iT邦幫忙

2023 iThome 鐵人賽

DAY 28
0
AI & Data

利用 Databricks 學習 ML/LLM 開發系列 第 28

Day28 - Databricks Feature Store

  • 分享至 

  • xImage
  •  

How Databricks Feature Store works

Feature Store 的機器學習工作流程如下:

  1. 撰寫程式將原始資料轉換成特徵,並建立包含所需特徵的 Spark DataFrame。

  2. 將 DataFrame 寫入 Unity Catalog 中的 feature table。如果您的工作區未啟用 Unity Catalog,則將 DataFrame 寫入 Workspace Feature Store 中的 feature table。

  3. 使用 feature store 的特徵訓練模型。這樣做時,模型會儲存用於訓練的特徵規格。當模型用於推論時,它會自動連接來自適當 feature table 的特徵。

  4. 註冊模型到 Model Registry。

  • Batch 情境下,模型會自動從 Feature Store 取得所需的特徵。

https://ithelp.ithome.com.tw/upload/images/20231012/20091643pTeXJdfmqo.png

  • Real-time 情境下,需要將 features 發布到 oneline store
  • 在 inference time 時,模型會從 online store 讀取預先計算的特徵,並與 client request 的資料與 model serving endpoint 連結。

https://ithelp.ithome.com.tw/upload/images/20231012/20091643T9DbGzSxqD.png

基礎範例

參考自這份文件,主要是示範透過 Databricks Unity Catalog 作 Feature Engineering,並將特徵寫入 Feature Store。

  • 環境準備
import pandas as pd
 
from pyspark.sql.functions import monotonically_increasing_id, expr, rand
import uuid
 
from databricks import feature_store
from databricks.feature_store import feature_table, FeatureLookup
 
import mlflow
import mlflow.sklearn
 
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score
  • Load data
raw_data = spark.read.load("/databricks-datasets/wine-quality/winequality-red.csv",format="csv",sep=";",inferSchema="true",header="true" )
 
def addIdColumn(dataframe, id_column_name):
    """Add id column to dataframe"""
    columns = dataframe.columns
    new_df = dataframe.withColumn(id_column_name, monotonically_increasing_id())
    return new_df[[id_column_name] + columns]
 
def renameColumns(df):
    """Rename columns to be compatible with Feature Store"""
    renamed_df = df
    for column in df.columns:
        renamed_df = renamed_df.withColumnRenamed(column, column.replace(' ', '_'))
    return renamed_df
 
# Run functions
renamed_df = renameColumns(raw_data)
df = addIdColumn(renamed_df, 'wine_id')
 
# Drop target column ('quality') as it is not included in the feature table
features_df = df.drop('quality')
display(features_df)
  • 建立或是用現有的 catalog
# Create a new catalog with:
# spark.sql("CREATE CATALOG IF NOT EXISTS ml")
# spark.sql("USE CATALOG ml")
 
# Or reuse existing catalog:
spark.sql("USE CATALOG ml")
  • 建立 new schema
spark.sql("CREATE SCHEMA IF NOT EXISTS wine_db")
spark.sql("USE SCHEMA wine_db")
 
# Create a unique table name for each run. This prevents errors if you run the notebook multiple times.
table_name = f"ml.wine_db.wine_db_" + str(uuid.uuid4())[:6]
print(table_name)
  • 透過 FeatureStoreClient 建立 feature table
fs = feature_store.FeatureStoreClient()
 
# You can get help in the notebook for feature store API functions:
# help(fs.<function_name>)
 
# For example:
# help(fs.create_table)

fs.create_table(
    name=table_name,
    primary_keys=["wine_id"],
    df=features_df,
    schema=features_df.schema,
    description="wine features"
)
  • 此外,FeatureStoreClient 也支援 overwrite, merge 等功能
fs.create_table(
    name=table_name,
    primary_keys=["wine_id"],
    schema=features_df.schema,
    description="wine features"
)

fs.write_table(
    name=table_name,
    df=features_df,
    mode="overwrite"
)
  • 用 feature store 訓練模型

## inference_data_df includes wine_id (primary key), quality (prediction target), and a real time feature
inference_data_df = df.select("wine_id", "quality", (10 * rand()).alias("real_time_measurement"))
display(inference_data_df)

def load_data(table_name, lookup_key):
    # In the FeatureLookup, if you do not provide the `feature_names` parameter, all features except primary keys are returned
    model_feature_lookups = [FeatureLookup(table_name=table_name, lookup_key=lookup_key)]
 
    # fs.create_training_set looks up features in model_feature_lookups that match the primary key from inference_data_df
    training_set = fs.create_training_set(inference_data_df, model_feature_lookups, label="quality", exclude_columns="wine_id")
    training_pd = training_set.load_df().toPandas()
 
    # Create train and test datasets
    X = training_pd.drop("quality", axis=1)
    y = training_pd["quality"]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    return X_train, X_test, y_train, y_test, training_set
 
