iT邦幫忙

2021 iThome 鐵人賽

DAY 19
0
Software Development

Coroutine 停看聽系列 第 19

Day19:Flow 準備好輸出了嗎?使用 Terminal operators 產生結果吧。

Flow 經過 Intermediate operators 將資料經過處理之後,最後一步則是要把資料輸出,而將資料輸出則是要透過 Terminal operators。

而除了我們在之前所介紹的 collect 外,Terminal operators 還有其他的成員。那麼,本篇文章將一一介紹這些 Terminal operators。

Terminal operators (終端運算子)

首先,Terminal operators 我這邊先暫時翻譯成「終端運算子」,如果有更好的建議可以跟我說喔。

這邊一樣使用這個簡單的範例:

fun flow(): Flow<Int> = flow {
      println("Flow started")
      repeat(10){
        delay(100)
        emit(it)
    }
}

collect

我們在前面的範例中, collect 使用蠻多次的,我們看一下它的簽名:

inline suspend fun <T> Flow<T>.collect(crossinline action: suspend (T) -> Unit)

在 collect 的參數中,只有一個 actioin: suspend(T) -> Unit ,在調用 collect 的時候可以同時執行這裡面的內容。這邊的 action 是一個 suspend 函式。

使用範例如下:

我們可以直接使用 collect 來把 Flow 裏面所有的資料拿出來做最後的處理。

fun main() = runBlocking {
    val flow = flow()
    flow.collect { value -> println(value)}
}

另外,有另外一種 collect 的方式,定義如下:

suspend fun Flow<*>.collect()

從上面的定義得知,這種的 collect 函式不需要帶任何的參數。當我們調用 collect 的時候,一樣會把 Flow 上的所有資料按照我們的設定來處理。

但是,這個函式是沒有回傳值的,那麼我們該如何使用這個函式呢?

Flow 提供了四個函式 onStartonEachonCompletecatch 。這幾個函式能夠在執行 collect 之前執行,其實它們也是 Intermediate operators ,將上面的範例改成:

fun main() = runBlocking {
    val flow = flow()
		flow.onEach { println(it) }
        .onCompletion { println("done") }
        .collect()
}
Flow started
0
1
2
3
4
5
6
7
8
9
10
done

我們看一下這幾個函式:

onStart

在 Flow 的最前面執行,也就是說當執行 collect 的時候,那麼就會執行 onStart

這邊的重點是,不管 onStart 的順序,它都是會在第一個執行。

public fun <T> Flow<T>.onStart(
  action: suspend FlowCollector<T>.() -> Unit
): Flow<T>
fun main() = runBlocking {
    val flow = flow()
		flow.onStart{ println("start") }
        .onEach { println(it) }
        .onCompletion { println("done") }
        .collect()
}
fun main() = runBlocking {
    val flow = flow()
		flow.onEach { println(it) }
        .onCompletion { println("done") }
				.onStart{ println("start") }
        .collect()
}

這兩段都會輸出:

start
Flow started
0
1
2
3
4
5
6
7
8
9
10
done

onCompletion

與 onStart 相反,onCompletion 是會在所有的動作執行完畢之後才會呼叫的。

public fun <T> Flow<T>.onCompletion(
    action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
): Flow<T>

在 onCompletion 中,action 的型別為 suspend FlowCollector<T>.(cause: Throwable?) -> Unit

所以我們也可以在 onCompletion 中 使用 emit

fun main() = runBlocking {
    val flow = flow()
		flow.onStart{ println("start") }
        .onEach { println(it) }
        .onCompletion { emit("done") }
        .collect{ println(it) }
}
start
Flow started
0
1
2
3
4
5
6
7
8
9
10
done

onEach:它是用來走訪每一個 Flow 的元素的,如同我們上方的範例

這邊要特別注意的, onEach 會根據擺放的位置會接收到不同的元素,所以擺放的位置可能會影響其結果。

public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T>

範例1: onEach 在 map 前方

fun main() = runBlocking {
    val flow = flow()
		flow.onEach { println(it) }
        .map { it * 3 }
        .collect()
}
Flow started
0
1
2
3
4
5
6
7
8
9

範例2: onEach 在 map 後方


fun main() = runBlocking {
    val flow = flow()
		flow.map { it * 3 }
				.onEach { println(it) }
        .collect()
}
Flow started
0
3
6
9
12
15
18
21
24
27

從這兩個結果我們可以得知,onEach 會根據擺放的位置而有不同的結果,在每個 Intermediate operator 執行過後, Flow 裡面的內容就會更改,所以使用 onEach 也會有不同的值。

