iT邦幫忙

2023 iThome 鐵人賽

DAY 21
0

前面二十天,我們看過了 Ktor 怎麼建立專案,怎麼作為 server 回傳資訊,我們看過了怎麼回傳純文字,以及怎麼透過一系列函數生成 HTML 畫面。另外,也可以透過 ContentNegotiation,將我們自定義的物件轉換成特定格式。

我們也看過了怎麼將自己當作一個 client 端,傳輸資料到其他 server 內。我們可以透過 GET 取得內容,也可以透過 POST 傳送純文字,也可以用 Form 的方式一次傳輸多個內容,或者傳輸圖片之類的檔案。

除了這些需求以外,還有一個功能,也是 server 服務很常見的,那就是透過 webSocket 的方式,建立一個長期連線來傳輸資料。

根據官方文件,server 內加入 webSocket 的作法如下

fun Application.configureSerialization() {
    install(WebSockets)
}

然後我們就可以建立一個 webSocket 的 session

routing {
    webSocket("/echo") {
        send("Please enter your name")
        for (frame in incoming) {
            frame as? Frame.Text ?: continue
            val receivedText = frame.readText()
            if (receivedText.equals("bye", ignoreCase = true)) {
                close(CloseReason(CloseReason.Codes.NORMAL, "Client said BYE"))
            } else {
                send(Frame.Text("Hi, $receivedText!"))
            }
        }
    }
}

我們來看看 Ktor 是怎麼實作這段的。

首先,我們先來看看 WebSockets 類別的註解跟簽名

/**
 * WebSockets support plugin. It is required to be installed first before binding any websocket endpoints
 *
 * ```
 * install(WebSockets)
 *
 * install(Routing) {
 *     webSocket("/ws") {
 *          incoming.consumeForEach { ... }
 *     }
 * }
 * ```
 *
 * @param pingIntervalMillis duration between pings or `null` to disable pings.
 * @param timeoutMillis write/ping timeout after that a connection will be closed.
 * @param maxFrameSize maximum frame that could be received or sent.
 * @param masking whether masking need to be enabled (useful for security).
 * @param extensionsConfig is configuration for WebSocket extensions.
 */
public class WebSockets private constructor(
    public val pingIntervalMillis: Long,
    public val timeoutMillis: Long,
    public val maxFrameSize: Long,
    public val masking: Boolean,
    public val extensionsConfig: WebSocketExtensionsConfig,
    public val contentConverter: WebsocketContentConverter?
) : CoroutineScope 

可以看到我們能設置的各種參數

接著我們來看看實際使用時的 webSocket 函數實作

/**
 * Bind WebSocket at the current route + [path] optionally checking for the WebSocket [protocol] (ignored if `null`)
 * Requires [WebSockets] plugin to be installed.
 *
 * [DefaultWebSocketSession.incoming] will never contain any control frames and no fragmented frames could be found.
 * Default WebSocket implementation is handling ping/pongs, timeouts, close frames and reassembling fragmented frames.
 *
 * When a websocket session is created, a [handler] lambda will be called with WebSocket session instance on receiver.
 * Once [handler] function returns, the WebSocket termination sequence will be scheduled so you shouldn't use
 * [DefaultWebSocketSession] anymore. However, WebSocket could live for a while until close sequence completed or
 * a timeout exceeds.
 */
public fun Route.webSocket(
    path: String,
    protocol: String? = null,
    handler: suspend DefaultWebSocketServerSession.() -> Unit
) {
    webSocketRaw(path, protocol, negotiateExtensions = true) {
        proceedWebSocket(handler)
    }
}

這邊的 Route.webSocket 實作

/**
 * Bind RAW WebSocket at the current route + [path] optionally checking for the WebSocket [protocol] (ignored if `null`)
 * Requires [WebSockets] plugin to be installed.
 *
 * Unlike regular (default) [webSocket], a raw WebSocket is not handling any ping/pongs, timeouts or close frames.
 * So [WebSocketSession.incoming] channel will contain all low-level control frames and all fragmented frames need
 * to be reassembled.
 *
 * When a WebSocket session is created, a [handler] lambda will be called with WebSocket session instance on receiver.
 * Once [handler] function returns, the WebSocket connection will be terminated immediately. For RAW WebSockets
 * it is important to perform close sequence properly.
 *
 * @param negotiateExtensions indicates if the server should negotiate installed WebSocket extensions.
 */
public fun Route.webSocketRaw(
    path: String,
    protocol: String? = null,
    negotiateExtensions: Boolean = false,
    handler: suspend WebSocketServerSession.() -> Unit
) {
    application.plugin(WebSockets) // early require

    route(path, HttpMethod.Get) {
        webSocketRaw(protocol, negotiateExtensions, handler)
    }
}

可以看到,會透過 Route.route 建立一個 GET 的路徑

再往下的 Route.webSocketRaw 則是

/**
 * Bind RAW WebSocket at the current route optionally checking for the WebSocket [protocol] (ignored if `null`)
 * Requires [WebSockets] plugin to be installed.
 *
 * Unlike regular (default) [webSocket], a raw WebSocket is not handling any ping/pongs, timeouts or close frames.
 * So [WebSocketSession]'s incoming channel will contain all low-level control frames and all fragmented frames need
 * to be reassembled.
 *
 * When a WebSocket session is created, a [handler] lambda will be called with WebSocket session instance on receiver.
 * Once [handler] function returns, the WebSocket connection will be terminated immediately. For RAW WebSocket
 * it is important to perform close sequence properly.
 *
 * @param negotiateExtensions indicates if the server should negotiate installed WebSocket extensions.
 */
public fun Route.webSocketRaw(
    protocol: String? = null,
    negotiateExtensions: Boolean = false,
    handler: suspend WebSocketServerSession.() -> Unit
) {
    application.plugin(WebSockets) // early require

    header(HttpHeaders.Connection, "Upgrade") {
        header(HttpHeaders.Upgrade, "websocket") {
            webSocketProtocol(protocol) {
                handle {
                    call.respondWebSocketRaw(protocol, negotiateExtensions) {
                        toServerSession(call).handler()
                    }
                }
            }
        }
    }
}

可以看到加上的各種 HttpHeaders

我們先打在這邊,改看看 proceedWebSocket 的實作

proceedWebSocket 則是

@OptIn(InternalAPI::class)
private suspend fun WebSocketServerSession.proceedWebSocket(handler: suspend DefaultWebSocketServerSession.() -> Unit) {
    val webSockets = application.plugin(WebSockets)

    val session = DefaultWebSocketSession(
        this,
        webSockets.pingIntervalMillis,
        webSockets.timeoutMillis
    ).apply {
        val extensions = call.attributes[WebSockets.EXTENSIONS_KEY]
        start(extensions)
    }

    session.handleServerSession(call, handler)
    session.joinSession()
}

這邊我們可以看到 DefaultWebSocketSession 的建立以及運作。

今天我們先粗略地看到這邊,明天繼續追入 call.respondWebSocketRaw 的細節實作。


上一篇
Day 20:用 MultiPartFormDataContent 實作 multipart/form-data 請求
下一篇
Day 22:call.respondWebSocketRaw 的實作以及 RawWebSocket
系列文
深入解析 Kotlin 專案 Ktor 的程式碼,探索 Ktor 的強大功能30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言