공부/그 외

[3년차 텐서플로우 이용자의 pytorch 사용기] 커스텀 데이터 셋 사용하기

비랑이 2024. 7. 4. 18:23

이전에 졸업 작품과 다른 연구에서 Eelectra 모델과 BERT를 이용하려고 pytorch를 사용했던 적이 있었는데

 

그때는 단일 모델에 CSV에 저장된 데이터를 사용했기 때문에 큰 어려운은 없었다

 

하지만 이번에 연구를 위해 CNN-LSTM 기반 모델을 사용하는데 데이터를 메모리에 로드하는 과정이 추가된것

 

코드를 간단하게 작성할 수 있는 파이토치 라이트닝을 사용할 예정이다. 

 

일반 파이토치는 훈련과정을 에포크 for문을 통해 작성을해야해서 여간 귀찮은게 아닌... 

 

일단 데이터셋 클래스를 작성해준다.

import numpy as np
from keras.preprocessing.image import img_to_array, load_img
from keras.applications.mobilenet_v2 import preprocess_input
from torch.utils.data import Dataset

class CustomDataset(Dataset):
    def __init__(self, image_paths, tfidf_paths, labels):
        super(CustomDataset, self).__init__()
        self.image_paths = image_paths
        self.tfidf_paths = tfidf_paths
        self.labels = labels
    
    def __len__(self):
        return len(self.image_paths)
    
    # 로컬에 저장된 이미지와 TFIDF로 변환된 데이터를 가져온다.
    def _load_image(self, img_path):
        # load image from path and convert to array
        img = load_img(img_path, color_mode="grayscale")
        st_data = img_to_array(img)
        img = preprocess_input(st_data)

        return img

    def _load_tfidf(self, tfidf_path):
        data = np.load(tfidf_path)

        return data
    
    def __getitem__(self, idx):
        # file 경로
        img = self._load_image(self.image_paths[idx])
        tfidf = self._load_tfidf(self.tfidf_paths[idx])
        # label 생성
        lbl = self.labels[idx]
        # image, label return
        return  img.astype(np.float32), tfidf.astype(np.float32), lbl

 

이제 모델을 작성한다.

 

class MalClassifierForFSL(pl.LightningModule):
  def __init__(self, hyper_parameter: dict):
    super().__init__()
    
    #파라미터
    self.MAX_LENGTH = hyper_parameter["max_length"] if ("max_length" in hyper_parameter) else 400
    self.LEARNING_RATE = hyper_parameter["lr"] if ("lr" in hyper_parameter) else 1e-6
    self.EPOCHS = hyper_parameter["epochs"] if ("epochs" in hyper_parameter) else 20
    self.OPTIMIZER = hyper_parameter["optimizer"] if ("optimizer" in hyper_parameter) else "adamw"
    self.GAMMA = hyper_parameter["gamma"] if ("gamma" in hyper_parameter) else 0.5
    self.BATCH_SIZE = hyper_parameter["batch_size"] if ("batch_size" in hyper_parameter) else 32
    self.DATAFRAME = hyper_parameter["DataFrame"] if ("DataFrame" in hyper_parameter) else pd.read_csv('./data_label_withBenign.csv')
    self.CLASS_NUM = hyper_parameter["class_num"] if ("class_num" in hyper_parameter) else 5
    self.STATIC_PATH = hyper_parameter["static_path"] if ("static_path" in hyper_parameter) else "../Integrated System/Data/prepro/datanumpy/images/"
    self.DYNAMIC_PATH = hyper_parameter["dynamic_path"] if ("dynamic_path" in hyper_parameter) else "../Integrated System/Data/prepro/datanumpy/tfidf/"
    self.IMG_TYPE = hyper_parameter["img_type"] if ("img_type" in hyper_parameter) else "orig"
    
    # 데이터 셋
    self._traindata =  self.DATAFRAME.loc[ self.DATAFRAME['train']=='train']
    self._valdata =  self.DATAFRAME.loc[ self.DATAFRAME['train']=='val']
    self._testdata =  self.DATAFRAME.loc[ self.DATAFRAME['train']=='test']    

    self.CNN_STATIC = torch.nn.Sequential(
        torch.nn.Conv2d(256, 32, kernel_size=3, stride=1, padding=2),
        torch.nn.ReLU(),
        torch.nn.MaxPool2d(kernel_size=2),
        torch.nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=2),
        torch.nn.ReLU(),
        torch.nn.MaxPool2d(kernel_size=2),
        torch.nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=2),
        torch.nn.ReLU(),
        torch.nn.MaxPool2d(kernel_size=2),
    )
    
    self.LSTM_DYNAMIC = torch.nn.Sequential(
        torch.nn.LSTM(input_size=self.MAX_LENGTH, hidden_size=120,
                      num_layers=2, batch_first=True, bidirectional=True),
    )
    
    self.relu = torch.nn.ReLU()
    self.drop = torch.nn.Dropout(p=0.5)

    self.d2 = torch.nn.Linear(4464, 256)
    # self.d3 = torch.nn.Linear(4000, 256)
    self.d4 = torch.nn.Linear(256, 128)
    self.d5 = torch.nn.Linear(128, 64)
    # Fully connected 1 (readout)
    self.d6 = torch.nn.Linear(64, self.CLASS_NUM)
    self.softmax = torch.nn.Softmax(dim=1)
    self.lossF = torch.nn.CrossEntropyLoss()
        
        
        
  def forward(self, dy, st):
    x2,_ = self.LSTM_DYNAMIC(dy)
    x2 = x2.reshape(x2.size(0), -1)
    x1 = self.CNN_STATIC(st)
    x1 = x1.reshape(x1.size(0), -1)

    # Concatenate in dim1 (feature dimension)
    out = torch.cat((x1, x2), 1)
    out = self.drop(out)
    out = self.d2(out)
    # out = self.relu(out)
    # out = self.d3(out)
    out = self.relu(out)
    out = self.d4(out)
    out = self.relu(out)
    out = self.d5(out)
    out = self.relu(out)
    out = self.d6(out)
    out = self.softmax(out)
    return out

