iT邦幫忙

2021 iThome 鐵人賽

DAY 20
0
Software Development

Coroutine 停看聽系列 第 20

Day20:Flow 想在其他的執行緒執行,可以嗎?

  • 分享至 

  • twitterImage
  •  

Flow 是屬於 coroutine 範圍項目,coroutine 中一個重要的特點可以輕易的切換執行緒,在 Flow 也有這項功能嗎?

我們知道要在 Coroutine 中使用不同的執行緒,我們需要切換不同的 Dispatchers ,使用不同的 Dispatchers Coroutine 會在其內部幫我們切換到適當的執行緒/執行緒池。而我們在使用 Dispatchers 的時機點有兩個,一個是使用 Coroutine builder (ex: launch, async),另一種方法則是使用 withContext

假設我們想與 launch、async 一樣在建立的時候就把 Dispatchers 帶入,那麼 flow 的 builder 應該就要包含 CoroutineContext 的參數。

fun <T> flow(block: suspend FlowCollector<T>.() -> Unit): Flow<T>

不過很可惜的,並沒有這個參數在其中,所以我們沒有辦法在建立 flow 的時候同時帶入 Dispatchers。

如果第一條路不通,我們試試看第二條路如何?

我們嘗試在 flow 裏面使用 withContext 將 emit 包起來。

class Day20 {
    fun flow(): Flow<Int> = flow {
        withContext(Dispatchers.Default) {
            repeat(10) {
                emit(it)
            }
        }
    }
}
fun main() = runBlocking {
    val day20 = Day20()
    day20.flow().collect { println(it) }
    println("done")
}

嘗試執行之後,噴出錯誤了

withContext exception

錯誤的內容:Flow invariant is violated。

訊息還告訴我們請使用 flowOn 替代。

flowOn

我們將上面的範例改成

class Day20 {
    fun flow(): Flow<Int> = flow {
        repeat(10) {
            emit(it)
        }
    }.flowOn(Dispatchers.Default)
}

再次執行,

0
1
2
3
4
5
6
7
8
9
done

OH~YEAH 通過了。

我們來研究一下 flowOn ,首先我們先看的是它的簽名

fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T>

沒錯,這邊的確就如我們在前面所說的,必須要帶入 CoroutineContext。所以如果我們必須要在 flow 中切換執行緒,那麼就必須要使用 flowOn

flowOn 除了在建立 flow 的時候可以使用外,我們觀察 flowOn 的定義,嗯,它應該也能在 Terminal operators 之前呼叫,因為它的回傳值是 Flow

將上方的範例改回,並把 flowOn 加到 collect 之前

class Day20 {
    fun flow(): Flow<Int> = flow {
        repeat(10) {
            emit(it)
        }
    }
}
fun main() = runBlocking {
    val day20 = Day20()
    val flow = day20.flow()
    flow.map {
        println("map: ${Thread.currentThread().name}")
        it * 3
    }
    .flowOn(Dispatchers.Default)
    .filter {
        println("filter: ${Thread.currentThread().name}")
        it % 2 == 0
    }
    .flowOn(Dispatchers.IO)
    .collect {
        println("collect: ${Thread.currentThread().name}")
        println(it)
    }
    println("done")
}
map: DefaultDispatcher-worker-2
map: DefaultDispatcher-worker-2
map: DefaultDispatcher-worker-2
map: DefaultDispatcher-worker-2
map: DefaultDispatcher-worker-2
map: DefaultDispatcher-worker-2
map: DefaultDispatcher-worker-2
map: DefaultDispatcher-worker-2
map: DefaultDispatcher-worker-2
map: DefaultDispatcher-worker-2
filter: DefaultDispatcher-worker-1
filter: DefaultDispatcher-worker-1
filter: DefaultDispatcher-worker-1
filter: DefaultDispatcher-worker-1
collect: main
0
collect: main
6
filter: DefaultDispatcher-worker-1
filter: DefaultDispatcher-worker-1
filter: DefaultDispatcher-worker-1
filter: DefaultDispatcher-worker-1
filter: DefaultDispatcher-worker-1
collect: main
12
filter: DefaultDispatcher-worker-1
collect: main
18
collect: main
24
done

跟我們所想的一樣,我們可以在 Terminal operator 之前使用 flowOn,而 flowOn 只會對它之前的操作有作用,如上方的 map 以及 filter 都是屬於不同的 DefaultDispatchers-worker,然後在 collect 又回到 main。


如果我們沒有在 flow 裏面呼叫 flowOn 來更改 Dispatchers ,那麼它預設是會使用外層 coroutine 的 Dispatchers。如下:

class Day20 {
    val scope = CoroutineScope(Job() + Dispatchers.Default)

		...

    fun flow2() = scope.launch {
        flow {
            println("emit: ${Thread.currentThread().name}")
            emit(10)
        }.collect {
            println("collect:  ${Thread.currentThread().name} $it")
        }
    }
}
fun main() = runBlocking {
    val day20 = Day20()
    day20.flow2()
    delay(100)
    println("done")
}
emit: DefaultDispatcher-worker-1
collect:  DefaultDispatcher-worker-1 10
done

小結

flowOn 的 執行緒是從呼叫它的點帶入,預設的執行緒是跟外層相同,如果我們希望能夠在使用 flow 的時候更改執行緒,那麼我們就必須要使用 flowOn。

flowOn 它帶入的參數一樣是 CoroutineContext,所以我們可以帶入 Dispatchers,根據不同的 Dispatchers, Flow 會切換至不同的執行緒/執行緒池來執行。 flowOn 除了可以在建立 flow 的時候使用,我們也可以在 Terminal operators 之前呼叫,它會修改在它之前的所有操作的執行緒。使用 flowOn 我們就可以輕鬆切換執行緒,而不會發生錯誤。

特別感謝

Kotlin Taiwan User Group
Kotlin 讀書會


由本系列文改編的《Kotlin 小宇宙:使用 Coroutine 優雅的執行非同步任務》已經上市囉。

有興趣的讀者歡迎參考:https://coroutine.kotlin.tips/
天瓏書局


上一篇
Day19:Flow 準備好輸出了嗎?使用 Terminal operators 產生結果吧。
下一篇
Day21:Hot Flow - SharedFlow
系列文
Coroutine 停看聽30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言