iT邦幫忙

2023 iThome 鐵人賽

DAY 25
0

Django Channels 是 Django 的一個擴充功能,主要用來處理即時通訊 (real-time communication) 的需要,像是廣播(broadcasting)和非同步更新等等。

Django 本身是一個很強大的框架, 它的主要目標是使複雜的網頁開發變得簡單且快速,但其設計初期主要處理的是 HTTP 類型的請求,這對於處理即時通訊的 WebSocket 請求來說,可能並不是最適合的選擇。而 Django Channels 的出現,就是為了填補這一部分。

那 Django Channels 跟 GraphQL 又有什麼關係呢?

那就是要提到 GraphQL Subscription,它的功能有點類似查詢,它是查詢(Query)、變更(Mutation)之外的第三個操作(Operations),主要用於即時資料更新,這就是為什麼會需要在 GraphQL 中用到 Django Channels,因為 Django Channels 可以處理 WebSocket 請求,並且 Strawberry 有提供 Django Channels 的整合。

但其實 GraphQL Subscription 不一定要用 WebSocket 來實作,當然也能用 Server-Sent Events(SSE)之類的實作,但是大部分客戶端 GraphQL 套件都是支援 WebSocket。

那下面我們就先安裝與設定 Django Channels,再開始實作吧。

首先是安裝 Django Channels:

$ poetry add "strawberry-graphql[channels]"
$ poetry add daphne

這邊我們透過 Strawberry 依賴安裝 Django Channels,以及安裝 Django 官方的 ASGI 非同步處理 Web Server。

修改server/settings.py Django設定:

# ... 省略
INSTALLED_APPS = [
+   "daphne",
    "django.contrib.admin",
    "django.contrib.auth",
    "django.contrib.contenttypes",
    "django.contrib.sessions",
    "django.contrib.messages",
    "django.contrib.staticfiles",
    "django_extensions",
    "strawberry_django",
    "server.app.blog",
    "server.app.authentication",
]
# ... 省略
WSGI_APPLICATION = "server.wsgi.application"
+ASGI_APPLICATION = "server.asgi.application"
# ... 省略
+CHANNEL_LAYERS = {
+    "default": {
+        "BACKEND": "channels.layers.InMemoryChannelLayer",
+    },
+}

+POSTS_CHANNEL = "posts_notifications"

這邊把daphne加進 Django 應用程式中,daphne可以處理非同步程式,原本 Django runserver 只能處理同步程式,現在這樣runserver也能透過daphne處理非同步程式,那現在我們要執行非同步程式,所以要設定 ASGI 應用程式執行進入點,最後設定 Channel Layers,它是用來跨服務即時交換資訊的服務,開發階段會使用InMemoryChannelLayer,到正式環境的時候要改成用RedisChannelLayer

最後的POSTS_CHANNEL是我自己新增的常數,用來設定文章通知的頻道名稱。

Django Channels 的前置設定就先到這邊,我們先建立 GraphQL Subscription 的程式,再繼續完成 ASGI 的設定。

這裡我們實作一個新發佈文章的通知功能,一開始先加一個新的型態:

# server/app/blog/graph/types.py
# ... 省略

+@strawberry.type
+class PostNotification(relay.Node):
+    id: relay.NodeID[uuid.UUID]  # noqa: A003
+    title: str
+    publish_at: datetime.datetime

假設我們通知訊息會顯示標題、發布時間以及連結的 ID。

接著建立放 Subscription 相關程式的 Python 檔:

$ touch server/app/blog/graph/subscriptions.py

編輯subscriptions.py

import typing

import strawberry
from channels.db import database_sync_to_async
from django.conf import settings
from strawberry.types import Info

from server.app.blog import models as blog_models
from server.app.blog.graph import types as blog_types

@database_sync_to_async
def _get_post(id: str) -> blog_models.Post | None:  # noqa: A002
    try:
        return blog_models.Post.objects.get(id=id)
    except blog_models.Post.DoesNotExist:
        return None

@strawberry.type
class Subscription:
    @strawberry.subscription
    async def post_notify(
        self,
        info: Info,
    ) -> typing.AsyncGenerator[blog_types.PostNotification, None]:
        ws = info.context["ws"]
        channel_layer = ws.channel_layer
        await channel_layer.group_add(settings.POSTS_CHANNEL, ws.channel_name)

        async with ws.listen_to_channel(
            "chat.message",
            groups=[settings.POSTS_CHANNEL],
        ) as cm:
            async for message in cm:
                post = await _get_post(message["post_id"])
                if post:
                    yield blog_types.PostNotification(
                        id=post.pk,
                        title=post.title,
                        publish_at=post.published_at,
                    )

