iT邦幫忙

2018 iT 邦幫忙鐵人賽
DAY 25
0

RxJava2是在JVM中實現Reactive programming的library,其特色是可以輕鬆的切換thread,並和LiveData一樣以observer pattern處理事件。

Android開發經常會用到非同步(asynchronous)機制,像是網路連線、資料庫存取和其他耗時的工作,此時必須用AsyncTask或Hadnler等方式實現thread切換並建立callback以得知事件何時完成,而RxJava可以大量簡化這些繁瑣的過程,只要一行程式就能在background和UI thread之間切換。

雖然RxJava的學習曲線稍高,但用順手之後可以對程式帶來顯著的改善,且RxJava不用事前準備就可以開始練習,所以個人覺得起手還比Dagger2簡單。幾個主流library包括Retrofit和Architecture Components都有額外支援RxJava,足見其在Android開發有一定的影響力。

Why Reactive?

直接看程式,引用Jake Wharton大神在演講中舉的例子:

interface UserManager {
	User getUser();
	void setName(String name);
	void setAge(int age);
}

UserManager um = new UserManager();
System.out.println(um.getUser());

um.setName("Jane Doe");
System.out.println(um.getUser());

建立一簡單的UserManager讓我們取得User和更新Name、Age,底下印出User之後更新Name並再次印出。

那麼,如果這是一個耗時的工作,例如更新個人資料時需透過網路與伺服器同步,那就要建立callback以得知setName何時完成,並在setName完成時才顯示結果,讓程式變得reactive

interface UserManager {
	User getUser();
	void setName(String name, Runnable callback);
	void setAge(int age, Runnable callback);
}
UserManager um = new UserManager();
System.out.println(um.getUser());

um.setName("Jane Doe", new Runnable() {
	@Override 
    public void run() {
		System.out.println(um.getUser());
	}
});

此外網路連線並不一定會成功,所以我們須知道連線是成功還是失敗。

UserManager um = new UserManager();
System.out.println(um.getUser());

um.setName("Jane Doe", new UserManager.Listener() {
	@Override 
    public void success() {
		System.out.println(um.getUser());
	}

	@Override 
    public void failure(IOException e) {
		// TODO show the error...
	}
});

最後,若連線成功時我們要更新UI上的使用者名稱,Android的網路連線只能發生在background thread,而更新UI則須回到UI thread,且在連線完成要更新UI時我們應檢查使用者是否還在Activity中,以免已經離開Activity而導致找不到UI元件的crash,於是就產生了這樣一隻小怪物:

public final class UserActivity extends Activity {
	private final UserManager um = new UserManager();

	@Override 
    protected void onCreate(Bundle savedInstanceState) {
		super.onCreate(savedInstanceState);
		
		setContentView(R.layout.user);
		TextView tv = (TextView) findViewById(R.id.user_name);
		tv.setText(um.getUser().toString());

		um.setName("Jane Doe", new UserManager.Listener() {
			@Override 
            public void success() {
				runOnUiThread(new Runnable() {
					@Override 
                    public void run() {
						if (!isDestroyed()) {
							tv.setText(um.getUser().toString());
						}
					}
				});
			}
            
			@Override 
            public void failure(IOException e) {
				// TODO show the error...
			}
		});
	}
}

只是發出網路連線並在完成時更新TextView而已,setName的callback變得巢狀縮排好幾層,將來做更多功能時會變得更難以閱讀和維護。

那使用RxJava2會是怎樣呢?

一樣是讓setName在background thread執行,並在UI thread進行callback的內容:

Completable.fromAction(() -> {um.setName("Jane Doe")})
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new DisposableCompletableObserver() {
            @Override
            public void onComplete() {
                tv.setText(um.getUser().toString());
            }

            @Override
            public void onError(Throwable e) {
                // TODO show the error...
            }
        });

fromAction建立我們要執行的內容。
subscribeOn表示這些內容要在哪個thread執行,Schedulers.io()是RxJava提供的background thread之一,稍後解釋。
observeOn表示執行後的callback要在哪個thread執行。
subscribe時才會真的開始執行,就像LiveData的observe。
callback有onComplete()onError(Throwable e)分別代表執行成功或失敗的處理。

其中最重要的是,藉由subscribeOnobserveOn我們就完成了thread切換:於io thread執行並在main thread更新結果,是不是很簡單?真的很簡單。

再看一次程式,由上到下每一行都清楚說明用意:

  1. 我們要執行setName
  2. 這要在io thread執行
  3. callback要在main thread執行
  4. 執行成功或失敗的處理。

雖然相對字數比較多,但程式解釋力比較高,尤其在將來情況越複雜時RxJava還是能保持一行一行沒有太多縮排的寫法,到時優勢會更明顯。

好,剛剛怕第一印象嚇到人其實做了一點手腳,上面的fromAction那邊是用Android Studio把它縮起來或是用lambda語法的樣子,真正的樣子是這樣:

