iT邦幫忙

2025 iThome 鐵人賽

DAY 12
0

前言

先前我們用 SageMaker 的基本觀念以及內建的一些算法,今天我們要來訓練自定義模型的部分

參考部分::
參考資料 - 算法選擇
參考資料 - 訓練的部分
官方的 Overview

關於訓練

sageMaker 提供的好處

  • 彈性擴展:根據需求自動調整運算資源
  • 成本優化:只為實際使用的訓練時間付費
  • 框架支援:支援 TensorFlow、PyTorch、scikit-learn 等主流框架
  • 分散式訓練:支援大規模分散式訓練任務
  • 實驗管理:自動追蹤和比較不同的訓練實驗

創建工作(traning job)

raining Job 是 SageMaker 中執行模型訓練的基本單位

  • 演算法規格:指定使用的框架和版本
  • 輸入資料配置:定義訓練資料的位置和格式
  • 輸出配置:指定模型儲存位置
  • 資源配置:設定運算實例類型和數量

Training Image(訓練映像)

SageMaker 提供預建的 Docker 映像,支援常見的機器學習框架:

  • TensorFlow
  • PyTorch
  • XGBoost
  • scikit-learn
  • Hugging Face

開始訓練

step 1 : 準備訓練腳本 -> 創建腳本 tran.py

import argparse
import json
import logging
import os
import sys
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import boto3

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))

# 定義簡單的 CNN 模型
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout2d(0.25)
        self.dropout2 = nn.Dropout2d(0.5)
        self.fc1 = nn.Linear(64 * 6 * 6, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)

def _get_train_data_loader(batch_size, training_dir):
    logger.info("Get train data loader")
    transform = transforms.Compose([
        transforms.Resize((32, 32)),
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])
    
    dataset = datasets.ImageFolder(training_dir, transform=transform)
    return DataLoader(dataset, batch_size=batch_size, shuffle=True)

def _get_test_data_loader(test_batch_size, training_dir):
    logger.info("Get test data loader")
    transform = transforms.Compose([
        transforms.Resize((32, 32)),
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])
    
    dataset = datasets.ImageFolder(training_dir, transform=transform)
    return DataLoader(dataset, batch_size=test_batch_size, shuffle=False)

def train(args):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    logger.info("Device Type: {}".format(device))

    # 載入資料
    train_loader = _get_train_data_loader(args.batch_size, args.data_dir)
    test_loader = _get_test_data_loader(args.test_batch_size, args.test_dir)

    # 初始化模型
    model = Net().to(device)
    optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)

    # 訓練循環
    for epoch in range(1, args.epochs + 1):
        model.train()
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            loss.backward()
            optimizer.step()
            
            if batch_idx % args.log_interval == 0:
                logger.info('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                    epoch, batch_idx * len(data), len(train_loader.dataset),
                    100. * batch_idx / len(train_loader), loss.item()))

        # 驗證
        test(model, device, test_loader)

    # 保存模型
    save_model(model, args.model_dir)

def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)
    logger.info('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))

def save_model(model, model_dir):
    logger.info("Saving the model.")
    path = os.path.join(model_dir, 'model.pth')
    torch.save(model.cpu().state_dict(), path)

if __name__ == '__main__':
    parser = argparse.ArgumentParser()

    # SageMaker 環境變數
    parser.add_argument('--batch-size', type=int, default=64, metavar='N',
                        help='input batch size for training (default: 64)')
    parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
                        help='input batch size for testing (default: 1000)')
    parser.add_argument('--epochs', type=int, default=10, metavar='N',
                        help='number of epochs to train (default: 10)')
    parser.add_argument('--lr', type=float, default=0.01, metavar='LR',
                        help='learning rate (default: 0.01)')
    parser.add_argument('--momentum', type=float, default=0.5, metavar='M',
                        help='SGD momentum (default: 0.5)')
    parser.add_argument('--log-interval', type=int, default=100, metavar='N',
                        help='how many batches to wait before logging training status')
    
    # Container environment
    parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])
    parser.add_argument('--data-dir', type=str, default=os.environ['SM_CHANNEL_TRAINING'])
    parser.add_argument('--test-dir', type=str, default=os.environ['SM_CHANNEL_TESTING'])

    args = parser.parse_args()

    train(args)

