因為資料源可能以不同時序提供,例如 Tick 資料源提供的每筆資料時間間隔是毫秒級,分 K 或分 MA 的每筆資料時間間隔是分鐘級,故在回測時不同資料源的每筆資料可能都在毫秒級的間隔內丟出,但實際上代表的時間間隔卻是不同時間層級,如何讓各資料源之間丟出不同時間層級的資料,能夠在相同的時間序列上依序排列,正是時序運行模組的主要工作。
為進行時序比較,必須將資料的時間欄位統一命名,故訂為 time 欄位。
新增
加入 Sequence 用以保存各資料源已提供的資料。
DataSource.py
import abc
import events
import munch
class DataSource(abc.ABC, events.Events):
__events__ = ('OnData')
def __init__(self, name, option):
self.Name = name
self.Option = munch.munchify(option)
# 用於保存已提供資料
self.Sequence = []
@abc.abstractmethod
def Setup(self):
return NotImplemented
@abc.abstractmethod
def Start(self):
return NotImplemented
新增
將取得資料在重新處理時間欄位後存入 Sequence。
CsvDataSource.py
import csv
import datetime
import munch
class CsvDataSource(DataSource):
def Setup(self):
self.File = open(self.Option.FilePath)
self.Reader = csv.DictReader(self.File)
def Start(self):
with self.File:
for row in self.Reader:
# 將 Date 與 Time 欄位重新處理為 time 欄位
# 並移除 Date 與 Time 欄位後放入 Sequence
row = munch.munchify(row)
row.time = datetime.datetime.strptime(f'{row.Date} {row.Time}', '%Y/%m/%d %H:%M:%S.%f')
row.pop('Date', None)
row.pop('Time', None)
row.price = row.pop('Price')
row.volume = row.pop('Volume')
self.Sequence.append(row)
if self.OnData is not None:
self.OnData(self, row)
新增
將取得資料存入 Sequence。
MongoDataSource.py
import munch
import pymongo
class MongoDataSource(DataSource):
def Setup(self):
self.Client = pymongo.MongoClient(
f'mongodb://{self.Option.Username}:{self.Option.Password}@{self.Option.Host}:{self.Option.Port}/'
)
self.Database = self.Client[self.Option.Database]
def Start(self):
cursor = self.Database[self.Option.Collection].find().sort([('created', 1)])
for doc in cursor:
# 因存入 MongoDB 前已處理成 time 欄位
# 故僅須將資料存入 Sequence 即可
doc = munch.munchify(doc)
self.Sequence.append(doc)
if self.OnData is not None:
self.OnData(self, doc)
設計為將自動初始化加入的資料源,並於資料源完成載入後依序提取資料
TimeSeriesManager.py
import threading
class TimeSeriesManager(events.Events):
__events__ = ('OnData')
def __init__(self):
self.DataSources = []
self.Tasks = []
def Setup(self):
# 幫已加入管理的資料源進行設定與初始化
for ds in self.DataSources:
ds.Setup()
def Start(self):
# 幫已加入管理的資料源分配執行緒並開始讀取資料
for ds in self.DataSources:
self.Tasks.append(threading.Thread(target = ds.Start))
for task in self.Tasks:
task.start()
# 等待所有資料源讀取完成
for task in self.Tasks:
task.join()
# 依時序輸出各資料源的資料
while True:
# 直到所有已加入管理的資料源內資料全部輸出後停止
if sum([len(item.Sequence) for item in self.DataSources]) == 0:
break
ds = None
for item in self.DataSources:
if ds is None:
ds = item
continue
if ds.Sequence[0].time > item.Sequence[0].time:
ds = item
data = ds.Sequence.pop(0)
if self.OnData is not None:
self.OnData(ds, data)
main.py
from MongoDataSource import *
from TimeSeriesManager import *
def OnData(ds, data):
print(f'{ds.Name}\n{data}')
def main():
tsm = TimeSeriesManager()
tsm.DataSources.append(MongoDataSource(
name='TXF-1MINK',
option={
'Username': 'root',
'Password': 'root',
'Host': 'localhost',
'Port': 27017,
'Database': 'backtest',
'Collection': 'k1min'
}
))
tsm.DataSources.append(MongoDataSource(
name='TXF-5MINK',
option={
'Username': 'root',
'Password': 'root',
'Host': 'localhost',
'Port': 27017,
'Database': 'backtest',
'Collection': 'k5min'
}
))
tsm.OnData = OnData
tsm.Setup()
tsm.Start()
if __name__ == '__main__':
main()
目前已成功將資料來源模組與時序運行模組順利整合,可依時序輸出資料與指標,確保多重資料源下時序運行的正確性。