Subscription 相關的程式一定是非同步的,但是我們資料庫的查詢是跑在同步的,Django Channels 提供ㄧ個裝飾器讓我們在資料庫查詢的地方轉成非同步的,就像上面_get_post取得文章的查詢使用database_sync_to_async,把它變成非同步的函式。

post_notify是我們主要要實作的 Subscription 功能,一開始透過 GraphQL Context 拿到 Django Channels 的 Channel Layer,接著將當前的請求加入我們設定文章通知頻道,後面就是等待頻道訊息的更新,接收到新的訊息就傳送新文章的通知。

下面幫發布文章的功能加上發送通知訊息:

# server/app/blog/graph/mutations.py
# ... 省略
+from asgiref.sync import async_to_sync
+from channels.layers import get_channel_layer
+from django.conf import settings
# ... 省略

+def notify_new_post(post: blog_models.Post) -> None:
+    channel_layer = get_channel_layer()
+    group_send = async_to_sync(channel_layer.group_send)  # type: ignore
+    group_send(
+        settings.POSTS_CHANNEL,
+        {
+            "type": "chat.message",
+            "post_id": post.pk,
+        },
+    )

# ... 省略
@strawberry.type
class Mutation:
		# ... 省略
    @strawberry_django.input_mutation(
        handle_django_errors=True,
        extensions=[
            HasPerm(["blog.publish_post", "blog.view_post"], any_perm=False),
            IsStaff(),
        ],
    )
    def publish_post(self, id: uuid.UUID) -> blog_types.Post:  # noqa: A002
        post = blog_models.Post.objects.get(pk=id)
        if not post.published:
            post.published = True
            post.published_at = timezone.now()
            post.save()

+       notify_new_post(post)
        return typing.cast(blog_types.Post, post)

主要也是先拿到 Django Channels 的 Channel Layer,對指定頻道群組發送訊息,在發布文章的地方呼叫發送訊息的函式。

再接下來是設定 Subscription 到 Schema 中:

# server/schema.py
# ... 省略
from server.app.authentication.graph import queries as auth_queries
from server.app.blog.graph import mutations as blog_mutations
from server.app.blog.graph import queries as blog_queries
+from server.app.blog.graph import subscriptions as blog_subscriptions

# ... 省略

+subscription = strawberry.tools.merge_types(
+    "Subscription",
+    (blog_subscriptions.Subscription,),
+)

schema = strawberry.Schema(query=query, mutation=mutation)
+ws_schema = strawberry.Schema(query=query, subscription=subscription)

由於 strawberry_django 跟 strawberry 的 Django Channels 的 GraphQL Context 格式目前不相容,會導致 strawberry_django 有些功能錯誤,所以將兩個 Schema 分開,至於ws_schema為什麼還要設定查詢,是因為查詢在 Schema 是必填的。

接著設定 ASGI 的設定:

# server/asgi.py
# ... 省略
import os

from django.core.asgi import get_asgi_application
+from strawberry.channels import GraphQLProtocolTypeRouter

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "server.settings")

-application = get_asgi_application()
+django_asgi_app = get_asgi_application()

+from server.schema import ws_schema  # noqa: E402

+application = GraphQLProtocolTypeRouter(
+    ws_schema,
+    django_application=django_asgi_app,  # type: ignore
+    url_pattern="^wsgraphql",
+)

最後啟動 Django runserver:

$ python manage.py runserver
Watching for file changes with StatReloader
Performing system checks...

System check identified no issues (0 silenced).
Django version 4.2.5, using settings 'server.settings'
Starting ASGI/Daphne version 4.0.0 development server at http://127.0.0.1:8000/
Quit the server with CONTROL-C.
WebSocket HANDSHAKING /wsgraphql/ [127.0.0.1:53135]
WebSocket CONNECT /wsgraphql/ [127.0.0.1:53135]

可以看到 runserver 是透過 Daphne 執行的,然後我們分別開啟 Subscription 的http://127.0.0.1:8000/wsgraphql/http://127.0.0.1:8000/graphql/原本一般的 GraphQL:

https://ithelp.ithome.com.tw/upload/images/20231010/20161957ixqjvSXSJo.png

https://ithelp.ithome.com.tw/upload/images/20231010/201619570xWH8PFyId.png

執行 Subscription 的時候,會持續顯示執行中,等待新的訊息。

這次修改內容可以參考 Git commit:

參考資料


上一篇
Day 24:Strawberry Django 權限
下一篇
Day 26:Strawberry Django 查詢優化
系列文
Django 與 Strawberry GraphQL:探索現代 API 開發之路30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言