今天我們要來撰寫上雲的最後一步:使用模型完成預測
導入需要的庫
import joblib
import pandas as pd
from itertools import product
import talib as ta
我們先做一個函式將資料按照開高低收製作出KD指標。
# 函數:計算Stochastic Oscillator指標
def calculate_stoch_by_talib(dataframe, fastk_period, slowk_period, slowd_period):
slowk, slowd = ta.STOCH(dataframe['high'], dataframe['low'], dataframe['close'],
fastk_period=fastk_period,
slowk_period=slowk_period,
slowk_matype=0,
slowd_period=slowd_period)
return slowk, slowd
將KD指標的函式包入生成技術指標的函式中,這個函式可以做一件事:
# 函數:生成技術指標特徵
def generate_technical_indicators(dataframe):
feature_dict = {}
param_values = [5, 6, 7, 8, 9, 10]
param_combinations = product(param_values, [3, 4, 5], [3, 4, 5])
for fastk_period, slowk_period, slowd_period in param_combinations:
slowk, slowd = calculate_stoch_by_talib(dataframe, fastk_period, slowk_period, slowd_period)
slowk_name = f"slowk_{fastk_period}_{slowk_period}_{slowd_period}"
slowd_name = f"slowd_{fastk_period}_{slowk_period}_{slowd_period}"
feature_dict[slowk_name] = slowk
feature_dict[slowd_name] = slowd
feature_df = pd.DataFrame(feature_dict)
return feature_df
做一個函式將指標遞延幾天,可以在當天的資料中看到前一天、前兩天的資料
# 函數:創建lag特徵
def create_lag_features(dataframe, columns):
for col in columns:
for i in range(1, 6): # 創建1到5天的lag
dataframe[f'{col}_lag_{i}'] = dataframe[col].shift(i)
return dataframe.dropna()
導入資料並且開始預測。
# 讀取已存在的label資料
df = pd.read_csv('label_data.csv')
# 生成技術指標
new_features_df = generate_technical_indicators(df)
# 合併原始DataFrame和新特徵
df_combined = pd.concat([df, new_features_df], axis=1)
# 定義用於創建lag特徵的列
columns_to_use = ['slowk_5_3_3', 'slowd_5_3_3', 'slowd_5_3_4', 'slowd_5_3_5', 'slowk_5_4_3',
'slowd_5_4_4', 'slowd_5_4_5', 'slowk_5_5_3', 'slowd_5_5_5', 'slowk_6_3_3',
'slowd_6_3_3', 'slowd_6_3_4', 'slowd_6_3_5', 'slowk_6_4_3', 'slowd_6_4_4',
'slowd_6_4_5', 'slowk_6_5_3', 'slowd_6_5_5', 'slowk_7_3_3', 'slowd_7_3_3',
'slowd_7_3_4', 'slowd_7_3_5', 'slowk_7_4_3', 'slowd_7_4_4', 'slowd_7_4_5',
'slowk_7_5_3', 'slowd_7_5_5', 'slowk_8_3_3', 'slowd_8_3_3', 'slowd_8_3_4',
'slowk_8_4_3', 'slowd_8_4_5', 'slowk_8_5_3', 'slowd_8_5_5', 'slowk_9_3_3',
'slowd_9_3_3', 'slowk_9_4_3', 'slowk_9_5_3', 'slowd_9_5_5', 'slowk_10_3_3',
'slowd_10_3_3', 'slowd_10_3_4', 'slowd_10_3_5', 'slowk_10_4_3', 'slowd_10_4_4',
'slowd_10_4_5', 'slowk_10_5_3', 'slowd_10_5_5']
features_to_use = [f'{col}_lag_{i}' for col in columns_to_use for i in range(1, 6)]
# 創建lag特徵
df_combined = create_lag_features(df_combined, columns_to_use)
# 讀取預先訓練的Random Forest模型
loaded_model = joblib.load('random_forest_model.pkl')
# 獲取最新日期
latest_date = df_combined['日期'].max()
# 篩選出最新日期的所有股票數據
latest_data_df = df_combined[df_combined['日期'] == latest_date]
# 初始化一個字典來存儲預測結果
predictions = {}
# 對每一條最新的股票數據進行預測
for index, row in latest_data_df.iterrows():
ticker = row['股票代號']
features = row[features_to_use].values.reshape(1, -1)
predicted_label = loaded_model.predict(features)[0]
predictions[ticker] = predicted_label
# 輸出結果
print("最新一天各個股票的預測標籤:")
for ticker, label in predictions.items():
print(f"股票代碼: {ticker}, 預測標籤(label): {label}")
明天我們修改此程式碼將其結果上傳至BigQuery完成儲存。這樣整個上雲過程就結束了。