iT邦幫忙

2023 iThome 鐵人賽

DAY 27
0
AI & Data

已 django + channels 來取得即時股價資料的推播系列 第 27

[Day 27] 實戰應用 - 聊天機器人 - 今日收盤行情(下)

  • 分享至 

  • xImage
  •  

建立模型

# chat/models.py
from django.db import models

# Create your models here.

class Quote(models.Model):
    datetime = models.DateTimeField()
    symbol = models.CharField(max_length=30)
    name = models.CharField(max_length=30)
    referencePrice = models.FloatField()
    openPrice = models.FloatField()
    highPrice = models.FloatField()
    lowPrice = models.FloatField()
    closePrice = models.FloatField()
    avgPrice = models.FloatField()
    lastSize = models.IntegerField()

    class Meta:
        db_table = 'quote'

    def __str__(self):
        return f'{self.datetime},{self.symbol},{self.name}'

python3 manage.py makemigrations

預期看到
https://ithelp.ithome.com.tw/upload/images/20231005/201627242rhmznWwrS.png

python3 manage.py migrate

預期看到
https://ithelp.ithome.com.tw/upload/images/20231005/20162724jZ8U1mnOrf.png

Rest API

#chat/fugle.py
import time

from fugle_marketdata import WebSocketClient, RestClient
import asyncio
from asgiref.sync import async_to_sync, sync_to_async
import channels.layers
import django, sys
from os.path import join, dirname, abspath
from os import environ
from datetime import datetime


# ---------------------- django setting --------------------------
PROJECT_DIR = dirname(dirname(abspath(__file__)))
sys.path.insert(0, PROJECT_DIR)

#  Set the correct path to you settings module
environ.setdefault("DJANGO_SETTINGS_MODULE", "django_channels.settings")

# All django stuff has to come after the setup:
django.setup()

# ---------------------- django setting --------------------------
channel_layer = channels.layers.get_channel_layer()

YOUR_API_KEY = 'XXX'



from chat.models import Quote
def main():
    from fugle_marketdata import RestClient

    client = RestClient(api_key=YOUR_API_KEY)
    stock = client.stock
    _dt = None
    while True:
        time.sleep(5)
        data = stock.intraday.quote(symbol='2330')
        dt = datetime.fromtimestamp(data['closeTime']/ 10**6)
        if dt != _dt:
            _dt = dt
            Quote.objects.create(
                datetime=dt, symbol=data['symbol'], name=data['name'],
                referencePrice=data['referencePrice'], openPrice=data['openPrice'], highPrice=data['highPrice'],
                lowPrice=data['lowPrice'], closePrice=data['closePrice'], avgPrice=data['avgPrice'],
                lastSize=data['lastSize']
                                 ).save()



if __name__ == '__main__':
    main()

python3 chat/fugle.py

預期上述要能達到的功能是定時(5秒) 呼叫一次查詢股價的功能,並且存至 Quote 這個模型裡

# chat/consumers.py
import json
from datetime import datetime
from asgiref.sync import async_to_sync
from channels.generic.websocket import WebsocketConsumer
from chat.models import Product, Quote

from fugle_marketdata import RestClient

api_key = 'XXX'
#

class ChatConsumer(WebsocketConsumer):
    def connect(self):
        self.room_name = self.scope["url_route"]["kwargs"]["room_name"]
        self.room_group_name = f"chat_{self.room_name}"
        client = RestClient(api_key=api_key)
        self.stock = client.stock

        # Join room group
        async_to_sync(self.channel_layer.group_add)(
            self.room_group_name, self.channel_name
        )

        self.accept()

        message = f"""
歡迎來到 {self.room_name} 股票即時推播聊天室, 請輸入"查詢目前價格" 關鍵字
            """
        async_to_sync(self.channel_layer.group_send)(
            self.room_group_name, {"type": "chat.message", "message": message}
        )
        # 新加入
        obj = Quote.objects.filter(symbol=self.room_name).order_by('-datetime').first()

        message = '--------------------------------\n'
        message += f"{obj.datetime}, {self.room_name} {obj.name}, 現在價格: {obj.closePrice} 量: {obj.lastSize}\n"
        message += '--------------------------------\n'

        async_to_sync(self.channel_layer.group_send)(
            self.room_group_name, {"type": "chat.message", "message": message}
        )


    def disconnect(self, close_code):
        # Leave room group
        async_to_sync(self.channel_layer.group_discard)(
            self.room_group_name, self.channel_name
        )

    # Receive message from WebSocket
    def receive(self, text_data):
        text_data_json = json.loads(text_data)
        if text_data_json['message'] == '查詢目前價格':
            obj = Quote.objects.filter(symbol=self.room_name).order_by('-datetime').first()
            message = f'查詢字串: {text_data_json["message"]}\n'
            message += f"{obj.datetime}, {self.room_name} {obj.name}, 現在價格: {obj.closePrice} 量: {obj.lastSize}\n"
            self.send(text_data=json.dumps({"message": message}))
