iT邦幫忙

2023 iThome 鐵人賽

DAY 14
0
AI & Data

從零到英雄:用GCP建立AI交易體系系列 第 14

Day14 趕火車之上雲(中)

  • 分享至 

  • xImage
  •  

建立BigQuery表格

首先我們來到GCP的BigQuery頁面,點擊新增

點擊本機檔案

點擊空白資料表

建立設定大致長這樣

設定好之後就可以建立資料表,而後我們要按照一樣的方式建立另外一個表格如下。

程式抓取

from shioaji import TickSTKv1, Exchange, Shioaji, constant, BidAskSTKv1
from threading import Event
from datetime import datetime
from google.cloud import bigquery

def init_api():
    """初始化API"""
    return Shioaji(simulation=True)

def login(api, api_key, secret_key):
    """登入API"""
    return api.login(api_key=api_key, secret_key=secret_key)

def is_market_open():
    """檢查市場是否開放"""
    now = datetime.now()
    return 9 <= now.hour < 15

def place_order(api, stock_id, action, price, accounts):
    """下單"""
    order = api.Order(
        price=price,
        quantity=1,
        action=action,
        price_type='LMT',  # 限價單
        order_type='ROD',  # 僅今日有效
        order_cond='Cash', # 現股交易
        order_lot='Common', # 整股
        first_sell='false', 
        account=accounts[0]
    )
    api.place_order(
        contract=api.Contracts.Stocks[stock_id],
        order=order,
        timeout=5000
    )

def subscribe_stock(api, stock_id):
    """訂閱股票報價"""
    api.quote.subscribe(
        api.Contracts.Stocks[stock_id], 
        quote_type=constant.QuoteType.Tick,
        version=constant.QuoteVersion.v1
    )

def fetch_stock_data():
    """從BigQuery獲取股票資料"""
    client = bigquery.Client()

    # 指定 BigQuery 資料集和資料表
    dataset_id = 'trade-397602.ithelp'
    table_id = 'stock_rowdata'

    # 撰寫查詢
    query = f"""
        SELECT id, up_price, down_price
        FROM `{dataset_id}.{table_id}`
    """

    # 執行查詢
    query_job = client.query(query)

    # 獲取查詢結果
    results = query_job.result()

    # 初始化資料字典
    sell_price_data = {}

    # 將查詢結果轉換為字典格式
    for row in results:
        stock_id = row['id']
        up_price = row['up_price']
        down_price = row['down_price']
        sell_price_data[stock_id] = {'high': up_price, 'low': down_price}

    return sell_price_data

def quote_callback(exchange: Exchange, tick: TickSTKv1, sell_price_data):
    """報價回調函數"""
    print(f"Exchange: {exchange}, Tick: {tick}")
    if tick.code in sell_price_data:
        if tick.close >= sell_price_data[tick.code]['high']:
            place_order(api, tick.code, 'Sell', api.Contracts.Stocks[tick.code].limit_down, accounts)
        elif tick.close <= sell_price_data[tick.code]['low']:
            place_order(api, tick.code, 'Sell', api.Contracts.Stocks[tick.code].limit_down, accounts)

if __name__ == "__main__":
    # 獲取股票資料
    sell_price_data = fetch_stock_data()

    # 初始化 API
    api = init_api()
    accounts = login(api, "9EwVF2HtBmfeexfLBYUExWvFbaL6f1jenPfxESjSo4x5", "5BW5ZxGc3Vz186y5EzhCeg7b25qC3bD2kk823t9L7kyx")

    if not is_market_open():
        for stock_id in sell_price_data.keys():
            place_order(api, stock_id, 'Buy', api.Contracts.Stocks[stock_id].limit_up, accounts)  # 預掛開盤買進的委託單
            subscribe_stock(api, stock_id)  # 訂閱開盤買進的個股
    else:
        print("市場已經開盤。")
    
    for stock_id in sell_price_data.keys():
        subscribe_stock(api, stock_id)  # 訂閱盤中需要監控的個股

    api.on_tick_stk_v1(quote_callback, sell_price_data)  # 傳遞股價資料到回調函數
    Event().wait()  # 維持程式運作,等待交易資料送入
    api.logout()  # 登出API

新增以下這個函數用於抓取資料表的資料:

def fetch_stock_data():
    """從BigQuery獲取股票資料"""
    client = bigquery.Client()

    # 指定 BigQuery 資料集和資料表
    dataset_id = 'trade-397602.ithelp'
    table_id = 'stock_rowdata'

    # 撰寫查詢
    query = f"""
        SELECT id, up_price, down_price
        FROM `{dataset_id}.{table_id}`
    """

    # 執行查詢
    query_job = client.query(query)

    # 獲取查詢結果
    results = query_job.result()

    # 初始化資料字典
    sell_price_data = {}

    # 將查詢結果轉換為字典格式
    for row in results:
        stock_id = row['id']
        up_price = row['up_price']
        down_price = row['down_price']
        sell_price_data[stock_id] = {'high': up_price, 'low': down_price}

    return sell_price_data

並且改寫執行步驟:

if __name__ == "__main__":
    # 獲取股票資料
    sell_price_data = fetch_stock_data()

    # 初始化 API
    api = init_api()
    accounts = login(api, "", "")

    if not is_market_open():
        for stock_id in sell_price_data.keys():
            place_order(api, stock_id, 'Buy', api.Contracts.Stocks[stock_id].limit_up, accounts)  # 預掛開盤買進的委託單
            subscribe_stock(api, stock_id)  # 訂閱開盤買進的個股
    else:
        print("市場已經開盤。")
    
    for stock_id in sell_price_data.keys():
        subscribe_stock(api, stock_id)  # 訂閱盤中需要監控的個股

    api.on_tick_stk_v1(quote_callback, sell_price_data)  # 傳遞股價資料到回調函數
    Event().wait()  # 維持程式運作,等待交易資料送入
    api.logout()  # 登出API

今天先到這裡


上一篇
Day13 API上雲(上)
下一篇
Day 15 上雲(下)
系列文
從零到英雄:用GCP建立AI交易體系34
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言