iT邦幫忙

2023 iThome 鐵人賽

DAY 22
0

今天我們來看 call.respondWebSocketRaw 的實作細節

call.respondWebSocketRaw 的實作程式如下

// these two functions could be potentially useful for users however it is not clear how to provide them better
// so for now they are still private

private suspend fun ApplicationCall.respondWebSocketRaw(
    protocol: String? = null,
    negotiateExtensions: Boolean = false,
    handler: suspend WebSocketSession.() -> Unit
) {
    respond(WebSocketUpgrade(this, protocol, negotiateExtensions, handler))
}

ApplicationCall.respond 是我們前面已經看過幾次的內容

這邊比較特別的是,裡面使用了 WebSocketUpgrade 物件來定義回傳內容

/**
 * An [OutgoingContent] response object that could be used to `respond()`: it will cause application engine to
 * perform HTTP upgrade and start websocket RAW session.
 *
 * Please note that you generally shouldn't use this object directly but use [WebSockets] plugin with routing builders
 * [webSocket] instead.
 *
 * [handle] function is applied to a session and as far as it is a RAW session, you should handle all low-level
 * frames yourself and deal with ping/pongs, timeouts, close frames, frame fragmentation and so on.
 *
 * @param call that is starting web socket session
 * @param protocol web socket negotiated protocol name (optional)
 * @param installExtensions specifies if WebSocket extensions should be installed in current session.
 * @param handle function that is started once HTTP upgrade complete and the session will end once this function exit
 */
public class WebSocketUpgrade(
    public val call: ApplicationCall,
    @Suppress("MemberVisibilityCanBePrivate") public val protocol: String? = null,
    private val installExtensions: Boolean = false,
    public val handle: suspend WebSocketSession.() -> Unit
) : OutgoingContent.ProtocolUpgrade() 

WebSocketUpgrade 繼承我們已經看過幾次的 OutgoingContentProtocolUpgrade

ProtocolUpgrade 的內容則是

/**
 * Variant of a [OutgoingContent] for upgrading an HTTP connection
 */
public abstract class ProtocolUpgrade : OutgoingContent() {
	final override val status: HttpStatusCode
		get() = HttpStatusCode.SwitchingProtocols

	/**
	 * Upgrades an HTTP connection
	 * @param input is a [ByteReadChannel] for an upgraded connection
	 * @param output is a [ByteWriteChannel] for an upgraded connection
	 * @param engineContext is a [CoroutineContext] to execute non-blocking code, such as parsing or processing
	 * @param userContext is a [CoroutineContext] to execute user-provided callbacks or code potentially blocking
	 */
	public abstract suspend fun upgrade(
		input: ByteReadChannel,
		output: ByteWriteChannel,
		engineContext: CoroutineContext,
		userContext: CoroutineContext
	): Job
}

這邊定義了 upgrade 這個函數

針對這個函數,WebSocketUpgrade 版本的實作是

override suspend fun upgrade(
	input: ByteReadChannel,
	output: ByteWriteChannel,
	engineContext: CoroutineContext,
	userContext: CoroutineContext
): Job {
	val webSocket = RawWebSocket(
		input,
		output,
		plugin.maxFrameSize,
		plugin.masking,
		coroutineContext = engineContext + (coroutineContext[Job] ?: EmptyCoroutineContext)
	)

	webSocket.launch(WebSocketHandlerCoroutineName) {
		try {
			webSocket.handle()
			webSocket.flush()
		} catch (cause: Throwable) {
		} finally {
			webSocket.cancel()
		}
	}

	return webSocket.coroutineContext[Job]!!
}

這邊會使用 RawWebSocket() 來協助建立 web socket 連線

/**
 * Creates a RAW web socket session from connection
 *
 * @param input is a [ByteReadChannel] of connection
 * @param output is a [ByteWriteChannel] of connection
 * @param maxFrameSize is an initial [maxFrameSize] value for [WebSocketSession]
 * @param masking is an initial [masking] value for [WebSocketSession]
 * @param coroutineContext is a [CoroutineContext] to execute reading/writing from/to connection
 */
@Suppress("FunctionName")
public actual fun RawWebSocket(
    input: ByteReadChannel,
    output: ByteWriteChannel,
    maxFrameSize: Long,
    masking: Boolean,
    coroutineContext: CoroutineContext
): WebSocketSession = RawWebSocketJvm(input, output, maxFrameSize, masking, coroutineContext)

RawWebSocketJvm 裡面定義了許多連線時會用到的資料和元件

private val socketJob: CompletableJob = Job(coroutineContext[Job])
private val filtered = Channel<Frame>(Channel.RENDEZVOUS)

override val coroutineContext: CoroutineContext = coroutineContext + socketJob + CoroutineName("raw-ws")
override val incoming: ReceiveChannel<Frame> get() = filtered
override val outgoing: SendChannel<Frame> get() = writer.outgoing

override val extensions: List<WebSocketExtension<*>>
	get() = emptyList()

override var maxFrameSize: Long by Delegates.observable(maxFrameSize) { _, _, newValue ->
	reader.maxFrameSize = newValue
}

override var masking: Boolean by Delegates.observable(masking) { _, _, newValue ->
	writer.masking = newValue
}

internal val writer: WebSocketWriter = WebSocketWriter(output, this.coroutineContext, masking, pool)
internal val reader: WebSocketReader = WebSocketReader(input, this.coroutineContext, maxFrameSize, pool)

這邊比較吸引注意的是 writerreader,看起來很可能跟更加底層的資料撰寫有關,

不過我們先打住,等之後需要看相關邏輯時再繼續往下追查

除了一些傳輸內容需要的資料和元件以外,這邊也定義了初始化時要做的事情

init {
	launch {
		try {
			for (frame in reader.incoming) {
				filtered.send(frame)
			}
		} catch (cause: FrameTooBigException) {
			outgoing.send(Frame.Close(CloseReason(CloseReason.Codes.TOO_BIG, cause.message)))
			filtered.close(cause)
		} catch (cause: ProtocolViolationException) {
			outgoing.send(Frame.Close(CloseReason(CloseReason.Codes.PROTOCOL_ERROR, cause.message)))
			filtered.close(cause)
		} catch (cause: CancellationException) {
			reader.incoming.cancel(cause)
		} catch (cause: Throwable) {
			filtered.close(cause)
		} finally {
			filtered.close()
		}
	}

	socketJob.complete()
}

這邊我們可以看到,利用 Kotlin 協程的 launch 我們可以讓這段程式切換成非同步的架構,不需要一次跑完所有的內容才往下繼續運作。

今天針對 call.respondWebSocketRaw 的實作細節我們就先看到這邊,

明天我們來看看 webSocket("/echo") 裡面所定義的行為

send("Please enter your name")
for (frame in incoming) {
	frame as? Frame.Text ?: continue
	val receivedText = frame.readText()
	if (receivedText.equals("bye", ignoreCase = true)) {
		close(CloseReason(CloseReason.Codes.NORMAL, "Client said BYE"))
	} else {
		send(Frame.Text("Hi, $receivedText!"))
	}
}

每段分別是什麼意思,背後 Ktor 的實作又是什麼


上一篇
Day 21:Ktor 怎麼安裝 WebSockets 與建立一個 webSocket route
下一篇
Day 23:webSocket 的 Frame 以及對應的各個函數
系列文
深入解析 Kotlin 專案 Ktor 的程式碼,探索 Ktor 的強大功能30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言