iT邦幫忙

2021 iThome 鐵人賽

DAY 13
0
Mobile Development

Jetpack Compose X Android Architecture X Functional Reactive Programming系列 第 13

Multicasting for RxJava

在進入正題之前先讓大家看看在 Reactive Programming 中的一種使用案例:

val studentObservable: Observable<Student> = getStudents()

studentObservable
    .filter { it.gender == "male" }
    .subscribe {
        // handle male logic
    }

studentObservable
    .filter { it.gender == "female" }
    .subscribe {
        // handle female logic
    }

// Don't do this
studentObservable
    .subscribe { student ->
        when(student.gender) {
            "male" -> { TODO() }
            "female" -> { TODO() }
        }
    }

在使用 reactive stream 的時候,有時候並不需要“一條通到底”,在必要的時候,可以採取這種“分流”的方式,分別處理各自的邏輯,也比較符合 Single Responsibility Principle。但是這樣使用卻會導致其他問題的發生,請看看下面這段範例:

val a = Observable.create<Int> { emitter ->
    for (i in (1..3)) {
        println("emit $i")
        emitter.onNext(i)
    }
    emitter.onComplete()
}

a.subscribe { number ->
    println("received 1 $number")
}
a.subscribe { number ->
    println("received 2 $number")
}

猜猜上面這段程式碼的輸出是什麼?

emit 1
received 1 1
emit 2
received 1 2
emit 3
received 1 3
emit 1
received 2 1
emit 2
received 2 2
emit 3
received 2 3

這個 Observable 由於被 subscribe 了兩次,所以 emit 1emit 2emit 3 也都各執行了兩次,這乍看之下沒有什麼大問題,但萬一這個 Observable 其實是一個很耗時的任務呢?同樣的任務,同樣的輸出結果我卻需要執行兩次以上,不是很浪費機器效能嗎?那有沒有辦法只執行一次呢?答案是有的:

val a = Observable.create<Int> { emitter ->
    for (i in (1..3)) {
        println("emit $i")
        emitter.onNext(i)
    }
    emitter.onComplete()
}.cache()

a.subscribe { number ->
    println("received 1 $number")
}
a.subscribe { number ->
    println("received 2 $number")
}

使用 cache() 這個 operator 可以讓我們把所有跑過的結果都記起來,當有第二個或第三個 Observer subscribe 的時候,就可以直接拿到這結果,請看輸出:

emit 1
received 1 1
emit 2
received 1 2
emit 3
received 1 3
received 2 1
received 2 2
received 2 3
// emit 1, emit 2, emit 3 都只跑一次了 

以上這種機制叫做 Multicasting :一個源頭事件流,同時分發到不同的接收者而不需要重複執行已經運算過的內容。

強大而危險的工具

在寫 Reactive Application 的時候,效能一直都會是一個很大的考驗,因為我們偏好使用 Immutable object,在每一個 operator 運行的時候可能會產生一個新的物件出來,而要產生一個新物件就要將記憶體中的一個位置讓出來給他,當事件越來越多時,記憶體消耗的速度就會越來越快,量大到一定程度的話,系統就會開啟 Garbage Collection 的機制自動回收記憶體,而回收的次數太多就會有可能使得使用者感覺到使用者介面卡頓,這是一個很需要被意識到的問題。

因此,在使用 Reactive Programming 做“分流”時,要隨時注意到是不是有不小心在什麼地方浪費了太多系統資源,如果發現了,就要適時的使用 multicasting 的方式來做優化。然而 multicasting 卻也是一個雙面刃,以下是官方在 cache() 這個 operator 中寫的文件:

Note: You sacrifice the ability to dispose the origin when you use the cache operator, so be careful not to use this operator on Observables that emit an infinite or very large number of items that will use up memory. A possible workaround is to apply takeUntil with a predicate or another source before (and perhaps after) the application of cache().

大意是說,一但使用了 cache() ,就失去了對上游資料流的控制權,你將無法靠下游的 Disposable 來斷掉整個資料流,就算下游的 Observable 已經被某人給結束,他的上游還是會永遠存在,而且還會繼續、默默的產生資料,你的記憶體將會被消耗殆盡!為了驗證這樣的說法,我們來試著做出永遠不會死掉的事件源:

