iT邦幫忙

2021 iThome 鐵人賽

DAY 9
1

除了方便好用的 operator 之外,RxJava 還有一個非常重要的機制:非同步處理。 RxJava 的非同步處理機制可以讓我們很輕鬆的切換不同的執行緒,但是在使用上常常會有一些困難,像是某項任務沒有執行在我預期的執行緒上,或是煩惱說切換執行緒的程式碼應該要放在哪,今天會來跟大家分享我的用法以及看法。

執行緒切換

執行緒切換的語法只有兩個:observeOnsubscribeOn,以一般的準則來說,想要指定上游的執行緒,就使用 subscribeOn ,相反的,想指定下游的執行緒,就使用 observeOn ,如果都沒有指定,就是跑在現在正在運行的這條執行緒上。這些原則看起來很簡單,但是有時候我們會想做一些比較複雜的操作,寫完了之後才發現,這些任務並沒有運行在我預想中的執行緒上面,下面來舉一些我平常會遇到的使用案例:

註:以下這些程式碼都是在單元測試的環境下執行的。

1. 切換多次執行緒

@Test
fun test() {
    printCurrentThread("Start")

    Observable.just(1)
        .map {
            printCurrentThread("first map")
            it + 1
        }
        .subscribeOn(Schedulers.computation())
        .map {
            printCurrentThread("second map")
            it + 1
        }
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.single())
        .subscribe {
            printCurrentThread("subscribe")
        }

    Thread.sleep(1000L)
}

private fun printCurrentThread(message: String) {
    println("Thread: ${Thread.currentThread().name}, $message")
}

根據上面的推論,如果我要指定上游的執行緒的話,就使用 subscribeOn,因此,first map 會執行在 Computation,second map 會執行在 IO ,但是結果如下:

Thread: Test worker, Start
Thread: RxComputationThreadPool-1, first map
Thread: RxComputationThreadPool-1, second map
Thread: RxSingleScheduler-1, subscribe

second map 還是執行在 Computation!這表示第二個 subscribe 是沒用的,所以如果真的要讓 second map 執行在 IO 的話,應該要這樣寫:

@Test
fun test() {
    printCurrentThread("Start")

    Observable.just(1)
        .map {
            printCurrentThread("first map")
            it + 1
        }
        .subscribeOn(Schedulers.computation())
        .observeOn(Schedulers.io()) // 在前面安插一個 observeOn
        .map {
            printCurrentThread("second map")
            it + 1
        }
        .observeOn(Schedulers.single())
        .subscribe {
            printCurrentThread("subscribe")
        }

    Thread.sleep(1000L)
}

把原來的 subscribeOn 拿掉,並在更前面加上 observeOn 才能達到原來想做的事。

2. Observable 源頭的執行緒

使用 subscribeOn 的話,照理來說 Observable 的源頭就是會在我指定的執行緒上執行了對吧?但很遺憾的這個假設是錯的:

@Test
fun test() {
    printCurrentThread("Start")

    Observable.just(createItem())
        .subscribeOn(Schedulers.computation())
        .observeOn(Schedulers.single())
        .subscribe {
            printCurrentThread("subscribe")
        }

    Thread.sleep(1000L)
}

private fun createItem(): Int {
    printCurrentThread("create Item")
    return 1
}

createItem 是執行在哪條執行緒上呢?應該是 Computation 吧?但結果卻是 Test worker !:

Thread: Test worker, Start
Thread: Test worker, create Item
Thread: RxSingleScheduler-1, subscribe

原因是因為在 Observable.just 是當下就會馬上執行的函示,所以 createItem 會是現在正在運行的執行緒上執行,如果想要在 computation 上執行,請用 Observable.create。

3. flatMap 的執行緒切換

寫到這邊突然想到還沒介紹到 flatMap,但是沒關係,不會 flatMap 的話網路上有很多資源可以學習,或是之後沒寫滿30天的話可以硬塞一天給它。

