iT邦幫忙

第 11 屆 iT 邦幫忙鐵人賽

DAY 26
1

基礎需求

因為資料源可能以不同時序提供,例如 Tick 資料源提供的每筆資料時間間隔是毫秒級,分 K 或分 MA 的每筆資料時間間隔是分鐘級,故在回測時不同資料源的每筆資料可能都在毫秒級的間隔內丟出,但實際上代表的時間間隔卻是不同時間層級,如何讓各資料源之間丟出不同時間層級的資料,能夠在相同的時間序列上依序排列,正是時序運行模組的主要工作。

模組實作

為進行時序比較,必須將資料的時間欄位統一命名,故訂為 time 欄位。

基底類別

新增 加入 Sequence 用以保存各資料源已提供的資料。

DataSource.py

import abc

import events
import munch

class DataSource(abc.ABC, events.Events):
    __events__ = ('OnData')

    def __init__(self, name, option):
        self.Name = name
        self.Option = munch.munchify(option)
        # 用於保存已提供資料
        self.Sequence = []

    @abc.abstractmethod
    def Setup(self):
        return NotImplemented

    @abc.abstractmethod
    def Start(self):
        return NotImplemented

歷史檔案讀取類別

新增 將取得資料在重新處理時間欄位後存入 Sequence。

CsvDataSource.py

import csv
import datetime

import munch

class CsvDataSource(DataSource):

    def Setup(self):
        self.File = open(self.Option.FilePath)
        self.Reader = csv.DictReader(self.File)

    def Start(self):
        with self.File:
            for row in self.Reader:
                # 將 Date 與 Time 欄位重新處理為 time 欄位
                # 並移除 Date 與 Time 欄位後放入 Sequence
                row = munch.munchify(row)
                row.time = datetime.datetime.strptime(f'{row.Date} {row.Time}', '%Y/%m/%d %H:%M:%S.%f')
                row.pop('Date', None)
                row.pop('Time', None)
                row.price = row.pop('Price')
                row.volume = row.pop('Volume')
                self.Sequence.append(row)
                if self.OnData is not None:
                    self.OnData(self, row)

歷史資料庫讀取類別

新增 將取得資料存入 Sequence。

MongoDataSource.py

import munch
import pymongo

class MongoDataSource(DataSource):

    def Setup(self):
        self.Client = pymongo.MongoClient(
            f'mongodb://{self.Option.Username}:{self.Option.Password}@{self.Option.Host}:{self.Option.Port}/'
        )
        self.Database = self.Client[self.Option.Database]

    def Start(self):
        cursor = self.Database[self.Option.Collection].find().sort([('created', 1)])
        for doc in cursor:
            # 因存入 MongoDB 前已處理成 time 欄位
            # 故僅須將資料存入 Sequence 即可
            doc = munch.munchify(doc)
            self.Sequence.append(doc)
            if self.OnData is not None:
                self.OnData(self, doc)

時序運行管理器類別

設計為將自動初始化加入的資料源,並於資料源完成載入後依序提取資料

TimeSeriesManager.py

import threading

class TimeSeriesManager(events.Events):
    __events__ = ('OnData')

    def __init__(self):
        self.DataSources = []
        self.Tasks = []
    
    def Setup(self):
        # 幫已加入管理的資料源進行設定與初始化
        for ds in self.DataSources:
            ds.Setup()
    
    def Start(self):
        # 幫已加入管理的資料源分配執行緒並開始讀取資料
        for ds in self.DataSources:
            self.Tasks.append(threading.Thread(target = ds.Start))
        for task in self.Tasks:
            task.start()
        # 等待所有資料源讀取完成
        for task in self.Tasks:
            task.join()
        # 依時序輸出各資料源的資料
        while True:
            # 直到所有已加入管理的資料源內資料全部輸出後停止
            if sum([len(item.Sequence) for item in self.DataSources]) == 0:
                break
            ds = None
            for item in self.DataSources:
                if ds is None:
                    ds = item
                    continue
                if ds.Sequence[0].time > item.Sequence[0].time:
                    ds = item
            data = ds.Sequence.pop(0)
            if self.OnData is not None:
                self.OnData(ds, data)

main.py

from MongoDataSource import *
from TimeSeriesManager import *

def OnData(ds, data):
    print(f'{ds.Name}\n{data}')

def main():
    tsm = TimeSeriesManager()
    tsm.DataSources.append(MongoDataSource(
        name='TXF-1MINK',
        option={
            'Username': 'root',
            'Password': 'root',
            'Host': 'localhost',
            'Port': 27017,
            'Database': 'backtest',
            'Collection': 'k1min'
        }
    ))
    tsm.DataSources.append(MongoDataSource(
        name='TXF-5MINK',
        option={
            'Username': 'root',
            'Password': 'root',
            'Host': 'localhost',
            'Port': 27017,
            'Database': 'backtest',
            'Collection': 'k5min'
        }
    ))
    tsm.OnData = OnData
    tsm.Setup()
    tsm.Start()

if __name__ == '__main__':
    main()

執行結果

目前已成功將資料來源模組與時序運行模組順利整合,可依時序輸出資料與指標,確保多重資料源下時序運行的正確性。


上一篇
Day-25 回測系統:資料來源模組
下一篇
Day-27 回測系統:策略運行模組
系列文
Python 程式交易 30 天新手入門30

尚未有邦友留言

立即登入留言