不知道大家有沒有手沖咖啡的經驗?如果沒有的話,應該也看過或用過濾掛式咖啡,在沖咖啡時,水不能一次倒太多,因為濾網的消化速度沒這麼快,要是你不管它的消化速度一直倒水進去的話,最後就會滿出來!
上面這個手沖咖啡的例子也會發生在 Reactive programming 上面,之前的章節中有講過,Reactive programming 的其中一個大特點是,事件流可以在不同的階段依據需求來去切換執行緒,所以萬一上游的事件發出太多,切換執行緒之後,下游無法即時處理時會發生什麼事呢?依咖啡的例子來說,熱水可以到的最大容量就是濾紙的大小,那以電腦來說就當然是記憶體的容量了,所以當下游來不及處理的話,是會發生 OutOfMemoryException
的!
這個現象在 RxJava 中有一個名詞來表示它: Backpressure。既然 Backpressure 有這個問題,RxJava 當然就提供了相對應的解法,接下來就來介紹它吧!
跟 Observable 其實是幾乎一模一樣的,有各式各樣的 operator: map
、flatMap
、filter
、reduce
、combineLatest
、zip
等等,也可以使用 observeOn
跟 subscribeOn
來切換執行緒。Flowable 提供更多的是,在建立實體時提供了各種不同處理 Backpressure 的策略,像是丟掉最新的一筆、或是丟掉最後的一筆等等,下面舉個例子:
val f = Flowable.fromIterable((1..10000).toList())
f.subscribe { number ->
println("number : $number")
}
運行結果:
number : 1
number : 2
number : 3
....
number : 10000
奇怪!全部都有跑出來啊,怎麼沒看到有任何資料被丟掉?但其實用法錯了,應該要用下面這種方式來建立 Flowable :
val f = Flowable.create(FlowableOnSubscribe<Int> { emitter ->
for (i in 1..10000) {
emitter.onNext(i)
}
emitter.onComplete()
}, BackpressureStrategy.DROP)
f.subscribe { number ->
println("number : $number")
}
使用 Flowable.create
才能成功建立出有 BackpressureStrategy
的 Flowable 喔!現在的 BackpressureStrategy
為 DROP ,也就是丟掉多餘的資料,那我們來看看他是怎麼丟掉的吧!(根據下圖,數字應該只會跑到 128,因為預設的 buffer size 就是 128)
number : 1
number : 2
number : 3
....
number : 10000
果不其然,結果是.....?不對啊,怎麼還是跑完 10000 了呢?那我們再做一個實驗,在 onNext 上面再留個 log:
val f = Flowable.create(FlowableOnSubscribe<Int> { emitter ->
for (i in 1..10000) {
**println("onNext : $i") // 多加這一行**
emitter.onNext(i)
}
emitter.onComplete()
}, BackpressureStrategy.DROP)
f.subscribe { number ->
println("number : $number")
}
運行結果如下:
onNext : 1
number : 1
onNext : 2
number : 2
onNext : 3
number : 3
...
onNext : 10000
number : 10000
原來從頭到尾都沒有在累積資料!每一筆資料送出之後馬上就被處理了,因為他們都是在同一條執行緒上運行啊!所以為了模擬 Backpressure ,就需要去切換執行緒:
val f = Flowable.create(FlowableOnSubscribe<Int> { emitter ->
for (i in 1..10000) {
emitter.onNext(i)
}
emitter.onComplete()
}, BackpressureStrategy.DROP)
f
**.observeOn(Schedulers.computation()) // 多了這一行**
.subscribe { number ->
println("number : $number")
}
Thread.sleep(1000L)
再跑最後一次,終於成功了!
number : 1
number : 2
number : 3
...
number : 128
BackpressureStrategy
還有其他四種,想了解的就交給各位讀者自己去玩了!
相信聰明的你已經知道上一篇的最後發生了什麼事了,沒錯!就是發生了 Backpressure!請看下圖說明:
當最右邊的 Repository 要更新資料到 Firebase 時,是有一些網路延遲的,網路更新事件這邊是以橘色的小圓點來表示,這邊假設每一個小圓點都要花差不多 300 毫秒好了,過了 300 毫秒之後才能再處理下一個事件。接著我們再看到上圖的左邊,有一個手勢的事件(紅色小圓點)傳送到 ViewModel 了,而且下一個手勢事件可能會在 30 毫秒內又送出另一個,而這每一個手勢事件最後又會轉為右邊的橘色小圓點,30 毫秒對上 300 毫秒...那這不就造成塞車了嗎?這邊的 Backpressure 就是這麼發生的!
那麼只要我將中間的事件都丟掉就沒問題了嗎?讓 Firebase 處理完再給他下一個更新事件,但...好像也不能丟掉,丟掉的話手勢事件的連貫性就沒了,使用者就因此無法再手機上體驗到很流暢的便利貼拖曳體驗(畢竟至少 300 毫秒之後才會看到拖曳之後的結果)。所以這要怎麼辦呢?接著我們就要靠下一個 Reactive Programming 的機制來幫我們解決這問題了 - Multicasting。