Django Channels 是 Django 的一個擴充功能,主要用來處理即時通訊 (real-time communication) 的需要,像是廣播(broadcasting)和非同步更新等等。
Django 本身是一個很強大的框架, 它的主要目標是使複雜的網頁開發變得簡單且快速,但其設計初期主要處理的是 HTTP 類型的請求,這對於處理即時通訊的 WebSocket 請求來說,可能並不是最適合的選擇。而 Django Channels 的出現,就是為了填補這一部分。
那 Django Channels 跟 GraphQL 又有什麼關係呢?
那就是要提到 GraphQL Subscription,它的功能有點類似查詢,它是查詢(Query)、變更(Mutation)之外的第三個操作(Operations),主要用於即時資料更新,這就是為什麼會需要在 GraphQL 中用到 Django Channels,因為 Django Channels 可以處理 WebSocket 請求,並且 Strawberry 有提供 Django Channels 的整合。
但其實 GraphQL Subscription 不一定要用 WebSocket 來實作,當然也能用 Server-Sent Events(SSE)之類的實作,但是大部分客戶端 GraphQL 套件都是支援 WebSocket。
那下面我們就先安裝與設定 Django Channels,再開始實作吧。
首先是安裝 Django Channels:
$ poetry add "strawberry-graphql[channels]"
$ poetry add daphne
這邊我們透過 Strawberry 依賴安裝 Django Channels,以及安裝 Django 官方的 ASGI 非同步處理 Web Server。
修改server/settings.py
Django設定:
# ... 省略
INSTALLED_APPS = [
+ "daphne",
"django.contrib.admin",
"django.contrib.auth",
"django.contrib.contenttypes",
"django.contrib.sessions",
"django.contrib.messages",
"django.contrib.staticfiles",
"django_extensions",
"strawberry_django",
"server.app.blog",
"server.app.authentication",
]
# ... 省略
WSGI_APPLICATION = "server.wsgi.application"
+ASGI_APPLICATION = "server.asgi.application"
# ... 省略
+CHANNEL_LAYERS = {
+ "default": {
+ "BACKEND": "channels.layers.InMemoryChannelLayer",
+ },
+}
+POSTS_CHANNEL = "posts_notifications"
這邊把daphne
加進 Django 應用程式中,daphne
可以處理非同步程式,原本 Django runserver
只能處理同步程式,現在這樣runserver
也能透過daphne
處理非同步程式,那現在我們要執行非同步程式,所以要設定 ASGI 應用程式執行進入點,最後設定 Channel Layers,它是用來跨服務即時交換資訊的服務,開發階段會使用InMemoryChannelLayer
,到正式環境的時候要改成用RedisChannelLayer
。
最後的POSTS_CHANNEL
是我自己新增的常數,用來設定文章通知的頻道名稱。
Django Channels 的前置設定就先到這邊,我們先建立 GraphQL Subscription 的程式,再繼續完成 ASGI 的設定。
這裡我們實作一個新發佈文章的通知功能,一開始先加一個新的型態:
# server/app/blog/graph/types.py
# ... 省略
+@strawberry.type
+class PostNotification(relay.Node):
+ id: relay.NodeID[uuid.UUID] # noqa: A003
+ title: str
+ publish_at: datetime.datetime
假設我們通知訊息會顯示標題、發布時間以及連結的 ID。
接著建立放 Subscription 相關程式的 Python 檔:
$ touch server/app/blog/graph/subscriptions.py
編輯subscriptions.py
:
import typing
import strawberry
from channels.db import database_sync_to_async
from django.conf import settings
from strawberry.types import Info
from server.app.blog import models as blog_models
from server.app.blog.graph import types as blog_types
@database_sync_to_async
def _get_post(id: str) -> blog_models.Post | None: # noqa: A002
try:
return blog_models.Post.objects.get(id=id)
except blog_models.Post.DoesNotExist:
return None
@strawberry.type
class Subscription:
@strawberry.subscription
async def post_notify(
self,
info: Info,
) -> typing.AsyncGenerator[blog_types.PostNotification, None]:
ws = info.context["ws"]
channel_layer = ws.channel_layer
await channel_layer.group_add(settings.POSTS_CHANNEL, ws.channel_name)
async with ws.listen_to_channel(
"chat.message",
groups=[settings.POSTS_CHANNEL],
) as cm:
async for message in cm:
post = await _get_post(message["post_id"])
if post:
yield blog_types.PostNotification(
id=post.pk,
title=post.title,
publish_at=post.published_at,
)
Subscription 相關的程式一定是非同步的,但是我們資料庫的查詢是跑在同步的,Django Channels 提供ㄧ個裝飾器讓我們在資料庫查詢的地方轉成非同步的,就像上面_get_post
取得文章的查詢使用database_sync_to_async
,把它變成非同步的函式。
post_notify
是我們主要要實作的 Subscription 功能,一開始透過 GraphQL Context 拿到 Django Channels 的 Channel Layer,接著將當前的請求加入我們設定文章通知頻道,後面就是等待頻道訊息的更新,接收到新的訊息就傳送新文章的通知。
下面幫發布文章的功能加上發送通知訊息:
# server/app/blog/graph/mutations.py
# ... 省略
+from asgiref.sync import async_to_sync
+from channels.layers import get_channel_layer
+from django.conf import settings
# ... 省略
+def notify_new_post(post: blog_models.Post) -> None:
+ channel_layer = get_channel_layer()
+ group_send = async_to_sync(channel_layer.group_send) # type: ignore
+ group_send(
+ settings.POSTS_CHANNEL,
+ {
+ "type": "chat.message",
+ "post_id": post.pk,
+ },
+ )
# ... 省略
@strawberry.type
class Mutation:
# ... 省略
@strawberry_django.input_mutation(
handle_django_errors=True,
extensions=[
HasPerm(["blog.publish_post", "blog.view_post"], any_perm=False),
IsStaff(),
],
)
def publish_post(self, id: uuid.UUID) -> blog_types.Post: # noqa: A002
post = blog_models.Post.objects.get(pk=id)
if not post.published:
post.published = True
post.published_at = timezone.now()
post.save()
+ notify_new_post(post)
return typing.cast(blog_types.Post, post)
主要也是先拿到 Django Channels 的 Channel Layer,對指定頻道群組發送訊息,在發布文章的地方呼叫發送訊息的函式。
再接下來是設定 Subscription 到 Schema 中:
# server/schema.py
# ... 省略
from server.app.authentication.graph import queries as auth_queries
from server.app.blog.graph import mutations as blog_mutations
from server.app.blog.graph import queries as blog_queries
+from server.app.blog.graph import subscriptions as blog_subscriptions
# ... 省略
+subscription = strawberry.tools.merge_types(
+ "Subscription",
+ (blog_subscriptions.Subscription,),
+)
schema = strawberry.Schema(query=query, mutation=mutation)
+ws_schema = strawberry.Schema(query=query, subscription=subscription)
由於 strawberry_django 跟 strawberry 的 Django Channels 的 GraphQL Context 格式目前不相容,會導致 strawberry_django 有些功能錯誤,所以將兩個 Schema 分開,至於ws_schema
為什麼還要設定查詢,是因為查詢在 Schema 是必填的。
接著設定 ASGI 的設定:
# server/asgi.py
# ... 省略
import os
from django.core.asgi import get_asgi_application
+from strawberry.channels import GraphQLProtocolTypeRouter
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "server.settings")
-application = get_asgi_application()
+django_asgi_app = get_asgi_application()
+from server.schema import ws_schema # noqa: E402
+application = GraphQLProtocolTypeRouter(
+ ws_schema,
+ django_application=django_asgi_app, # type: ignore
+ url_pattern="^wsgraphql",
+)
最後啟動 Django runserver:
$ python manage.py runserver
Watching for file changes with StatReloader
Performing system checks...
System check identified no issues (0 silenced).
Django version 4.2.5, using settings 'server.settings'
Starting ASGI/Daphne version 4.0.0 development server at http://127.0.0.1:8000/
Quit the server with CONTROL-C.
WebSocket HANDSHAKING /wsgraphql/ [127.0.0.1:53135]
WebSocket CONNECT /wsgraphql/ [127.0.0.1:53135]
可以看到 runserver 是透過 Daphne 執行的,然後我們分別開啟 Subscription 的http://127.0.0.1:8000/wsgraphql/
與http://127.0.0.1:8000/graphql/
原本一般的 GraphQL:
執行 Subscription 的時候,會持續顯示執行中,等待新的訊息。
這次修改內容可以參考 Git commit: