import json
import redis
import datetime as dt
class Writer():
def __init__(self):
pool = redis.ConnectionPool(host='localhost', port=6379, decode_responses=True)
self.redis_client = redis.Redis(connection_pool=pool)
now_ts = dt.datetime.now().timestamp()
# print(now_ts) # Reverse: dt.datetime.fromtimestamp(now_ts)
self.next_ts_mircosec = (int(now_ts/300)+1)*300*1000000 # 跟 Fugle API 方便比較
self.end_ts_mircosec = (int(now_ts/300)+2)*300*1000000 # 跟 Fugle API 方便比較
# print(next_ts_mircosec)
def write_messageq_to_file(self):
quote_queue = []
sub = self.redis_client.pubsub()
sub.subscribe('mtx')
for message in sub.listen():
if message['type'] == 'message':
print(message['data'])
msg_data_json = json.loads(message['data'])
if msg_data_json['time'] < self.next_ts_mircosec:
print('waiting to start ...')
continue
elif msg_data_json['time'] > self.end_ts_mircosec:
print('end of listening ...')
break
elif msg_data_json['time'] >= self.next_ts_mircosec:
print('gathering ...')
json_data = json.dumps(message['data'])
quote_queue.append(json_data)
else:
print('type is not message')
print(message)
## message 解開來像是
'''
{
'type': 'message',
'pattern': None,
'channel': 'mtx',
'data': "{
'symbol': 'MXFJ4',
'type': 'FUTURE_AH',
'exchange': 'TAIFEX',
'trades': [{'price': 22868, 'size': 1, 'bid': 22866, 'ask': 22868}],
'total': {'tradeVolume': 63553, 'totalBidMatch': 47132, 'totalAskMatch': 46308},
'time': 1727433572066000,
'serial': 77625
}"
}
'''
with open(f'./{self.next_ts_mircosec}.json', 'a') as f:
quote_json = json.dumps(quote_queue, indent = 4)
f.write(quote_json)
print('file written')
mtx
頻道放出來的訊息。