Completable.fromAction(new Action() {
            @Override
            public void run() throws Exception {
                um.setName("Jane Doe");
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new DisposableCompletableObserver() {
            @Override
            public void onComplete() {
                tv.setText(um.getUser().toString());
            }

            @Override
            public void onError(Throwable e) {
                // TODO show the error...
            }
        });

雖然把fromAction縮起來比較好看但本系列會盡量少用lambda寫法,此例是剛好setName沒有回傳值所以影響不大,後續的內容我覺得看懂參數輸入輸出會更好上手,當初自己學習時看別人的lambda語法都不知道為什麼能變那樣,所以,雖然醜了點但我們會把參數全寫出來。

上面還少了一部份,檢查Activity是否還存在的isDestroyed(),除了在onComplete()中檢查之外,RxJava還有更好的作法:Disposable

public final class UserActivity extends Activity {
	
    private CompositeDisposable disposables = new CompositeDisposable();

	@Override 
    protected void onCreate(Bundle savedInstanceState) {
		super.onCreate(savedInstanceState);
		
		...

		disposables.add(
            Completable.fromAction(new Action() {
                @Override
                public void run() throws Exception {
                    um.setName("Jane Doe");
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(new DisposableCompletableObserver() {
                @Override
                public void onComplete() {
                    tv.setText(um.getUser().toString());
                }

                @Override
                public void onError(Throwable e) {
                    // TODO show the error...
                }
            }));
	}
    
    @Override
    public void onDestroy() {
        super.onDestroy();
        disposables.clear();
    }
}

建立一個CompositeDisposable並用add()將Completable新增至其中,在onDestroy()時呼叫clear()就會清除disposables內的所有作業。這樣當多個連線運作時不用每個都檢查isDestroyed(),只要都新增到disposables就可以在Destory時全部取消。

Introducing RxJava2

正式的介紹一下RxJava2,在這邊先說明幾個重要的東西,現在還不會全部用到但方便之後幾天查閱。

dependencie有兩個,一個是RxJava2本身,另一個是專為Android設計的RxAndroid,提供Android開發的輔助功能例如切換thread時可以選擇Android main thread。

implementation "io.reactivex.rxjava2:rxjava:2.1.7"
implementation "io.reactivex.rxjava2:rxandroid:2.0.1"

接著是class,概略的說,RxJava分成幾個步驟進行:

  1. 建立Observable,這是RxJava的運作單位,像是上面範例的Completable。
  2. 設置要在哪個thread執行Observable。
  3. 設置Observable執行完畢的callback內容要在哪個thread執行。
  4. 使用subscribe開始執行Observable。

Observables

RxJava的起始需建立Observable,例如上面我們用fromActionsetName變成了Completable,拆開寫是長這樣:

Completable completable = Completable.fromAction(new Action() {
            @Override
            public void run() throws Exception {
                ...
            }
        });
        
completable.subscribeOn(...)

建立completable之後就可以用RxJava的各種功能了。

RxJava提供多種Observable,主要差異在於callback的類型不同,有以下這些:

Completable
適合用在執行的內容沒有回傳值,只要知道成功或失敗就好的時候,例如更新個人資料。在callback有onComplete()onError(Throwable e)兩個方法,其中onComplete()是沒有參數的表示執行完不會有回傳值。

Completable.fromAction(...)
        .subscribe(new DisposableCompletableObserver() {
            @Override
            public void onComplete() {
                
            }

            @Override
            public void onError(Throwable e) {

            }
        });

Maybe
適合用在不一定有回傳值時,例如資料庫查詢,如果有撈到資料會進入onSuccess(T),否則onComplete(),兩者互斥只會發生其中一種。<String>表示回傳值的型態,依照需求可以是String、Integer或自訂的物件User、Repo等等。至於onError(Throwable e)這是每個Observable都有的,下面就省略不提了。

Maybe.fromCallable(...)
        .subscribe(new DisposableMaybeObserver<String>() {
            @Override
            public void onSuccess(String s) {
                
            }

            @Override
            public void onError(Throwable e) {
            
            }

            @Override
            public void onComplete() {

            }
        });

Single
用在執行完一定要有回傳值時,例如API抓資料時一定要有Json內容回傳,成功即進入onSuccess(T)

Single.fromCallable(...)
        .subscribe(new DisposableSingleObserver<String>() {
        @Override
        public void onSuccess(String s) {
        
        }
        
        @Override
        public void onError(Throwable e) {
        
        }
    });

Observable
嗯它就叫做Observable,是RxJava一代最早出現的單位,所以在教學文章的能見度很高。適合用在多次執行的內容例如迴圈,每一次執行迴圈會進入onNext(T),全部完成時進入onComplete()

Observable.fromCallable(...)
        .subscribe(new DisposableObserver<String>() {
        @Override
        public void onNext(String s) {
        
        }

        @Override
        public void onError(Throwable e) {
        
        }

        @Override
        public void onComplete() {
        
        }
    });

Flowable
跟上面的Observable一樣適合用在多次執行,callback也一模一樣,不同的是Flowable有支援backpressure,當資料發送的速度快過於處理速度時,可以讓發送速度減緩。

介紹完這些Observable的特色後要看看它們的建立方式,也許你已經注意到上面Completable是用fromAction而其它是用fromCallable,差異是前者沒有回傳值而後者有,完整寫法:

// While setName return nothing, use fromAction.
Completable.fromAction(new Action() {
            @Override
            public void run() throws Exception {
                um.setName("Jane Doe");
            }
        })
        .subscribe(...);
        
// If setName has return value which type is string, use fromCallable.
Single.fromCallable(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return um.setName("Jane Doe");
            }
        })
        .subscribe(...);

各Observable可用的建立方式:

Completable.fromCallable(() -> "Ignored!");
Completable.fromAction(() -> System.out.println("Hello"));
Completable.fromRunnable(() -> System.out.println("Hello"));

Maybe.fromCallable(() -> "Hello");
Maybe.fromAction(() -> System.out.println("Hello"));
Maybe.fromRunnable(() -> System.out.println("Hello"))

Single.fromCallable(() -> "Hello");

Flowable.fromCallable(() -> "Hello");

Observable.fromCallable(() -> "Hello");

Completable也可以用fromCallable但回傳結果會被忽略,其它的如果寫"Hello"就是有回傳值,寫println的沒有。

這樣就完成第1步建立Observable了,接著看thread,會簡單很多。

Schedulers

Schedulers可以視為RxJava中thread的代稱,有以下這幾種:

  • Schedulers.io()
    最常用到的thread,適合用來處理網路連線、資料庫存取這些不使用CPU運算的作業,沒有thread pool數量限制。
  • Schedulers.computation()
    處理複雜資料或影像等需要CPU運算的作業,有數量限制,應只在需要CPU運算時才使用以免影響效能。
  • Schedulers.newThread()
    建立全新的thread,一般不太會用到,建立thread很耗資源所以須謹慎使用。
  • Schedulers.single()
    單一的thread並依序進行作業,可用來避免作業同時執行。
  • Schedulers.from(Executor executor)
    用自訂的Executor來操作thread,例如想要限制網路連線同時最多只能用3個thread。
  • AndroidSchedulers.mainThread()
    RxAndroid提供的thread,即為Android的UI thread,用於更新UI。

指定thread的方式為subscribeOnobserveOn,前者設置Observable要在哪個thread執行,後者是callback要發生在哪個thread。

如果一開始記不住的話,個人提供一種無腦的記法...

Completable.fromAction(...)
        .subscribeOn(Schedulers.io()) // 這行以上的內容要在io thread執行
        .observeOn(AndroidSchedulers.mainThread()) // 以下要在main thread執行
        .subscribe(...)

這種方式的前提是subscribeOnobserveOn要照順序寫,且只適合用在像上面這樣簡單的內容,如果Observable串聯很長的就不一定適用了。

對Schedulers還是不太了解的話可以看這篇更詳細的說明。

Subscribe

以上設置完畢以後就用subscribe讓Observable啟動,可以依需求寫想要的subscribe方式。

一般可以用DisposableXXXObserver,這包含所有的callback。

Completable.fromAction(...)
        .subscribeOn(...)
        .observeOn(...)
        .subscribe(new DisposableCompletableObserver() {
            @Override
            public void onComplete() {
                
            }

            @Override
            public void onError(Throwable e) {
                
            }
        });

若只要在收到特定回傳值類型時才觸發callback,可以用Consumer。

Single.fromCallable(...)
        .subscribeOn(...)
        .observeOn(...)
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {

            }
        });

也可以單純的執行,不用任何callback。

Completable.fromAction(...)
        .subscribeOn(...)
        .observeOn(...)
        .subscribe();

如果想要像開始時提到的用Disposable取消作業,要用subscribeWith,才能將Observable加進Disposable中。

Completable.fromAction(...)
        .subscribeOn(...)
        .observeOn(...)
        .subscribeWith(new DisposableCompletableObserver() {
            @Override
            public void onComplete() {
                
            }

            @Override
            public void onError(Throwable e) {
                
            }
        });

OK終於結束了,再複習一下:

  • 第一行建立Observable。
  • 第二行用subscribeOn設置Observable要在哪個thread執行。
  • 第三行用observeOn設置callback內容要在哪個thread執行。
  • 第四行subscribe開始執行Observable。

今天先有個概念,明天就會開始寫程式與Retrofit搭配使用。


Reference:
Exploring RxJava 2 for Android
Multi-Threading Like a Boss in Android With RxJava 2


上一篇
Data Binding Compiler V2
下一篇
RxJava2 and Retrofit
系列文
Android Architecture30

尚未有邦友留言

立即登入留言