val observable = Observable.create<Int> { emitter ->
    var count = 0
    while (!emitter.isDisposed) {
        count +=1
        emitter.onNext(count)
        println("emit number: $count")
        Thread.sleep(1000)
    }
}.publish() // [1]

val disposable = observable
    .subscribeOn(Schedulers.computation())
    .observeOn(Schedulers.io())
    .subscribe { number ->
        println("observable: $number") // [3]
    }

Completable.complete()
    .observeOn(Schedulers.single())
    .subscribe {
        observable.connect()  // [2]
    }

Thread.sleep(5000)  // [4]

disposable.dispose()  // [5]

Thread.sleep(5000) 

我們建立了一個永遠都不會結束的 Observable ,每發出一個 item ,就會 sleep 一秒鐘的時間,一直到這個 observable 被 dispose 為止。然後在 [1] 這個地方有一個 publish() ,這個 operator 的目的是為了要將他變成一個 Hot Observable,接著,為了讓這個 hot Observable 能夠開始發出第一個資料,我們必須要在 [2] 這個地方呼叫 connect() 這個 function 。接著在 [3] 這個地方 subscribe 了這個 observable ,而且使用不同的執行緒來接收結果來確保這些執行的順序都是獨立的。[4] 強制目前這條執行緒停止五秒鐘,觀察一下結果,並且在 [5] 這個地方取消了在 [3] 的訂閱,照理說,一但取消了訂閱,observable 也要立即停止送出新的資料,不然在這邊會有記憶體浪費的問題存在,我們來看看執行的結果:

observable: 1
emit number: 1
emit number: 2
observable: 2
emit number: 3
observable: 3
emit number: 4
observable: 4
emit number: 5
observable: 5
emit number: 6
emit number: 7
emit number: 8
emit number: 9
emit number: 10

Hot Observable: 跟我們之前熟悉的 Observable 不一樣,即使沒有 Observer,也會持續不斷的送出新資料出來,當碰到地理位置更新或是藍芽訊號時,會需要用到這種類型的 Observable

我們可以觀察到,即使 dispose 已經觸發了,源頭的 observable 卻還是沒有停下來,持續的在丟出新的資料。事實上, cache() 在實作上就是 publishconnect 的結合。

以上是一個比較複雜的範例,主要是用來示範 Observable 在使用 Multicasting 的情況下是有可能無法操控源頭的 Observable 的,為了這個示範我就用了比較多進階的概念。如果沒有完全懂的也沒關係,下面我再舉一個比較淺顯易懂的例子:

val observable = Observable.create<Int> { emitter ->
    var count = 0
    while (!emitter.isDisposed) {
        count +=1
        emitter.onNext(count)
        println("emit number: $count")
        Thread.sleep(1000)
    }
}
    .doOnDispose { 
        println("This observable is disposed") //[1]
    }

val disposable = observable
    .subscribeOn(Schedulers.computation())
    .observeOn(Schedulers.io())
    .subscribe { number ->
        println("observable: $number")
    }

Thread.sleep(5000)

disposable.dispose()

Thread.sleep(5000)

我把剛剛的 publish()connect() 拿掉了,並且在 [1] 這個地方加了一個 side effect operator - doOnDispose ,在這個 observable 被 dispose 掉的時候就會去呼叫它,來看看執行的結果:

observable: 1
emit number: 1
emit number: 2
observable: 2
emit number: 3
observable: 3
emit number: 4
observable: 4
emit number: 5
observable: 5
This observable is disposed

沒有了 publish()connect() 之後,這個 observable 的確沒有再繼續跑了,看到 “This observable is disposed” 就是最直接的證據,那如果我再加上 cache() 會發生什麼事呢?

val observable = Observable.create<Int> { emitter ->
    var count = 0
    while (!emitter.isDisposed) {
        count +=1
        emitter.onNext(count)
        println("emit number: $count")
        Thread.sleep(1000)
    }
}
    .doOnDispose {
        println("This observable is disposed")
    }.cache() // [1]

val disposable = observable
    .subscribeOn(Schedulers.computation())
    .observeOn(Schedulers.io())
    .subscribe { number ->
        println("observable: $number")
    }

Thread.sleep(5000)

disposable.dispose()

Thread.sleep(5000)

如上方程式碼所示,我在 [1] 的這個地方加了 cache() ,執行的結果會是什麼呢?

