iT邦幫忙

2024 iThome 鐵人賽

DAY 15
0

Source Code

import json
import redis
import datetime as dt

class Writer():

    def __init__(self):
        pool = redis.ConnectionPool(host='localhost', port=6379, decode_responses=True)
        self.redis_client = redis.Redis(connection_pool=pool)
        now_ts = dt.datetime.now().timestamp()
        # print(now_ts)   # Reverse: dt.datetime.fromtimestamp(now_ts)
        self.next_ts_mircosec = (int(now_ts/300)+1)*300*1000000  # 跟 Fugle API 方便比較
        self.end_ts_mircosec = (int(now_ts/300)+2)*300*1000000  # 跟 Fugle API 方便比較
        # print(next_ts_mircosec)
    
    def write_messageq_to_file(self):
        quote_queue = []
        sub = self.redis_client.pubsub()
        sub.subscribe('mtx')
        for message in sub.listen():
            if message['type'] == 'message':
                print(message['data'])
                msg_data_json = json.loads(message['data'])
                if msg_data_json['time'] < self.next_ts_mircosec:
                    print('waiting to start ...')
                    continue
                elif msg_data_json['time'] > self.end_ts_mircosec:
                    print('end of listening ...')
                    break
                elif msg_data_json['time'] >= self.next_ts_mircosec:
                    print('gathering ...')
                    json_data = json.dumps(message['data'])
                    quote_queue.append(json_data)
            else:
                print('type is not message')
                print(message)
        ## message 解開來像是
        '''
        {
            'type': 'message', 
            'pattern': None, 
            'channel': 'mtx', 
            'data': "{
                'symbol': 'MXFJ4', 
                'type': 'FUTURE_AH', 
                'exchange': 'TAIFEX', 
                'trades': [{'price': 22868, 'size': 1, 'bid': 22866, 'ask': 22868}], 
                'total': {'tradeVolume': 63553, 'totalBidMatch': 47132, 'totalAskMatch': 46308}, 
                'time': 1727433572066000, 
                'serial': 77625
            }"
        }
        '''
        with open(f'./{self.next_ts_mircosec}.json', 'a') as f:
            quote_json = json.dumps(quote_queue, indent = 4)
            f.write(quote_json)
        print('file written')

解說

  • 從 Writer 產生一個實例後,先找出下一個整五分時戳。
  • 立即會開始收聽 Redis mtx 頻道放出來的訊息。
  • 看訊息中的時戳,當大於等於整五分時戳,開始把聽到的訊息,塞到 queue (python list)。
  • 一旦收聽到的時間大於結束時間,則結束收聽!!

結論

  • 這邊我認為還需要檢查,有沒有可能,比較後面的訊息,比較早被收聽到 ~ 如果有的話,則可能會遺漏部份行情。
  • 接下來想要開發動態計算「支撐」與「壓力」的功能,並且在行情發生爆量,或是反轉時,給出最新的提示。
  • 還有「選擇權」監控的功能。
  • 今天的 iThome 網站好像隨時都會掛點,先寫到這邊了。

上一篇
【Day 14】 設計一個 Writer 從 Redis 讀資料並篩選時間
下一篇
【Day 16】 選擇權的定價公式 / 本日盈虧
系列文
從一萬元開始交易:收割韭菜三十天,量化交易工具製作32
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言