python3 manage.py runserver 8000

在 Consumer.py 裡時做登入時回應現在股價,以及查詢現在價格功能
https://ithelp.ithome.com.tw/upload/images/20231005/20162724Qz9y4aNGhC.png

Websocket API

不想要查詢,只想登入後看到推播,只能從 Websocket 來著手
要看到效果限制在盤中時段
使用下圖功能

https://ithelp.ithome.com.tw/upload/images/20231005/20162724N5IZsxhyKk.png

#chat/fugle_websocket.py
import time

from fugle_marketdata import WebSocketClient
import asyncio
from asgiref.sync import async_to_sync, sync_to_async
import channels.layers
import django, sys
from os.path import join, dirname, abspath
from os import environ
import json

# ---------------------- django setting --------------------------
PROJECT_DIR = dirname(dirname(abspath(__file__)))
sys.path.insert(0, PROJECT_DIR)

#  Set the correct path to you settings module
environ.setdefault("DJANGO_SETTINGS_MODULE", "django_channels.settings")

# All django stuff has to come after the setup:
django.setup()

# ---------------------- django setting --------------------------
channel_layer = channels.layers.get_channel_layer()

YOUR_API_KEY = 'xxx'

def handle_message(message):
    print(message)
    message = json.loads(message)
    if message['event'] in ['data']:
        data = message['data']
        symbol = data['symbol']

        async_to_sync(channel_layer.group_send)(f"chat_{symbol}",
                                                {'type': 'trades.message', "message": message})


async def main():
    client = WebSocketClient(api_key=YOUR_API_KEY)
    stock = client.stock
    stock.on('message', handle_message)
    await stock.connect()
    stock.subscribe({
        'channel': 'trades',
        'symbol': '2330'
    })

if __name__ == '__main__':
    asyncio.run(main())


python3 chat/fugle_websocket.py

預期看到
https://ithelp.ithome.com.tw/upload/images/20231005/20162724knoZhQD6Nb.png

# chat/consumers.py
import json
from datetime import datetime
from asgiref.sync import async_to_sync
from channels.generic.websocket import WebsocketConsumer
from chat.models import Product, Quote

from fugle_marketdata import RestClient

api_key = 'xxx'
#

class ChatConsumer(WebsocketConsumer):
    def connect(self):
        self.room_name = self.scope["url_route"]["kwargs"]["room_name"]
        self.room_group_name = f"chat_{self.room_name}"
        client = RestClient(api_key=api_key)
        self.stock = client.stock

        # Join room group
        async_to_sync(self.channel_layer.group_add)(
            self.room_group_name, self.channel_name
        )

        self.accept()

        message = f"""
歡迎來到 {self.room_name} 股票即時推播聊天室, 請輸入"查詢目前價格" 關鍵字
            """
        async_to_sync(self.channel_layer.group_send)(
            self.room_group_name, {"type": "chat.message", "message": message}
        )
        # 新加入
        obj = Quote.objects.filter(symbol=self.room_name).order_by('-datetime').first()

        message = '--------------------------------\n'
        message += f"{obj.datetime}, {self.room_name} {obj.name}, 現在價格: {obj.closePrice} 量: {obj.lastSize}\n"
        message += '--------------------------------\n'

        async_to_sync(self.channel_layer.group_send)(
            self.room_group_name, {"type": "chat.message", "message": message}
        )


    def disconnect(self, close_code):
        # Leave room group
        async_to_sync(self.channel_layer.group_discard)(
            self.room_group_name, self.channel_name
        )

    # Receive message from WebSocket
    def receive(self, text_data):
        text_data_json = json.loads(text_data)
        if text_data_json['message'] == '查詢目前價格':
            obj = Quote.objects.filter(symbol=self.room_name).order_by('-datetime').first()
            message = f'查詢字串: {text_data_json["message"]}\n'
            message += f"{obj.datetime}, {self.room_name} {obj.name}, 現在價格: {obj.closePrice} 量: {obj.lastSize}\n"
            self.send(text_data=json.dumps({"message": message}))

    def trades_message(self, event):
        data = event["message"]

        # Send message to WebSocket
        dt = datetime.fromtimestamp(data['data']['time'] / 10**6)
        msg = f"{dt}, {self.room_name} 現在價格: {data['data']['price']} 總量: {data['data']['volume']}\n"
        self.send(text_data=json.dumps({"message": msg}))

注意看可發現這裡加上 trades_message 的函式,用來接受即時訊息

python3 manage.py runserver 8000

在 Consumer.py 裡時做登入時回應現在股價,以及查詢現在價格功能 + 接受即時推播資訊並且顯示

https://ithelp.ithome.com.tw/upload/images/20231005/201627248Akyeq5GMK.png

完成


上一篇
[Day 26] 實戰應用 - 聊天機器人 - 今日收盤行情(上)
下一篇
[Day 28] 實戰應用 - 聊天機器人 - 漲幅排行榜(上)
系列文
已 django + channels 來取得即時股價資料的推播30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言