昨天我們使用了 shareIn
將 Flow 轉成 SharedFlow, 我們來研究一下這個函式。
public fun <T> Flow<T>.shareIn(
scope: CoroutineScope,
started: SharingStarted,
replay: Int = 0
): SharedFlow<T>
首先,我們可以知道這是一個 extension function,並且帶有三個參數, scope
、 stsrted
、 replay
,我們看一下昨天的範例:
class Day22 {
val scope = CoroutineScope(Job() + Dispatchers.Default)
fun sharedFlow() = flow {
repeat(10) {
delay(100)
emit(it)
}
}.shareIn(
scope,
replay = 5,
started = SharingStarted.WhileSubscribed()
)
}
scope
這邊是帶入我們在這個類別中所建立的 CoroutineScope,目的是用來定義共享開始的 Coroutine Scope。
replay
是帶入一個整數值,目的是用來當新的訂閱者(subscriber)加入時,會根據這個數值把最後的幾個值發給它們。
started
則是用來決定 SharedFlow 什麼時候會開始啟動。而在這邊有三個選項可供選擇:
注意到,這邊所使用的 shareIn
是用來建立一個 SharedFlow,裡面的內容是不可變的。會在每次加入新的訂閱者之後把內容傳給它們。
SharedFlow
是一個介面,它裡面只有定義一個函式: val replayCache: List<T>
,可以猜出 SharedFlow 的作用就是重播它的快取給新加入的訂閱者。
interface SharedFlow<out T> : Flow<T> {
val replayCache: List<T>
}
這時候我們可以使用 MutableSharedFlow
,MutableSharedFlow 顧名思義就是可以變動的 SharedFlow,也就是說 MutableSharedFlow 裏面的資料,是可以更新的。
我們先看一下 MutableSharedFlow
的介面是什麼樣子?
interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
fun tryEmit(value: T): Boolean
val subscriptionCount: StateFlow<Int>
fun resetReplayCache()
}
在這個介面中,它實作了兩個介面,一個是我們前面所提到的 SharedFlow
另一個則是 FlowCollector
。
SharedFlow 的功能如剛剛所說,是用來發送快取的內容給新加入的訂閱者。那 FlowCollector 呢?
我們直接看他的定義
public interface FlowCollector<in T> {
public suspend fun emit(value: T)
}
嗯,它裡面包含了 public suspend fun emit(value: T)
,所以 MutableSharedFlow 就可以任意的發射資料給訂閱者。
介紹完這兩個介面之後,是時候介紹一下另外三個函式。
tryEmit
:跟 emit
最大的不同就是,它不是一個 suspend 函式,它會嘗試發射資料,如果成功發射不需暫停,就會直接回傳 true ,如果需要暫停,則回傳 false 並且呼叫原本的 emit。subscriptionCount
:回傳有多少個訂閱者。resetReplayCache
:將所有快取內的資料清空。假設我們有一個類別名為 Company,儲存資料類型 Employee,如果想要動態的發送 Employee 的更新給所有的訂閱者,我們可以這麼設計:(參考)
class Company{
private val _employees = MutableSharedFlow<Employee>()
val employees = _employees.asSharedFlow()
suspend fun addEmployee(employee: Employee){
_employees.emit(employee)
}
}
對外暴露的只有 employees
以及 fun addEmployee
這兩個項目,在類別 Company 中,只有開放這兩個方式來取用/更新 Employee 。對於使用者來說,只需要把 Employee 透過 addEmployee 來更新即可,而外面的使用者不能直接取得 _employees (MutbaleSharedFlow),而是只能取得它不可變動的版本。這樣子我們就把資料的變動留在這個類別中。
而這樣子的設計,在 Android 的 Best Practice 中到處可見。例如 ViewModel 裏面的 LiveData 就是這麼的設計。話說回來,SharedFlow 可以用來取代 LiveData,接下來我先會先介紹 StateFlow,介紹完之後,如果還有時間,我將介紹如何使用 SharedFlow 以及 StateFlow 來替代 LiveData。
SharedFlow 與 Flow 不同,因為它是屬於 hot stream,也就是說 SharedFlow 的執行不是像 Flow 一樣呼叫 collect 之後就會執行,而在執行完成之後就會立刻結束。 SharedFlow 可以同時有多個 collect,SharedFlow 可以同時將快取內的資料同時傳送給所有 collect,在 SharedFlow 中 collect 稱為 subscriber(訂閱者)。
我們可以根據需求設定快取的大小,當有新的訂閱者加入時,就會把最後幾筆資料傳送給訂閱者,而當更新的資料傳送完畢之後,就會跟其他的訂閱者一起接收新的資料。
使用 shareIn 建立的是一個 SharedFlow,也就是裡面的資料是不可更新的,如果要建立可以更新的 SharedFlow,必需要使用 MutableSharedFlow。
Reactive Streams on Kotlin: SharedFlow and StateFlow
Kotlin Taiwan User Group
Kotlin 讀書會
有興趣的讀者歡迎參考:https://coroutine.kotlin.tips/
天瓏書局