@Test
fun test() {
    printCurrentThread("Start")

    Observable.just(1)
        .flatMap { number ->
            createItemObservable()
                .map { number2 ->
                    printCurrentThread("inner map") // 這是在哪一個執行緒上執行呢?
                    number2 + number
                }
        }
        .subscribeOn(Schedulers.computation())
        .observeOn(Schedulers.single())
        .subscribe {
            printCurrentThread("subscribe")
        }

    Thread.sleep(1000L)
}

private fun createItemObservable(): Observable<Int> {
    printCurrentThread("create Item")
    return Observable.just(2)
        .subscribeOn(Schedulers.io())
}

你覺得 Inner map 會執行在哪條 Thread 上呢? computation 嗎?因為 subscribeOn 是影響所有的上游,所以照這樣推論的話 flatMap 會因此跑在 computation 上,flatMap 裡面的 map 也是同理,但是執行結果是:

Thread: Test worker, Start
Thread: RxComputationThreadPool-1, flatMap
Thread: RxComputationThreadPool-1, create Item
Thread: RxCachedThreadScheduler-1, inner map
Thread: RxSingleScheduler-1, subscribe

inner map 是跑在 “RxCachedThreadScheduler”上,也就是 io,為什麼呢?因為 createItemObservable() 在建立 Observable 的時候已經搶先執行了 subscribeOn(Scheduler.io()) ,所以後來的 subscribeOn(Scheduler.computation()) 就無法影響這個 Observable 了。

為什麼要特別示範這一段呢?我們其實很常會在不同的類別之間傳遞 Observable ,然而,在對該 Observable 做 flatMap 時要小心,在後面接 subscribeOn 的話有時並不會影響該 Observable 的執行緒,執行在哪條執行緒上是要看情況而定的。

建議大家可以試試看拿掉 subscribeOn(Scheduler.io()) ,會有不同的結果喔!

Custom Extension functions for RxJava

“RxJava” 是一個為 Java 這語言設計的 “Reactive extension library”,所以在使用上會比較貼近 Java 這個程式語言。但是我們現在寫的語言是 Kotlin!Kotlin 有著很多 Java 沒有的語言特性,因此可以突破原本的限制,進而讓 RxJava 更好使用。其中我最喜歡的就是 Extension function 搭配上 RxJava 了,他可以讓我們的語法更加簡潔,增加我們的生產力。

執行緒切換

@Test
fun test() {
    // These two are the same
    Observable.just(1)
        .subscribeOn(Schedulers.io())

    Observable.just(1)
        .fromIO()
}

fun <T> Observable<T>.fromIO(): Observable<T> {
    return this.subscribeOn(Schedulers.io())
}

藉由 extension function 的幫助,我們可以讓冗長的 subscribeOn(Schedulers.io()),變成只有六個字母的 fromIO()。因此 observeOn 也是同理:

@Test
fun test() {
    // These two are the same
    Observable.just(1)
        .observeOn(Schedulers.io())

    Observable.just(1)
        .toIO()
}

fun <T> Observable<T>.toIO(): Observable<T> {
    return this.observeOn(Schedulers.io())
}

filterInstance

覺得 List 的 filterInstance 很好用嗎?怎麼不為 Observable 也做一個呢?

inline fun <reified T> Observable<in T>.filterInstance(): Observable<T> {
    return this.filter { it is T }
        .map { it as T }
}

註:這邊經過了兩個 Operator ,所以是有優化空間的,如果非常注重效能的話,需要自己去實作 Custom Observable 或是 Costum ObservableTransformer ,然後再使用 extension function 來串接,但是通常來說,這樣的解法是夠用的。

pairwise

有時候會需要跟前一個資料組合起來,兩個資料一起送出去做其他運算,但是 RxJava 沒有這樣的 operator ,所以我們也自己做了一個:

fun <T> Observable<T>.pairwise(): Observable<Pair<T, T>> {
    return Observable.zip(this, this.skip(1),
                          BiFunction { a, b -> Pair(a, b) })
}

