iT邦幫忙

2021 iThome 鐵人賽

DAY 22
1

昨天我們使用了 shareIn 將 Flow 轉成 SharedFlow, 我們來研究一下這個函式。

public fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow<T>

首先,我們可以知道這是一個 extension function,並且帶有三個參數, scopestsrtedreplay ,我們看一下昨天的範例:

class Day22 {
    val scope = CoroutineScope(Job() + Dispatchers.Default)
    fun sharedFlow() = flow {
        repeat(10) {
            delay(100)
            emit(it)
        }
    }.shareIn(
        scope,
        replay = 5,
        started = SharingStarted.WhileSubscribed()
    )
}

scope 這邊是帶入我們在這個類別中所建立的 CoroutineScope,目的是用來定義共享開始的 Coroutine Scope。

replay 是帶入一個整數值,目的是用來當新的訂閱者(subscriber)加入時,會根據這個數值把最後的幾個值發給它們。

started 則是用來決定 SharedFlow 什麼時候會開始啟動。而在這邊有三個選項可供選擇:

  1. Eagerly:當 SharedFlow 建立起來之後,就會立刻起動,而且永不停止。
  2. Lazily:等到第一個監聽者加入的時候,才會啟動,同樣也是永不停止。
  3. WhileSubscribed:預設是當第一個訂閱者加入的時候就會啟動,當最後一個監聽者離開的時候就會停止。所以與前面兩個的第一個不同點就是它會停止。
    • 在 WhileSubscribed 中,有另外兩個參數可以使用:
      1. stopTimeoutMillis:當最後一個訂閱者離開的時候,需要多久才會停止 Shared Flow。
      2. replayExpirationMillis:當有新的訂閱者加入時,會從快取中取得最後的內容,預設是會永遠保存這些內容,也就是不管多久有新訂閱者加入,都可以取得內容。

注意到,這邊所使用的 shareIn 是用來建立一個 SharedFlow,裡面的內容是不可變的。會在每次加入新的訂閱者之後把內容傳給它們。

SharedFlow 是什麼?

SharedFlow 是一個介面,它裡面只有定義一個函式: val replayCache: List<T> ,可以猜出 SharedFlow 的作用就是重播它的快取給新加入的訂閱者。

interface SharedFlow<out T> : Flow<T> {
    val replayCache: List<T>
}

如果我們需要在已有訂閱者的時候要在發射 (emit) 內容呢?

這時候我們可以使用 MutableSharedFlow ,MutableSharedFlow 顧名思義就是可以變動的 SharedFlow,也就是說 MutableSharedFlow 裏面的資料,是可以更新的。

我們先看一下 MutableSharedFlow 的介面是什麼樣子?

interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
    fun tryEmit(value: T): Boolean
    val subscriptionCount: StateFlow<Int>
    fun resetReplayCache()
}

在這個介面中,它實作了兩個介面,一個是我們前面所提到的 SharedFlow 另一個則是 FlowCollector

SharedFlow 的功能如剛剛所說,是用來發送快取的內容給新加入的訂閱者。那 FlowCollector 呢?

我們直接看他的定義

public interface FlowCollector<in T> {
    public suspend fun emit(value: T)
}

嗯,它裡面包含了 public suspend fun emit(value: T) ,所以 MutableSharedFlow 就可以任意的發射資料給訂閱者。

介紹完這兩個介面之後,是時候介紹一下另外三個函式。

  • tryEmit :跟 emit 最大的不同就是,它不是一個 suspend 函式,它會嘗試發射資料,如果成功發射不需暫停,就會直接回傳 true ,如果需要暫停,則回傳 false 並且呼叫原本的 emit。
  • subscriptionCount :回傳有多少個訂閱者。
  • resetReplayCache :將所有快取內的資料清空。

使用範例

假設我們有一個類別名為 Company,儲存資料類型 Employee,如果想要動態的發送 Employee 的更新給所有的訂閱者,我們可以這麼設計:(參考)

class Company{
		private val _employees = MutableSharedFlow<Employee>()
		val employees = _employees.asSharedFlow()

		suspend fun addEmployee(employee: Employee){
			_employees.emit(employee)
		}
}

對外暴露的只有 employees 以及 fun addEmployee 這兩個項目,在類別 Company 中,只有開放這兩個方式來取用/更新 Employee 。對於使用者來說,只需要把 Employee 透過 addEmployee 來更新即可,而外面的使用者不能直接取得 _employees (MutbaleSharedFlow),而是只能取得它不可變動的版本。這樣子我們就把資料的變動留在這個類別中。

而這樣子的設計,在 Android 的 Best Practice 中到處可見。例如 ViewModel 裏面的 LiveData 就是這麼的設計。話說回來,SharedFlow 可以用來取代 LiveData,接下來我先會先介紹 StateFlow,介紹完之後,如果還有時間,我將介紹如何使用 SharedFlow 以及 StateFlow 來替代 LiveData。

小結

SharedFlow 與 Flow 不同,因為它是屬於 hot stream,也就是說 SharedFlow 的執行不是像 Flow 一樣呼叫 collect 之後就會執行,而在執行完成之後就會立刻結束。 SharedFlow 可以同時有多個 collect,SharedFlow 可以同時將快取內的資料同時傳送給所有 collect,在 SharedFlow 中 collect 稱為 subscriber(訂閱者)。

我們可以根據需求設定快取的大小,當有新的訂閱者加入時,就會把最後幾筆資料傳送給訂閱者,而當更新的資料傳送完畢之後,就會跟其他的訂閱者一起接收新的資料。

使用 shareIn 建立的是一個 SharedFlow,也就是裡面的資料是不可更新的,如果要建立可以更新的 SharedFlow,必需要使用 MutableSharedFlow。

參考資料

Reactive Streams on Kotlin: SharedFlow and StateFlow

SharedFlow

特別感謝

Kotlin Taiwan User Group
Kotlin 讀書會


由本系列文改編的《Kotlin 小宇宙:使用 Coroutine 優雅的執行非同步任務》已經上市囉。

有興趣的讀者歡迎參考:https://coroutine.kotlin.tips/
天瓏書局


上一篇
Day21:Hot Flow - SharedFlow
下一篇
Day23:Hot Flow - StateFlow
系列文
Coroutine 停看聽30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言