iT邦幫忙

第 11 屆 iThome 鐵人賽

DAY 17
1
Mobile Development

Android 十全大補系列 第 17

[Android 十全大補] RxJava Scheduler

我們前一篇提到 Observable 是如何發射資料給 Observer,以及過程中可以透過一連串的 operator 來不斷的修改 Observable 本身來達成資料的操作,但還少了一個很重要的環節就是 multi-threading。

如果你還沒看過前一篇,強烈建議你花幾分鐘先看看喔!
傳送門

Threading 對 RxJava 來說是非常重要的一件事情,可以說沒有 multi-threading 的話,Rx 就沒有存在的價值,這重要性大到值得獨立一篇來介紹。
我們就一起來看看吧~

Threading

由於整個 Observable 開始發射資料的時間點是在 Observer 開始 subscribe 的時候,如果沒有人 subscribe,整個資料流是處在等待的階段,所以 RxJava 預設 thread 就是我們呼叫 subscribe 時所在的 thread。

建議大家可以試試看把 Observable operator subscribe 都放在不同的 Thread 裡驗證一下。

那我們要怎麼切換 thread 呢?有以下二種方法:

subscribeOn

subscribeOn 就是改變了 subscribe 執行的 thread,不管你在哪裡使用都是一樣的效果,但只有第一次的呼叫會起作用,一般情形這個 function 只會被呼叫一次。

observeOn

observeOn 相反的只會影響到在這個 function 之後的所以操作,因為他不是全域的設定所以在哪裡呼叫很重要,也可以被呼叫多次,最後的呼叫會覆蓋掉之前的設定,比如說一個 operator 之前有二個 observeOn 的呼叫,那就會以最後面的為主,同理也會覆蓋掉 subscribeOn 的設定。

Scheduler

subscribeOnobserveOn 都需要一個 scheduler 的參數來指定說我們所需要的 thread 要從哪裡提供,常用的有以下這幾種 scheduler:

  • Schedulers.io():
    如果有網路、檔案存取需求推薦使用,Rx 會幫我們管理 ThreadPool reuse 的部分。
  • Schedulers.computation():
    跟 io() 一樣也是會由 ThreadPool 管理,但是 ThreadPool 的大小跟裝置 CPU core 相關。
  • Schedulers.newThread():
    顧名思義就是每次都會建立新的 Thread,所以稍微耗效能一點。

但我們常用的 Android main thread 要怎麼指定呢?
這時候就要引入另外一個 library 了,起手式加入 dependency:

dependencies {
    implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
}

這時候我們就會得到一個 AndroidSchedulers.mainThread() 的 reference 可以使用來指名說我們需要 operator 跑在 main thread 下。

Sample

我們先回顧之前的範例,加上 Thread.currentThread().name 來檢查目前的 thread:

Observable
    .just(1, 2, 3)
    .filter {
        println("Filter:" + Thread.currentThread().name)
        it % 2 == 1
    }
    .map {
        println("Map:" + Thread.currentThread().name)
        it * 2
    }
    .subscribe(object : Observer<Int> {
        override fun onNext(i: Int) {
            println("onNext" + Thread.currentThread().name)
        }

        override fun onComplete() {
            println("Completed Observable.")
        }

        override fun onSubscribe(d: Disposable) {
        }

        override fun onError(e: Throwable) {
            println("Whoops: " + e.message)
        }
    })

如果是在 Android 的 Activity 下,會得到類似以下的 log:

com.example.myapplication I/System.out: Filter:main
com.example.myapplication I/System.out: Map:main
com.example.myapplication I/System.out: onNext2main
com.example.myapplication I/System.out: Filter:main
com.example.myapplication I/System.out: Filter:main
com.example.myapplication I/System.out: Map:main
com.example.myapplication I/System.out: onNext6main
com.example.myapplication I/System.out: Completed Observable.

