[3년차 텐서플로우 이용자의 pytorch 사용기] 커스텀 데이터 셋 사용하기
이전에 졸업 작품과 다른 연구에서 Eelectra 모델과 BERT를 이용하려고 pytorch를 사용했던 적이 있었는데
그때는 단일 모델에 CSV에 저장된 데이터를 사용했기 때문에 큰 어려운은 없었다
하지만 이번에 연구를 위해 CNN-LSTM 기반 모델을 사용하는데 데이터를 메모리에 로드하는 과정이 추가된것
코드를 간단하게 작성할 수 있는 파이토치 라이트닝을 사용할 예정이다.
일반 파이토치는 훈련과정을 에포크 for문을 통해 작성을해야해서 여간 귀찮은게 아닌...
일단 데이터셋 클래스를 작성해준다.
import numpy as np
from keras.preprocessing.image import img_to_array, load_img
from keras.applications.mobilenet_v2 import preprocess_input
from torch.utils.data import Dataset
class CustomDataset(Dataset):
def __init__(self, image_paths, tfidf_paths, labels):
super(CustomDataset, self).__init__()
self.image_paths = image_paths
self.tfidf_paths = tfidf_paths
self.labels = labels
def __len__(self):
return len(self.image_paths)
# 로컬에 저장된 이미지와 TFIDF로 변환된 데이터를 가져온다.
def _load_image(self, img_path):
# load image from path and convert to array
img = load_img(img_path, color_mode="grayscale")
st_data = img_to_array(img)
img = preprocess_input(st_data)
return img
def _load_tfidf(self, tfidf_path):
data = np.load(tfidf_path)
return data
def __getitem__(self, idx):
# file 경로
img = self._load_image(self.image_paths[idx])
tfidf = self._load_tfidf(self.tfidf_paths[idx])
# label 생성
lbl = self.labels[idx]
# image, label return
return img.astype(np.float32), tfidf.astype(np.float32), lbl
이제 모델을 작성한다.
class MalClassifierForFSL(pl.LightningModule):
def __init__(self, hyper_parameter: dict):
super().__init__()
#파라미터
self.MAX_LENGTH = hyper_parameter["max_length"] if ("max_length" in hyper_parameter) else 400
self.LEARNING_RATE = hyper_parameter["lr"] if ("lr" in hyper_parameter) else 1e-6
self.EPOCHS = hyper_parameter["epochs"] if ("epochs" in hyper_parameter) else 20
self.OPTIMIZER = hyper_parameter["optimizer"] if ("optimizer" in hyper_parameter) else "adamw"
self.GAMMA = hyper_parameter["gamma"] if ("gamma" in hyper_parameter) else 0.5
self.BATCH_SIZE = hyper_parameter["batch_size"] if ("batch_size" in hyper_parameter) else 32
self.DATAFRAME = hyper_parameter["DataFrame"] if ("DataFrame" in hyper_parameter) else pd.read_csv('./data_label_withBenign.csv')
self.CLASS_NUM = hyper_parameter["class_num"] if ("class_num" in hyper_parameter) else 5
self.STATIC_PATH = hyper_parameter["static_path"] if ("static_path" in hyper_parameter) else "../Integrated System/Data/prepro/datanumpy/images/"
self.DYNAMIC_PATH = hyper_parameter["dynamic_path"] if ("dynamic_path" in hyper_parameter) else "../Integrated System/Data/prepro/datanumpy/tfidf/"
self.IMG_TYPE = hyper_parameter["img_type"] if ("img_type" in hyper_parameter) else "orig"
# 데이터 셋
self._traindata = self.DATAFRAME.loc[ self.DATAFRAME['train']=='train']
self._valdata = self.DATAFRAME.loc[ self.DATAFRAME['train']=='val']
self._testdata = self.DATAFRAME.loc[ self.DATAFRAME['train']=='test']
self.CNN_STATIC = torch.nn.Sequential(
torch.nn.Conv2d(256, 32, kernel_size=3, stride=1, padding=2),
torch.nn.ReLU(),
torch.nn.MaxPool2d(kernel_size=2),
torch.nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=2),
torch.nn.ReLU(),
torch.nn.MaxPool2d(kernel_size=2),
torch.nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=2),
torch.nn.ReLU(),
torch.nn.MaxPool2d(kernel_size=2),
)
self.LSTM_DYNAMIC = torch.nn.Sequential(
torch.nn.LSTM(input_size=self.MAX_LENGTH, hidden_size=120,
num_layers=2, batch_first=True, bidirectional=True),
)
self.relu = torch.nn.ReLU()
self.drop = torch.nn.Dropout(p=0.5)
self.d2 = torch.nn.Linear(4464, 256)
# self.d3 = torch.nn.Linear(4000, 256)
self.d4 = torch.nn.Linear(256, 128)
self.d5 = torch.nn.Linear(128, 64)
# Fully connected 1 (readout)
self.d6 = torch.nn.Linear(64, self.CLASS_NUM)
self.softmax = torch.nn.Softmax(dim=1)
self.lossF = torch.nn.CrossEntropyLoss()
def forward(self, dy, st):
x2,_ = self.LSTM_DYNAMIC(dy)
x2 = x2.reshape(x2.size(0), -1)
x1 = self.CNN_STATIC(st)
x1 = x1.reshape(x1.size(0), -1)
# Concatenate in dim1 (feature dimension)
out = torch.cat((x1, x2), 1)
out = self.drop(out)
out = self.d2(out)
# out = self.relu(out)
# out = self.d3(out)
out = self.relu(out)
out = self.d4(out)
out = self.relu(out)
out = self.d5(out)
out = self.relu(out)
out = self.d6(out)
out = self.softmax(out)
return out
모델의 파라미터는 기존에 사용하던것을 가져왔다.
이미지는 CNN에서 TFIDF는 LSTM에서 특징을 추출하고 Fullyconnected Layer을통해 클래스를 추출하는 형태의 모델이다.
그 다음은 위에서 작성했던 데이터셋 클래스를 사용하는 데이터 로더 부분이다.
def __dataloader(self, image_paths, tfidf_paths, class_labels, shuffle: bool = False):
dataset = CustomDataset(image_paths, tfidf_paths, class_labels)
return DataLoader(
dataset,
shuffle=shuffle,
batch_size=self.BATCH_SIZE
)
def train_dataloader(self):
self.image_paths, self.tfidf_paths, self.class_labels = self.__preprocessing(self._traindata)
return self.__dataloader(self.image_paths, self.tfidf_paths, self.class_labels, True)
def val_dataloader(self):
self.image_paths, self.tfidf_paths, self.class_labels = self.__preprocessing(self._valdata)
return self.__dataloader(self.image_paths, self.tfidf_paths, self.class_labels)
def test_dataloader(self):
self.image_paths, self.tfidf_paths, self.class_labels = self.__preprocessing(self._testdata)
return self.__dataloader(self.image_paths, self.tfidf_paths, self.class_labels)
훈련, 검증, 테스트 데이터에 대해 전처리를 진행하고 데이터로더를 통해 모델에 배치단위로 입력이 된다.
나중에 다시 사용할 때 편하려고 공통된 부분은 __dataloader를 통해 묶었다.
여기서 중요한 점은 로컬에 데이터가 클래스나 훈련, 검증, 테스트 데이터별로 분류가 되어있는것은 아니기 때문에
.CustomDataset 클래스에서 데이터를 로드하기 위해 데이터의 경로들을 가져오는 __preprocessing 과정이 필요하다.
def __preprocessing(self, data_list):
self.suffled_idx = np.arange(len(data_list))
np.random.shuffle(self.suffled_idx)
self.image_paths = []
self.tfidf_paths = []
self.class_labels = []
for idx, i in data_list.iterrows():
datanumpy_tfidf = str(i['hash'])+'.npy'
datanumpy_img = str(i['hash'])+'.png'
self.image_paths += [os.path.join(self.STATIC_PATH, self.IMG_TYPE, datanumpy_img)]
self.tfidf_paths += [os.path.join(self.DYNAMIC_PATH, str(self.MAX_LENGTH), datanumpy_tfidf)]
self.class_labels += [i['label']]
self.image_paths = np.array(self.image_paths)
self.tfidf_paths = np.array(self.tfidf_paths)
self.class_labels = np.array(self.class_labels)
st = self.image_paths[self.suffled_idx[:]]
dy = self.tfidf_paths[self.suffled_idx[:]]
labels = self.class_labels[self.suffled_idx[:]]
return st, dy, labels
데이터 프레임에 입력된 파일의 이름을 경로와 연결애 paths 리스트에 넣어주고
데이터를 섞어준다. 셔플과정이 없으면 훈련할 때 가중치 업데이트를 제대로 못하는것 같아서 넣어 주었다.
물론 Dataloader에서 shuffle이라는 파라미터가 있지만 왜인지 제대로 안되어서 따로 셔플과정을 추가했다.
다음은 훈련과정인 step이다.
def __step(self, batch, batch_idx, state="train"):
image_paths, tfidf_paths, class_labels = batch
output = self.forward(dy=tfidf_paths, st=image_paths)
logits = output
loss = self.lossF(logits, class_labels)
preds = logits.argmax(dim=-1)
logs = {'train_loss': loss}
self.log_dict({'loss': loss, 'acc': accuracy_score(class_labels, preds)}, on_step=True, on_epoch=True, prog_bar=True, logger=True)
if state == "train":
return loss
elif state == "val":
return {"loss": loss, "y_true": class_labels, "y_pred": preds}
elif state == "test":
return {"loss": loss, "y_true": class_labels, "y_pred": preds}
def training_step(self, batch, batch_idx):
return self.__step(batch, batch_idx, state="train")
def validation_step(self, batch, batch_idx):
return self.__step(batch, batch_idx, state="val")
def test_step(self, batch, batch_idx):
return self.__step(batch, batch_idx, state="test")
훈련과정을 위해 self.log_dict를 통해 텐서보드에서 사용할 훈련 로그를 기록해준다.
다음은 훈련이 종료될 때마다 호출되는 메서드이다.
def __epoch_end(self, outputs, state="train"):
loss = torch.tensor(0, dtype=torch.float64)
y_true, y_pred = [], []
if state == "train":
loss = outputs / len(outputs)
return loss
elif state == "val":
for i in outputs:
loss += i["loss"].cpu().detach()
y_true += i["y_true"]
y_pred += i["y_pred"]
loss = loss / len(outputs)
acc = accuracy_score(y_true, y_pred)
return {"loss": loss, "acc": acc}
elif state == "test":
for i in outputs:
loss += i["loss"].cpu().detach()
y_true += i["y_true"]
y_pred += i["y_pred"]
loss = loss / len(outputs)
acc = accuracy_score(y_true, y_pred)
return {"loss": loss, "acc": acc}
def _on_train_epoch_end(self, outputs):
self.__epoch_end(outputs, state="train")
def _on_validation_epoch_end(self, outputs):
self.__epoch_end(outputs, state="val")
def _on_test_epoch_end(self, outputs):
self.__epoch_end(outputs, state="test")
def configure_optimizers(self):
if self.OPTIMIZER == "adam":
optimizer = Adam(self.parameters(), lr=self.LEARNING_RATE)
elif self.OPTIMIZER == "adamw":
optimizer = AdamW(self.parameters(), lr=self.LEARNING_RATE)
elif self.OPTIMIZER == "sgd":
optimizer = SGD(self.parameters(), lr=self.LEARNING_RATE)
else:
raise NotImplementedError(f"'{self.OPTIMIZER}' is not available.")
scheduler = ExponentialLR(optimizer, gamma=self.GAMMA)
return {
"optimizer": optimizer,
"scheduler": scheduler
}
model = MalClassifierForFSL(hyper_parameter)
device = torch.device('cuda:0') if torch.cuda.is_available() else torch.device('cpu')
model = model.to(device)
위 코드를 통해 모델을 불러오고
훈련 로그를 기록해야하니 텐서보드로거를 이용하자
from pytorch_lightning.loggers import TensorBoardLogger
logger = TensorBoardLogger(save_dir='logs')
trainer = pl.Trainer(
# callbacks=[checkpoint_callback],
max_epochs=model.EPOCHS,
deterministic=torch.cuda.is_available(),
logger=logger,
log_every_n_steps=1,
# reload_dataloaders_every_n_epochs=1,
# devices=[0] if torch.cuda.is_available() else None,
#tpu_cores=1
)
트레이너를 선언하는데 주석처리 되어있는 파라미터들은 필요에 따라 추가하면 된다. callback은 조건에 따라 훈련 조기 종료, 에포크마다 모델 저장 등을 할 수 있고
reload_dataloaders_every_n_epochs는 N번째 에포크마다 데이터 로더를 재실행 해주는 것이다.
가끔씩 셔플을 추가했음에도 가중치 업데이트가 안되고 정확도가 변화안할 때 훈련 로그를 확인하면 에포크 마다 똑같은 loss, 정확도의 경향이 매치마다 똑같은 걸 볼 수 있는데 그때 이 파라미터를 추가해주면 해결할 수 있다.
devices, tpu_cores는 GPU나 코랩의 TPU를 사용할 때 추가하면 되는 파라미터이다.
trainer.fit(model=model)
trainer.test(model)
두 코드를 통해 훈련과 최종 테스트를 진행하면
훈련 정확도와 테스트 정확도가 비슷한것을 볼 수 있다 정확도가 낮은것은 테스트를 위해 데이터의 일부만 사용했기 때문이다.
텐서보드에서 훈련 과정을 확인해보자
파이토치를 오랜만에 다뤄본 느낌은
텐서플로우는 체계화가 잘되어있어서 쉽게 사용할 수 있는것 같고
파이토치는 어렵지만 커스텀을 하기에 용이해보인다.
사실 텐서플로우도 커스텀이 가능하고 파이토치도 더 쉽게 사용할 수 있어서
그냥 취향과 주로 사용하는 분야의 차이인것 같다.
자연어 처리나 LLM을 사용할거라면 필수이긴 하다.