先前我們用 SageMaker 的基本觀念以及內建的一些算法,今天我們要來訓練自定義模型的部分
參考部分::
參考資料 - 算法選擇
參考資料 - 訓練的部分
官方的 Overview
sageMaker 提供的好處
SageMaker 提供預建的 Docker 映像,支援常見的機器學習框架:
step 1 : 準備訓練腳本 -> 創建腳本 tran.py
import argparse
import json
import logging
import os
import sys
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import boto3
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))
# 定義簡單的 CNN 模型
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(3, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout2d(0.25)
self.dropout2 = nn.Dropout2d(0.5)
self.fc1 = nn.Linear(64 * 6 * 6, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.relu(x)
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.fc1(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.fc2(x)
return F.log_softmax(x, dim=1)
def _get_train_data_loader(batch_size, training_dir):
logger.info("Get train data loader")
transform = transforms.Compose([
transforms.Resize((32, 32)),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
dataset = datasets.ImageFolder(training_dir, transform=transform)
return DataLoader(dataset, batch_size=batch_size, shuffle=True)
def _get_test_data_loader(test_batch_size, training_dir):
logger.info("Get test data loader")
transform = transforms.Compose([
transforms.Resize((32, 32)),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
dataset = datasets.ImageFolder(training_dir, transform=transform)
return DataLoader(dataset, batch_size=test_batch_size, shuffle=False)
def train(args):
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logger.info("Device Type: {}".format(device))
# 載入資料
train_loader = _get_train_data_loader(args.batch_size, args.data_dir)
test_loader = _get_test_data_loader(args.test_batch_size, args.test_dir)
# 初始化模型
model = Net().to(device)
optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)
# 訓練循環
for epoch in range(1, args.epochs + 1):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % args.log_interval == 0:
logger.info('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), len(train_loader.dataset),
100. * batch_idx / len(train_loader), loss.item()))
# 驗證
test(model, device, test_loader)
# 保存模型
save_model(model, args.model_dir)
def test(model, device, test_loader):
model.eval()
test_loss = 0
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
test_loss += F.nll_loss(output, target, reduction='sum').item()
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()
test_loss /= len(test_loader.dataset)
logger.info('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
test_loss, correct, len(test_loader.dataset),
100. * correct / len(test_loader.dataset)))
def save_model(model, model_dir):
logger.info("Saving the model.")
path = os.path.join(model_dir, 'model.pth')
torch.save(model.cpu().state_dict(), path)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
# SageMaker 環境變數
parser.add_argument('--batch-size', type=int, default=64, metavar='N',
help='input batch size for training (default: 64)')
parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
help='input batch size for testing (default: 1000)')
parser.add_argument('--epochs', type=int, default=10, metavar='N',
help='number of epochs to train (default: 10)')
parser.add_argument('--lr', type=float, default=0.01, metavar='LR',
help='learning rate (default: 0.01)')
parser.add_argument('--momentum', type=float, default=0.5, metavar='M',
help='SGD momentum (default: 0.5)')
parser.add_argument('--log-interval', type=int, default=100, metavar='N',
help='how many batches to wait before logging training status')
# Container environment
parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])
parser.add_argument('--data-dir', type=str, default=os.environ['SM_CHANNEL_TRAINING'])
parser.add_argument('--test-dir', type=str, default=os.environ['SM_CHANNEL_TESTING'])
args = parser.parse_args()
train(args)
step 2 : 設定 sageMaker 訓練作業
import boto3
import sagemaker
from sagemaker.pytorch import PyTorch
from sagemaker.inputs import TrainingInput
# 初始化 SageMaker session
sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
bucket = sagemaker_session.default_bucket()
# 定義資料位置
train_input = TrainingInput(
s3_data='s3://{}/training-data/'.format(bucket),
content_type='application/x-image'
)
test_input = TrainingInput(
s3_data='s3://{}/test-data/'.format(bucket),
content_type='application/x-image'
)
# 創建 PyTorch estimator
pytorch_estimator = PyTorch(
entry_point='train.py',
source_dir='.',
role=role,
instance_type='ml.p3.2xlarge',
instance_count=1,
framework_version='1.12.0',
py_version='py38',
hyperparameters={
'epochs': 15,
'batch-size': 32,
'learning-rate': 0.001,
'momentum': 0.9
}
)
# 開始訓練
pytorch_estimator.fit({
'training': train_input,
'testing': test_input
})
step 3 : 監控訓練進度
# 取得訓練作業狀態
training_job_name = pytorch_estimator.latest_training_job.job_name
print(f"Training job name: {training_job_name}")
# 查看訓練指標
from sagemaker.analytics import TrainingJobAnalytics
analytics = TrainingJobAnalytics(training_job_name)
df = analytics.dataframe()
print(df.head())
# 即時監控訓練日誌
pytorch_estimator.logs()
pytorch_estimator = PyTorch(
entry_point='train.py',
source_dir='.',
role=role,
instance_type='ml.p3.8xlarge',
instance_count=4, # 使用 4 個實例
distribution={'parameter_server': {'enabled': True}},
framework_version='1.12.0',
py_version='py38'
)
pytorch_estimator = PyTorch(
entry_point='train.py',
source_dir='.',
role=role,
instance_type='ml.p3.2xlarge',
instance_count=1,
use_spot_instances=True,
max_wait=86400, # 24小時
max_run=7200, # 2小時
checkpoint_s3_uri='s3://{}/checkpoints/'.format(bucket),
framework_version='1.12.0',
py_version='py38'
)
from sagemaker.tuner import IntegerParameter, CategoricalParameter, ContinuousParameter, HyperparameterTuner
# 定義超參數範圍
hyperparameter_ranges = {
'lr': ContinuousParameter(0.001, 0.1),
'batch-size': CategoricalParameter([16, 32, 64, 128]),
'epochs': IntegerParameter(10, 50)
}
# 設定目標指標
objective_metric_name = 'accuracy'
objective_type = 'Maximize'
# 創建調參器
tuner = HyperparameterTuner(
pytorch_estimator,
objective_metric_name,
hyperparameter_ranges,
objective_type=objective_type,
max_jobs=20,
max_parallel_jobs=3
)
# 開始調參
tuner.fit({
'training': train_input,
'testing': test_input
})
幾個常見問題
Q1 : 記憶體不足
A:
# 解決方案:調整批次大小或使用梯度累積
parser.add_argument('--gradient-accumulation-steps', type=int, default=1)
Q2 : 訓練速度慢
A:
Q3: 成本控制
# 設定最大訓練時間
pytorch_estimator = PyTorch(
# ... 其他參數
max_run=3600, # 1小時後自動停止
)
今天就探討到這