step 2 : 設定 sageMaker 訓練作業

import boto3
import sagemaker
from sagemaker.pytorch import PyTorch
from sagemaker.inputs import TrainingInput

# 初始化 SageMaker session
sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
bucket = sagemaker_session.default_bucket()

# 定義資料位置
train_input = TrainingInput(
    s3_data='s3://{}/training-data/'.format(bucket),
    content_type='application/x-image'
)

test_input = TrainingInput(
    s3_data='s3://{}/test-data/'.format(bucket),
    content_type='application/x-image'
)

# 創建 PyTorch estimator
pytorch_estimator = PyTorch(
    entry_point='train.py',
    source_dir='.',
    role=role,
    instance_type='ml.p3.2xlarge',
    instance_count=1,
    framework_version='1.12.0',
    py_version='py38',
    hyperparameters={
        'epochs': 15,
        'batch-size': 32,
        'learning-rate': 0.001,
        'momentum': 0.9
    }
)

# 開始訓練
pytorch_estimator.fit({
    'training': train_input,
    'testing': test_input
})

step 3 : 監控訓練進度

# 取得訓練作業狀態
training_job_name = pytorch_estimator.latest_training_job.job_name
print(f"Training job name: {training_job_name}")

# 查看訓練指標
from sagemaker.analytics import TrainingJobAnalytics

analytics = TrainingJobAnalytics(training_job_name)
df = analytics.dataframe()
print(df.head())

# 即時監控訓練日誌
pytorch_estimator.logs()

其他補充

  1. 分散式訓練
pytorch_estimator = PyTorch(
    entry_point='train.py',
    source_dir='.',
    role=role,
    instance_type='ml.p3.8xlarge',
    instance_count=4,  # 使用 4 個實例
    distribution={'parameter_server': {'enabled': True}},
    framework_version='1.12.0',
    py_version='py38'
)
  1. 節約成本 - Spot Instance
pytorch_estimator = PyTorch(
    entry_point='train.py',
    source_dir='.',
    role=role,
    instance_type='ml.p3.2xlarge',
    instance_count=1,
    use_spot_instances=True,
    max_wait=86400,  # 24小時
    max_run=7200,    # 2小時
    checkpoint_s3_uri='s3://{}/checkpoints/'.format(bucket),
    framework_version='1.12.0',
    py_version='py38'
)
  1. 自動模型調參(Hyperparameter tuning)
from sagemaker.tuner import IntegerParameter, CategoricalParameter, ContinuousParameter, HyperparameterTuner

# 定義超參數範圍
hyperparameter_ranges = {
    'lr': ContinuousParameter(0.001, 0.1),
    'batch-size': CategoricalParameter([16, 32, 64, 128]),
    'epochs': IntegerParameter(10, 50)
}

# 設定目標指標
objective_metric_name = 'accuracy'
objective_type = 'Maximize'

# 創建調參器
tuner = HyperparameterTuner(
    pytorch_estimator,
    objective_metric_name,
    hyperparameter_ranges,
    objective_type=objective_type,
    max_jobs=20,
    max_parallel_jobs=3
)

# 開始調參
tuner.fit({
    'training': train_input,
    'testing': test_input
})

幾個常見問題
Q1 : 記憶體不足
A:

# 解決方案:調整批次大小或使用梯度累積
parser.add_argument('--gradient-accumulation-steps', type=int, default=1)

Q2 : 訓練速度慢
A:

  • 使用更強大的 GPU 實例(如 ml.p4d.24xlarge)
  • 實施資料並行處理
  • 優化資料載入管道

Q3: 成本控制

# 設定最大訓練時間
pytorch_estimator = PyTorch(
    # ... 其他參數
    max_run=3600,  # 1小時後自動停止
)

今天就探討到這


上一篇
SageMaker Data Wrangler:資料預處理實戰
系列文
從零開始的AWS AI之路:用Bedrock與SageMaker打造智慧應用的30天實戰12
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言