iT邦幫忙

2021 iThome 鐵人賽

DAY 27
0
永豐金融APIs

永豐金融APIs - 從零開始到放棄!?系列 第 27

資料取得 - 多重來源

說是多重來源,其實也就是本機和 shioaji 而已,我的想法是這樣子的,如果本機有資料的話,就從本機抓取,如果本機沒有的話就連線到 shioaji 抓取,如果是要跑半年線的資料的話,至少可以省下半年的時間,因為我的目標,可能一天要跑 20 支以上的股票,那這樣子省下來的時間就多了。以下是大概的架構。

取得資料流程

規畫的目錄架構如下:

project
│   main.py
│   service.py
│
└───repository
│   │   databse.py
│   │   models.py
│   │   database.sqlite3 (資料庫檔案)
│   │   api.py (shioaji api)
  • api.py 這部份之前已經使用很多次了,所以就不介紹了,程式碼如下
import shioaji as sj
import pandas as pd
from datetime import datetime

PERSON_ID = "身份證字號"
PASSWD = "密碼"

api = sj.Shioaji()

def getKbarsFromApi(stock_code, start, end):
    api.login(person_id=PERSON_ID, passwd=PASSWD)

    stock = api.Contracts.Stocks[stock_code]
    kbars = api.kbars(stock ,start=start, end=end)
    
    df = __kbarsConvertToDf(kbars)
    
    api.logout()

    return df


def __kbarsConvertToDf(kbars):
    dts = list(map(lambda x:datetime.utcfromtimestamp(x / 10**9), kbars.ts))

    df = pd.DataFrame(
        {
            "open": pd.Series(kbars.Open),
            "high": pd.Series(kbars.High),
            "low": pd.Series(kbars.Low),
            "close": pd.Series(kbars.Close),
            "volume": pd.Series(kbars.Volume),
        }
    )
    
    df.index = pd.Index(dts)
    
    return df
  • database.py 昨天也示範過了,一樣直接上程式碼
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///repository/database.sqlite3"

engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)

session = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()
  • models.py 昨天也有示範過,不過這邊加了一個 Offdays 的資料,是要紀錄休市的日期的,這樣子本機撈不到資料,可以判斷是不是休市,如果是的話,也不用去 Shioaji 再撈一次了
from sqlalchemy import Column, String, DateTime, Integer
from sqlalchemy.sql.sqltypes import Date, Float
from .database import Base


class Kbars(Base):
    __tablename__ = 'kbars'

    stock_code = Column(String, primary_key=True)
    datetime = Column(DateTime(timezone=False), primary_key=True)
    open = Column(Float(precision=2))
    high = Column(Float(precision=2))
    low = Column(Float(precision=2))
    close = Column(Float(precision=2))
    volume = Column(Integer)

    def __init__(self, stock_code, datetime, open, high, low, close, volume):
        self.stock_code = stock_code
        self.datetime = datetime
        self.open = open
        self.high = high
        self.low = low
        self.close = close
        self.volume = volume

    def __repr__(self):
        return "Stock: {} datetime: {} open:{}|high:{}|low:{}|close:{}|vol:{}".format(
            self.stock_code, self.datetime, self.open, self.high, self.low, self.close, self.volume)


class OffDays(Base):
    __tablename__ = "offDays"

    id = Column(Integer, primary_key=True)
    date = Column(Date)
    
    def __init__(self, date):
        self.date = date

    def __repr__(self):
        return self.date.strftime("%Y-%m-%d")
  • services.py 主要資料取得的邏輯在這個檔案,最有可能出錯的也是在這裡,所以各位參考就好,如果有問題還是要自己處理喔
from datetime import timedelta, datetime
from sqlalchemy.sql.expression import and_
from repository.api import getKbarsFromApi
from repository.models import Kbars, OffDays
from repository.database import session
from sqlalchemy import func
import pandas as pd


class RangeClass:
      '''紀錄要撈取時間範圍的class'''
    def __init__(self):
        self.start = None
        self.end = None
        
    def __repr__(self):
        return "{{start: {}, end: {}}}".format(
            self.start.strftime("%Y-%m-%d"), 
            "None" if self.end is None else self.end.strftime("%Y-%m-%d"))

