到目前為止,我們已經有了不同模型:
問題:
👉 不同模型的輸出格式不一致,導致後續比較或 API 整合困難。
今天的目標:
統一推論輸出格式:
{
  "input": "Naruto",
  "recommendations": ["Bleach", "One Piece", ...]
}
把推薦範例存成 Artifacts,方便在 MLflow UI 查看與下載。
使用 MLflow 的 log_artifact() 與 log_dict() 完成紀錄。
我們新增一個 pipeline_v3.py。
/usr/mlflow/src/pipeline/pipeline_v3.py   # 新版 Pipeline,統一推論格式 + 存 artifacts
/usr/mlflow/run_pipeline_v3.py            # 執行入口
📂 路徑:/usr/mlflow/src/pipeline/pipeline_v3.py
import os
import json
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import mlflow
DATA_DIR = "/usr/mlflow/data"
class AnimePipelineV3:
    def __init__(self, sample_size=1000):
        self.sample_size = sample_size
    def load_data(self):
        """載入動畫資料,只取部分樣本確保 3 分鐘內可跑完"""
        anime = pd.read_csv(os.path.join(DATA_DIR, "anime_clean.csv"))
        ratings_train = pd.read_csv(os.path.join(DATA_DIR, "ratings_train.csv"))
        anime = anime.sample(self.sample_size, random_state=42).reset_index(drop=True)
        return anime, ratings_train
    def train_model(self, anime, max_features=1000, ngram_range=(1,1), min_df=2, use_type=True):
        """用 TF-IDF 訓練 item-based 模型,可以選擇是否加入 type 特徵"""
        if use_type:
            anime["features"] = anime["genre"].fillna("") + " " + anime["type"].fillna("")
        else:
            anime["features"] = anime["genre"].fillna("")
        vectorizer = TfidfVectorizer(
            stop_words="english",
            max_features=max_features,
            ngram_range=ngram_range,
            min_df=min_df
        )
        tfidf = vectorizer.fit_transform(anime["features"])
        sim_matrix = cosine_similarity(tfidf)
        return sim_matrix
    def predict(self, anime, sim_matrix, title, top_k=10):
        """統一推論格式"""
        if title not in anime["name"].values:
            return {"input": title, "recommendations": []}
        idx = anime[anime["name"] == title].index[0]
        sim_scores = list(enumerate(sim_matrix[idx]))
        sim_scores = sorted(sim_scores, key=lambda x: x[1], reverse=True)
        top_idx = [i for i, _ in sim_scores[1:top_k+1]]
        recs = anime.iloc[top_idx]["name"].tolist()
        return {"input": title, "recommendations": recs}
    def evaluate_and_log(self, anime, sim_matrix, params):
        """測試 Precision@10,並存推論範例到 MLflow artifacts"""
        def precision_at_k(recommended, relevant, k=10):
            return len(set(recommended[:k]) & set(relevant)) / k
        test_idx = np.random.choice(len(anime), 30, replace=False)
        scores = []
        examples = []
        for idx in test_idx[:5]:  # 只存 5 筆範例,避免 artifacts 太大
            sim_scores = list(enumerate(sim_matrix[idx]))
            sim_scores = sorted(sim_scores, key=lambda x: x[1], reverse=True)
            top_idx = [i for i, _ in sim_scores[1:11]]
            recommended = anime.iloc[top_idx]["name"].tolist()
            relevant = anime[anime["genre"] == anime.iloc[idx]["genre"]]["name"].tolist()
            if len(relevant) > 1:
                scores.append(precision_at_k(recommended, relevant, k=10))
            # 存成統一格式
            examples.append({
                "input": anime.iloc[idx]["name"],
                "recommendations": recommended
            })
        avg_precision = np.mean(scores)
        with mlflow.start_run(run_name="pipeline-v3") as run:
            mlflow.log_params(params)
            mlflow.log_metric("precision_at_10", avg_precision)
            result_path = "recommendations.json"
            with open(result_path, "w", encoding="utf-8") as f:
                json.dump(examples, f, ensure_ascii=False, indent=2)
            mlflow.log_artifact(result_path)
            # mlflow.log_dict(),直接把 dict 存成 JSON
            mlflow.log_dict({"examples": examples}, "recommendations_dict.json")
            print("Run ID:", run.info.run_id)
            print("Artifact URI:", run.info.artifact_uri)
        return avg_precision
📂 路徑:/usr/mlflow/day15_run_pipeline_v3.py
import mlflow
from src.pipeline.pipeline_v3 import AnimePipelineV3
mlflow.set_tracking_uri("http://mlflow:5000")
mlflow.set_experiment("anime-recsys-pipeline-v3")
def main():
    pipeline = AnimePipelineV3(sample_size=1000)
    anime, ratings_train = pipeline.load_data()
    params = {
        "max_features": 1000,
        "ngram_range": (1,1),
        "min_df": 2,
        "use_type": True
    }
    sim_matrix = pipeline.train_model(anime, **params)
    score = pipeline.evaluate_and_log(anime, sim_matrix, params)
    print(f"Pipeline V3 完成 ✅ Precision@10 = {score:.4f}")
if __name__ == "__main__":
    main()
mlflow.log_artifact(path).json、.csv、.png。mlflow.log_dict(dict_obj, artifact_file)這兩個 API 適合存「推薦範例」、「推論結果」、「測試輸出」。
在容器中執行:
python day15_run_pipeline_v3.py
執行結果:
Console 會印出 Run ID、Artifact URI、Precision@10。
MLflow UI → Experiments → anime-recsys-pipeline-v3 → 點進 run → Artifacts,你會看到:
recommendations.json
recommendations_dict.json


AnimePipelineV3
   │
   ├── load_data() → 抽樣資料
   ├── train_model() → TF-IDF 訓練
   ├── predict() → 統一輸出格式
   └── evaluate_and_log()
           ├── log_params / log_metric
           ├── mlflow.log_artifact() → recommendations.json
           └── mlflow.log_dict() → recommendations_dict.json
我們新增了 AnimePipelineV3,讓所有推薦輸出都有統一格式。
使用 MLflow 的 Artifacts 保存推論範例,方便比較不同 run 的輸出。
新增的 API:
mlflow.log_artifact() → 存本地檔案mlflow.log_dict() → 直接存 dict整個流程抽樣 1000 筆動畫 + 測試 30 筆,保證 3 分鐘內完成。
👉 下一步(Day 16),我們會嘗試 模型解釋 (SHAP/Feature Importance),並把解釋結果存到 MLflow。