DAY 27
1
Software Development

# 模組實作

## 基底類別

`Strategy.py`

``````import abc

import events

class Strategy(abc.ABC, events.Events):
__events__ = ('OnBuy', 'OnSell')

# 待實作函數：提供指標給策略
@abc.abstractmethod
def Feed(self, data):
return NotImplemented
``````

`Buy3RK1BKAndSell3BK1RK.py`

``````from Strategy import *

# 策略目標
# 買進：連續 3 隻紅 K 後出現 1 隻黑 K
# 賣出：連續 3 隻黑 K 後出現 1 隻紅 K

def __init__(self):
# 存放過去 3 隻 K 棒的類型
self.Bars = []

# 實作函數：提供指標給策略
def Feed(self, data):
# 開小於收為 +1（黑 K）
# 開收相等為 0
# 開大於收為 -1（紅 K）
status = 0
# 判斷黑 K
if data.open > data.close:
status = +1
# 判斷紅 K
if data.open < data.close:
status = -1
# 判斷買進條件符合並發出事件
if sum(self.Bars) == -3 and status == +1:
if self.OnBuy is not None:
# 判斷賣出條件符合並發出事件
if sum(self.Bars) == +3 and status == -1:
if self.OnSell is not None:
self.OnSell(self, data)
# 保存 K 棒類型
self.Bars.append(status)
# 若保存超過 3 隻 K 棒，則移除第 1 隻 K 棒
if len(self.Bars) > 3:
self.Bars.pop(0)
``````

`DataSource.py`

``````import abc

import events
import munch

class DataSource(abc.ABC, events.Events):
__events__ = ('OnData')

def __init__(self, name, option):
self.Name = name
self.Option = munch.munchify(option)
self.Sequence = []

@abc.abstractmethod
def Setup(self):
return NotImplemented

@abc.abstractmethod
def Start(self):
return NotImplemented
``````

`MongoDataSource.py`

``````import munch
import pymongo

class MongoDataSource(DataSource):

def Setup(self):
self.Client = pymongo.MongoClient(
)
self.Database = self.Client[self.Option.Database]

def Start(self):
cursor = self.Database[self.Option.Collection].find().sort([('created', 1)])
for doc in cursor:
doc = munch.munchify(doc)
self.Sequence.append(doc)
if self.OnData is not None:
self.OnData(self, doc)
``````

`main.py`

``````from MongoDataSource import *
from TimeSeriesManager import *
from Buy3RK1BKAndSell3BK1RK import *

def main():

def OnData(ds, data):
# 若為 1 分 K 訊號，則提供給策略執行模組
if ds.Name == 'TXF-1MINK':
strategy.Feed(data)

def OnSell(ds, data):
print(f'[SELL]\n{data}\n')

# 建立策略執行模組並轉接買賣訊號事件
strategy.OnSell += OnSell

tsm = TimeSeriesManager()
tsm.DataSources.append(MongoDataSource(
name='TXF-1MINK',
option={
'Host': 'localhost',
'Port': 27017,
'Database': 'backtest',
'Collection': 'k1min'
}
))
tsm.OnData = OnData
tsm.Setup()
tsm.Start()

if __name__ == '__main__':
main()
``````

CSScoke - 金魚都能懂的這個網頁畫面怎麼切 - 金魚都能懂了你還怕學不會嗎
Clarence - LINE bot 好好玩 30 天玩轉 LINE API
Hina Hina - 陣列大亂鬥
King Tzeng - IoT沒那麼難！新手用JavaScript入門做自己的玩具
Vita Ora - 好 Js 不學嗎 !? JavaScript 入門中的入門。
TaTaMo - 用Python開發的網頁不能放到Github上？Lektor說可以！！