我們前一篇提到 Observable
是如何發射資料給 Observer
,以及過程中可以透過一連串的 operator 來不斷的修改 Observable
本身來達成資料的操作,但還少了一個很重要的環節就是 multi-threading。
如果你還沒看過前一篇,強烈建議你花幾分鐘先看看喔!
傳送門
Threading 對 RxJava 來說是非常重要的一件事情,可以說沒有 multi-threading 的話,Rx 就沒有存在的價值,這重要性大到值得獨立一篇來介紹。
我們就一起來看看吧~
由於整個 Observable
開始發射資料的時間點是在 Observer
開始 subscribe 的時候,如果沒有人 subscribe,整個資料流是處在等待的階段,所以 RxJava 預設 thread 就是我們呼叫 subscribe
時所在的 thread。
建議大家可以試試看把
Observable
operatorsubscribe
都放在不同的Thread
裡驗證一下。
那我們要怎麼切換 thread 呢?有以下二種方法:
subscribeOn
就是改變了 subscribe 執行的 thread,不管你在哪裡使用都是一樣的效果,但只有第一次的呼叫會起作用,一般情形這個 function 只會被呼叫一次。
observeOn
相反的只會影響到在這個 function 之後的所以操作,因為他不是全域的設定所以在哪裡呼叫很重要,也可以被呼叫多次,最後的呼叫會覆蓋掉之前的設定,比如說一個 operator 之前有二個 observeOn
的呼叫,那就會以最後面的為主,同理也會覆蓋掉 subscribeOn
的設定。
subscribeOn
跟 observeOn
都需要一個 scheduler 的參數來指定說我們所需要的 thread 要從哪裡提供,常用的有以下這幾種 scheduler:
但我們常用的 Android main thread 要怎麼指定呢?
這時候就要引入另外一個 library 了,起手式加入 dependency:
dependencies {
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
}
這時候我們就會得到一個 AndroidSchedulers.mainThread()
的 reference 可以使用來指名說我們需要 operator 跑在 main thread 下。
我們先回顧之前的範例,加上 Thread.currentThread().name
來檢查目前的 thread:
Observable
.just(1, 2, 3)
.filter {
println("Filter:" + Thread.currentThread().name)
it % 2 == 1
}
.map {
println("Map:" + Thread.currentThread().name)
it * 2
}
.subscribe(object : Observer<Int> {
override fun onNext(i: Int) {
println("onNext" + Thread.currentThread().name)
}
override fun onComplete() {
println("Completed Observable.")
}
override fun onSubscribe(d: Disposable) {
}
override fun onError(e: Throwable) {
println("Whoops: " + e.message)
}
})
如果是在 Android 的 Activity 下,會得到類似以下的 log:
com.example.myapplication I/System.out: Filter:main
com.example.myapplication I/System.out: Map:main
com.example.myapplication I/System.out: onNext2main
com.example.myapplication I/System.out: Filter:main
com.example.myapplication I/System.out: Filter:main
com.example.myapplication I/System.out: Map:main
com.example.myapplication I/System.out: onNext6main
com.example.myapplication I/System.out: Completed Observable.
如果我們希望所有 filter
、 map
都可以跑在 io thread 上只要我們 onNext
回到 ui thread 就好的話,可以改成以下程式碼(特別注意 observeOn
的位置喔):
Observable
.just(1, 2, 3)
.filter {
println("Filter:" + Thread.currentThread().name)
it % 2 == 1
}
.map {
println("Map:" + Thread.currentThread().name)
it * 2
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object : Observer<Int> {
override fun onNext(i: Int) {
println("onNext" + i + Thread.currentThread().name)
}
override fun onComplete() {
println("Completed Observable.")
}
override fun onSubscribe(d: Disposable) {
}
override fun onError(e: Throwable) {
println("Whoops: " + e.message)
}
})
再重跑一次應該會得到以下的 log:
com.example.myapplication I/System.out: Filter:RxCachedThreadScheduler-1
com.example.myapplication I/System.out: Map:RxCachedThreadScheduler-1
com.example.myapplication I/System.out: onNext2main
com.example.myapplication I/System.out: Filter:RxCachedThreadScheduler-1
com.example.myapplication I/System.out: Filter:RxCachedThreadScheduler-1
com.example.myapplication I/System.out: Map:RxCachedThreadScheduler-1
com.example.myapplication I/System.out: onNext6main
com.example.myapplication I/System.out: Completed Observable.
大家可以試驗一下各種排列組合來更了解
subscribeOn
跟observeOn
的差異喔!
是不是很神奇有趣呢?有了這種切換 thread 的功能後我們就可以更順暢的做資料的操作了,我們回頭看我們之前的的 Retrofit 範例試試看要怎麼跟 RxJava 搭配。
大家應該可以猜到起手式又是加一個新的 dependency 了吧!
dependencies {
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.6.2'
}
再回到 Retrofit 建立的地方加上我們新的 RxJava2CallAdapterFactory
如下:
Retrofit.Builder().client(okHttpClient)
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.baseUrl("https://www.example.com")
.build()
打開原本的 GitHubService
把回傳改成 Single<Response<List<Repo>>>
。
Single
也可以視為一種特殊的Observable
,差別是它只會回傳一個 data 或者是錯誤。
interface GitHubService {
@GET("users/{user}/repos")
fun listRepos(
@Path("user") user: String,
@Query("type") type: String? = null,
@Query("sort") sort: String? = null
): Single<Response<List<Repo>>>
}
回頭到我們使用的地方加上 Single
所需搭配的 Observer
如下:
service.listRepos("Jintin")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object : SingleObserver<Response<List<Repo>>> {
override fun onSuccess(t: Response<List<Repo>>) {
//use data here
}
override fun onSubscribe(d: Disposable) {
}
override fun onError(e: Throwable) {
}
})
因為現在所有 thread 都是由我們自己控制,別忘了要加上 subscribeOn
跟 observeOn
,不然你可能會遇到 android.os.NetworkOnMainThreadException
這個錯誤喔。
這樣就完成 RxJava 搭配 Retrofit 的任務了,是不是很好玩呢,但這一切只是開始,RxJava 還有很多我們沒有介紹到的地方值得大家去探索,你也可以研究其他 operator 或自己做自己的 operator,相信強大的 RxJava 可以帶給大家寫程式更多的可能性。
挑戰:請把我們之前 Room 的例子改成 RxJava 的 function。
Android 十全大補已經正式出書上架囉!
有興趣的讀者歡迎參考:
https://www.tenlong.com.tw/products/9789864345786