說完了 Coroutine
,有個概念不得不一起說一下,那就是 Flow
,如果大家仔細看 Flow
的 package 的話就會發現它其實也是 Coroutine
裡的一個小功能:
import kotlinx.coroutines.flow.Flow
而 Flow
是什麼呢,它其實只是個 interface:
public interface Flow<out T> {
public suspend fun collect(collector: FlowCollector<T>)
}
而 FlowCollector
是個 functional interface(SAM):
public fun interface FlowCollector<in T> {
public suspend fun emit(value: T)
}
終於看到 suspend function 了,因為有 SAM 的支援所以實際使用的時候通常都寫成 lambda 的形式,待會我們會介紹,而這邊的 emit
function,其實是給 Flow
call 的,當上游有新資料的時候,就會透過 emit
來通知下游。
那通常我們怎麼建立一個 Flow
呢?可以透過 flow
這個 FlowBuilder 來建立:
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
它需要一個 FlowCollector
的 receiver 作為參數,而 FlowCollector
其實就是我們剛剛 collect
所帶的參數。
一個非常簡單的完整範例如下:
val flow = flow {
emit(1)
emit(2)
emit(3)
}
flow.collect {
println(it)
}
執行的結果如下:
1
2
3
為什麼要講原始碼呢,其實透過這個過程我們就可以了解它設計的美妙,以及 Flow
預設上只有 collect
之後才有 FlowCollector
可以使用,也才能夠由 flow
呼叫 emit
來拋出資料,所以 Flow
基本上是 Cold 的一種 Stream,沒有人 collect
它就不會運作,而多個 collect
呼叫就會有多條 flow
分別跑。
一開始就看程式碼或許走得太快了,讓我們稍微緩緩,回頭來看一下這個結構帶給我們什麼好處,如果只是 iterate 一個 list,好像用 forEach
就好了為什麼要這麼麻煩分成這麼多元件呢?
其實 Flow
的設計很大程度的參考了 ReactiveX 的概念,甚至 Flow
還可以跟 Rx 直接作轉換(https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-rx2/),所以很多差別以及運用場景都是類似的,比如原生 Coroutine 支援更好的非同步使用、不需要知道整個 stream 的總長度,甚至可以丟出無限多的資料等,都是蠻棒的功能,尤其使用 Kotlin 來寫 operator 更是非常的精簡,有機會的話也推薦大家可以自己試試看,或是敬請期待明天的進一步的 Flow
介紹囉~
筆者多年前學習 Rx 的時候,真心覺得 Rx 官網的說明寫的很棒,主被動的差別帶來的改變幾乎刷新了我的世界觀,雖然我們現在談的是
Flow
,但很多概念也是通用的,推薦大家沒看過的話也可以花點時間看看:https://reactivex.io/intro.html