# Create the train and test datasets
X_train, X_test, y_train, y_test, training_set = load_data(table_name, "wine_id")
X_train.head()
from mlflow.tracking.client import MlflowClient
 
client = MlflowClient()
 
try:
    client.delete_registered_model("wine_model") # Delete the model if already created
except:
    None

經典的 Sklearn 模型訓練,不多解釋了。

# Disable MLflow autologging and instead log the model using Feature Store
mlflow.sklearn.autolog(log_models=False)
 
def train_model(X_train, X_test, y_train, y_test, training_set, fs):
    ## fit and log model
    with mlflow.start_run() as run:
 
        rf = RandomForestRegressor(max_depth=3, n_estimators=20, random_state=42)
        rf.fit(X_train, y_train)
        y_pred = rf.predict(X_test)
 
        mlflow.log_metric("test_mse", mean_squared_error(y_test, y_pred))
        mlflow.log_metric("test_r2_score", r2_score(y_test, y_pred))
 
        fs.log_model(
            model=rf,
            artifact_path="wine_quality_prediction",
            flavor=mlflow.sklearn,
            training_set=training_set,
            registered_model_name="wine_model",
        )
 
train_model(X_train, X_test, y_train, y_test, training_set, fs)
  • 有提到會用 Batch scoring 方式,這邊用 score_batch 來 apply 已經 packaged feature store model。然後透過指定 primary key wine_id ,其他 feature 會自動從 feature store 取得。
## For simplicity, this example uses inference_data_df as input data for prediction
batch_input_df = inference_data_df.drop("quality") # Drop the label column
 
predictions_df = fs.score_batch("models:/wine_model/latest", batch_input_df)
                                  
display(predictions_df["wine_id", "prediction"])
  • Modify feature table
## Modify the dataframe containing the features
so2_cols = ["free_sulfur_dioxide", "total_sulfur_dioxide"]
new_features_df = (features_df.withColumn("average_so2", expr("+".join(so2_cols)) / 2))
 
display(new_features_df)

透過 mode="merge" 來更新 feature table

fs.write_table(
    name=table_name,
    df=new_features_df,
    mode="merge"
)
  • Read feature table
# Displays most recent version of the feature table
# Note that features that were deleted in the current version still appear in the table but with value = null.
display(fs.read_table(name=table_name))

使用更新過的 feature table 重新訓練模型


def load_data(table_name, lookup_key):
    model_feature_lookups = [FeatureLookup(table_name=table_name, lookup_key=lookup_key)]
    
    # fs.create_training_set will look up features in model_feature_lookups with matched key from inference_data_df
    training_set = fs.create_training_set(inference_data_df, model_feature_lookups, label="quality", exclude_columns="wine_id")
    training_pd = training_set.load_df().toPandas()
 
    # Create train and test datasets
    X = training_pd.drop("quality", axis=1)
    y = training_pd["quality"]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    return X_train, X_test, y_train, y_test, training_set
 
X_train, X_test, y_train, y_test, training_set = load_data(table_name, "wine_id")
X_train.head()

透過 indicated key 來找到 feature table 中的資料,並轉換成 training dataset。

def train_model(X_train, X_test, y_train, y_test, training_set, fs):
    ## fit and log model
    with mlflow.start_run() as run:
 
        rf = RandomForestRegressor(max_depth=3, n_estimators=20, random_state=42)
        rf.fit(X_train, y_train)
        y_pred = rf.predict(X_test)
 
        mlflow.log_metric("test_mse", mean_squared_error(y_test, y_pred))
        mlflow.log_metric("test_r2_score", r2_score(y_test, y_pred))
 
        fs.log_model(
            model=rf,
            artifact_path="feature-store-model",
            flavor=mlflow.sklearn,
            training_set=training_set,
            registered_model_name="wine_model",
        )
 
train_model(X_train, X_test, y_train, y_test, training_set, fs)

透過 score_batch 來 apply 已經 packaged feature store model。然後透過指定 primary key wine_id ,其他 feature 會自動從 feature store 取得。

## For simplicity, this example uses inference_data_df as input data for prediction
batch_input_df = inference_data_df.drop("quality") # Drop the label column
predictions_df = fs.score_batch(f"models:/wine_model/latest", batch_input_df)
display(predictions_df["wine_id","prediction"])

Reference:


上一篇
Day27 - 如何使用 Databricks 做 LLMOps - Part 3
下一篇
Day29 - Ray on Databricks
系列文
利用 Databricks 學習 ML/LLM 開發30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言