iT邦幫忙

2024 iThome 鐵人賽

DAY 12
0

為了縮短 IO 造成的延遲,所以需要使用 Redis 來加速。

安裝 Redis

brew services start redis
brew services info redis
redis-cli

設計

考慮到之後我們需要同時從 Fugle 行情 API 拿回四項不同種類的資料,(交易、K現、五檔、聚合),所以至少我也會將資料分成四個不同 channels。 不過當前我只要拿回台指期、之後還要拿選擇權。 這兩個行情理論上放在不同 channel 之後在整理上會比較方便。

初始化

  • 在 MTXMonitor 的建構子中,加入 redis 的連線資訊,建立 connection pool
pool = redis.ConnectionPool(host='localhost', port=6379, decode_responses=True)
self.redis_client = redis.Redis(connection_pool=pool)

塞入行情

  • 修改 handle_message 函式
def handle_message(self, message):
    msg_dict = json.loads(message)
    print(msg_dict)
    if 'event' in msg_dict and msg_dict['event'] == 'data':
        self.redis_client.publish('mtx', msg_dict['data'])
    elif 'event' in msg_dict and msg_dict['event'] == 'snapshot':
        self.redis_client.publish('mtx', msg_dict['data'])
    else:
        print(msg_dict)

從 Redis 讀出訊息寫檔案

def write_messageq_to_file(self):
    quote_queue = []
    sub = self.redis_client.pubsub()
    sub.subscribe('mtx')
    for message in sub.listen():
        if message['type'] == 'message':
            print(message['data'])
            self.redis_client.rpush('mtx', message['data'])
            json_data = json.dumps(message['data'])
            quote_queue.append(json_data)
        elif message['type'] == 'subscribe':
            print('subscribed')

    file_timemark = dt.now().strftime('%Y%m%d_%H%M%S')
    with open(f'./{file_timemark}.json', 'a') as f:
        f.write(str(quote_queue))
    print('file written')

結論

今日盈虧

  • 今天夜盤行情背離小那太多,感覺會崩,等等再來看怎麼復活
  • 後續再修改持股量價在這裡

上一篇
【Day 11】 行情暫存區 / 淺談訊息佇列 (Message queue) / 本日損益
系列文
從一萬元開始交易:收割韭菜三十天,量化交易工具製作12
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言