底下會以 reproducible-machine-learning
以及這個 MLOps Workflow 來逐一說明。
底下為 Notebook,但透過 Python format 方便閱讀。
import mlflow
import mlflow.sklearn
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
import pyspark.sql.functions as F
from delta.tables import *
import numpy as np
# Read in Wine Data From DBFS
## Reading in data
path = '/databricks-datasets/wine-quality/winequality-white.csv'
wine_df = (spark.read
.option('header', 'true')
.option('inferSchema', 'true')
.option('sep', ';')
.csv(path))
wine_df_clean = wine_df.select([F.col(col).alias(col.replace(' ', '_')) for col in wine_df.columns])
display(wine_df_clean)
# Create a Directory
%fs mkdirs /tmp/reproducible_ml_blog
# Write out Data as Delta
## Write it out as a delta table
write_path = 'dbfs:/tmp/reproducible_ml_blog/wine_quality_white.delta'
wine_df_clean.write.format('delta').mode('overwrite').save(write_path)
# Add a New Row
## Insert a new row
new_row = spark.createDataFrame([[7, 0.27, 0.36, 1.6, 0.045, 45, 170, 1.001, 3, 0.45, 8.8, 6]])
wine_df_extra_row = wine_df_clean.union(new_row)
display(wine_df_extra_row)
# Overwrite Delta Table and Update Schema
## Write it out to delta location
wine_df_extra_row.write.format('delta').mode('overwrite').option('overwriteSchema', 'true').save(write_path)
# View Delta Table History
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, write_path)
fullHistoryDF = deltaTable.history() # get the full history of the table to pick the version
display(fullHistoryDF)
# Specify Data Version For Training
## Specifying data version to use for model training
version = 1
wine_df_delta = spark.read.format('delta').option('versionAsOf', version).load(write_path).toPandas()
display(wine_df_delta)
# Split Data
## Split the data into training and test sets. (0.75, 0.25) split.
seed = 1111
train, test = train_test_split(wine_df_delta, train_size=0.75, random_state=seed)
## The target column is "quality" which is a scalar from [3, 9]
X_train = train.drop(['quality'], axis=1)
X_test = test.drop(['quality'], axis=1)
y_train = train[['quality']]
y_test = test[['quality']]
# Build Model with MLflow
with mlflow.start_run() as run:
## Log your params
n_estimators = 10000
max_features = 'sqrt'
params = {'data_version': version,
'n_estimators': n_estimators,
'max_features': max_features}
mlflow.log_params(params)
## Train the model
rf = RandomForestRegressor(n_estimators=n_estimators, max_features=max_features, random_state=seed)
rf.fit(X_train, y_train)
## Predict on the test data
preds = rf.predict(X_test)
## Generate metrics
rmse = np.sqrt(mean_squared_error(y_test, preds))
mae = mean_absolute_error(y_test, preds)
r2 = r2_score(y_test, preds)
metrics = {'rmse': rmse,
'mae': mae,
'r2' : r2}
## Log Metrics
mlflow.log_metrics(metrics)
## Log the model
mlflow.sklearn.log_model(rf, 'model')
有關 Commit code
,可以參考前幾篇提到的 Databricks Repos
。
Reference: