今天就簡單把BigQuery加入程式碼,我們就可以功成身退了~~~
import joblib
import pandas as pd
from itertools import product
import talib as ta
from google.cloud import bigquery
import pandas as pd
pd.set_option('display.max_rows', None)
# 函數:計算Stochastic Oscillator指標
def calculate_stochastic_oscillator(dataframe, fastk_period, slowk_period, slowd_period):
"""
計算Stochastic Oscillator指標。
Args:
dataframe (pd.DataFrame): 包含股票數據的DataFrame。
fastk_period (int): 快速K期數。
slowk_period (int): 慢速K期數。
slowd_period (int): 慢速D期數。
Returns:
pd.Series: 包含計算結果的DataFrame。
"""
slowk, slowd = ta.STOCH(dataframe['high'], dataframe['low'], dataframe['close'],
fastk_period=fastk_period,
slowk_period=slowk_period,
slowk_matype=0,
slowd_period=slowd_period)
return slowk, slowd
# 函數:生成技術指標特徵
def generate_technical_indicators(dataframe):
"""
生成技術指標特徵。
Args:
dataframe (pd.DataFrame): 包含股票數據的DataFrame。
Returns:
pd.DataFrame: 包含生成特徵的DataFrame。
"""
feature_dict = {}
param_values = [5, 6, 7, 8, 9, 10]
param_combinations = product(param_values, [3, 4, 5], [3, 4, 5])
for fastk_period, slowk_period, slowd_period in param_combinations:
slowk, slowd = calculate_stochastic_oscillator(dataframe, fastk_period, slowk_period, slowd_period)
slowk_name = f"slowk_{fastk_period}_{slowk_period}_{slowd_period}"
slowd_name = f"slowd_{fastk_period}_{slowk_period}_{slowd_period}"
feature_dict[slowk_name] = slowk
feature_dict[slowd_name] = slowd
feature_df = pd.DataFrame(feature_dict)
return feature_df
# 函數:創建lag特徵
def create_lag_features(dataframe, columns):
"""
創建lag特徵。
Args:
dataframe (pd.DataFrame): 包含股票數據的DataFrame。
columns (list): 需要創建lag特徵的列名列表。
Returns:
pd.DataFrame: 包含創建lag特徵的DataFrame。
"""
lagged_columns = [] # 存儲創建的lag特徵列
for col in columns:
for i in range(1, 6): # 創建1到5天的lag
lagged_col_name = f'{col}_lag_{i}'
lagged_columns.append(dataframe[col].shift(i).rename(lagged_col_name))
# 使用pd.concat連接所有lag特徵列
dataframe = pd.concat([dataframe] + lagged_columns, axis=1)
return dataframe.dropna()
# 函數:進行股票預測
def predict_stocks(dataframe, model):
"""
進行股票預測。
Args:
dataframe (pd.DataFrame): 包含股票數據的DataFrame。
model: 預訓練的機器學習模型。
Returns:
dict: 包含股票預測結果的字典。
"""
predictions = {}
for index, row in dataframe.iterrows():
ticker = row['id'] # 使用BigQuery資料表中的股票代號欄位
features = row[features_to_use].values.reshape(1, -1)
predicted_label = model.predict(features)[0]
predictions[ticker] = predicted_label
print(row[features_to_use])
break
return predictions
# 主程序
if __name__ == "__main__":
# 初始化BigQuery客戶端
client = bigquery.Client()
# 查詢BigQuery資料表,替換成你的專案ID
project_id = "your-project-id"
table_id = "trade-397602.ithelp.stock_index"
# 查詢SQL
query = f"""
SELECT * FROM `{project_id}.{table_id}`
"""
# 執行查詢
df = client.query(query).to_dataframe()
# 生成技術指標
new_features_df = generate_technical_indicators(df)
# 合併原始DataFrame和新特徵
df_combined = pd.concat([df, new_features_df], axis=1)
# 定義用於創建lag特徵的列
columns_to_use = ['slowk_5_3_3', 'slowd_5_3_3', 'slowd_5_3_4', 'slowd_5_3_5', 'slowk_5_4_3',
'slowd_5_4_4', 'slowd_5_4_5', 'slowk_5_5_3', 'slowd_5_5_5', 'slowk_6_3_3',
'slowd_6_3_3', 'slowd_6_3_4', 'slowd_6_3_5', 'slowk_6_4_3', 'slowd_6_4_4',
'slowd_6_4_5', 'slowk_6_5_3', 'slowd_6_5_5', 'slowk_7_3_3', 'slowd_7_3_3',
'slowd_7_3_4', 'slowd_7_3_5', 'slowk_7_4_3', 'slowd_7_4_4', 'slowd_7_4_5',
'slowk_7_5_3', 'slowd_7_5_5', 'slowk_8_3_3', 'slowd_8_3_3', 'slowd_8_3_4',
'slowk_8_4_3', 'slowd_8_4_5', 'slowk_8_5_3', 'slowd_8_5_5', 'slowk_9_3_3',
'slowd_9_3_3', 'slowk_9_4_3', 'slowk_9_5_3', 'slowd_9_5_5', 'slowk_10_3_3',
'slowd_10_3_3', 'slowd_10_3_4', 'slowd_10_3_5', 'slowk_10_4_3', 'slowd_10_4_4',
'slowd_10_4_5', 'slowk_10_5_3', 'slowd_10_5_5']
features_to_use = [f'{col}_lag_{i}' for col in columns_to_use for i in range(1, 6)]
# 創建lag特徵
df_combined = create_lag_features(df_combined, columns_to_use)
# 讀取預先訓練的Random Forest模型
loaded_model = joblib.load('random_forest_model.pkl')
# 獲取最新日期
latest_date = df_combined['date'].max()
# 篩選出最新日期的所有股票數據
latest_data_df = df_combined[df_combined['date'] == latest_date]
# 進行股票預測
predictions = predict_stocks(latest_data_df, loaded_model)
# 輸出結果
print("最新一天各個股票的預測標籤:")
for ticker, label in predictions.items():
print(f"股票代碼: {ticker}, 預測標籤(label): {label}")
我們將輸入改成使用BigQuery。
首先建立BigQuery客戶端
# 初始化BigQuery客戶端
client = bigquery.Client()
建立查詢,將查詢的資料轉成DataFrame。
# 查詢BigQuery資料表,替換成你的專案ID
project_id = "your-project-id"
table_id = "trade-397602.ithelp.stock_index"
# 查詢SQL
query = f"""
SELECT * FROM `{project_id}.{table_id}`
"""
執行他。
# 執行查詢
df = client.query(query).to_dataframe()
至此我們已經完成所有的上雲交易任務,接下來會開始優化各部分的程式碼以及數據處理。
水了幾天還是要開始面對了QAQ