iT邦幫忙

2021 iThome 鐵人賽

DAY 18
1
Software Development

Coroutine 停看聽系列 第 18

Day18:Flow 的中間運算子,資料輸出前還可以做很多事喔

我們在上一篇的文章中,介紹了 Flow 的基本概念,包括如何建立一個 Flow,以及 Flow 是一個 Cold stream,所謂的 cold stream 就是在當我們呼叫 collect{} 的時候,這串資料才會被執行。

Flow 厲害的一個點就在於,我們可以在呼叫 collect{} 之前利用一些操作,讓資料轉換成別的樣子,這個操作稱之為 Intermediate operators (中間運算子),這邊的操作與 Functional Programming 一樣,都是可以在輸出之前做一些操作,讓原本的資料流轉換成新的樣子。而 Flow 在設計這些函式的時候,也按照 Functional Programming 定義的名稱,所以熟悉 FP 的朋友應該對於 Flow 提供的中間操作不會太陌生。

Intermediate operators (中間運算子)

這是我們前一篇的範例,

fun flow(): Flow<Int> = flow {
      println("Flow started")
      repeat(10){
        delay(100)
        emit(it)
    }
}

fun main() = runBlocking {
    val flow = flow()
    flow.collect { value -> println(value)}
}

根據我們昨天所介紹的,當我們呼叫 collect{ value -> println(value) } 的時候,我們將會列印出 0~10。(因為我們在 flow{} 裏面,按照順序由 0 ~ 9 每間隔 100 毫秒發送當下的整數值進入 flow 中。)

map

map 其實是 mapping 的意思,也就是說我們可以將輸入的值按照我們設定的方式對應到某一個 Domain 中。

如下圖,我們利用 map 來把 X映射到 X² 的 Domain。

Flow map

inline fun <T, R> Flow<T>.map(crossinline transform: suspend (T) -> R): Flow<R>

map 的定義我們也可以發現,輸入的型別為 T 經過轉換之後,會變成 R,也就是轉成另外一個 Domain 上。

使用範例:

fun main() = runBlocking {
    val flow = flow()
    flow.map{ it*it }
        .collect { println(it)}
}
Flow started
0
1
4
9
16
25
36
49
64
81

filter

filter 如同它的名稱,就是用來過濾的,在這邊我們可以自定義過濾的條件。我們先看一下它的簽名:

inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T>

在 filter 中,我們帶入的是一個回傳 Boolean 的 lambda,當傳入的值滿足條件時,就會把這個值往下傳,否則就會被過濾下來。

範例如下:

我們嘗試把偶數過濾出來,我們可以這麼做

fun main() = runBlocking {
    val flow = flow()
    flow.filter{ it % 2 == 0 }
        .collect { println(it)}
}
Flow started
0
2
4
6
8

take

將資料流只保留相對應的數量,當指定的數量超過資料流的數量時,就會以資料流的數量為主,但是如果帶入的是負值的話,就會拋出 java.lang.IllegalArgumentException

同樣的,我們看一下 take 的簽名:

fun <T> Flow<T>.take(count: Int): Flow<T>

在 take 中,參數 count: Int 就是用來指定要取得的數量。

範例:

取得資料流中,前三個數值

fun main() = runBlocking {
    val flow = flow()
    flow.take(3)
        .collect { println(it)}
}
Flow started
0
1
2

那麼,不知道你們有沒有注意到, take 是由資料流的最前面開始取,所以取得前三個值就會是 0 1 2

zip

zip 是用來把兩個 Flow 組合起來,我們看一下它的簽名:

fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R>

不囉唆,我們直接看範例:

fun main() = runBlocking{
    val stringFlow = listOf("a", "b", "c", "d").asFlow()
    val intFlow = (1..3).asFlow()
    intFlow.zip(stringFlow){int, string -> "$int-$string"}
            .collect { println(it) }
}
1-a
2-b
3-c

在這個範例中,我們有兩個 flow,一個是含有四個元素("a", "b", "c", "d")的 stringFlow,另一個則是含有三個元素(1, 2, 3) 的 flow。我們想要將兩組資料流組合在一起,我們使用 intFlow 的 zip 來組合。

從上方的範例我們可以清楚的得知,zip 組合而成的數量會與這兩個 flow 最少的相同,因為 zip 需要的是一對一的組合,所以沒有辦法組合的部分就會被捨棄。

combine

zip 類似, combine 也是用來組合 Flow 的函式,將上方的範例改成使用 combine 看看結果如何。

fun main() = runBlocking{
    val stringFlow = listOf("a", "b", "c", "d").asFlow()
    val intFlow = (1..3).asFlow()
    intFlow.combine(stringFlow){int, string -> "$int-$string"}
        .collect { println(it) }
}
1-a
2-b
3-c
3-d

執行之後發現,雖然與 zip 一樣都是結合 flow 的函式,但是兩者的結果不同, combine 所產生的數量會與這兩個 flow 中最多的一樣,當組合元素不夠的時候,就會拿前一個值來使用。

另外,與 zip 不同的是 combine 不只可以組合兩個 flow,它支援組合多個 flow。簽名如下:

fun <T1, T2, R> Flow<T1>.combine(flow: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R>
fun <T1, T2, R> combine(flow: Flow<T1>, flow2: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R>
fun <T1, T2, T3, R> combine(flow: Flow<T1>, flow2: Flow<T2>, flow3: Flow<T3>, transform: suspend (T1, T2, T3) -> R): Flow<R>
fun <T1, T2, T3, T4, R> combine(flow: Flow<T1>, flow2: Flow<T2>, flow3: Flow<T3>, flow4: Flow<T4>, transform: suspend (T1, T2, T3, T4) -> R): Flow<R>
fun <T1, T2, T3, T4, T5, R> combine(flow: Flow<T1>, flow2: Flow<T2>, flow3: Flow<T3>, flow4: Flow<T4>, flow5: Flow<T5>, transform: suspend (T1, T2, T3, T4, T5) -> R): Flow<R>
inline fun <T, R> combine(vararg flows: Flow<T>, crossinline transform: suspend (Array<T>) -> R): Flow<R>
inline fun <T, R> combine(flows: Iterable<Flow<T>>, crossinline transform: suspend (Array<T>) -> R): Flow<R>

我們在前面介紹了幾個中間運算子,這些運算子是可以組合使用的,如下:

fun main() = runBlocking{
    val flow = flow()
    flow.map { it * 3 }
    .filter { it % 2 == 0 }
    .take(2)
    .collect { println(it) }
}
Flow started
0
6

從結果只列印一個 Flow started 得知,原本的 Flow 只會被執行一次,每經過一個中間運算子都會產生一組新的 Flow,這樣子一層一層的把 Flow 往下送,直到最後的 collect{}

小結

Flow 可以讓你使用串串大法,把資料在輸出前經過一層一層的處理產生出我們想要的樣子,這樣子的好處是我們可以用更簡潔的方式來完成我們所需要的操作。
另外, Flow 所提供的這些中間運算子的名稱與 Functional Programming 裏面的相同,原因是因為它們屬於相同的操作,如果熟悉 FP 對這樣的操作就不會太陌生。

本篇文章先介紹到這邊,下篇文章將繼續介紹 Flow ,除了 collect 以外,我們還有哪些運算子可以使用呢?

特別感謝

Kotlin Taiwan User Group
Kotlin 讀書會


上一篇
Day17:Flow,一個非同步的資料流。 First Look
下一篇
Day19:Flow 準備好輸出了嗎?使用 Terminal operators 產生結果吧。
系列文
Coroutine 停看聽30

尚未有邦友留言

立即登入留言