iT邦幫忙

2021 iThome 鐵人賽

DAY 13
0

儘管大家講coroutine都會提到channel和flow,然後開始比較,但我個人覺得channel和flow兩個你都沒有概念的時候,聽我比較很難知道我在說什麼啦!!
這邊會分段介紹channel的特性,首先,channel就像是一個queue,可以把它想像成羽毛球桶,先進先出(fifo),應該很好懂吧
channel

所以打算先介紹兩者,而後再來比較,首先我們先用一下channel吧

val firstChannel = Channel<Int> {  }
val secondChannel = Channel<Int> {  }

lifecycleScope.launch {
    Timber.d("start")

    val waiting = firstChannel.receive()
    Timber.d("check waiting, $waiting")
    val waiting2 = secondChannel.receive()
    Timber.d("check waiting2, $waiting2")

    Timber.d("after clair receive")

    val result = async {
        Timber.d("in result")
        sampleDemoSuspendFunction("$waiting result")
    }
    val result2 = async {
        Timber.d("in result 2 ")
        sampleDemoSuspendFunction("$waiting2 result2")
    }
    Timber.d("${result.await()} , ${result2.await()}")
}

lifecycleScope.launch(Dispatchers.IO){
    delay(1000)
    firstChannel.send(1001)
    Timber.d("sent first")

    delay(1000)
    secondChannel.send(1111 )
    Timber.d("sent second")
}


suspend fun sampleDemoSuspendFunction(content: String): String {
    delay(1000L)
    return content
}

/**
 *
 hh:mm:ss 
 14:44:35.792 17905-17967/: sent first
 14:44:35.792 17905-17905/: check waiting, 1001
 14:44:36.793 17905-17967/: sent second
 14:44:36.793 17905-17905/: check waiting, 1111
 14:44:36.794 17905-17905/: after clair receive
 
 14:44:36.794 17905-17905/$1$1$result: in result
 14:44:36.795 17905-17905/d$1$1$result2: in result 2 
 
 14:44:37.798 17905-17905/: 1001 result , 1111 result2
 * */

可以看到channel的關鍵字有send和receive兩個,在Timber裡面,我們要印出waiting和waiting2兩個變數,這就表示我們需要兩個變數都已經有值了,才去呼叫Timber

我已經把log都印出來了,從log可以看到先送出東西到channel,receive()才會被觸發,現在我們來講講send和receive的特性,receive很好懂,沒東西的時候就suspend起來,等到有東西為止,但send就不一樣了,send的時候如果沒有被receive,他也suspend在那邊等,等到有人收他東西,那如果沒人收呢,就沒有然後了!!

比如上面把兩個receive交換,secondChannel再等一個等不到的人,因為firstChannel沒有被接收,他suspend在那裏了

這個特性非常重要,如果把send和receive放在同一個thread,就會suspend起來,然後又沒有然後了QQ,用的位子百百種,使用時記得思考suspend的特性,send/receive有沒有對起來,規則告訴你了,剩下就讓各位發揮

lifecycleScope.launch { 
    launch {
        for (x in 1..5) firstChannel.send(x)
    }

    repeat(5) { Timber.d(firstChannel.receive().toString()) }
    Timber.d("Done!")
}

channel cancel

和corotuine的兩段式取消有點類似,當呼叫cancel()時,並不會立刻關閉,而是會等到先前發出的值都被接收後才會關閉,值得注意的一點,他有分send和receive,cancel()呼叫後不能send

操作符

channel的建構器可以用produce,而 for 在接收端可以用consumeEach有限制的替換,先看下面範例

lifecycleScope.launch {
    val produceChannel = produce {
        for (x in 1..5) send( x )
    }
    produceChannel.consumeEach { Timber.d(it.toString()) }
}

上面的例子應該表現出channel的基本特性了,但這並沒有解決coroutine之間溝通的問題,channel的特性在於可以再多個coroutine之間 send和receive

這裡再借文檔例子

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1 // start from 1
    while (true) {
        send(x++) // produce next
        delay(100) // wait 0.1s
    }
}

fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    for (msg in channel) {
        println("Processor #$id received $msg")
    }
}

val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // cancel producer coroutine and thus kill them all

/**
Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10
 * */

給看不懂的人在解釋一下,多核心的時候,可以同時進行多個工作,用channel可以保證,變數的順序和更新
channel

那剛剛說的consumeEach的限制,就是for在其中一個coroutine失敗時,可以安全的持續接收值直到結束,但comsumeEach在遇到,會無法接收到完整的資料

val firstChannel = Channel<Int>(10)//buffer 有10個


lifecycleScope.launch {
    (1..10).forEach { firstChannel.send(it) }

    val one = async { launchProcessor(1,firstChannel) }
    val two = async { launchProcessor(2,firstChannel) }
    val three = async { launchProcessor(3,firstChannel) }

    one.await()
    two.await()
    three.await()

    firstChannel.cancel() // cancel producer coroutine and thus kill them all
}

with for loop,這裡會少3是因為在for loop裡面,已經把3拿出來了,才 throw Exception,所以在channel裡面才會沒有3

fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {

    for ( msg in channel) {
        if (msg == 3) throw CancellationException()
        Timber.d("Processor #$id received $msg")
        delay(500)
    }
}


/**
 * 
 Processor #1 received 1
 Processor #2 received 2
 Processor #1 received 4
 Processor #2 received 5
 Processor #1 received 6
 Processor #2 received 7
 Processor #1 received 8
 Processor #2 received 9
 Processor #1 received 10
 * */

with consume

fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {

    channel.consumeEach { msg ->
        if (msg == 3) throw CancellationException()
        Timber.d("Processor #$id received $msg")
        delay(500)
    }
}
/**
 * Processor #1 received 1
 * Processor #2 received 2
 * */

最後,channel會盡可能的公平操作,ex.有兩個先receive之後再send的coroutine,他們會交替的被呼叫,而不會在一個裡面重複,文檔有範例,這邊有例外討論

實際運作比較

在看官方影片的時候,剛好看到他們有做比較,真是太貼心了
只用list坐回傳的話順序是

用channel回傳的順序是交替的,你會依順序一個一個收到,而不是一次收到一串

特性

如果沒被receive,在scope終止前,會一直等下去,不會有"30歲還單身我們結婚吧?" 這種事情

連結統整

必看

KotlinConf 2019 前面有講到channel
englidh doc
chinese doc

圖片來源

kotlinlang


上一篇
day12 輕鬆一下,用 coroutine 接個 restful api
下一篇
day14 channel實戰使用 with webSocket,後面離題講android接localhost
系列文
解鎖kotlin coroutine的各種姿勢-新手篇30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言