使用 pip 安裝
# 事件處理
pip install events
# 讓 dict 物件可用屬性的方式存取
pip install munch
因為資料來源可來自:
而歷史資料通常因為資料已存在可以連續不停留地灌入,但即時訊號則是間斷性地提供資料,故資料提供並不適合函數回傳的方式,而應使用註冊事件的方式提供資料。
首先定義一個抽象類別規範每個資料來源模組都應提供的事件與應實作的函數。
DataSource.py
# Python 提供抽象類別的實作庫
import abc
import events
import munch
# 繼承事件類別的
class DataSource(abc.ABC, events.Events):
# 定義事件 OnData,用於有新資料時提供給註冊該事件的對象
__events__ = ('OnData')
def __init__(self, name, option):
# 設定資料模組名稱
self.Name = name
# 接收傳入的參數設定,因不同資料來源有不同的設定,故整合於 option 物件
self.Option = munch.munchify(option)
# 待實作函數:設定並開啟資料源
@abc.abstractmethod
def Setup(self):
return NotImplemented
# 待實作函數:開始從資料源讀取或接收資料
@abc.abstractmethod
def Start(self):
return NotImplemented
建立一個繼承自基底類別的歷史檔案讀取類別,因歷史檔案多為 CSV 格式,故以此為示範。
CsvDataSource.py
import csv
from DataSource import *
class CsvDataSource(DataSource):
# 實作函數:設定並開啟資料源
def Setup(self):
# 開啟檔案
self.File = open(self.Option.FilePath)
# 將檔案設定給 CSV 讀取器
self.Reader = csv.DictReader(self.File)
# 實作函數:開始從資料源讀取或接收資料
def Start(self):
with self.File:
# 每從 CSV 讀取器取得一筆資料就用 OnData 事件提供給註冊該事件的對象
for row in self.Reader:
if self.OnData is not None:
self.OnData(self, row)
main.py
from CsvDataSource import *
def OnData(ds, data):
print(f'{ds.Name}\n{data}')
def main():
ds = CsvDataSource(
name='TXF-TICKS',
option={
'FilePath': 'TXF1-Ticks-2019-01-02~2019-09-20.csv'
}
)
ds.OnData = OnData
ds.Setup()
ds.Start()
if __name__ == '__main__':
main()
建立一個繼承自基底類別的歷史資料庫讀取類別,使用前日於 MongoDB 內已保存的 Tick 資料。
MongoDataSource.py
import csv
from DataSource import *
class MongoDataSource(DataSource):
# 實作函數:設定並開啟資料源
def Setup(self):
# 建立 MongoDB 客戶端
self.Client = pymongo.MongoClient(
f'mongodb://{self.Option.Username}:{self.Option.Password}@{self.Option.Host}:{self.Option.Port}/'
)
# 取得指定資料庫
self.Database = self.Client[self.Option.Database]
# 實作函數:開始從資料源讀取或接收資料
def Start(self):
# 每從 Cursor 取得一筆 Document 就用 OnData 事件提供給註冊該事件的對象
cursor = self.Database[self.Option.Collection].find().sort([
('created', 1)
])
for doc in cursor:
if self.OnData is not None:
self.OnData(self, doc)
main.py
from MongoDataSource import *
def OnData(ds, data):
print(f'{ds.Name}\n{data}')
def main():
ds = MongoDataSource(
name='TXF-TICKS',
option={
'Username': 'root',
'Password': 'root',
'Host': 'localhost',
'Port': 27017,
'Database': 'backtest',
'Collection': 'ticks'
}
)
ds.OnData = OnData
ds.Setup()
ds.Start()
if __name__ == '__main__':
main()
接下來我們可同時提供多組資料源,讓時序運行模組可依時間載入所需資料。
至於如何串接即時資料源,就請參考上述二種資料讀取模組的實作方式,再整合 Day-14 券商串接:串接元大期貨行情 API(一) 所學到的知識自行實作。
https://docs.python.org/3.6/library/abc.html
https://github.com/pyeve/events
https://github.com/Infinidat/munch
團隊系列文:
CSScoke - 金魚都能懂的這個網頁畫面怎麼切 - 金魚都能懂了你還怕學不會嗎
Clarence - LINE bot 好好玩 30 天玩轉 LINE API
Hina Hina - 陣列大亂鬥
King Tzeng - IoT沒那麼難!新手用JavaScript入門做自己的玩具
Vita Ora - 好 Js 不學嗎 !? JavaScript 入門中的入門。
TaTaMo - 用Python開發的網頁不能放到Github上?Lektor說可以!!