今天我們來看看,如果我們要能讓 webSocket 連線和其他的連線互動該怎麼處理
根據官方教學,首先我們要先建立一個自己的 Connection
物件
package com.example
import io.ktor.websocket.*
import java.util.concurrent.atomic.*
class Connection(val session: DefaultWebSocketSession) {
companion object {
val lastId = AtomicInteger(0)
}
val name = "user${lastId.getAndIncrement()}"
}
然後利用這個物件,我們就可以取得連線的資料,讓連線之間可以互動
routing {
val connections = Collections.synchronizedSet<Connection?>(LinkedHashSet())
webSocket("/chat") {
println("Adding user!")
val thisConnection = Connection(this)
connections += thisConnection
try {
send("You are connected! There are ${connections.count()} users here.")
for (frame in incoming) {
frame as? Frame.Text ?: continue
val receivedText = frame.readText()
val textWithUsername = "[${thisConnection.name}]: $receivedText"
connections.forEach {
it.session.send(textWithUsername)
}
}
} catch (e: Exception) {
println(e.localizedMessage)
} finally {
println("Removing $thisConnection!")
connections -= thisConnection
}
}
}
我們來看看這段 Ktor 是如何實現的
首先,我們先建立一個 synchronizedSet
來儲存所有的 Connection
物件
這是一個 thread safe 的集合,所以我們可以安心的用協程存取
每次有人進入 webSocket("/chat")
時,都會觸發
val thisConnection = Connection(this)
connections += thisConnection
這邊的 this
可以比對 webSocket()
的實作
public fun Route.webSocket(
path: String,
protocol: String? = null,
handler: suspend DefaultWebSocketServerSession.() -> Unit
)
會是一個 DefaultWebSocketServerSession
public interface DefaultWebSocketServerSession : DefaultWebSocketSession, WebSocketServerSession
有實作 DefaultWebSocketSession
,所以可以作為 Connection.session
儲存起來。
儲存起來之後,我們就可以用 connections.count()
知道現在的連線數量
send("You are connected! There are ${connections.count()} users here.")
接著如果收到輸入資訊,我們就對每個 Connection
都發送訊息
for (frame in incoming) {
frame as? Frame.Text ?: continue
val receivedText = frame.readText()
val textWithUsername = "[${thisConnection.name}]: $receivedText"
connections.forEach {
it.session.send(textWithUsername)
}
}
這邊的 WebSocketSession.send
實作如下
/**
* Enqueues a text frame for sending with the specified [content].
*
* May suspend if the outgoing queue is full, and throw an exception if the channel is already closed.
*/
public suspend fun WebSocketSession.send(content: String): Unit = send(Frame.Text(content))
/**
* Enqueue a frame, may suspend if an outgoing queue is full. May throw an exception if the
* outgoing channel is already closed, so it is impossible to transfer any message.
* Frames that were sent after close frame could be silently ignored.
* Note that a close frame could be sent automatically in reply to a peer's close frame unless it is
* raw WebSocket session.
*/
public suspend fun send(frame: Frame) {
outgoing.send(frame)
}
這邊的 outgoing
之前看過了,是一個 SendChannel
介面
SendChannel.send
的註解如下
/**
* Sends the specified [element] to this channel, suspending the caller while the buffer of this channel is full
* or if it does not exist, or throws an exception if the channel [is closed for `send`][isClosedForSend] (see [close] for details).
*
* [Closing][close] a channel _after_ this function has suspended does not cause this suspended [send] invocation
* to abort, because closing a channel is conceptually like sending a special "close token" over this channel.
* All elements sent over the channel are delivered in first-in first-out order. The sent element
* will be delivered to receivers before the close token.
*
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
* function is suspended, this function immediately resumes with a [CancellationException].
* There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
* suspended, it will not resume successfully. The `send` call can send the element to the channel,
* but then throw [CancellationException], thus an exception should not be treated as a failure to deliver the element.
* See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
*
* Note that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*
* This function can be used in [select] invocations with the [onSend] clause.
* Use [trySend] to try sending to this channel without waiting.
*/
public suspend fun send(element: E)
如果要看實作內容的話,我們必須要找到 DefaultWebSocketSessionImpl
類別
裡面的 outgoing
定義如下
private val outgoingToBeProcessed = Channel<Frame>(OUTGOING_CHANNEL_CAPACITY)
override val outgoing: SendChannel<Frame> get() = outgoingToBeProcessed
這裡面的 Channel
函數定義如下
/**
* Creates a channel with the specified buffer capacity (or without a buffer by default).
* See [Channel] interface documentation for details.
*
* @param capacity either a positive channel capacity or one of the constants defined in [Channel.Factory].
* @param onBufferOverflow configures an action on buffer overflow (optional, defaults to
* a [suspending][BufferOverflow.SUSPEND] attempt to [send][Channel.send] a value,
* supported only when `capacity >= 0` or `capacity == Channel.BUFFERED`,
* implicitly creates a channel with at least one buffered element).
* @param onUndeliveredElement an optional function that is called when element was sent but was not delivered to the consumer.
* See "Undelivered elements" section in [Channel] documentation.
* @throws IllegalArgumentException when [capacity] < -2
*/
public fun <E> Channel(
capacity: Int = RENDEZVOUS,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =
when (capacity) {
RENDEZVOUS -> {
if (onBufferOverflow == BufferOverflow.SUSPEND)
BufferedChannel(RENDEZVOUS, onUndeliveredElement) // an efficient implementation of rendezvous channel
else
ConflatedBufferedChannel(1, onBufferOverflow, onUndeliveredElement) // support buffer overflow with buffered channel
}
CONFLATED -> {
require(onBufferOverflow == BufferOverflow.SUSPEND) {
"CONFLATED capacity cannot be used with non-default onBufferOverflow"
}
ConflatedBufferedChannel(1, BufferOverflow.DROP_OLDEST, onUndeliveredElement)
}
UNLIMITED -> BufferedChannel(UNLIMITED, onUndeliveredElement) // ignores onBufferOverflow: it has buffer, but it never overflows
BUFFERED -> { // uses default capacity with SUSPEND
if (onBufferOverflow == BufferOverflow.SUSPEND) BufferedChannel(CHANNEL_DEFAULT_CAPACITY, onUndeliveredElement)
else ConflatedBufferedChannel(1, onBufferOverflow, onUndeliveredElement)
}
else -> {
if (onBufferOverflow === BufferOverflow.SUSPEND) BufferedChannel(capacity, onUndeliveredElement)
else ConflatedBufferedChannel(capacity, onBufferOverflow, onUndeliveredElement)
}
}
這邊預設的是 RENDEZVOUS
,實在是看不懂,所以我們看看參數註解
/**
* Requests a rendezvous channel in the `Channel(...)` factory function — a channel that does not have a buffer.
*/
public const val RENDEZVOUS: Int = 0
接著 onBufferOverflow
如預設的,會是 BufferOverflow.SUSPEND
那麼條件是滿足這種建立 BufferedChannel
的方式
BufferedChannel(RENDEZVOUS, onUndeliveredElement) // an efficient implementation of rendezvous channel
BufferedChannel
的註解和簽名如下
/**
* The buffered channel implementation, which also serves as a rendezvous channel when the capacity is zero.
* The high-level structure bases on a conceptually infinite array for storing elements and waiting requests,
* separate counters of [send] and [receive] invocations that were ever performed, and an additional counter
* that indicates the end of the logical buffer by counting the number of array cells it ever contained.
* The key idea is that both [send] and [receive] start by incrementing their counters, assigning the array cell
* referenced by the counter. In case of rendezvous channels, the operation either suspends and stores its continuation
* in the cell or makes a rendezvous with the opposite request. Each cell can be processed by exactly one [send] and
* one [receive]. As for buffered channels, [send]-s can also add elements without suspension if the logical buffer
* contains the cell, while the [receive] operation updates the end of the buffer when its synchronization finishes.
*
* Please see the ["Fast and Scalable Channels in Kotlin Coroutines"](https://arxiv.org/abs/2211.04986)
* paper by Nikita Koval, Roman Elizarov, and Dan Alistarh for the detailed algorithm description.
*/
internal open class BufferedChannel<E>(
/**
* Channel capacity; `Channel.RENDEZVOUS` for rendezvous channel
* and `Channel.UNLIMITED` for unlimited capacity.
*/
private val capacity: Int,
@JvmField
internal val onUndeliveredElement: OnUndeliveredElement<E>? = null
) : Channel<E>
這邊對 send
的實作如下
override suspend fun send(element: E): Unit =
sendImpl( // <-- this is an inline function
element = element,
// Do not create a continuation until it is required;
// it is created later via [onNoWaiterSuspend], if needed.
waiter = null,
// Finish immediately if a rendezvous happens
// or the element has been buffered.
onRendezvousOrBuffered = {},
// As no waiter is provided, suspension is impossible.
onSuspend = { _, _ -> assert { false } },
// According to the `send(e)` contract, we need to call
// `onUndeliveredElement(..)` handler and throw an exception
// if the channel is already closed.
onClosed = { onClosedSend(element) },
// When `send(e)` decides to suspend, the corresponding
// `onNoWaiterSuspend` function that creates a continuation
// is called. The tail-call optimization is applied here.
onNoWaiterSuspend = { segm, i, elem, s -> sendOnNoWaiterSuspend(segm, i, elem, s) }
)
/**
* Abstract send implementation.
*/
protected inline fun <R> sendImpl(
/* The element to be sent. */
element: E,
/* The waiter to be stored in case of suspension,
or `null` if the waiter is not created yet.
In the latter case, when the algorithm decides
to suspend, [onNoWaiterSuspend] is called. */
waiter: Any?,
/* This lambda is invoked when the element has been
buffered or a rendezvous with a receiver happens. */
onRendezvousOrBuffered: () -> R,
/* This lambda is called when the operation suspends in the
cell specified by the segment and the index in it. */
onSuspend: (segm: ChannelSegment<E>, i: Int) -> R,
/* This lambda is called when the channel
is observed in the closed state. */
onClosed: () -> R,
/* This lambda is called when the operation decides
to suspend, but the waiter is not provided (equals `null`).
It should create a waiter and delegate to `sendImplOnNoWaiter`. */
onNoWaiterSuspend: (
segm: ChannelSegment<E>,
i: Int,
element: E,
s: Long
) -> R = { _, _, _, _ -> error("unexpected") }
): R {
// Read the segment reference before the counter increment;
// it is crucial to be able to find the required segment later.
var segment = sendSegment.value
while (true) {
// Atomically increment the `senders` counter and obtain the
// value right before the increment along with the close status.
val sendersAndCloseStatusCur = sendersAndCloseStatus.getAndIncrement()
val s = sendersAndCloseStatusCur.sendersCounter
// Is this channel already closed? Keep the information.
val closed = sendersAndCloseStatusCur.isClosedForSend0
// Count the required segment id and the cell index in it.
val id = s / SEGMENT_SIZE
val i = (s % SEGMENT_SIZE).toInt()
// Try to find the required segment if the initially obtained
// one (in the beginning of this function) has lower id.
if (segment.id != id) {
// Find the required segment.
segment = findSegmentSend(id, segment) ?:
// The required segment has not been found.
// Finish immediately if this channel is closed,
// restarting the operation otherwise.
// In the latter case, the required segment was full
// of interrupted waiters and, therefore, removed
// physically to avoid memory leaks.
if (closed) {
return onClosed()
} else {
continue
}
}
// Update the cell according to the algorithm. Importantly, when
// the channel is already closed, storing a waiter is illegal, so
// the algorithm stores the `INTERRUPTED_SEND` token in this case.
when (updateCellSend(segment, i, element, s, waiter, closed)) {
RESULT_RENDEZVOUS -> {
// A rendezvous with a receiver has happened.
// The previous segments are no longer needed
// for the upcoming requests, so the algorithm
// resets the link to the previous segment.
segment.cleanPrev()
return onRendezvousOrBuffered()
}
RESULT_BUFFERED -> {
// The element has been buffered.
return onRendezvousOrBuffered()
}
RESULT_SUSPEND -> {
// The operation has decided to suspend and installed the
// specified waiter. If the channel was already closed,
// and the `INTERRUPTED_SEND` token has been installed as a waiter,
// this request finishes with the `onClosed()` action.
if (closed) {
segment.onSlotCleaned()
return onClosed()
}
(waiter as? Waiter)?.prepareSenderForSuspension(segment, i)
return onSuspend(segment, i)
}
RESULT_CLOSED -> {
// This channel is closed.
// In case this segment is already or going to be
// processed by a receiver, ensure that all the
// previous segments are unreachable.
if (s < receiversCounter) segment.cleanPrev()
return onClosed()
}
RESULT_FAILED -> {
// Either the cell stores an interrupted receiver,
// or it was poisoned by a concurrent receiver.
// In both cases, all the previous segments are already processed,
segment.cleanPrev()
continue
}
RESULT_SUSPEND_NO_WAITER -> {
// The operation has decided to suspend,
// but no waiter has been provided.
return onNoWaiterSuspend(segment, i, element, s)
}
}
}
}
到這邊,我們差不多看過了取得連線的資料,以及針對每個連線都進行存取的寫法