看完了如何建立一個接收 webSocket 請求的 server 之後
今天,我們來看看如果將 Ktor 當作 Client 端
要透過 webSocket 持續發送資料時,該怎麼撰寫
以及框架是怎麼實作的
首先我們看看官網指導的寫法
前面我們要在 build.gradle.kts
內加上
tasks.named<JavaExec>("run") {
standardInput = System.`in`
}
接著在程式內的寫法下如下
val client = HttpClient {
install(WebSockets)
}
runBlocking {
client.webSocket(method = HttpMethod.Get, host = "127.0.0.1", port = 8080, path = "/chat") {
while (true) {
val othersMessage = incoming.receive() as? Frame.Text ?: continue
println(othersMessage.readText())
val myMessage = readlnOrNull()
if (myMessage != null) {
send(myMessage)
}
}
}
}
client.close()
println("Connection closed. Goodbye!")
由於 Ktor 的程式許多時候會定義成 suspend
來允許協程切換
所以這邊我們將整段運作的邏輯放在 runBlocking
內,變成同步執行
runBlocking
內 client.webSocket
在這邊的實作如下
/**
* Opens a [block] with [DefaultClientWebSocketSession].
*/
public suspend fun HttpClient.webSocket(
method: HttpMethod = HttpMethod.Get,
host: String? = null,
port: Int? = null,
path: String? = null,
request: HttpRequestBuilder.() -> Unit = {},
block: suspend DefaultClientWebSocketSession.() -> Unit
) {
webSocket(
{
this.method = method
url("ws", host, port, path)
request()
},
block
)
}
HttpClient.webSocket(request, block)
則是
/**
* Opens a [block] with [DefaultClientWebSocketSession].
*/
public suspend fun HttpClient.webSocket(
request: HttpRequestBuilder.() -> Unit,
block: suspend DefaultClientWebSocketSession.() -> Unit
) {
plugin(WebSockets)
val session = prepareRequest {
url {
protocol = URLProtocol.WS
}
request()
}
session.body<DefaultClientWebSocketSession, Unit> {
try {
block(it)
} finally {
it.close()
}
}
}
URLProtocol.WS
相關的定義如下
/**
* Represents URL protocol
* @property name of protocol (schema)
* @property defaultPort default port for protocol or `-1` if not known
*/
public data class URLProtocol(val name: String, val defaultPort: Int) {
init {
require(name.all { it.isLowerCase() }) { "All characters should be lower case" }
}
@Suppress("PublicApiImplicitType")
public companion object {
/**
* HTTP with port 80
*/
public val HTTP: URLProtocol = URLProtocol("http", 80)
/**
* secure HTTPS with port 443
*/
public val HTTPS: URLProtocol = URLProtocol("https", 443)
/**
* Web socket over HTTP on port 80
*/
public val WS: URLProtocol = URLProtocol("ws", 80)
/**
* Web socket over secure HTTPS on port 443
*/
public val WSS: URLProtocol = URLProtocol("wss", 443)
/**
* Socks proxy url protocol.
*/
public val SOCKS: URLProtocol = URLProtocol("socks", 1080)
/**
* Protocols by names map
*/
public val byName: Map<String, URLProtocol> = listOf(HTTP, HTTPS, WS, WSS, SOCKS).associateBy { it.name }
/**
* Create an instance by [name] or use already existing instance
*/
public fun createOrDefault(name: String): URLProtocol = name.toLowerCasePreservingASCIIRules().let {
byName[it] ?: URLProtocol(it, DEFAULT_PORT)
}
}
}
可以看到除了 WS
以外,還定義了許多的 Protocol
確定好 URLProtocol
之後,後續的動作就跟其他的 HttpRequest
差不多
下一段是一個無窮迴圈
while (true) {
val othersMessage = incoming.receive() as? Frame.Text ?: continue
}
這邊的 incoming
是一個 ReceiveChannel
/**
* An incoming frames channel.
* Note that if you use `webSocket` to handle a WebSockets session,
* the incoming channel doesn't contain control frames such as the ping/pong or close frames.
* If you need control over control frames, use the `webSocketRaw` function.
*/
public val incoming: ReceiveChannel<Frame>
ReceiveChannel.receive
定義如下
/**
* Retrieves and removes an element from this channel if it's not empty, or suspends the caller while the channel is empty,
* or throws a [ClosedReceiveChannelException] if the channel [is closed for `receive`][isClosedForReceive].
* If the channel was closed because of an exception, it is called a _failed_ channel and this function
* will throw the original [close][SendChannel.close] cause exception.
*
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
* function is suspended, this function immediately resumes with a [CancellationException].
* There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
* suspended, it will not resume successfully. The `receive` call can retrieve the element from the channel,
* but then throw [CancellationException], thus failing to deliver the element.
* See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
*
* Note that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*
* This function can be used in [select] invocations with the [onReceive] clause.
* Use [tryReceive] to try receiving from this channel without waiting.
*/
public suspend fun receive(): E
收到內容之後,會試圖將內容轉換成 Frame.Text
如果失敗的話就 continue
收到 Frame.Text
之後,我們就可以針對 othersMessage
進行操作
println(othersMessage.readText())
Frame.Text.readText()
之前看過了
/**
* Reads text content from the text frame.
* Shouldn't be used for fragmented frames: such frames need to be reassembled first.
*/
public fun Frame.Text.readText(): String {
require(fin) { "Text could be only extracted from non-fragmented frame" }
return Charsets.UTF_8.newDecoder().decode(buildPacket { writeFully(data) })
}
接著是一個有趣的段落
val myMessage = readlnOrNull()
if (myMessage != null) {
send(myMessage)
}
這邊是從 System.in
讀取內容的,不過 Ktor 怎麼知道我們會從這邊送入資料呢?
這就回到前面
tasks.named<JavaExec>("run") {
standardInput = System.`in`
}
我們在 build.gradle.kts
定義的 task
讓我們可以在 gradle run
的時候,持續接收 System.in
的內容
每次收到內容時,我們就執行 send(myMessage)
/**
* Enqueues a text frame for sending with the specified [content].
*
* May suspend if the outgoing queue is full, and throw an exception if the channel is already closed.
*/
public suspend fun WebSocketSession.send(content: String): Unit = send(Frame.Text(content))
/**
* Enqueue a frame, may suspend if an outgoing queue is full. May throw an exception if the
* outgoing channel is already closed, so it is impossible to transfer any message.
* Frames that were sent after close frame could be silently ignored.
* Note that a close frame could be sent automatically in reply to a peer's close frame unless it is
* raw WebSocket session.
*/
public suspend fun send(frame: Frame) {
outgoing.send(frame)
}
到今天,我們就看過了將 Ktor 當作 Client 透過 webSocket 發送資料的邏輯