首先我們來到GCP的BigQuery頁面,點擊新增
點擊本機檔案
點擊空白資料表
建立設定大致長這樣
設定好之後就可以建立資料表,而後我們要按照一樣的方式建立另外一個表格如下。
from shioaji import TickSTKv1, Exchange, Shioaji, constant, BidAskSTKv1
from threading import Event
from datetime import datetime
from google.cloud import bigquery
def init_api():
"""初始化API"""
return Shioaji(simulation=True)
def login(api, api_key, secret_key):
"""登入API"""
return api.login(api_key=api_key, secret_key=secret_key)
def is_market_open():
"""檢查市場是否開放"""
now = datetime.now()
return 9 <= now.hour < 15
def place_order(api, stock_id, action, price, accounts):
"""下單"""
order = api.Order(
price=price,
quantity=1,
action=action,
price_type='LMT', # 限價單
order_type='ROD', # 僅今日有效
order_cond='Cash', # 現股交易
order_lot='Common', # 整股
first_sell='false',
account=accounts[0]
)
api.place_order(
contract=api.Contracts.Stocks[stock_id],
order=order,
timeout=5000
)
def subscribe_stock(api, stock_id):
"""訂閱股票報價"""
api.quote.subscribe(
api.Contracts.Stocks[stock_id],
quote_type=constant.QuoteType.Tick,
version=constant.QuoteVersion.v1
)
def fetch_stock_data():
"""從BigQuery獲取股票資料"""
client = bigquery.Client()
# 指定 BigQuery 資料集和資料表
dataset_id = 'trade-397602.ithelp'
table_id = 'stock_rowdata'
# 撰寫查詢
query = f"""
SELECT id, up_price, down_price
FROM `{dataset_id}.{table_id}`
"""
# 執行查詢
query_job = client.query(query)
# 獲取查詢結果
results = query_job.result()
# 初始化資料字典
sell_price_data = {}
# 將查詢結果轉換為字典格式
for row in results:
stock_id = row['id']
up_price = row['up_price']
down_price = row['down_price']
sell_price_data[stock_id] = {'high': up_price, 'low': down_price}
return sell_price_data
def quote_callback(exchange: Exchange, tick: TickSTKv1, sell_price_data):
"""報價回調函數"""
print(f"Exchange: {exchange}, Tick: {tick}")
if tick.code in sell_price_data:
if tick.close >= sell_price_data[tick.code]['high']:
place_order(api, tick.code, 'Sell', api.Contracts.Stocks[tick.code].limit_down, accounts)
elif tick.close <= sell_price_data[tick.code]['low']:
place_order(api, tick.code, 'Sell', api.Contracts.Stocks[tick.code].limit_down, accounts)
if __name__ == "__main__":
# 獲取股票資料
sell_price_data = fetch_stock_data()
# 初始化 API
api = init_api()
accounts = login(api, "9EwVF2HtBmfeexfLBYUExWvFbaL6f1jenPfxESjSo4x5", "5BW5ZxGc3Vz186y5EzhCeg7b25qC3bD2kk823t9L7kyx")
if not is_market_open():
for stock_id in sell_price_data.keys():
place_order(api, stock_id, 'Buy', api.Contracts.Stocks[stock_id].limit_up, accounts) # 預掛開盤買進的委託單
subscribe_stock(api, stock_id) # 訂閱開盤買進的個股
else:
print("市場已經開盤。")
for stock_id in sell_price_data.keys():
subscribe_stock(api, stock_id) # 訂閱盤中需要監控的個股
api.on_tick_stk_v1(quote_callback, sell_price_data) # 傳遞股價資料到回調函數
Event().wait() # 維持程式運作,等待交易資料送入
api.logout() # 登出API
新增以下這個函數用於抓取資料表的資料:
def fetch_stock_data():
"""從BigQuery獲取股票資料"""
client = bigquery.Client()
# 指定 BigQuery 資料集和資料表
dataset_id = 'trade-397602.ithelp'
table_id = 'stock_rowdata'
# 撰寫查詢
query = f"""
SELECT id, up_price, down_price
FROM `{dataset_id}.{table_id}`
"""
# 執行查詢
query_job = client.query(query)
# 獲取查詢結果
results = query_job.result()
# 初始化資料字典
sell_price_data = {}
# 將查詢結果轉換為字典格式
for row in results:
stock_id = row['id']
up_price = row['up_price']
down_price = row['down_price']
sell_price_data[stock_id] = {'high': up_price, 'low': down_price}
return sell_price_data
並且改寫執行步驟:
if __name__ == "__main__":
# 獲取股票資料
sell_price_data = fetch_stock_data()
# 初始化 API
api = init_api()
accounts = login(api, "", "")
if not is_market_open():
for stock_id in sell_price_data.keys():
place_order(api, stock_id, 'Buy', api.Contracts.Stocks[stock_id].limit_up, accounts) # 預掛開盤買進的委託單
subscribe_stock(api, stock_id) # 訂閱開盤買進的個股
else:
print("市場已經開盤。")
for stock_id in sell_price_data.keys():
subscribe_stock(api, stock_id) # 訂閱盤中需要監控的個股
api.on_tick_stk_v1(quote_callback, sell_price_data) # 傳遞股價資料到回調函數
Event().wait() # 維持程式運作,等待交易資料送入
api.logout() # 登出API
今天先到這裡