Flow 是依序執行的,如果使用 collect 作為終端運算子,那麼在最後就會按照每一個元素所要花費的時間一一的執行,並把結果吐出來。
如下面的範例:
class Day25 {
    fun flow1() = flow {
        repeat(10) {
            delay(100)
            emit(it)
        }
    }
}
這個 flow 中會產生十個元素,每一個元素都需要花費 100 毫秒建立。
如果這些元素用 collect 執行的時候,也會在每一個元素上花費 200 毫秒來執行,如此一來一個元素所需要執行的時間就是 100 毫秒 + 200 毫秒,總共需要 300 毫秒。
fun main() = runBlocking {
    val day25 = Day25()
    val times = measureTimeMillis {
        day25.flow1()
            .collect {
                delay(200)
                println(it)
            }
    }
    println("$times")
}
0
1
2
3
4
5
6
7
8
9
3115
measureTimeMillis 來計算所需執行的時間。
今天要介紹的運算子 buffer 就是要在這個地方使用的,怎麼使用呢?
我們只需要在需要加上緩衝的地方使用即可,如下:
fun main() = runBlocking {
    val day25 = Day25()
    val times = measureTimeMillis {
        day25.flow1()
            .buffer() // Add here
            .collect {
                delay(200)
                println(it)
            }
    }
    println("$times")
}
我把 buffer 加在 flow 與 collect 之間,原因是每一個 flow 的元素需要 100 毫秒來產生,而執行每一個元素卻需要 200 毫秒,也就是說,如果我們在這兩個操作之間加上緩衝,那麼執行的時間應該可以減少。(因為當 collect 花費 200 毫秒處理第一筆資料時,flow 已經在建立第二筆資料了,這樣子就可以減少操作的時間。我們來實際執行一下,看看結果會是如何?
0
1
2
3
4
5
6
7
8
9
2246
經過緩衝之後,我們所需花費的時間由 3115 毫秒,減少至 2246 毫秒。提升了 27% 的速度。

其實使用 buffer 會變成使用兩個 coroutine 在執行,flow 與 collect 分別使用不同的 coroutine ,並且透過 Channel 來傳輸。
在使用 Flow 時,因為是按照順序來執行,總時間就會是全部的執行時間加總起來。利用 buffer 我們就可以讓這些操作使用不同的 Coroutine,如此一來就可以同時執行多個任務,時間比沒有用 buffer 的會少很多。
有興趣的讀者歡迎參考:https://coroutine.kotlin.tips/
天瓏書局