這邊舉個例子來讓大家比較好理解,假如有一個 Observable 會送出 1, 2, 3, 4, 5,這五個資料,那經過 pairwise 之後,就會送出 (1, 2), (2, 3), (3, 4), (4, 5)

在 MVVM pattern 中的執行緒切換

在 Android 開發中,主要分成兩種類型的執行緒:Main thread, background thread

  • Main thread: 主要用來做 UI 顯示,接受按鈕事件,這條執行緒非常重要!要是處理太多任務的話,App 會開始卡頓,情況更嚴重的話還會有 ANR。
  • Background thread: 網路連線、資料庫操作、檔案讀寫都是屬於這個類型,請注意這邊的 background thread 並不是只能有一個執行緒,而是可以依不同目的而去建立並使用各自的執行緒。

一般來說,Model 層處理的事情主要是關於網路連線以及資料庫操作。所以在 Model 層中是不能使用 Main thread 的,再加上在 Android app 中,沒有特別指定的話,程式碼都會跑在 Main thread 上。所以,我們必須在某個地方做執行緒切換,來確保說 Model 層的所有任務都不會跑在 Main thread 上。

那麼問題來了,如果要在 MVVM 中使用 RxJava 來實現的話,要在哪裡做執行緒切換呢(也就是 subscribeOn 跟 observeOn)?有一種可能性是,在 Model 層中使用 subscribeOn(Schedulers.io()) ,那 ViewModel 層就可以放心的繼續使用從 Model 層來的 Observable,而不用擔心這個 Observable 被 subscribe 之後會不會卡到 UI thread 。然後呢,在 View 層要 Subscribe 的時候再使用 observeOn(AndroidSchedulers.mainThread()) 切換回 Main thread,就可以確保更新 UI 的動作是在 Main thread 完成的。

在同一個專案或是團隊裡,遵守同樣的程式碼風格是一件好事,所以上面的規則我們可以套用到整個專案,對吧? 先來看看這個調查:下面是我在 “Android Developer開發讀書會”社團發起的投票,用意是調查大家偏好在哪裡使用 subscribeOn ,哪裡使用 observeOn ,結果分為兩大陣營,第三個選項只有四個人投票:

Screen Shot 2021-09-01 at 10.49.00 PM.png

連結:https://www.facebook.com/groups/523386591081376/permalink/4285325838220747/

這統計結果可能代表了大家都有一些預設的偏好,在他們的專案開發中有一樣的程式碼風格。但其中很有趣的是,有人一次選了兩個選項,這是為什麼呢?我沒有實際問本人的意見,但我猜他們想表達的是“It depends”,事實上,第一個留言的人就是說看情況而定了。

為什麼是看情況呢?舉例來說,要是我們真的想要在不同的情況下決定 Model 層的執行緒,使用我在這篇文章中的作法做得到嗎?好像做不太到對吧?因為第一個 subscribeOn 就已經定好了執行緒,之後不管怎麼做都無法切換了。所以如果要在後來做切換的話,可能就要在 ViewModel 層根據條件來決定 subscribeOn 中的 scheduler 要放哪一個。那更之後的任務呢?都只能執行在該執行緒上嗎?好像也不應該這麼做,我們很有可能還要視情況使用 observeOn 再切換到 computation 或是其他的 scheduler。

在這個問題的討論上又回到了一個結論,就是架構設計是沒有絕對的,一個完整的架構,其實是很多小小、不同的設計決策所累積出來的,在 Clean architecture 中有一段話是這麼說的:

I see all the little details that support all the high-level decisions. I also see that those low-level details and high-level decisions are part of the whole design of the house.

low-level detail 跟 high-level decision 是相輔相成的,世界上不存在一個完美的架構能夠解決所有的問題。


上一篇
RxJava operators && Java.Optional as a type class
下一篇
Firebase Firestore
系列文
Jetpack Compose X Android Architecture X Functional Reactive Programming30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言