iT邦幫忙

2023 iThome 鐵人賽

DAY 25
0

今天我們來看看,如果我們要能讓 webSocket 連線和其他的連線互動該怎麼處理

根據官方教學,首先我們要先建立一個自己的 Connection 物件

package com.example

import io.ktor.websocket.*
import java.util.concurrent.atomic.*

class Connection(val session: DefaultWebSocketSession) {
    companion object {
        val lastId = AtomicInteger(0)
    }
    val name = "user${lastId.getAndIncrement()}"
}

然後利用這個物件,我們就可以取得連線的資料,讓連線之間可以互動

routing {
	val connections = Collections.synchronizedSet<Connection?>(LinkedHashSet())
	webSocket("/chat") {
		println("Adding user!")
		val thisConnection = Connection(this)
		connections += thisConnection
		try {
			send("You are connected! There are ${connections.count()} users here.")
			for (frame in incoming) {
				frame as? Frame.Text ?: continue
				val receivedText = frame.readText()
				val textWithUsername = "[${thisConnection.name}]: $receivedText"
				connections.forEach {
					it.session.send(textWithUsername)
				}
			}
		} catch (e: Exception) {
			println(e.localizedMessage)
		} finally {
			println("Removing $thisConnection!")
			connections -= thisConnection
		}
	}
}

我們來看看這段 Ktor 是如何實現的

首先,我們先建立一個 synchronizedSet 來儲存所有的 Connection 物件

這是一個 thread safe 的集合,所以我們可以安心的用協程存取

每次有人進入 webSocket("/chat") 時,都會觸發

val thisConnection = Connection(this)
connections += thisConnection

這邊的 this 可以比對 webSocket() 的實作

public fun Route.webSocket(
    path: String,
    protocol: String? = null,
    handler: suspend DefaultWebSocketServerSession.() -> Unit
) 

會是一個 DefaultWebSocketServerSession

public interface DefaultWebSocketServerSession : DefaultWebSocketSession, WebSocketServerSession

有實作 DefaultWebSocketSession,所以可以作為 Connection.session 儲存起來。

儲存起來之後,我們就可以用 connections.count() 知道現在的連線數量

send("You are connected! There are ${connections.count()} users here.")

接著如果收到輸入資訊,我們就對每個 Connection 都發送訊息

for (frame in incoming) {
	frame as? Frame.Text ?: continue
	val receivedText = frame.readText()
	val textWithUsername = "[${thisConnection.name}]: $receivedText"
	connections.forEach {
		it.session.send(textWithUsername)
	}
}

這邊的 WebSocketSession.send 實作如下

/**
 * Enqueues a text frame for sending with the specified [content].
 *
 * May suspend if the outgoing queue is full, and throw an exception if the channel is already closed.
 */
public suspend fun WebSocketSession.send(content: String): Unit = send(Frame.Text(content))
/**
 * Enqueue a frame, may suspend if an outgoing queue is full. May throw an exception if the
 * outgoing channel is already closed, so it is impossible to transfer any message.
 * Frames that were sent after close frame could be silently ignored.
 * Note that a close frame could be sent automatically in reply to a peer's close frame unless it is
 * raw WebSocket session.
 */
public suspend fun send(frame: Frame) {
	outgoing.send(frame)
}

這邊的 outgoing 之前看過了,是一個 SendChannel 介面

SendChannel.send  的註解如下

/**
 * Sends the specified [element] to this channel, suspending the caller while the buffer of this channel is full
 * or if it does not exist, or throws an exception if the channel [is closed for `send`][isClosedForSend] (see [close] for details).
 *
 * [Closing][close] a channel _after_ this function has suspended does not cause this suspended [send] invocation
 * to abort, because closing a channel is conceptually like sending a special "close token" over this channel.
 * All elements sent over the channel are delivered in first-in first-out order. The sent element
 * will be delivered to receivers before the close token.
 *
 * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
 * function is suspended, this function immediately resumes with a [CancellationException].
 * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
 * suspended, it will not resume successfully. The `send` call can send the element to the channel,
 * but then throw [CancellationException], thus an exception should not be treated as a failure to deliver the element.
 * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
 *
 * Note that this function does not check for cancellation when it is not suspended.
 * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
 *
 * This function can be used in [select] invocations with the [onSend] clause.
 * Use [trySend] to try sending to this channel without waiting.
 */
public suspend fun send(element: E)

如果要看實作內容的話,我們必須要找到 DefaultWebSocketSessionImpl 類別

裡面的 outgoing 定義如下

private val outgoingToBeProcessed = Channel<Frame>(OUTGOING_CHANNEL_CAPACITY)
override val outgoing: SendChannel<Frame> get() = outgoingToBeProcessed

這裡面的 Channel 函數定義如下

