在上一篇文章我們建立 Channel 時,使用 Channel<E>()
來建立一個 Channel。這個方法是由 Coroutine 所提供的一個用來建立 Channel 的函式。我們來看一下這個函式的簽名:
public fun <E> Channel(
capacity: Int = RENDEZVOUS,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E>
在這邊有三個參數,capacity、onBufferOverflow、onUndeliveredElement,當我們沒有帶任何參數進來的時候,預設是會使用
capacity: Int = RENDEZVOUS
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
onUndeliveredElement: ((E) -> Unit)? = null
依據帶入的參數不同,Channel 的特性也不一樣,下面將列出所有的參數:
這邊的 Capacity 指的是緩衝區的容量,Channel 提供了五種 capacity:
預設的 Capacity,這是一種沒有 Buffer 的 channel,經由 channel 傳送的元素會在 send()
以及 receive()
同時呼叫時,才會傳送過去。當 receive()
調用時, send()
就會 suspend 。反之,當 send()
調用時, receive()
就會 suspend 。換句話說, send() 與 receive() 是一組一組成雙成對的。
另外,假設調用 send()
但是 receive()
卻沒有調用,這時候這個 suspend 函式就不會被調用,反之,如果只有調用 receive()
沒有調用 send()
,這時候因為 channel 是空的, 調用 receive() 沒有內容,所以 receive()
就會進入 suspend 的狀態,直到有一個 send()
被調用。
RENDEZVOUS -> {
if (onBufferOverflow == BufferOverflow.SUSPEND)
RendezvousChannel(onUndeliveredElement) // an efficient implementation of rendezvous channel
else
ArrayChannel(1, onBufferOverflow, onUndeliveredElement) // support buffer overflow with buffered channel
}
從 Channel()
這個 Builder 裏面的內容來看,我們發現,使用 RENDEZVOUS
時,必須要將 onBufferOverflow
設定為 BufferOverflow.SUSPEND
,此時才是會建立 RendezvousChannel()
,否則就會建立一個 ArrayChannel()
且 capacity 為 1。(Channel() 預設的 onBufferOverflow 的值就是設定為 BufferOverflow.SUSPEND。)
Conflated channel (合併的通道),它在使用上並不是將所有的元素都合併起來,而是只保留最後一個元素,換句話說,在這個 channel 中,它的 Buffer 只有一個,而這個 Buffer 只會把最新的元素保留下來,在下一個 send()
調用之前如果沒有調用 receive()
,那麼在 Buffer 裏面的元素將會被丟掉。
看下面的範例:
class Day16 {
val scope = CoroutineScope(Job())
suspend fun conflatedBroadcastChannel(): Unit {
val channel = Channel<Int>(capacity = Channel.CONFLATED)
scope.launch {
for(i in 0..10){
channel.send(i)
}
delay(1000)
channel.send(9)
}
for(i in 0 .. 100){
delay(100)
println(channel.receive())
}
println("done")
}
}
→ 我們在 scope.launch{ ... }
中發送 Int 至 channel 中,其中發送的部分有兩段,一段是在 channel 中直接塞入0~10,接著過了一秒鐘,另外調用 channel.send(9)
將整數 9 傳進 channel 中。
→ 在 scope.launch{} 外側則是使用一個 for-loop 來取用 channel 裏面的值,我們猜猜看會是怎麼樣的結果呢?
10
9
如我們所說的, Conflated channel 只會保留最後的元素,直到有另外的 receive()
來把值取走。所以上面範例的第一段 channel.send() 因為發送的時間比取值的時間還要快,所以所有的值都以會新的蓋掉舊的行為進行,應為第一個 channel.receive() 呼叫時,已經過了 100 毫秒。
我在這邊刻意將含有 channel.receive() 的 for-loop 時間拉長,為了就是要包含到下一個 send()
的時間,以上面這個範例來說, 第一段的 channel.send() 做完之後,延遲了 1 秒鐘,接著會調用 channel.send(9)
把整數 9 送出,這個行為剛好會在 channel.receive() 這個 for-loop 的範圍內,所以 9 也會被列印出來。
之後呢?因為我們使用 for-loop 嘗試調用 channel.receive() 取出 100 個元素,但是我們只有成功取出兩個元素,所以這個 receive() 就會持續等待,直到所有的 receive() 都取出值。
CONFLATED -> {
require(onBufferOverflow == BufferOverflow.SUSPEND) {
"CONFLATED capacity cannot be used with non-default onBufferOverflow"
}
ConflatedChannel(onUndeliveredElement)
}
使用 Channel() 時,會要求你的 onBufferOverflow 要為 BufferOverflow.SUSPEND
,但是進入 CONFLATED
的定義去看,又會發現
Requests a conflated channel in the Channel(...) factory function. This is a shortcut to creating a channel with onBufferOverflow = DROP_OLDEST.
其實它是建立一個使用 onBufferOverflow = DROP_OLDEST 的 channel。
Unlimited channel 應該比較好懂一點,從名字看起來就知道它的 Buffer 是無限制的。
UNLIMITED -> LinkedListChannel(onUndeliveredElement) // ignores onBufferOverflow: it has buffer, but it never overflows
從 Channel() 得知,UNLIMITED 會建立 LinkedListChannel
,LinkedList 我想應該大家都知道意思,它就是一個有順序性的列表。那麼 LinkedListChannel 就是一個有順序性的列表含有一個無限制的 Buffer。它的實作如下:
internal open class LinkedListChannel<E>(onUndeliveredElement: OnUndeliveredElement<E>?) : AbstractChannel<E>(onUndeliveredElement) {
protected final override val isBufferAlwaysEmpty: Boolean get() = true
protected final override val isBufferEmpty: Boolean get() = true
protected final override val isBufferAlwaysFull: Boolean get() = false
protected final override val isBufferFull: Boolean get() = false
...
可以發現 isBufferAlwaysFull
以及 isBufferFull
都是回傳 false,也就是都不會滿,那麼也就不會走到 onBufferOverflow 的情況了。(不過這邊的無限制的 unlimited 的 buffer 最終還是需要看記憶體夠不夠放)
class Day16 {
val scope = CoroutineScope(Job())
suspend fun conflatedChannel(): Unit {
val channel = Channel<Int>(capacity = Channel.UNLIMITED)
scope.launch {
for (i in 0..10) {
channel.send(threeTimesInt(i))
}
}
for (i in 0..10) {
println(channel.receive())
}
println("done")
}
private suspend fun threeTimesInt(x: Int): Int {
delay(100L * x)
return x * 3
}
}
0
3
6
9
12
15
18
21
24
27
30
done
我在使用 UNLIMITED 以及預設的 RENDEZVOUS 時感覺很相像,但是差異在於 RENDEZVOUS 所產生出來的 Channel 是由 RendezvousChannel(onUndeliveredElement)
所建立的。而RendezvousChannel(onUndeliveredElement) 的實作如下:
internal open class RendezvousChannel<E>(onUndeliveredElement: OnUndeliveredElement<E>?): AbstractChannel<E>(onUndeliveredElement) {
protected final override val isBufferAlwaysEmpty: Boolean get()= true
protected final override val isBufferEmpty: Boolean get()= true
protected final override val isBufferAlwaysFull: Boolean get()= true
protected final override val isBufferFull: Boolean get()= true
}
我們可以發現,isBufferAlwaysFull
以及 isBufferFull
都是回傳 true,所以預設的 RENDEZVOUS channel 是會走到 onBufferOverflow 的情況,與 UNLIMITED channel 不同。
在 UNLIMITED channel 下調用 send()
是永遠不會被 suspend 的,而 RENDEZVOUS channel 是會 suspend。我們假設將上面這段程式稍作修改
class Day16 {
val scope = CoroutineScope(Job())
suspend fun conflatedChannel(): Unit {
val channel = Channel<Int>(capacity = Channel.UNLIMITED)
scope.launch {
for (i in 0..10) {
println("first: $i")
channel.send(threeTimesInt(i))
}
}
scope.launch {
for (i in 0..10) {
println("second: $i")
channel.send(fiveTimesInt(i))
}
}
delay(100)
for (i in 0..20) {
println(channel.receive())
}
println("done")
}
private suspend fun threeTimesInt(x: Int): Int {
return x * 3
}
private suspend fun fiveTimesInt(x: Int): Int {
return x * 5
}
}
// capacity = Channel.RENDEZVOUS
first: 0
second: 0
0
first: 1
0
second: 1
first: 2
3
5
second: 2
first: 3
6
10
second: 3
first: 4
9
15
12
second: 4
first: 5
second: 5
20
15
first: 6
second: 6
25
18
30
first: 7
second: 7
first: 8
21
35
second: 8
first: 9
24
40
second: 9
first: 10
27
45
second: 10
30
done
→ 因為 send()
是會 suspend 的,所以會在不同的 coroutine scope 中切換。
如果使用的是 UNLIMITED channel ,其結果會是:
first: 0
first: 1
first: 2
first: 3
first: 4
first: 5
first: 6
first: 7
first: 8
first: 9
first: 10
second: 0
second: 1
second: 2
second: 3
second: 4
second: 5
second: 6
second: 7
second: 8
second: 9
second: 10
0
3
6
9
12
15
18
21
24
27
30
0
5
10
15
20
25
30
35
40
45
done
→ 因為 Buffer 有無限的空間,所以可以存完所有資料之後才列印出來。
在前面我們知道,當 Buffer 的容量為 0 時(RENDEZVOUS),每一次的調用 send()
,send() 都會 suspend ,也就是說會把執行緒的使用權切換走,直到調用 receive()
。
在 Buffer 不為 0 的 channel 中, send() 只會在 channel 滿的時候才會 suspend,也就是說會根據我們所設置的大小來決定什麼時機點會 suspend send()。同樣地,receive() 也是會在 buffer 為空的時候會 suspend 。
在 Channel() 中,我們注意到當 capacity ==1 且 onBufferOverflow == BufferOverflow.DROP_OLDEST 時,我們就會建立 ConflatedChannel()
,否則就會建立 ArrayChannel()
。
if (capacity == 1 && onBufferOverflow == BufferOverflow.DROP_OLDEST)
ConflatedChannel(onUndeliveredElement) // conflated implementation is more efficient but appears to work in the same way
else
ArrayChannel(capacity, onBufferOverflow, onUndeliveredElement)
在 ArrayChannel 的裏面,會根據帶入的 capacity 與現在的狀態來比對 buffer 是否為滿的狀態。
internal open class ArrayChannel<E>(
/**
* Buffer capacity.
*/
private val capacity: Int,
private val onBufferOverflow: BufferOverflow,
onUndeliveredElement: OnUndeliveredElement<E>?
){
...
protected final override val isBufferAlwaysEmpty: Boolean get() = false
protected final override val isBufferEmpty: Boolean get() = state.size == 0
protected final override val isBufferAlwaysFull: Boolean get() = false
protected final override val isBufferFull: Boolean get() = state.size == capacity
...
有四種 Channel ,我們可以依照需求來選擇使用,特別要注意的是, receive() 的行為是固定的,receive() 如果在 channel 為空的時候調用,那麼 receive() 就會 suspend 直到 channel 不為空。
而 send() 則是會根據不同 channel 的 buffer 容量大小來決定何時會 suspend。UNLIMITED 因為具有無限大的 buffer 大小,所以 send() 不會 suspend()。
預設是使用 RENDEZVOUS Channel ,也就是 buffer 的 容量為 0,所以 send() 與 receive() 必須要成雙成對的呼叫,否則就會 suspend。
[Kotlin Coroutine Channel] (https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/index.html)
Kotlin Taiwan User Group
Kotlin 讀書會
有興趣的讀者歡迎參考:https://coroutine.kotlin.tips/
天瓏書局