catch:用來捕捉異常狀態的函式。

public fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(Throwable) -> kotlin.Unit): Flow<T>

直接看一下這個範例:

fun main() = runBlocking {
    val flow = flow()
		flow.map { it * 3 }
				.onEach { 
						println(it)
						if(it>10) throw RuntimeException("large than 10")
				 }
        .collect()
}

在這個範例中,我們在 onEach 的地方加上了 if(it>10) throw RuntimeException("large than 10") 也就是說,當 Flow 裏面的元素大於 10 的時候,就會噴 RuntimeException。如果我們直接執行這段程式碼,會發現的確在 12 的時候拋出了 RuntimeException,而程式也就中斷了。

Flow started
0
3
6
9
12
Exception in thread "main" java.lang.RuntimeException: large than 10
...

我們可以使用 catch 來捕捉異常,如下:

fun main() = runBlocking {
    val flow = flow()
		flow.map { it * 3 }
				.onEach { 
						println(it)
						if(it>10) throw RuntimeException("large than 10")
				 }
				.catch{ println(it) }
        .collect()
}
**Flow started
0
3
6
9
12
java.lang.RuntimeException: large than 10**

single

collect 會把所有在 Flow 裏面的元素都列出來, single 則是相反,它只能列出一個元素。

如果 Flow 裏面沒有元素,會拋出 NoSuchElementException 。如果元素超過 1 個,則是會拋出 IllegalStateException

suspend fun <T> Flow<T>.single(): T

所以我們可以搭配 take(1) 來確保 Flow 裏面的元素只有一個。

fun main() = runBlocking {
		val flow = flow
    val value = flow
        .map { it * 3 }
        .take(1)
        .single()
    println(value)
}
Flow started
0

reduce

在這邊的 reduce 的意思是漸少元素,這是什麼意思呢?在 reduce 中我們會有兩個值,一個是前一個運算得來的值,另一個是現在的值,會一個一個元素走訪過。隨著 index 往後移動,剩餘的元素也就越來越少了,直到全部的元素都走過了。

suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (S, T) -> S): S

範例:

fun main() = runBlocking {
		val flow = flow
    val reduce = flow
        .reduce { it, it2 ->
            println("$it + $it2")
            it + it2
        }
    println(reduce)
}

Flow started
0 + 1
1 + 2
3 + 3
6 + 4
10 + 5
15 + 6
21 + 7
28 + 8
36 + 9
45

reduce


fold

與 reduce 類似,不過它有一個初始值。

inline suspend fun <T, R> Flow<T>.fold(initial: R, crossinline operation: suspend (R, T) -> R): R

範例如下:

fun main() = runBlocking{
	val fold = (1..4).asFlow()
	        .fold(1) { it1, it2 -> it1 * it2 }
	    println(fold)
}
24

toList

最後介紹的是 toList ,我們可以在最後將 Flow 轉成 Collection

suspend fun <T> Flow<T>.toList(destination: MutableList<T> = ArrayList()): List<T>

範例如下:

fun main() = runBlocking{
		val reversed = flow().map { it * 3 }
        .catch { println(it) }
        .onCompletion { println("Done") }
        .toList(ArrayList())
				.reversed()
    println(reversed)
}
Flow started
Done
[27, 24, 21, 18, 15, 12, 9, 6, 3, 0]

小結

Flow 在 Terminal operators 之前都不會執行,我們可以在 Terminal operators 之前使用不同的 Intermediate operators ,而 Intermediate operators 可以使用多個,但是最後的 Terminal operators 只能呼叫一次。 Terminal operators 針對不同的用途有提供不同的函式,有直接傳回所有 Flow 內容的 collect ,有只能傳回一個值的 single ,如果希望在最後可以針對所有的 Flow 元素進行處理則可以使用 reduce 以及 fold ,最後我們當然可以將 Flow 轉成 list,只要呼叫 toList 即可。

特別感謝

Kotlin Taiwan User Group
Kotlin 讀書會


由本系列文改編的《Kotlin 小宇宙:使用 Coroutine 優雅的執行非同步任務》已經上市囉。

有興趣的讀者歡迎參考:https://coroutine.kotlin.tips/
天瓏書局


上一篇
Day18:Flow 的中間運算子,資料輸出前還可以做很多事喔
下一篇
Day20:Flow 想在其他的執行緒執行,可以嗎?
系列文
Coroutine 停看聽30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言