iT邦幫忙

2021 iThome 鐵人賽

DAY 12
1
Mobile Development

Jetpack Compose X Android Architecture X Functional Reactive Programming系列 第 12

RxJava - Backpressure

不知道大家有沒有手沖咖啡的經驗?如果沒有的話,應該也看過或用過濾掛式咖啡,在沖咖啡時,水不能一次倒太多,因為濾網的消化速度沒這麼快,要是你不管它的消化速度一直倒水進去的話,最後就會滿出來!

上面這個手沖咖啡的例子也會發生在 Reactive programming 上面,之前的章節中有講過,Reactive programming 的其中一個大特點是,事件流可以在不同的階段依據需求來去切換執行緒,所以萬一上游的事件發出太多,切換執行緒之後,下游無法即時處理時會發生什麼事呢?依咖啡的例子來說,熱水可以到的最大容量就是濾紙的大小,那以電腦來說就當然是記憶體的容量了,所以當下游來不及處理的話,是會發生 OutOfMemoryException 的!

這個現象在 RxJava 中有一個名詞來表示它: Backpressure。既然 Backpressure 有這個問題,RxJava 當然就提供了相對應的解法,接下來就來介紹它吧!

Flowable

跟 Observable 其實是幾乎一模一樣的,有各式各樣的 operator: mapflatMapfilterreducecombineLatestzip等等,也可以使用 observeOnsubscribeOn 來切換執行緒。Flowable 提供更多的是,在建立實體時提供了各種不同處理 Backpressure 的策略,像是丟掉最新的一筆、或是丟掉最後的一筆等等,下面舉個例子:

val f = Flowable.fromIterable((1..10000).toList())
f.subscribe { number ->
    println("number : $number")
}

運行結果:

number : 1
number : 2
number : 3
....
number : 10000

奇怪!全部都有跑出來啊,怎麼沒看到有任何資料被丟掉?但其實用法錯了,應該要用下面這種方式來建立 Flowable :

val f = Flowable.create(FlowableOnSubscribe<Int> { emitter ->
    for (i in 1..10000) {
        emitter.onNext(i)
    }
    emitter.onComplete()
}, BackpressureStrategy.DROP)
f.subscribe { number ->
    println("number : $number")
}

使用 Flowable.create 才能成功建立出有 BackpressureStrategy 的 Flowable 喔!現在的 BackpressureStrategy 為 DROP ,也就是丟掉多餘的資料,那我們來看看他是怎麼丟掉的吧!(根據下圖,數字應該只會跑到 128,因為預設的 buffer size 就是 128)

Screen Shot 2021-09-05 at 9.31.36 PM.png

number : 1
number : 2
number : 3
....
number : 10000

果不其然,結果是.....?不對啊,怎麼還是跑完 10000 了呢?那我們再做一個實驗,在 onNext 上面再留個 log:

val f = Flowable.create(FlowableOnSubscribe<Int> { emitter ->
    for (i in 1..10000) {
        **println("onNext : $i") // 多加這一行**
        emitter.onNext(i)
    }
    emitter.onComplete()
}, BackpressureStrategy.DROP)
f.subscribe { number ->
    println("number : $number")
}

運行結果如下:

onNext : 1
number : 1
onNext : 2
number : 2
onNext : 3
number : 3
...
onNext : 10000
number : 10000

原來從頭到尾都沒有在累積資料!每一筆資料送出之後馬上就被處理了,因為他們都是在同一條執行緒上運行啊!所以為了模擬 Backpressure ,就需要去切換執行緒:

val f = Flowable.create(FlowableOnSubscribe<Int> { emitter ->
    for (i in 1..10000) {
        emitter.onNext(i)
    }
    emitter.onComplete()
}, BackpressureStrategy.DROP)

f
    **.observeOn(Schedulers.computation()) // 多了這一行**
    .subscribe { number ->
        println("number : $number")
    }

Thread.sleep(1000L)

再跑最後一次,終於成功了!

number : 1
number : 2
number : 3
...
number : 128

BackpressureStrategy 還有其他四種,想了解的就交給各位讀者自己去玩了!

關於上回的 Bug

相信聰明的你已經知道上一篇的最後發生了什麼事了,沒錯!就是發生了 Backpressure!請看下圖說明:

Screen Shot 2021-09-05 at 9.47.17 PM.png

當最右邊的 Repository 要更新資料到 Firebase 時,是有一些網路延遲的,網路更新事件這邊是以橘色的小圓點來表示,這邊假設每一個小圓點都要花差不多 300 毫秒好了,過了 300 毫秒之後才能再處理下一個事件。接著我們再看到上圖的左邊,有一個手勢的事件(紅色小圓點)傳送到 ViewModel 了,而且下一個手勢事件可能會在 30 毫秒內又送出另一個,而這每一個手勢事件最後又會轉為右邊的橘色小圓點,30 毫秒對上 300 毫秒...那這不就造成塞車了嗎?這邊的 Backpressure 就是這麼發生的!

那麼只要我將中間的事件都丟掉就沒問題了嗎?讓 Firebase 處理完再給他下一個更新事件,但...好像也不能丟掉,丟掉的話手勢事件的連貫性就沒了,使用者就因此無法再手機上體驗到很流暢的便利貼拖曳體驗(畢竟至少 300 毫秒之後才會看到拖曳之後的結果)。所以這要怎麼辦呢?接著我們就要靠下一個 Reactive Programming 的機制來幫我們解決這問題了 - Multicasting。


上一篇
整合 Firestore SDK 到便利貼應用程式
下一篇
Multicasting for RxJava
系列文
Jetpack Compose X Android Architecture X Functional Reactive Programming30

尚未有邦友留言

立即登入留言