iT邦幫忙

2021 iThome 鐵人賽

DAY 30
0
永豐金融APIs

深入解析 Shioaji API系列 第 30

Day 30 - 每日產生觀察名單

本篇重點

  • 每日下載及更新Kbar資料
  • 每日產生觀察名單
  • 結論

每日下載及更新Kbar資料

每天下載及更新K線資料,實際上做起來不比之前的程式簡單,因為還要考慮已存在DB中的資料不抓,以免資料重覆導致計算錯誤。另外要說明,這裡INSERT資料到資料庫的動作,一樣是用pandas的to_sql,因為之前在建立Table時已經有產生index,所以在這裡必須傳入index=False這個參數,如果沒有這個參數,會因為DB中已經存在index而產生錯誤。

下載及更新1分K資料

首先說明每天下載1分K資料並儲存至DB中,請注意此動作請在收盤後再做,因為在盤中是抓得到當天的kbar資料,下列的程式並沒有考慮收盤前抓資料的情況。
程式範例如下:

import os, datetime
import shioaji as sj
from shioaji.constant import Exchange
from dotenv import load_dotenv
import pandas as pd
import re #匯入re模組
import sqlite3

update_date = None

load_dotenv('D:/python/shioaji/.env')
conn = sqlite3.connect('C:/shioaji.db')

api = sj.Shioaji()
api.login(
    person_id=os.getenv('YOUR_PERSON_ID'), 
    passwd=os.getenv('YOUR_PASSWORD')
)
print(api.Contracts.Stocks) #等待Contract.Stocks Fetch完成
# 從Contracts中,抓到目前最新的update_date日期
def get_update_date():
    global update_date
    for exchange in api.Contracts.Stocks:
        for stock in exchange:
            if stock.exchange in (Exchange.TSE, Exchange.OTC):   
                if stock.category == '00' or stock.category == '' or stock.update_date == '':
                    pass
                else:
                    if update_date is None:
                        update_date = datetime.datetime.strptime(stock.update_date, '%Y/%m/%d')
                    elif datetime.datetime.strptime(stock.update_date, '%Y/%m/%d') > update_date:
                        update_date = datetime.datetime.strptime(stock.update_date, '%Y/%m/%d')

#傳入股票代碼,抓kbar資料並存至DB中
def update_stock_kbar(stock_code):
    #預設start_date為2018-12-07
    start_date = datetime.datetime.strptime('2018-12-07', '%Y-%m-%d')
    cursor = conn.cursor()
    # 先從1分K的Table中,抓該檔股票最後的ts
    cursor.execute('SELECT ts FROM stocks_1min_kbars WHERE Code = ? ORDER BY ts DESC LIMIT 1', (stock_code, ))
    query_result = cursor.fetchone()
    if query_result is not None:
        start_date = datetime.datetime.strptime(query_result[0], '%Y-%m-%d %H:%M:%S')
        start_date = start_date.date()+datetime.timedelta(days=1)
    kbars = api.kbars(stock, start=start_date.strftime("%Y-%m-%d"), 
        end=update_date.strftime("%Y-%m-%d"))
    df = pd.DataFrame({**kbars})
    df.ts = pd.to_datetime(df.ts)
    df['Code'] = stock.code
    df.to_sql('stocks_1min_kbars', conn, if_exists='append', index=False)

get_update_date() #先執行一次get_update_date,取得最近一次的update_date

for exchange in api.Contracts.Stocks:
    for stock in exchange:
        if stock.exchange in (Exchange.TSE, Exchange.OTC):   
            if stock.category == '00' or stock.category == '':
                pass
            elif stock.update_date != update_date.strftime("%Y/%m/%d"):
                pass
            elif re.search('[A-Z]', stock.code) is None:
                print(f'start download {stock.code} {stock.name} kbar data...')
                update_stock_kbar(stock_code=stock.code)
                print(f'{stock.code} {stock.name} kbar data is stored to sqlite')

api.logout()

更新日K資料

抓完1分K,接著是就從1分K的資料產生出日K資料。這裡要注意的是在讀取1分K資料時,有先將dataFrame的index設為ts欄位,這樣才可以把資料resample為日K資料,所以在執行to_sql前,要再記得執行reset_index重設index,這樣to_sql後ts欄位資料才會正常寫入資料庫中。
程式範例如下:

import pandas as pd
import sqlite3
import datetime

conn = sqlite3.connect('C:/shioaji.db')
cursor = conn.cursor() #產生cursor物件