def getKbars(stock_code, start, end):
    # 建立 db 連線
    db = session()
    
    # 把 db 裡有資料的日期取出來
    dbDates = db.query(func.date(Kbars.datetime))\
        .filter(and_(func.date(Kbars.datetime) >= start, 
                     func.date(Kbars.datetime) <= end, 
                     Kbars.stock_code == stock_code)).distinct().all()

    # 將日期的 string 轉成 date 並存成 list
    dbDates = [datetime.strptime(value, "%Y-%m-%d").date() for value, in dbDates]
    
    # 把資料庫裡的 offDays 資料取出
    offDays = db.query(OffDays.date).filter(and_(OffDays.date >= start, OffDays.date <= end)).all()
    
    # 存成 list
    offDays = [value for value, in offDays]

    # 建立日期的 list
    dateRange = pd.date_range(start = start, end = end).to_pydatetime().tolist()
    
    # 要從 shioaji 取得的 RangeClass tuple
    apiRange = ()
    # db 有資料的 RangeClass tuple (可以不用)
    dbRange = ()

    # api 的 RangeClass
    apiRangeObject = RangeClass()
    # db 的 RangeClass
    dbRangeObject = RangeClass()

    # 檢查日期列表,是否在 db 中
    for date in dateRange:

        if date.date() in dbDates:
        # 在 db 裡
            if dbRangeObject.start is None:
                # 如果 db 的 RangeClass 開始是空的,就設定開始日期
                # 因為日期是一個區間,所以只要設一次就可了
                dbRangeObject.start = date.date()

                if apiRangeObject.start is not None:
                    # 如果 api 的 RangeClass 開始不是空的, 就設定結束日期, 並放入 apiRange tuple 中
                    apiRangeObject.end = date.date() - timedelta(days = 1)
                    apiRange += (apiRangeObject,)

                    # 重置 api 的 RangeClass
                    apiRangeObject = RangeClass()
        
        else:
        # 不在 db 裡,要從 shioaji 取得
            if apiRangeObject.start is None:
                # api RangeClass 開始是空的,設定開始日期
                if date.date() not in offDays:
                    # 判斷當天是不是休市,不是休市才進行設定
                    apiRangeObject.start = date.date()

                    if dbRangeObject.start is not None:
                        # 如果 db 的 RangeClass 有開始資料,就設定結束資料,並放入 dbRange tuple 中
                        dbRangeObject.end = date.date() - timedelta(days = 1)
                        dbRange += (dbRangeObject,)
                        
                        # 重置 db 的 RangeClass
                        dbRangeObject = RangeClass()


    # 結束後,檢查 db, api 的 RangeClass, 
    # 如果有還沒有結束的,就把它結束掉,並放入 tuple 中
    if dbRangeObject.start is not None:
        dbRangeObject.end = dateRange[-1].date()
        dbRange += (dbRangeObject,)
                    
    if apiRangeObject.start is not None:
        apiRangeObject.end = dateRange[-1].date()
        apiRange += (apiRangeObject,)

    # api 的 tuple 有資料
    if len(apiRange) > 0:
        for range in apiRange:
            # 呼叫 shioaji 進行抓取
            kbars = getKbarsFromApi(stock_code, range.start.strftime('%Y-%m-%d'), range.end.strftime('%Y-%m-%d')) 
            
            # 沒有資料,把日期存入 offDays
            if len(kbars.index) == 0:
                db.add(OffDays(range.start))
                nextDay = range.start
                while nextDay < range.end:
                    nextDay += timedelta(days=1)
                    db.add(OffDays(nextDay))
            else:
            # 有資料,把資料存到 db
                for index, row in kbars.iterrows():
                    kbar = Kbars(
                        stock_code, index, row["open"], row["high"], row["low"], row["close"], row["volume"]
                    )
                    
                    db.add(kbar)
        

    db.commit()
    
    # 從 db 撈資料並轉成 pd.dataframe
    statement = db.query(Kbars.datetime, Kbars.open, Kbars.high, Kbars.low, Kbars.close, Kbars.volume)\
        .filter(and_(func.date(Kbars.datetime) >= start, 
                     func.date(Kbars.datetime) <= end, 
                     Kbars.stock_code == stock_code)).statement
        
        
    df = pd.read_sql(statement, db.bind)
    
    df = df.set_index("datetime")
    
    db.close()

    return df
  • main.py 因為取資料的邏輯都在 services.py,所以 main.py 只要呼叫就可以了
from repository import models
from repository.database import engine
import services

# 這一個是初始化 db 的, 如果是第一次執行的話不能省
models.Base.metadata.create_all(bind=engine)

# 直接呼叫取得資料
kbars = services.getKbars("2412", "2021-10-01", "2021-10-11")

這樣就可以取得我們的資料,之後我希望每天進行當日的資料分析,所以明天我們來實做一下如何進行排程


上一篇
資料儲存 - SqlAlchemy
下一篇
自動化工作 - APScheduler
系列文
永豐金融APIs - 從零開始到放棄!?30

尚未有邦友留言

立即登入留言