如果我們希望所有 filtermap 都可以跑在 io thread 上只要我們 onNext 回到 ui thread 就好的話,可以改成以下程式碼(特別注意 observeOn 的位置喔):

Observable
    .just(1, 2, 3)
    .filter {
        println("Filter:" + Thread.currentThread().name)
        it % 2 == 1
    }
    .map {
        println("Map:" + Thread.currentThread().name)
        it * 2
    }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(object : Observer<Int> {
        override fun onNext(i: Int) {
            println("onNext" + i + Thread.currentThread().name)
        }

        override fun onComplete() {
            println("Completed Observable.")
        }

        override fun onSubscribe(d: Disposable) {
        }

        override fun onError(e: Throwable) {
            println("Whoops: " + e.message)
        }
    })

再重跑一次應該會得到以下的 log:

com.example.myapplication I/System.out: Filter:RxCachedThreadScheduler-1
com.example.myapplication I/System.out: Map:RxCachedThreadScheduler-1
com.example.myapplication I/System.out: onNext2main
com.example.myapplication I/System.out: Filter:RxCachedThreadScheduler-1
com.example.myapplication I/System.out: Filter:RxCachedThreadScheduler-1
com.example.myapplication I/System.out: Map:RxCachedThreadScheduler-1
com.example.myapplication I/System.out: onNext6main
com.example.myapplication I/System.out: Completed Observable.

大家可以試驗一下各種排列組合來更了解 subscribeOnobserveOn 的差異喔!

是不是很神奇有趣呢?有了這種切換 thread 的功能後我們就可以更順暢的做資料的操作了,我們回頭看我們之前的的 Retrofit 範例試試看要怎麼跟 RxJava 搭配。

RxJava x Retrofit

大家應該可以猜到起手式又是加一個新的 dependency 了吧!

dependencies {
    implementation 'com.squareup.retrofit2:adapter-rxjava2:2.6.2'
}

再回到 Retrofit 建立的地方加上我們新的 RxJava2CallAdapterFactory 如下:

Retrofit.Builder().client(okHttpClient)
    .addConverterFactory(GsonConverterFactory.create())
    .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
    .baseUrl("https://www.example.com")
    .build()

打開原本的 GitHubService 把回傳改成 Single<Response<List<Repo>>>

Single 也可以視為一種特殊的 Observable,差別是它只會回傳一個 data 或者是錯誤。

interface GitHubService {
    @GET("users/{user}/repos")
    fun listRepos(
        @Path("user") user: String,
        @Query("type") type: String? = null,
        @Query("sort") sort: String? = null
    ): Single<Response<List<Repo>>>
}

回頭到我們使用的地方加上 Single 所需搭配的 Observer 如下:

service.listRepos("Jintin")
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(object : SingleObserver<Response<List<Repo>>> {
        override fun onSuccess(t: Response<List<Repo>>) {
          //use data here
        }

        override fun onSubscribe(d: Disposable) {
        }

        override fun onError(e: Throwable) {
        }

    })

因為現在所有 thread 都是由我們自己控制,別忘了要加上 subscribeOnobserveOn,不然你可能會遇到 android.os.NetworkOnMainThreadException 這個錯誤喔。

這樣就完成 RxJava 搭配 Retrofit 的任務了,是不是很好玩呢,但這一切只是開始,RxJava 還有很多我們沒有介紹到的地方值得大家去探索,你也可以研究其他 operator 或自己做自己的 operator,相信強大的 RxJava 可以帶給大家寫程式更多的可能性。

挑戰:請把我們之前 Room 的例子改成 RxJava 的 function。

Android 十全大補已經正式出書上架囉!
有興趣的讀者歡迎參考:
https://www.tenlong.com.tw/products/9789864345786


上一篇
[Android 十全大補] RxJava
下一篇
[Android 十全大補] SOLID Principle
系列文
Android 十全大補30

尚未有邦友留言

立即登入留言