iT邦幫忙

2025 iThome 鐵人賽

DAY 10
0
Cloud Native

新興k8s工作流flyte與MLOps。系列 第 10

Day 10: structured dataset標註dataframe型別

  • 分享至 

  • xImage
  •  

今天先寫前半段產生資料並標註行的型態,並在另一個task將該序列化的資料從s3下載回來,並在拆分。

import flytekit as fl
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from typing import Annotated, Optional, Any, Tuple
from sklearn.preprocessing import StandardScaler

all_cols = fl.kwtypes(feature1=str, feature2=int, feature3=int, feature4=int, target=int)

class CustomDataset(Dataset):
    def __init__(self, features: pd.DataFrame, targets: pd.Series, transform: Optional[Any] = None) -> None:
        self.features: torch.Tensor = torch.FloatTensor(features.values)
        self.targets: torch.Tensor = torch.LongTensor(targets.values)
        self.transform: Optional[Any] = transform
    
    def __len__(self) -> int:
        return len(self.features)
    
    def __getitem__(self, idx: int) -> Tuple[torch.Tensor, torch.Tensor]:
        feature: torch.Tensor = self.features[idx]
        target: torch.Tensor = self.targets[idx]
        
        if self.transform:
            feature = self.transform(feature)
            
        return feature, target

@fl.task()
def create_sample_data(n_samples: int = 1000) -> Annotated[fl.StructuredDataset, all_cols]:
    np.random.seed(42)
    feature1: np.ndarray = np.random.normal(0, 1, n_samples)
    feature2: np.ndarray = np.random.normal(0, 1.5, n_samples)
    feature3: np.ndarray = np.random.uniform(-2, 2, n_samples)
    feature4: np.ndarray = np.random.exponential(1, n_samples)
    
    target_score: np.ndarray = 0.5 * feature1 + 0.3 * feature2 - 0.2 * feature3 + 0.1 * feature4 + np.random.normal(0, 0.1, n_samples)
    target: np.ndarray = (target_score > np.median(target_score)).astype(int)
    
    df = pd.DataFrame({
        'feature1': feature1,
        'feature2': feature2,
        'feature3': feature3,
        'feature4': feature4,
        'target': target
    })
    return fl.StructuredDataset(dataframe=df)

@fl.task()
def preprocess_data(df: Annotated[fl.StructuredDataset, all_cols],) -> Tuple[pd.DataFrame, pd.DataFrame, pd.Series, pd.Series, StandardScaler]:
    df = df.open(pd.DataFrame).all()
    features: pd.DataFrame = df.drop('target', axis=1)
    targets: pd.Series = df['target']
    
    scaler: StandardScaler = StandardScaler()
    features_scaled: np.ndarray = scaler.fit_transform(features)
    features_df: pd.DataFrame = pd.DataFrame(features_scaled, columns=features.columns)
    
    X_train, X_test, y_train, y_test = train_test_split(
        features_df, targets, test_size=0.2, random_state=42, stratify=targets
    )
    
    return X_train, X_test, y_train, y_test, scaler

@fl.workflow()
def ml_wf(n_samples: int = 100):
    df = create_sample_data(n_samples)
    x_train, x_test, y_train, y_test, scaler = preprocess_data(df)

上一篇
Day 9: flyte task參數overrides
下一篇
Day 10: 工作流獲取k8s secret
系列文
新興k8s工作流flyte與MLOps。11
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言