除了方便好用的 operator 之外,RxJava 還有一個非常重要的機制:非同步處理。 RxJava 的非同步處理機制可以讓我們很輕鬆的切換不同的執行緒,但是在使用上常常會有一些困難,像是某項任務沒有執行在我預期的執行緒上,或是煩惱說切換執行緒的程式碼應該要放在哪,今天會來跟大家分享我的用法以及看法。
執行緒切換的語法只有兩個:observeOn
跟 subscribeOn
,以一般的準則來說,想要指定上游的執行緒,就使用 subscribeOn
,相反的,想指定下游的執行緒,就使用 observeOn
,如果都沒有指定,就是跑在現在正在運行的這條執行緒上。這些原則看起來很簡單,但是有時候我們會想做一些比較複雜的操作,寫完了之後才發現,這些任務並沒有運行在我預想中的執行緒上面,下面來舉一些我平常會遇到的使用案例:
註:以下這些程式碼都是在單元測試的環境下執行的。
@Test
fun test() {
printCurrentThread("Start")
Observable.just(1)
.map {
printCurrentThread("first map")
it + 1
}
.subscribeOn(Schedulers.computation())
.map {
printCurrentThread("second map")
it + 1
}
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe {
printCurrentThread("subscribe")
}
Thread.sleep(1000L)
}
private fun printCurrentThread(message: String) {
println("Thread: ${Thread.currentThread().name}, $message")
}
根據上面的推論,如果我要指定上游的執行緒的話,就使用 subscribeOn,因此,first map 會執行在 Computation,second map 會執行在 IO ,但是結果如下:
Thread: Test worker, Start
Thread: RxComputationThreadPool-1, first map
Thread: RxComputationThreadPool-1, second map
Thread: RxSingleScheduler-1, subscribe
second map 還是執行在 Computation!這表示第二個 subscribe 是沒用的,所以如果真的要讓 second map 執行在 IO 的話,應該要這樣寫:
@Test
fun test() {
printCurrentThread("Start")
Observable.just(1)
.map {
printCurrentThread("first map")
it + 1
}
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.io()) // 在前面安插一個 observeOn
.map {
printCurrentThread("second map")
it + 1
}
.observeOn(Schedulers.single())
.subscribe {
printCurrentThread("subscribe")
}
Thread.sleep(1000L)
}
把原來的 subscribeOn 拿掉,並在更前面加上 observeOn 才能達到原來想做的事。
使用 subscribeOn 的話,照理來說 Observable 的源頭就是會在我指定的執行緒上執行了對吧?但很遺憾的這個假設是錯的:
@Test
fun test() {
printCurrentThread("Start")
Observable.just(createItem())
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.single())
.subscribe {
printCurrentThread("subscribe")
}
Thread.sleep(1000L)
}
private fun createItem(): Int {
printCurrentThread("create Item")
return 1
}
createItem 是執行在哪條執行緒上呢?應該是 Computation 吧?但結果卻是 Test worker !:
Thread: Test worker, Start
Thread: Test worker, create Item
Thread: RxSingleScheduler-1, subscribe
原因是因為在 Observable.just 是當下就會馬上執行的函示,所以 createItem
會是現在正在運行的執行緒上執行,如果想要在 computation 上執行,請用 Observable.create。
寫到這邊突然想到還沒介紹到 flatMap,但是沒關係,不會 flatMap 的話網路上有很多資源可以學習,或是之後沒寫滿30天的話可以硬塞一天給它。
@Test
fun test() {
printCurrentThread("Start")
Observable.just(1)
.flatMap { number ->
createItemObservable()
.map { number2 ->
printCurrentThread("inner map") // 這是在哪一個執行緒上執行呢?
number2 + number
}
}
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.single())
.subscribe {
printCurrentThread("subscribe")
}
Thread.sleep(1000L)
}
private fun createItemObservable(): Observable<Int> {
printCurrentThread("create Item")
return Observable.just(2)
.subscribeOn(Schedulers.io())
}
你覺得 Inner map 會執行在哪條 Thread 上呢? computation 嗎?因為 subscribeOn 是影響所有的上游,所以照這樣推論的話 flatMap 會因此跑在 computation 上,flatMap 裡面的 map 也是同理,但是執行結果是:
Thread: Test worker, Start
Thread: RxComputationThreadPool-1, flatMap
Thread: RxComputationThreadPool-1, create Item
Thread: RxCachedThreadScheduler-1, inner map
Thread: RxSingleScheduler-1, subscribe
inner map 是跑在 “RxCachedThreadScheduler”上,也就是 io,為什麼呢?因為 createItemObservable()
在建立 Observable 的時候已經搶先執行了 subscribeOn(Scheduler.io())
,所以後來的 subscribeOn(Scheduler.computation())
就無法影響這個 Observable 了。
為什麼要特別示範這一段呢?我們其實很常會在不同的類別之間傳遞 Observable ,然而,在對該 Observable 做 flatMap 時要小心,在後面接 subscribeOn 的話有時並不會影響該 Observable 的執行緒,執行在哪條執行緒上是要看情況而定的。
建議大家可以試試看拿掉 subscribeOn(Scheduler.io())
,會有不同的結果喔!
“RxJava” 是一個為 Java 這語言設計的 “Reactive extension library”,所以在使用上會比較貼近 Java 這個程式語言。但是我們現在寫的語言是 Kotlin!Kotlin 有著很多 Java 沒有的語言特性,因此可以突破原本的限制,進而讓 RxJava 更好使用。其中我最喜歡的就是 Extension function 搭配上 RxJava 了,他可以讓我們的語法更加簡潔,增加我們的生產力。
@Test
fun test() {
// These two are the same
Observable.just(1)
.subscribeOn(Schedulers.io())
Observable.just(1)
.fromIO()
}
fun <T> Observable<T>.fromIO(): Observable<T> {
return this.subscribeOn(Schedulers.io())
}
藉由 extension function 的幫助,我們可以讓冗長的 subscribeOn(Schedulers.io()),變成只有六個字母的 fromIO()。因此 observeOn 也是同理:
@Test
fun test() {
// These two are the same
Observable.just(1)
.observeOn(Schedulers.io())
Observable.just(1)
.toIO()
}
fun <T> Observable<T>.toIO(): Observable<T> {
return this.observeOn(Schedulers.io())
}
覺得 List 的 filterInstance 很好用嗎?怎麼不為 Observable 也做一個呢?
inline fun <reified T> Observable<in T>.filterInstance(): Observable<T> {
return this.filter { it is T }
.map { it as T }
}
註:這邊經過了兩個 Operator ,所以是有優化空間的,如果非常注重效能的話,需要自己去實作 Custom Observable 或是 Costum ObservableTransformer ,然後再使用 extension function 來串接,但是通常來說,這樣的解法是夠用的。
有時候會需要跟前一個資料組合起來,兩個資料一起送出去做其他運算,但是 RxJava 沒有這樣的 operator ,所以我們也自己做了一個:
fun <T> Observable<T>.pairwise(): Observable<Pair<T, T>> {
return Observable.zip(this, this.skip(1),
BiFunction { a, b -> Pair(a, b) })
}
這邊舉個例子來讓大家比較好理解,假如有一個 Observable 會送出 1, 2, 3, 4, 5,這五個資料,那經過 pairwise 之後,就會送出 (1, 2), (2, 3), (3, 4), (4, 5)
在 Android 開發中,主要分成兩種類型的執行緒:Main thread, background thread
一般來說,Model 層處理的事情主要是關於網路連線以及資料庫操作。所以在 Model 層中是不能使用 Main thread 的,再加上在 Android app 中,沒有特別指定的話,程式碼都會跑在 Main thread 上。所以,我們必須在某個地方做執行緒切換,來確保說 Model 層的所有任務都不會跑在 Main thread 上。
那麼問題來了,如果要在 MVVM 中使用 RxJava 來實現的話,要在哪裡做執行緒切換呢(也就是 subscribeOn 跟 observeOn)?有一種可能性是,在 Model 層中使用 subscribeOn(Schedulers.io())
,那 ViewModel 層就可以放心的繼續使用從 Model 層來的 Observable,而不用擔心這個 Observable 被 subscribe 之後會不會卡到 UI thread 。然後呢,在 View 層要 Subscribe 的時候再使用 observeOn(AndroidSchedulers.mainThread())
切換回 Main thread,就可以確保更新 UI 的動作是在 Main thread 完成的。
在同一個專案或是團隊裡,遵守同樣的程式碼風格是一件好事,所以上面的規則我們可以套用到整個專案,對吧? 先來看看這個調查:下面是我在 “Android Developer開發讀書會”社團發起的投票,用意是調查大家偏好在哪裡使用 subscribeOn ,哪裡使用 observeOn ,結果分為兩大陣營,第三個選項只有四個人投票:
連結:https://www.facebook.com/groups/523386591081376/permalink/4285325838220747/
這統計結果可能代表了大家都有一些預設的偏好,在他們的專案開發中有一樣的程式碼風格。但其中很有趣的是,有人一次選了兩個選項,這是為什麼呢?我沒有實際問本人的意見,但我猜他們想表達的是“It depends”,事實上,第一個留言的人就是說看情況而定了。
為什麼是看情況呢?舉例來說,要是我們真的想要在不同的情況下決定 Model 層的執行緒,使用我在這篇文章中的作法做得到嗎?好像做不太到對吧?因為第一個 subscribeOn 就已經定好了執行緒,之後不管怎麼做都無法切換了。所以如果要在後來做切換的話,可能就要在 ViewModel 層根據條件來決定 subscribeOn 中的 scheduler 要放哪一個。那更之後的任務呢?都只能執行在該執行緒上嗎?好像也不應該這麼做,我們很有可能還要視情況使用 observeOn 再切換到 computation 或是其他的 scheduler。
在這個問題的討論上又回到了一個結論,就是架構設計是沒有絕對的,一個完整的架構,其實是很多小小、不同的設計決策所累積出來的,在 Clean architecture 中有一段話是這麼說的:
I see all the little details that support all the high-level decisions. I also see that those low-level details and high-level decisions are part of the whole design of the house.
low-level detail 跟 high-level decision 是相輔相成的,世界上不存在一個完美的架構能夠解決所有的問題。