/**
 * Creates a channel with the specified buffer capacity (or without a buffer by default).
 * See [Channel] interface documentation for details.
 *
 * @param capacity either a positive channel capacity or one of the constants defined in [Channel.Factory].
 * @param onBufferOverflow configures an action on buffer overflow (optional, defaults to
 *   a [suspending][BufferOverflow.SUSPEND] attempt to [send][Channel.send] a value,
 *   supported only when `capacity >= 0` or `capacity == Channel.BUFFERED`,
 *   implicitly creates a channel with at least one buffered element).
 * @param onUndeliveredElement an optional function that is called when element was sent but was not delivered to the consumer.
 *   See "Undelivered elements" section in [Channel] documentation.
 * @throws IllegalArgumentException when [capacity] < -2
 */
public fun <E> Channel(
    capacity: Int = RENDEZVOUS,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =
    when (capacity) {
        RENDEZVOUS -> {
            if (onBufferOverflow == BufferOverflow.SUSPEND)
                BufferedChannel(RENDEZVOUS, onUndeliveredElement) // an efficient implementation of rendezvous channel
            else
                ConflatedBufferedChannel(1, onBufferOverflow, onUndeliveredElement) // support buffer overflow with buffered channel
        }
        CONFLATED -> {
            require(onBufferOverflow == BufferOverflow.SUSPEND) {
                "CONFLATED capacity cannot be used with non-default onBufferOverflow"
            }
            ConflatedBufferedChannel(1, BufferOverflow.DROP_OLDEST, onUndeliveredElement)
        }
        UNLIMITED -> BufferedChannel(UNLIMITED, onUndeliveredElement) // ignores onBufferOverflow: it has buffer, but it never overflows
        BUFFERED -> { // uses default capacity with SUSPEND
            if (onBufferOverflow == BufferOverflow.SUSPEND) BufferedChannel(CHANNEL_DEFAULT_CAPACITY, onUndeliveredElement)
            else ConflatedBufferedChannel(1, onBufferOverflow, onUndeliveredElement)
        }
        else -> {
            if (onBufferOverflow === BufferOverflow.SUSPEND) BufferedChannel(capacity, onUndeliveredElement)
            else ConflatedBufferedChannel(capacity, onBufferOverflow, onUndeliveredElement)
        }
    }

這邊預設的是 RENDEZVOUS,實在是看不懂,所以我們看看參數註解

/**
 * Requests a rendezvous channel in the `Channel(...)` factory function &mdash; a channel that does not have a buffer.
 */
public const val RENDEZVOUS: Int = 0

接著 onBufferOverflow 如預設的,會是 BufferOverflow.SUSPEND

那麼條件是滿足這種建立 BufferedChannel 的方式

BufferedChannel(RENDEZVOUS, onUndeliveredElement) // an efficient implementation of rendezvous channel

BufferedChannel 的註解和簽名如下

/**
 * The buffered channel implementation, which also serves as a rendezvous channel when the capacity is zero.
 * The high-level structure bases on a conceptually infinite array for storing elements and waiting requests,
 * separate counters of [send] and [receive] invocations that were ever performed, and an additional counter
 * that indicates the end of the logical buffer by counting the number of array cells it ever contained.
 * The key idea is that both [send] and [receive] start by incrementing their counters, assigning the array cell
 * referenced by the counter. In case of rendezvous channels, the operation either suspends and stores its continuation
 * in the cell or makes a rendezvous with the opposite request. Each cell can be processed by exactly one [send] and
 * one [receive]. As for buffered channels, [send]-s can also add elements without suspension if the logical buffer
 * contains the cell, while the [receive] operation updates the end of the buffer when its synchronization finishes.
 *
 * Please see the ["Fast and Scalable Channels in Kotlin Coroutines"](https://arxiv.org/abs/2211.04986)
 * paper by Nikita Koval, Roman Elizarov, and Dan Alistarh for the detailed algorithm description.
 */
internal open class BufferedChannel<E>(
    /**
     * Channel capacity; `Channel.RENDEZVOUS` for rendezvous channel
     * and `Channel.UNLIMITED` for unlimited capacity.
     */
    private val capacity: Int,
    @JvmField
    internal val onUndeliveredElement: OnUndeliveredElement<E>? = null
) : Channel<E> 

這邊對 send 的實作如下

override suspend fun send(element: E): Unit =
	sendImpl( // <-- this is an inline function
		element = element,
		// Do not create a continuation until it is required;
		// it is created later via [onNoWaiterSuspend], if needed.
		waiter = null,
		// Finish immediately if a rendezvous happens
		// or the element has been buffered.
		onRendezvousOrBuffered = {},
		// As no waiter is provided, suspension is impossible.
		onSuspend = { _, _ -> assert { false } },
		// According to the `send(e)` contract, we need to call
		// `onUndeliveredElement(..)` handler and throw an exception
		// if the channel is already closed.
		onClosed = { onClosedSend(element) },
		// When `send(e)` decides to suspend, the corresponding
		// `onNoWaiterSuspend` function that creates a continuation
		// is called. The tail-call optimization is applied here.
		onNoWaiterSuspend = { segm, i, elem, s -> sendOnNoWaiterSuspend(segm, i, elem, s) }
	)
/**
 * Abstract send implementation.
 */
protected inline fun <R> sendImpl(
	/* The element to be sent. */
	element: E,
	/* The waiter to be stored in case of suspension,
	or `null` if the waiter is not created yet.
	In the latter case, when the algorithm decides
	to suspend, [onNoWaiterSuspend] is called. */
	waiter: Any?,
	/* This lambda is invoked when the element has been
	buffered or a rendezvous with a receiver happens. */
	onRendezvousOrBuffered: () -> R,
	/* This lambda is called when the operation suspends in the
	cell specified by the segment and the index in it. */
	onSuspend: (segm: ChannelSegment<E>, i: Int) -> R,
	/* This lambda is called when the channel
	is observed in the closed state. */
	onClosed: () -> R,
	/* This lambda is called when the operation decides
	to suspend, but the waiter is not provided (equals `null`).
	It should create a waiter and delegate to `sendImplOnNoWaiter`. */
	onNoWaiterSuspend: (
		segm: ChannelSegment<E>,
		i: Int,
		element: E,
		s: Long
	) -> R = { _, _, _, _ -> error("unexpected") }
): R {
	// Read the segment reference before the counter increment;
	// it is crucial to be able to find the required segment later.
	var segment = sendSegment.value
	while (true) {
		// Atomically increment the `senders` counter and obtain the
		// value right before the increment along with the close status.
		val sendersAndCloseStatusCur = sendersAndCloseStatus.getAndIncrement()
		val s = sendersAndCloseStatusCur.sendersCounter
		// Is this channel already closed? Keep the information.
		val closed = sendersAndCloseStatusCur.isClosedForSend0
		// Count the required segment id and the cell index in it.
		val id = s / SEGMENT_SIZE
		val i = (s % SEGMENT_SIZE).toInt()
		// Try to find the required segment if the initially obtained
		// one (in the beginning of this function) has lower id.
		if (segment.id != id) {
			// Find the required segment.
			segment = findSegmentSend(id, segment) ?:
				// The required segment has not been found.
				// Finish immediately if this channel is closed,
				// restarting the operation otherwise.
				// In the latter case, the required segment was full
				// of interrupted waiters and, therefore, removed
				// physically to avoid memory leaks.
				if (closed) {
					return onClosed()
				} else {
					continue
				}
		}
		// Update the cell according to the algorithm. Importantly, when
		// the channel is already closed, storing a waiter is illegal, so
		// the algorithm stores the `INTERRUPTED_SEND` token in this case.
		when (updateCellSend(segment, i, element, s, waiter, closed)) {
			RESULT_RENDEZVOUS -> {
				// A rendezvous with a receiver has happened.
				// The previous segments are no longer needed
				// for the upcoming requests, so the algorithm
				// resets the link to the previous segment.
				segment.cleanPrev()
				return onRendezvousOrBuffered()
			}
			RESULT_BUFFERED -> {
				// The element has been buffered.
				return onRendezvousOrBuffered()
			}
			RESULT_SUSPEND -> {
				// The operation has decided to suspend and installed the
				// specified waiter. If the channel was already closed,
				// and the `INTERRUPTED_SEND` token has been installed as a waiter,
				// this request finishes with the `onClosed()` action.
				if (closed) {
					segment.onSlotCleaned()
					return onClosed()
				}
				(waiter as? Waiter)?.prepareSenderForSuspension(segment, i)
				return onSuspend(segment, i)
			}
			RESULT_CLOSED -> {
				// This channel is closed.
				// In case this segment is already or going to be
				// processed by a receiver, ensure that all the
				// previous segments are unreachable.
				if (s < receiversCounter) segment.cleanPrev()
				return onClosed()
			}
			RESULT_FAILED -> {
				// Either the cell stores an interrupted receiver,
				// or it was poisoned by a concurrent receiver.
				// In both cases, all the previous segments are already processed,
				segment.cleanPrev()
				continue
			}
			RESULT_SUSPEND_NO_WAITER -> {
				// The operation has decided to suspend,
				// but no waiter has been provided.
				return onNoWaiterSuspend(segment, i, element, s)
			}
		}
	}
}

到這邊,我們差不多看過了取得連線的資料,以及針對每個連線都進行存取的寫法


上一篇
Day 24:針對 WebSockets Frame 的實作細節
下一篇
Day 26:webSocket Client 的撰寫以及實作
系列文
深入解析 Kotlin 專案 Ktor 的程式碼,探索 Ktor 的強大功能30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言