emit number: 1
observable: 1
emit number: 2
observable: 2
emit number: 3
observable: 3
emit number: 4
observable: 4
emit number: 5
observable: 5

結果果然跟文件上說的一樣,原來的 observable 並沒有結束,而是一直靜靜的待在那邊,永遠沒有人可以結束的了他的生命...那如果我們要結束這個使用過 cache 的 observable 要怎麼做呢?可以參考官方建議的作法:

val endSignal = PublishSubject.create<Unit>()
val observable = Observable.create<Int> { emitter ->
    var count = 0
    while (!emitter.isDisposed) {
        count +=1
        emitter.onNext(count)
        println("emit number: $count")
        Thread.sleep(1000)
    }
}
    .doOnDispose {
        println("This observable is disposed")
    }
    .takeUntil(endSignal) // [1]
    .onTerminateDetach()
    .cache()

val disposable = observable
    .subscribeOn(Schedulers.computation())
    .observeOn(Schedulers.io())
    .subscribe { number ->
        println("observable: $number")
    }

Thread.sleep(5000)

endSignal.onNext(Unit) // [2]
disposable.dispose()

Thread.sleep(5000)

在 [1] 這個地方插入一個 takeUntil ,並且在預計要結束的地方觸發結束的條件,也就是在 [2] 這邊。

Share

除了 cache() 之外,還有一個功能類似,但是沒有那麼危險的 multicasting operator - share,跟 cache 最主要的差別是,cache 在有多個 observer 的情況下,每一個 observer 都能收到最完整的資訊,但是 share() 新的 observer 卻只能收到最新的結果,無法收到之前的所有資料,請看下面的範例:

val observable = Observable.create<Int> { emitter ->
    var count = 0
    while (!emitter.isDisposed) {
        count +=1
        emitter.onNext(count)
        println("emit number: $count")
        Thread.sleep(1000)
    }
}
    .cache()

observable
    .subscribeOn(Schedulers.computation())
    .observeOn(Schedulers.io())
    .subscribe { number ->
        println("observer1 : $number")
    }

Thread.sleep(3000)

observable
    .subscribeOn(Schedulers.computation())
    .observeOn(Schedulers.io())
    .subscribe { number ->
        println("observer2 : $number")
    }

Thread.sleep(1000)

以上這段程式碼使用了 cache() ,而且第一個 observer 跟第二個 observer subscribe 的時機點中間差了 3 秒鐘,以下是執行的結果:

emit number: 1
observer1 : 1
emit number: 2
observer1 : 2
emit number: 3
observer1 : 3
observer2 : 1
observer2 : 2
observer2 : 3
observer1 : 4
emit number: 4
observer2 : 4

observer2 一執行 subscribe 就馬上收到 1, 2, 3 三個資料,那如果將 cache() 換成 share() 呢?

observer1 : 1
emit number: 1
emit number: 2
observer1 : 2
emit number: 3
observer1 : 3
observer1 : 4
emit number: 4
observer2 : 4

observer2 沒收到 1, 2, 3 這三個資料,但是在之後發送的資料卻還是收得到,如果是不需要拿到所有資料的情況下,使用這個 operator 是比較好的選擇。

Subject

其實 Subject 也可以用來做 multicasting 喔!只要把計算產生的結果再送給 Subject ,就可以避免掉重複的計算以及浪費的記憶體:

val subject = BehaviorSubject.create<Int>()
  
val observable: Observable<Int> = getExpensiveObservable()
observable.subscribe { number -> 
    subject.onNext(number)
}

// multicast 1
subject.subscribe {  }

// multicast 2
subject.subscribe {  }

小結

今天看到了 multicasting 的一些 operator ,以及使用上要注意的地方,還有使用 subject 來當作 multicasting 的一種實作方式,實際上在專案上使用時,可能會看情況來使用其中一種。

在大型專案開發上,如果沒有特別注意是會產生很嚴重的後果的,接下來的篇幅中,將會使用實際的案例來讓大家更深刻體會到 multicasting 使用的必要性,大家明天見!


上一篇
RxJava - Backpressure
下一篇
完成便利貼程式第一版
系列文
Jetpack Compose X Android Architecture X Functional Reactive Programming30

尚未有邦友留言

立即登入留言