今天我們來看 call.respondWebSocketRaw
的實作細節
call.respondWebSocketRaw
的實作程式如下
// these two functions could be potentially useful for users however it is not clear how to provide them better
// so for now they are still private
private suspend fun ApplicationCall.respondWebSocketRaw(
protocol: String? = null,
negotiateExtensions: Boolean = false,
handler: suspend WebSocketSession.() -> Unit
) {
respond(WebSocketUpgrade(this, protocol, negotiateExtensions, handler))
}
ApplicationCall.respond
是我們前面已經看過幾次的內容
這邊比較特別的是,裡面使用了 WebSocketUpgrade
物件來定義回傳內容
/**
* An [OutgoingContent] response object that could be used to `respond()`: it will cause application engine to
* perform HTTP upgrade and start websocket RAW session.
*
* Please note that you generally shouldn't use this object directly but use [WebSockets] plugin with routing builders
* [webSocket] instead.
*
* [handle] function is applied to a session and as far as it is a RAW session, you should handle all low-level
* frames yourself and deal with ping/pongs, timeouts, close frames, frame fragmentation and so on.
*
* @param call that is starting web socket session
* @param protocol web socket negotiated protocol name (optional)
* @param installExtensions specifies if WebSocket extensions should be installed in current session.
* @param handle function that is started once HTTP upgrade complete and the session will end once this function exit
*/
public class WebSocketUpgrade(
public val call: ApplicationCall,
@Suppress("MemberVisibilityCanBePrivate") public val protocol: String? = null,
private val installExtensions: Boolean = false,
public val handle: suspend WebSocketSession.() -> Unit
) : OutgoingContent.ProtocolUpgrade()
WebSocketUpgrade
繼承我們已經看過幾次的 OutgoingContent
和 ProtocolUpgrade
ProtocolUpgrade
的內容則是
/**
* Variant of a [OutgoingContent] for upgrading an HTTP connection
*/
public abstract class ProtocolUpgrade : OutgoingContent() {
final override val status: HttpStatusCode
get() = HttpStatusCode.SwitchingProtocols
/**
* Upgrades an HTTP connection
* @param input is a [ByteReadChannel] for an upgraded connection
* @param output is a [ByteWriteChannel] for an upgraded connection
* @param engineContext is a [CoroutineContext] to execute non-blocking code, such as parsing or processing
* @param userContext is a [CoroutineContext] to execute user-provided callbacks or code potentially blocking
*/
public abstract suspend fun upgrade(
input: ByteReadChannel,
output: ByteWriteChannel,
engineContext: CoroutineContext,
userContext: CoroutineContext
): Job
}
這邊定義了 upgrade
這個函數
針對這個函數,WebSocketUpgrade
版本的實作是
override suspend fun upgrade(
input: ByteReadChannel,
output: ByteWriteChannel,
engineContext: CoroutineContext,
userContext: CoroutineContext
): Job {
val webSocket = RawWebSocket(
input,
output,
plugin.maxFrameSize,
plugin.masking,
coroutineContext = engineContext + (coroutineContext[Job] ?: EmptyCoroutineContext)
)
webSocket.launch(WebSocketHandlerCoroutineName) {
try {
webSocket.handle()
webSocket.flush()
} catch (cause: Throwable) {
} finally {
webSocket.cancel()
}
}
return webSocket.coroutineContext[Job]!!
}
這邊會使用 RawWebSocket()
來協助建立 web socket 連線
/**
* Creates a RAW web socket session from connection
*
* @param input is a [ByteReadChannel] of connection
* @param output is a [ByteWriteChannel] of connection
* @param maxFrameSize is an initial [maxFrameSize] value for [WebSocketSession]
* @param masking is an initial [masking] value for [WebSocketSession]
* @param coroutineContext is a [CoroutineContext] to execute reading/writing from/to connection
*/
@Suppress("FunctionName")
public actual fun RawWebSocket(
input: ByteReadChannel,
output: ByteWriteChannel,
maxFrameSize: Long,
masking: Boolean,
coroutineContext: CoroutineContext
): WebSocketSession = RawWebSocketJvm(input, output, maxFrameSize, masking, coroutineContext)
RawWebSocketJvm
裡面定義了許多連線時會用到的資料和元件
private val socketJob: CompletableJob = Job(coroutineContext[Job])
private val filtered = Channel<Frame>(Channel.RENDEZVOUS)
override val coroutineContext: CoroutineContext = coroutineContext + socketJob + CoroutineName("raw-ws")
override val incoming: ReceiveChannel<Frame> get() = filtered
override val outgoing: SendChannel<Frame> get() = writer.outgoing
override val extensions: List<WebSocketExtension<*>>
get() = emptyList()
override var maxFrameSize: Long by Delegates.observable(maxFrameSize) { _, _, newValue ->
reader.maxFrameSize = newValue
}
override var masking: Boolean by Delegates.observable(masking) { _, _, newValue ->
writer.masking = newValue
}
internal val writer: WebSocketWriter = WebSocketWriter(output, this.coroutineContext, masking, pool)
internal val reader: WebSocketReader = WebSocketReader(input, this.coroutineContext, maxFrameSize, pool)
這邊比較吸引注意的是 writer
和 reader
,看起來很可能跟更加底層的資料撰寫有關,
不過我們先打住,等之後需要看相關邏輯時再繼續往下追查
除了一些傳輸內容需要的資料和元件以外,這邊也定義了初始化時要做的事情
init {
launch {
try {
for (frame in reader.incoming) {
filtered.send(frame)
}
} catch (cause: FrameTooBigException) {
outgoing.send(Frame.Close(CloseReason(CloseReason.Codes.TOO_BIG, cause.message)))
filtered.close(cause)
} catch (cause: ProtocolViolationException) {
outgoing.send(Frame.Close(CloseReason(CloseReason.Codes.PROTOCOL_ERROR, cause.message)))
filtered.close(cause)
} catch (cause: CancellationException) {
reader.incoming.cancel(cause)
} catch (cause: Throwable) {
filtered.close(cause)
} finally {
filtered.close()
}
}
socketJob.complete()
}
這邊我們可以看到,利用 Kotlin 協程的 launch
我們可以讓這段程式切換成非同步的架構,不需要一次跑完所有的內容才往下繼續運作。
今天針對 call.respondWebSocketRaw
的實作細節我們就先看到這邊,
明天我們來看看 webSocket("/echo")
裡面所定義的行為
send("Please enter your name")
for (frame in incoming) {
frame as? Frame.Text ?: continue
val receivedText = frame.readText()
if (receivedText.equals("bye", ignoreCase = true)) {
close(CloseReason(CloseReason.Codes.NORMAL, "Client said BYE"))
} else {
send(Frame.Text("Hi, $receivedText!"))
}
}
每段分別是什麼意思,背後 Ktor 的實作又是什麼