모델의 파라미터는 기존에 사용하던것을 가져왔다. 

 

이미지는 CNN에서 TFIDF는 LSTM에서 특징을 추출하고 Fullyconnected Layer을통해 클래스를 추출하는 형태의 모델이다.

 

그 다음은 위에서 작성했던 데이터셋 클래스를 사용하는 데이터 로더 부분이다. 

 

def __dataloader(self, image_paths, tfidf_paths, class_labels, shuffle: bool = False):
    dataset = CustomDataset(image_paths, tfidf_paths, class_labels)
    return DataLoader(
        dataset,
        shuffle=shuffle,
        batch_size=self.BATCH_SIZE
    )

def train_dataloader(self):
    self.image_paths, self.tfidf_paths, self.class_labels = self.__preprocessing(self._traindata)
    return self.__dataloader(self.image_paths, self.tfidf_paths, self.class_labels, True)

def val_dataloader(self):
    self.image_paths, self.tfidf_paths, self.class_labels = self.__preprocessing(self._valdata)
    return self.__dataloader(self.image_paths, self.tfidf_paths, self.class_labels)

def test_dataloader(self):
    self.image_paths, self.tfidf_paths, self.class_labels = self.__preprocessing(self._testdata)
    return self.__dataloader(self.image_paths, self.tfidf_paths, self.class_labels)

 

훈련, 검증, 테스트 데이터에 대해 전처리를 진행하고 데이터로더를 통해 모델에 배치단위로 입력이 된다.

 

나중에 다시 사용할 때 편하려고 공통된 부분은 __dataloader를 통해 묶었다.

 

여기서 중요한 점은 로컬에 데이터가 클래스나 훈련, 검증, 테스트 데이터별로 분류가 되어있는것은 아니기 때문에

.CustomDataset 클래스에서 데이터를 로드하기 위해 데이터의 경로들을 가져오는 __preprocessing 과정이 필요하다.

 

def __preprocessing(self, data_list):
    self.suffled_idx = np.arange(len(data_list))
    np.random.shuffle(self.suffled_idx)
    self.image_paths = []
    self.tfidf_paths = []
    self.class_labels = []
    for idx, i in data_list.iterrows():
        datanumpy_tfidf = str(i['hash'])+'.npy'
        datanumpy_img = str(i['hash'])+'.png'
        self.image_paths += [os.path.join(self.STATIC_PATH, self.IMG_TYPE, datanumpy_img)]
        self.tfidf_paths += [os.path.join(self.DYNAMIC_PATH, str(self.MAX_LENGTH), datanumpy_tfidf)]
        
        self.class_labels += [i['label']]
    self.image_paths = np.array(self.image_paths)
    self.tfidf_paths = np.array(self.tfidf_paths)
    self.class_labels = np.array(self.class_labels)

    st = self.image_paths[self.suffled_idx[:]]
    dy = self.tfidf_paths[self.suffled_idx[:]]
    labels = self.class_labels[self.suffled_idx[:]]
    return st, dy, labels

데이터 프레임에 입력된 파일의 이름을 경로와 연결애 paths 리스트에 넣어주고 

데이터를 섞어준다. 셔플과정이 없으면 훈련할 때 가중치 업데이트를 제대로 못하는것 같아서 넣어 주었다.

 

물론 Dataloader에서 shuffle이라는 파라미터가 있지만 왜인지 제대로 안되어서 따로 셔플과정을 추가했다.

 

다음은 훈련과정인 step이다.

