今天先寫前半段產生資料並標註行的型態,並在另一個task將該序列化的資料從s3下載回來,並在拆分。
import flytekit as fl
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from typing import Annotated, Optional, Any, Tuple
from sklearn.preprocessing import StandardScaler
all_cols = fl.kwtypes(feature1=str, feature2=int, feature3=int, feature4=int, target=int)
class CustomDataset(Dataset):
def __init__(self, features: pd.DataFrame, targets: pd.Series, transform: Optional[Any] = None) -> None:
self.features: torch.Tensor = torch.FloatTensor(features.values)
self.targets: torch.Tensor = torch.LongTensor(targets.values)
self.transform: Optional[Any] = transform
def __len__(self) -> int:
return len(self.features)
def __getitem__(self, idx: int) -> Tuple[torch.Tensor, torch.Tensor]:
feature: torch.Tensor = self.features[idx]
target: torch.Tensor = self.targets[idx]
if self.transform:
feature = self.transform(feature)
return feature, target
@fl.task()
def create_sample_data(n_samples: int = 1000) -> Annotated[fl.StructuredDataset, all_cols]:
np.random.seed(42)
feature1: np.ndarray = np.random.normal(0, 1, n_samples)
feature2: np.ndarray = np.random.normal(0, 1.5, n_samples)
feature3: np.ndarray = np.random.uniform(-2, 2, n_samples)
feature4: np.ndarray = np.random.exponential(1, n_samples)
target_score: np.ndarray = 0.5 * feature1 + 0.3 * feature2 - 0.2 * feature3 + 0.1 * feature4 + np.random.normal(0, 0.1, n_samples)
target: np.ndarray = (target_score > np.median(target_score)).astype(int)
df = pd.DataFrame({
'feature1': feature1,
'feature2': feature2,
'feature3': feature3,
'feature4': feature4,
'target': target
})
return fl.StructuredDataset(dataframe=df)
@fl.task()
def preprocess_data(df: Annotated[fl.StructuredDataset, all_cols],) -> Tuple[pd.DataFrame, pd.DataFrame, pd.Series, pd.Series, StandardScaler]:
df = df.open(pd.DataFrame).all()
features: pd.DataFrame = df.drop('target', axis=1)
targets: pd.Series = df['target']
scaler: StandardScaler = StandardScaler()
features_scaled: np.ndarray = scaler.fit_transform(features)
features_df: pd.DataFrame = pd.DataFrame(features_scaled, columns=features.columns)
X_train, X_test, y_train, y_test = train_test_split(
features_df, targets, test_size=0.2, random_state=42, stratify=targets
)
return X_train, X_test, y_train, y_test, scaler
@fl.workflow()
def ml_wf(n_samples: int = 100):
df = create_sample_data(n_samples)
x_train, x_test, y_train, y_test, scaler = preprocess_data(df)