前面二十天,我們看過了 Ktor 怎麼建立專案,怎麼作為 server 回傳資訊,我們看過了怎麼回傳純文字,以及怎麼透過一系列函數生成 HTML 畫面。另外,也可以透過 ContentNegotiation,將我們自定義的物件轉換成特定格式。
我們也看過了怎麼將自己當作一個 client 端,傳輸資料到其他 server 內。我們可以透過 GET 取得內容,也可以透過 POST 傳送純文字,也可以用 Form 的方式一次傳輸多個內容,或者傳輸圖片之類的檔案。
除了這些需求以外,還有一個功能,也是 server 服務很常見的,那就是透過 webSocket 的方式,建立一個長期連線來傳輸資料。
根據官方文件,server 內加入 webSocket 的作法如下
fun Application.configureSerialization() {
install(WebSockets)
}
然後我們就可以建立一個 webSocket 的 session
routing {
webSocket("/echo") {
send("Please enter your name")
for (frame in incoming) {
frame as? Frame.Text ?: continue
val receivedText = frame.readText()
if (receivedText.equals("bye", ignoreCase = true)) {
close(CloseReason(CloseReason.Codes.NORMAL, "Client said BYE"))
} else {
send(Frame.Text("Hi, $receivedText!"))
}
}
}
}
我們來看看 Ktor 是怎麼實作這段的。
首先,我們先來看看 WebSockets
類別的註解跟簽名
/**
* WebSockets support plugin. It is required to be installed first before binding any websocket endpoints
*
* ```
* install(WebSockets)
*
* install(Routing) {
* webSocket("/ws") {
* incoming.consumeForEach { ... }
* }
* }
* ```
*
* @param pingIntervalMillis duration between pings or `null` to disable pings.
* @param timeoutMillis write/ping timeout after that a connection will be closed.
* @param maxFrameSize maximum frame that could be received or sent.
* @param masking whether masking need to be enabled (useful for security).
* @param extensionsConfig is configuration for WebSocket extensions.
*/
public class WebSockets private constructor(
public val pingIntervalMillis: Long,
public val timeoutMillis: Long,
public val maxFrameSize: Long,
public val masking: Boolean,
public val extensionsConfig: WebSocketExtensionsConfig,
public val contentConverter: WebsocketContentConverter?
) : CoroutineScope
可以看到我們能設置的各種參數
接著我們來看看實際使用時的 webSocket
函數實作
/**
* Bind WebSocket at the current route + [path] optionally checking for the WebSocket [protocol] (ignored if `null`)
* Requires [WebSockets] plugin to be installed.
*
* [DefaultWebSocketSession.incoming] will never contain any control frames and no fragmented frames could be found.
* Default WebSocket implementation is handling ping/pongs, timeouts, close frames and reassembling fragmented frames.
*
* When a websocket session is created, a [handler] lambda will be called with WebSocket session instance on receiver.
* Once [handler] function returns, the WebSocket termination sequence will be scheduled so you shouldn't use
* [DefaultWebSocketSession] anymore. However, WebSocket could live for a while until close sequence completed or
* a timeout exceeds.
*/
public fun Route.webSocket(
path: String,
protocol: String? = null,
handler: suspend DefaultWebSocketServerSession.() -> Unit
) {
webSocketRaw(path, protocol, negotiateExtensions = true) {
proceedWebSocket(handler)
}
}
這邊的 Route.webSocket
實作
/**
* Bind RAW WebSocket at the current route + [path] optionally checking for the WebSocket [protocol] (ignored if `null`)
* Requires [WebSockets] plugin to be installed.
*
* Unlike regular (default) [webSocket], a raw WebSocket is not handling any ping/pongs, timeouts or close frames.
* So [WebSocketSession.incoming] channel will contain all low-level control frames and all fragmented frames need
* to be reassembled.
*
* When a WebSocket session is created, a [handler] lambda will be called with WebSocket session instance on receiver.
* Once [handler] function returns, the WebSocket connection will be terminated immediately. For RAW WebSockets
* it is important to perform close sequence properly.
*
* @param negotiateExtensions indicates if the server should negotiate installed WebSocket extensions.
*/
public fun Route.webSocketRaw(
path: String,
protocol: String? = null,
negotiateExtensions: Boolean = false,
handler: suspend WebSocketServerSession.() -> Unit
) {
application.plugin(WebSockets) // early require
route(path, HttpMethod.Get) {
webSocketRaw(protocol, negotiateExtensions, handler)
}
}
可以看到,會透過 Route.route
建立一個 GET 的路徑
再往下的 Route.webSocketRaw
則是
/**
* Bind RAW WebSocket at the current route optionally checking for the WebSocket [protocol] (ignored if `null`)
* Requires [WebSockets] plugin to be installed.
*
* Unlike regular (default) [webSocket], a raw WebSocket is not handling any ping/pongs, timeouts or close frames.
* So [WebSocketSession]'s incoming channel will contain all low-level control frames and all fragmented frames need
* to be reassembled.
*
* When a WebSocket session is created, a [handler] lambda will be called with WebSocket session instance on receiver.
* Once [handler] function returns, the WebSocket connection will be terminated immediately. For RAW WebSocket
* it is important to perform close sequence properly.
*
* @param negotiateExtensions indicates if the server should negotiate installed WebSocket extensions.
*/
public fun Route.webSocketRaw(
protocol: String? = null,
negotiateExtensions: Boolean = false,
handler: suspend WebSocketServerSession.() -> Unit
) {
application.plugin(WebSockets) // early require
header(HttpHeaders.Connection, "Upgrade") {
header(HttpHeaders.Upgrade, "websocket") {
webSocketProtocol(protocol) {
handle {
call.respondWebSocketRaw(protocol, negotiateExtensions) {
toServerSession(call).handler()
}
}
}
}
}
}
可以看到加上的各種 HttpHeaders
我們先打在這邊,改看看 proceedWebSocket
的實作
proceedWebSocket
則是
@OptIn(InternalAPI::class)
private suspend fun WebSocketServerSession.proceedWebSocket(handler: suspend DefaultWebSocketServerSession.() -> Unit) {
val webSockets = application.plugin(WebSockets)
val session = DefaultWebSocketSession(
this,
webSockets.pingIntervalMillis,
webSockets.timeoutMillis
).apply {
val extensions = call.attributes[WebSockets.EXTENSIONS_KEY]
start(extensions)
}
session.handleServerSession(call, handler)
session.joinSession()
}
這邊我們可以看到 DefaultWebSocketSession
的建立以及運作。
今天我們先粗略地看到這邊,明天繼續追入 call.respondWebSocketRaw
的細節實作。