def __step(self, batch, batch_idx, state="train"):
    image_paths, tfidf_paths, class_labels = batch
    output = self.forward(dy=tfidf_paths, st=image_paths)

    logits = output
    loss = self.lossF(logits, class_labels)

    preds = logits.argmax(dim=-1)
    
    logs = {'train_loss': loss}
    self.log_dict({'loss': loss, 'acc': accuracy_score(class_labels, preds)}, on_step=True, on_epoch=True, prog_bar=True, logger=True)
    if state == "train":
      return loss
    elif state == "val":
      return {"loss": loss, "y_true": class_labels, "y_pred": preds}
    elif state == "test":
      return {"loss": loss, "y_true": class_labels, "y_pred": preds}
  
  def training_step(self, batch, batch_idx):
    return self.__step(batch, batch_idx, state="train")

  def validation_step(self, batch, batch_idx):
    return self.__step(batch, batch_idx, state="val")

  def test_step(self, batch, batch_idx):
    return self.__step(batch, batch_idx, state="test")

훈련과정을 위해 self.log_dict를 통해 텐서보드에서 사용할 훈련 로그를 기록해준다.

 

다음은 훈련이 종료될 때마다 호출되는 메서드이다.

def __epoch_end(self, outputs, state="train"):
    loss = torch.tensor(0, dtype=torch.float64)
    y_true, y_pred = [], []

    if state == "train":
      loss = outputs / len(outputs)
      return loss

    elif state == "val":
      for i in outputs:
        loss += i["loss"].cpu().detach()
        y_true += i["y_true"]
        y_pred += i["y_pred"]

        loss = loss / len(outputs)
        acc = accuracy_score(y_true, y_pred)
      return {"loss": loss, "acc": acc}

    elif state == "test":
      for i in outputs:
        loss += i["loss"].cpu().detach()
        y_true += i["y_true"]
        y_pred += i["y_pred"]

        loss = loss / len(outputs)
        acc = accuracy_score(y_true, y_pred)
      return {"loss": loss, "acc": acc}
      
def _on_train_epoch_end(self, outputs):
	self.__epoch_end(outputs, state="train")

def _on_validation_epoch_end(self, outputs):
    self.__epoch_end(outputs, state="val")

def _on_test_epoch_end(self, outputs):
    self.__epoch_end(outputs, state="test")

 

def configure_optimizers(self):
    if self.OPTIMIZER == "adam":
        optimizer = Adam(self.parameters(), lr=self.LEARNING_RATE)
    elif self.OPTIMIZER == "adamw":
        optimizer = AdamW(self.parameters(), lr=self.LEARNING_RATE)
    elif self.OPTIMIZER == "sgd":
        optimizer = SGD(self.parameters(), lr=self.LEARNING_RATE)
    else:
        raise NotImplementedError(f"'{self.OPTIMIZER}' is not available.")

    scheduler = ExponentialLR(optimizer, gamma=self.GAMMA)

    return {
        "optimizer": optimizer,
        "scheduler": scheduler
    }

 

model = MalClassifierForFSL(hyper_parameter)

device = torch.device('cuda:0') if torch.cuda.is_available() else torch.device('cpu')

model = model.to(device)

위 코드를 통해 모델을 불러오고

 

훈련 로그를 기록해야하니 텐서보드로거를 이용하자

from pytorch_lightning.loggers import TensorBoardLogger

logger = TensorBoardLogger(save_dir='logs')

 

trainer = pl.Trainer(
    # callbacks=[checkpoint_callback],
    max_epochs=model.EPOCHS,
    deterministic=torch.cuda.is_available(),
    logger=logger,
    log_every_n_steps=1,
    # reload_dataloaders_every_n_epochs=1,
    # devices=[0] if torch.cuda.is_available() else None,
    #tpu_cores=1
)

트레이너를 선언하는데 주석처리 되어있는 파라미터들은 필요에 따라 추가하면 된다. callback은 조건에 따라 훈련 조기 종료, 에포크마다 모델 저장 등을 할 수 있고

 

reload_dataloaders_every_n_epochs는 N번째 에포크마다 데이터 로더를 재실행 해주는 것이다. 

가끔씩 셔플을 추가했음에도 가중치 업데이트가 안되고 정확도가 변화안할 때 훈련 로그를 확인하면 에포크 마다 똑같은 loss, 정확도의 경향이 매치마다 똑같은 걸 볼 수 있는데 그때 이 파라미터를 추가해주면 해결할 수 있다.

devices, tpu_cores는 GPU나 코랩의 TPU를 사용할 때 추가하면 되는 파라미터이다.

 

 

trainer.fit(model=model)

trainer.test(model)

두 코드를 통해 훈련과 최종 테스트를 진행하면 

 

 

 

훈련 정확도와 테스트 정확도가 비슷한것을 볼 수 있다 정확도가 낮은것은 테스트를 위해 데이터의 일부만 사용했기 때문이다.

 

텐서보드에서 훈련 과정을 확인해보자

 

 

 

파이토치를 오랜만에 다뤄본 느낌은

텐서플로우는 체계화가 잘되어있어서 쉽게 사용할 수 있는것 같고

파이토치는 어렵지만 커스텀을 하기에 용이해보인다.

 

사실 텐서플로우도 커스텀이 가능하고 파이토치도 더 쉽게 사용할 수 있어서

그냥 취향과 주로 사용하는 분야의 차이인것 같다.

 

자연어 처리나 LLM을 사용할거라면 필수이긴 하다.