# 傳入股票代碼,產生日K資料並存至資料庫
def update_day_kbar(stock_code):
    print(f'update day kbars for stock:{stock_code}')
    start_date = datetime.datetime.strptime('2018-12-07', '%Y-%m-%d') #預設start_date為2018-12-07
    # 從day_kbars中,找該檔股票最後的ts
    cursor.execute('SELECT ts FROM stocks_day_kbars WHERE Code = ? ORDER BY ts DESC LIMIT 1', (stock_code, ))
    query_result = cursor.fetchone()
    if query_result is not None:
        start_date = datetime.datetime.strptime(query_result[0], '%Y-%m-%d %H:%M:%S')
        start_date = start_date.date()+datetime.timedelta(days=1)
    df = pd.read_sql('''SELECT Code, Open, High, Low, Close, Volume, ts 
                FROM stocks_1min_kbars WHERE Code = ? AND ts > ?''', 
                conn, parse_dates=['ts'], index_col=['ts'], params=(stock_code, start_date)) 
    df_day_kbar = df.resample(rule='1D').agg({
        'Code': 'first',
        'Open': 'first', 
        'High': 'max', 
        'Low': 'min', 
        'Close': 'last',
        'Volume': 'sum'})
    df_day_kbar.dropna(axis=0, inplace=True)
    df_day_kbar.reset_index(inplace=True) #重設index
    df_day_kbar.to_sql('stocks_day_kbars', conn, if_exists='append', index=False)
    print(f'stock:{stock_code}, day kbars is update to DB...')

codes = cursor.execute('SELECT DISTINCT code FROM stocks_1min_kbars').fetchall()
for code in codes:
    update_day_kbar(code[0])

conn.close() #關閉資料庫連線

重新計算均線相關資料

產生完日K資料後,要再重新計算一次均線相關資料。原本是想要寫在上面的產生日K資料的程式裡,但發現程式會變得太複雜,所以索性把這部份拆成另一個程式。
程式範例如下:

import pandas as pd
import sqlite3
from statistics import mean

conn = sqlite3.connect('C:/shioaji.db') #建立資料庫連線
cursor = conn.cursor() #產生cursor物件

# 更新均線相關資料
def update_MA_data(stock_code):
    cursor.execute('''SELECT ts FROM stocks_day_kbars 
                    WHERE Code = ? AND MA5 IS NOT NULL 
                    ORDER BY ts DESC LIMIT 1''', (stock_code, ))
    query_results = cursor.fetchone()
    if query_results is not None:
        start_ts = query_results[0]
        # 計算MA5
        cursor.execute('''SELECT ts FROM stocks_day_kbars 
                        WHERE Code = ? AND ts > ? AND MA5 IS NULL''', (stock_code, start_ts))
        for data in cursor.fetchall():
            cursor.execute('''SELECT ts, Close, MA5 
                            FROM stocks_day_kbars 
                            WHERE Code = ? AND ts <= ? 
                            ORDER BY ts DESC LIMIT 5''', (stock_code, data[0]))
            close_data = [] #收盤價list
            last_ma5 = None
            for ma_data in cursor.fetchall():
                close_data.append(ma_data[1])
                if last_ma5 is None:
                    last_ma5 = ma_data[2]
            ma5 = round(mean(close_data), 2) #用statistics.mean計算均價,並取至小數點後第二位
            ma5_diff = round(ma5-last_ma5, 2) #計算MA5的差異數,並取至小數點後第二位
            cursor.execute('''UPDATE stocks_day_kbars SET MA5 = ?, MA5_diff = ? 
                            WHERE Code = ? AND ts = ?''', (ma5, ma5_diff, stock_code, data[0]))
            conn.commit() #執行上面的update sql
        #計算MA20
        cursor.execute('''SELECT ts FROM stocks_day_kbars
                        WHERE Code = ? AND ts > ? AND MA20 IS NULL''', (stock_code, start_ts))
        for data in cursor.fetchall():
            cursor.execute('''SELECT ts, Close, MA20 FROM stocks_day_kbars 
                WHERE Code = ? AND ts <= ? ORDER BY ts DESC LIMIT 20''', (stock_code, data[0]))
            close_data = []
            last_ma20 = None
            for ma_data in cursor.fetchall():
                close_data.append(ma_data[1])
                if last_ma20 is None:
                    last_ma20 = ma_data[2]
            ma20 = round(mean(close_data), 2)
            ma20_diff = round(ma20-last_ma20, 2)
            cursor.execute('''UPDATE stocks_day_kbars SET MA20 = ?, MA20_diff = ? 
                        WHERE Code = ? AND ts = ?''', (ma20, ma20_diff, stock_code, data[0]))
            conn.commit() #執行上面的update sql

