iT邦幫忙

2021 iThome 鐵人賽

DAY 17
1
Software Development

Coroutine 停看聽系列 第 17

Day17:Flow,一個非同步的資料流。 First Look

What is Flow?

Flow 是用來處理非同步的資料流的一種方式,它會按照發射 (emit) 的順序來執行。

An asynchronous data stream that sequentially emits values and completes normally or with an exception.

將資料透過 Flow 的方式發送,在資料接收之前,這筆資料都不會被執行、運算,我們甚至可以在執行之前透過一些函式來將這些資料轉化成我們所希望的樣子。

跟 Channel 不太一樣的地方是,Channel 一次取出一個值,而 Flow 取出的是一個流(Stream)。

換句話說,使用 Channel 時,我們必須要呼叫多次的 receive() 來接收 send() 傳送出來的值,而 Flow 只需要使用 collection{} 就可以處理所有在這個資料流。

Simple Flow

Kotlin 提供多種方式建立 Flow ,在這邊我們使用 flow{ ... } 來建立。

fun flow1(): Flow<Int> = flow {
      println("Flow started")
      repeat(10){
        delay(100)
        emit(it)
    }
}

我們可以發現 flow{ ... } 建立出來的函式不是一個 suspend 函式,原因就是 Flow 是等到我們接收的時候才會去執行,所以接收的函式才會是一個 suspend 函式。

另外,不知道你有沒有注意到,在 flow{} 裏面包含著一個 suspend 函式 (delay()),也就是說這個 lambda 函式也是一個 suspend 函式。flow{} 的簽名如下:

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

可以發現在 flow{} 包含了一個 suspend 函式 FlowCollector<T>.() -> Unit

在這個 flow {} 中,我們最後使用了 emit() 將整數傳進 stream 中。

這邊的 emit() 其實就是 FlowCollector 的函式

public interface FlowCollector<in T> {

    /**
     * Collects the value emitted by the upstream.
     * This method is not thread-safe and should not be invoked concurrently.
     */
    public suspend fun emit(value: T)
}

如何取得 Flow 的資料呢?

從上方的範例我們知道,回傳的是一個 Flow<T> 的值,那麼我們來看一下這個 Flow 的定義:

public interface Flow<out T> {

    @InternalCoroutinesApi
    public suspend fun collect(collector: FlowCollector<T>)
}

在 Flow 中,裏面只有一個函式 ( collect() ),其中有一個參數為 FlowCollector<T> ,這個型別就是我們在 flow{} 中建立的 lambda 的型別。

所以到這邊我們就大致瞭解了, Flow 是利用 FlowCollector<T> 來傳送資料,用 emit() 把資料塞進去,然後用 collect 把 資料取出。

取出資料的範例如下:

fun main() = runBlocking {
    val flow = flow1()
    flow.collect { value -> println(value)}
}
Flow started
0
1
2
3
4
5
6
7
8
9

Flow is cold stream

我們在前面有提到, Flow 的資料是只有在接收的時候才會執行,也就是說,同樣的資料如果存放在 stream 中,我們可以多次接收,在每次呼叫接收的時候,都會重新執行一次 flow{} 裏面的程式碼。

我們上例稍作修改:

fun main() = runBlocking {
    val flow = flow1()
    flow.collect { value -> println(value)}
		flow.collect { value -> println(value)}
}
Flow started
0
1
2
3
4
5
6
7
8
9
Flow started
0
1
2
3
4
5
6
7
8
9

則結果的確是會得到兩次一樣的資料。

Flow Builder

Flow 有提供多種建立的方法,如下:

  • flow{}

這就是我們在上面範例所使用的 Builder,我們在這個 builder 中,使用 emit() 把資料傳進 stream 中。

  • flowOf(...)

flowOf 有兩種不同的實作,簡單來看就是一個是一個值,另外一個是處理多個值。重點是,flowOf 其實在裡面的實作也是使用 flow{} 來建立 Flow。

public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
    for (element in elements) {
        emit(element)
    }
}
public fun <T> flowOf(value: T): Flow<T> = flow {
    /*
     * Implementation note: this is just an "optimized" overload of flowOf(vararg)
     * which significantly reduces the footprint of widespread single-value flows.
     */
    emit(value)
}

使用方式:

flowOf(1,2,3,4)
  • asFlow()

如果你的資料是一個 Collection,那麼我們就可以使用 asFlow() 來將 Collection 轉成 Flow

底下為 asFlow() 的實作,可以看到有各式各樣的實作

public fun <T> Array<T>.asFlow(): Flow<T> = flow {
    forEach { value ->
        emit(value)
    }
}

public fun IntArray.asFlow(): Flow<Int> = flow {
    forEach { value ->
        emit(value)
    }
}

public fun LongArray.asFlow(): Flow<Long> = flow {
    forEach { value ->
        emit(value)
    }
}

public fun IntRange.asFlow(): Flow<Int> = flow {
    forEach { value ->
        emit(value)
    }
}

public fun LongRange.asFlow(): Flow<Long> = flow {
    forEach { value ->
        emit(value)
    }
}

@FlowPreview
public fun <T> (() -> T).asFlow(): Flow<T> = flow {
    emit(invoke())
}

@FlowPreview
public fun <T> (suspend () -> T).asFlow(): Flow<T> = flow {
    emit(invoke())
}

public fun <T> Iterable<T>.asFlow(): Flow<T> = flow {
    forEach { value ->
        emit(value)
    }
}

public fun <T> Iterator<T>.asFlow(): Flow<T> = flow {
    forEach { value ->
        emit(value)
    }
}

public fun <T> Sequence<T>.asFlow(): Flow<T> = flow {
    forEach { value ->
        emit(value)
    }
}

小結

Flow 是一種非同步處理資料的方式,使用 Flow 其實就是把資料丟進一個流(Stream)中,而 Flow 裏面的程式碼只有在呼叫接收的時候才會調用,所以建立 Flow 的函式不需要為 suspend 函式。

本篇文章我們簡單介紹了Flow ,下一篇文章我們會多講一些深入的內容。

參考資料

https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/index.html

特別感謝

Kotlin Taiwan User Group
Kotlin 讀書會


由本系列文改編的《Kotlin 小宇宙:使用 Coroutine 優雅的執行非同步任務》已經上市囉。

有興趣的讀者歡迎參考:https://coroutine.kotlin.tips/
天瓏書局


上一篇
Day16:四種不同的 Channel
下一篇
Day18:Flow 的中間運算子,資料輸出前還可以做很多事喔
系列文
Coroutine 停看聽30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言