Flow 是屬於 coroutine 範圍項目,coroutine 中一個重要的特點可以輕易的切換執行緒,在 Flow 也有這項功能嗎?
我們知道要在 Coroutine 中使用不同的執行緒,我們需要切換不同的 Dispatchers ,使用不同的 Dispatchers Coroutine 會在其內部幫我們切換到適當的執行緒/執行緒池。而我們在使用 Dispatchers 的時機點有兩個,一個是使用 Coroutine builder (ex: launch, async),另一種方法則是使用 withContext
。
假設我們想與 launch、async 一樣在建立的時候就把 Dispatchers 帶入,那麼 flow 的 builder 應該就要包含 CoroutineContext 的參數。
fun <T> flow(block: suspend FlowCollector<T>.() -> Unit): Flow<T>
不過很可惜的,並沒有這個參數在其中,所以我們沒有辦法在建立 flow 的時候同時帶入 Dispatchers。
如果第一條路不通,我們試試看第二條路如何?
我們嘗試在 flow 裏面使用 withContext
將 emit 包起來。
class Day20 {
fun flow(): Flow<Int> = flow {
withContext(Dispatchers.Default) {
repeat(10) {
emit(it)
}
}
}
}
fun main() = runBlocking {
val day20 = Day20()
day20.flow().collect { println(it) }
println("done")
}
嘗試執行之後,噴出錯誤了
錯誤的內容:Flow invariant is violated。
訊息還告訴我們請使用
flowOn
替代。
我們將上面的範例改成
class Day20 {
fun flow(): Flow<Int> = flow {
repeat(10) {
emit(it)
}
}.flowOn(Dispatchers.Default)
}
再次執行,
0
1
2
3
4
5
6
7
8
9
done
OH~YEAH 通過了。
我們來研究一下 flowOn ,首先我們先看的是它的簽名
fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T>
沒錯,這邊的確就如我們在前面所說的,必須要帶入 CoroutineContext。所以如果我們必須要在 flow 中切換執行緒,那麼就必須要使用 flowOn
。
flowOn 除了在建立 flow 的時候可以使用外,我們觀察 flowOn 的定義,嗯,它應該也能在 Terminal operators 之前呼叫,因為它的回傳值是 Flow
將上方的範例改回,並把 flowOn 加到 collect 之前
class Day20 {
fun flow(): Flow<Int> = flow {
repeat(10) {
emit(it)
}
}
}
fun main() = runBlocking {
val day20 = Day20()
val flow = day20.flow()
flow.map {
println("map: ${Thread.currentThread().name}")
it * 3
}
.flowOn(Dispatchers.Default)
.filter {
println("filter: ${Thread.currentThread().name}")
it % 2 == 0
}
.flowOn(Dispatchers.IO)
.collect {
println("collect: ${Thread.currentThread().name}")
println(it)
}
println("done")
}
map: DefaultDispatcher-worker-2
map: DefaultDispatcher-worker-2
map: DefaultDispatcher-worker-2
map: DefaultDispatcher-worker-2
map: DefaultDispatcher-worker-2
map: DefaultDispatcher-worker-2
map: DefaultDispatcher-worker-2
map: DefaultDispatcher-worker-2
map: DefaultDispatcher-worker-2
map: DefaultDispatcher-worker-2
filter: DefaultDispatcher-worker-1
filter: DefaultDispatcher-worker-1
filter: DefaultDispatcher-worker-1
filter: DefaultDispatcher-worker-1
collect: main
0
collect: main
6
filter: DefaultDispatcher-worker-1
filter: DefaultDispatcher-worker-1
filter: DefaultDispatcher-worker-1
filter: DefaultDispatcher-worker-1
filter: DefaultDispatcher-worker-1
collect: main
12
filter: DefaultDispatcher-worker-1
collect: main
18
collect: main
24
done
跟我們所想的一樣,我們可以在 Terminal operator 之前使用 flowOn,而 flowOn 只會對它之前的操作有作用,如上方的 map 以及 filter 都是屬於不同的 DefaultDispatchers-worker,然後在 collect 又回到 main。
如果我們沒有在 flow 裏面呼叫 flowOn 來更改 Dispatchers ,那麼它預設是會使用外層 coroutine 的 Dispatchers。如下:
class Day20 {
val scope = CoroutineScope(Job() + Dispatchers.Default)
...
fun flow2() = scope.launch {
flow {
println("emit: ${Thread.currentThread().name}")
emit(10)
}.collect {
println("collect: ${Thread.currentThread().name} $it")
}
}
}
fun main() = runBlocking {
val day20 = Day20()
day20.flow2()
delay(100)
println("done")
}
emit: DefaultDispatcher-worker-1
collect: DefaultDispatcher-worker-1 10
done
flowOn 的 執行緒是從呼叫它的點帶入,預設的執行緒是跟外層相同,如果我們希望能夠在使用 flow 的時候更改執行緒,那麼我們就必須要使用 flowOn。
flowOn 它帶入的參數一樣是 CoroutineContext,所以我們可以帶入 Dispatchers,根據不同的 Dispatchers, Flow 會切換至不同的執行緒/執行緒池來執行。 flowOn 除了可以在建立 flow 的時候使用,我們也可以在 Terminal operators 之前呼叫,它會修改在它之前的所有操作的執行緒。使用 flowOn 我們就可以輕鬆切換執行緒,而不會發生錯誤。
Kotlin Taiwan User Group
Kotlin 讀書會
有興趣的讀者歡迎參考:https://coroutine.kotlin.tips/
天瓏書局