codes = cursor.execute('SELECT DISTINCT code FROM stocks_1min_kbars').fetchall()
for code in codes:
    print(f'update {code[0]} MA data...')
    update_MA_data(code[0])

conn.close() #關閉資料庫連線

每日產生觀察名單

最後,就是從日K資料中,產生觀察名單。第一步我會先用DB Browser for SQLite去撰寫SQL並看取出來的資料是不是我要的。撰寫完後,按下上方的執行按鈕,若SQL語法沒有問題,下方就會跑出結果。
https://ithelp.ithome.com.tw/upload/images/20211014/20140827IdZAuxykWf.png
有了初步的資料後,再使用python讓產生出來的資料更加完整。程式範例如下:

import os, datetime
import shioaji as sj
from shioaji.constant import Exchange
from dotenv import load_dotenv
import pandas as pd
import sqlite3

update_date = None

load_dotenv('D:/python/shioaji/.env')
conn = sqlite3.connect('C:/shioaji.db') #在D槽底下建立資料庫

api = sj.Shioaji()
api.login(
    person_id=os.getenv('YOUR_PERSON_ID'), 
    passwd=os.getenv('YOUR_PASSWORD')
)
print(api.Contracts.Stocks) #等待Contract.Stocks Fetch完成

def get_update_date():
    global update_date
    for exchange in api.Contracts.Stocks:
        for stock in exchange:
            if stock.exchange in (Exchange.TSE, Exchange.OTC):   
                if stock.category == '00' or stock.category == '' or stock.update_date == '':
                    pass
                else:
                    if update_date is None:
                        update_date = datetime.datetime.strptime(stock.update_date, '%Y/%m/%d')
                    elif datetime.datetime.strptime(stock.update_date, '%Y/%m/%d') > update_date:
                        update_date = datetime.datetime.strptime(stock.update_date, '%Y/%m/%d')

get_update_date()
# 依照我們所要的條件,使用read_sql取出查詢結果
df = pd.read_sql('''SELECT ts, Code, Open, High, Low, Close, Volume, 
                    MA5, MA20, MA5_diff, MA20_diff FROM stocks_day_kbars 
                    WHERE ts >= ? AND MA5 > MA20 AND Close >= MA20 AND MA20_diff > 0''',
                    conn, parse_dates=['ts'], params=(update_date, ))
stock_name = [] #建立股票名稱list
# 歷遍df,透過shioaji取得股票名稱並加入list中
for index, row in df.iterrows():
    stock_name.append(api.Contracts.Stocks[row['Code']].name)
df['Name'] = stock_name #增加Name欄位,將欄位值設為stock_name
df = df[['ts', 'Code', 'Name', 'Open', 'High', 'Low', 'Close', 'Volume', 'MA5', 'MA20', 'MA5_diff', 'MA20_diff']] #設定欄位順序
df.to_excel('recommand.xlsx', encoding="utf_8_sig", index=False) #將結果匯出成Excel

api.logout()

之前介紹DataFrame匯出檔案時,都只用to_csv匯出成CSV檔,前幾天發現其實可以直接用to_excel匯出成xlsx檔,不過這個功能會用到openpyxl套件,若沒有先安裝會導致匯出失敗,若要使用to_excel,可以先執行pip install openpyxl進行安裝。
上面這個篩選條件,是用來產生多頭排列的股票清單,這裡只有用MA5跟MA20做判斷,並且只選出MA20_diff這個欄位值大於0且當日收盤價大於等於MA20的股票清單,並且透過shioaji抓股票的名稱,最後產生的資料如下:
https://ithelp.ithome.com.tw/upload/images/20211015/20140827mL0JwAirfw.png

結論

shioaji在報價及下單這方面功能算是齊全,但還是有一些資料是要使用其它套件來計算及產生。原本想說30天鐵人賽有點困難,但實際寫完後才對Shioaji有更深入的了解,當然還有好大一部份需要花時間去研究(例如盤中交易策略及即時K線資料產生)。
而目前shioaji還沒有盤後資料,這部份只能再找看看是否有其它的套件可以提供,或是直接抓證交所的資料。結論就是要做到真正的程式交易,還有很長的一段路要走,身為初學者的我們,可以先從盤後的資料下手,再一步步進展到盤中的資料。


上一篇
Day 29 - 計算均線資料
系列文
深入解析 Shioaji API30

尚未有邦友留言

立即登入留言