在 gradle (Module) 層級的 dependencies 中內加入:
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
Observable(被觀察者)/Observer(觀察者)
在RxJava中,有發送資料流程然後終止資料流程的Observable物件,以及訂閱Observable的Observer物件。 Observer訂閱的Observable發送值、錯誤、或完成信號後,Observer會接收到通知。
流程:
private void myObservable() {
Observable mObservable = new Observable() {
@Override
protected void subscribeActual(@NonNull Observer observer) {
observer.onNext("hello world");
observer.onComplete();
}
};
Observer mObserver = new Observer() {
// 這是新加入的方法,在訂閱後發送數據之前,會首先調用
// Disposable可用於取消訂閱
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Object o) {
Log.e("onNext", String.valueOf(o));
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
};
mObservable.subscribe(mObserver);
}
Flowable(被觀察者)/Subscriber(觀察者)
上游的被觀察者會響應下游觀察者的資料請求,下游調用request()告訴上游發送多少個資料。避免大量資料堆積在調用鏈上,使内存一直處於較低水平。
private void myFlowable() {
Flowable.range(0,5).subscribe(new Subscriber<Integer>() {
Subscription subscription;
// 訂閱後會先調用,相當於onStart()
// Subscription可用於請求數據、取消訂閱
@Override
public void onSubscribe(Subscription s) {
Log.w("flow","onSubscribe start");
subscription = s;
subscription.request(1);
}
@Override
public void onNext(Integer integer) {
Log.w("flow","onNext-->"+integer);
subscription.request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
Log.w("flow","onComplete");
}
});
}
Single/SingleObserver
Single類似於Observable,不同的是,它總是只發送一個值,或者一個錯誤通知,而不是發送一系列的值,所以當你使用一個單一連續事件流,這樣你可以使用Single。Single觀察者只包含兩個事件,一個是正常處理成功的onSuccess,另一個是處理失敗的onError。因此,不同於Observable需要三個方法onNext,onError,onCompleted,訂閱Single只需要兩個方法:
private void mySingle() {
// 被觀察者
Single<String> single = new Single<String>() {
@Override
protected void subscribeActual(@NonNull SingleObserver<? super String> observer) {
observer.onSuccess("success");
}
};
// 訂閱觀察者SingleObserver
single.subscribe(new SingleObserver<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onSuccess(@NonNull String s) {
Log.d("onSuccess", s);
}
@Override
public void onError(@NonNull Throwable e) {
}
});
}
Completable/CompletableObserver
如果你的觀察者連onNext事件都不關心,可以使用Completable,因為Completable只有onComplete和onError兩個事件
private void myCompletable() {
Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(@NonNull CompletableEmitter emitter) throws Throwable {
emitter.onComplete();
}
}).subscribe(new CompletableObserver() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onComplete() {
Log.e("onComplete","onComplete");
}
@Override
public void onError(@NonNull Throwable e) {
}
});
}
Maybe/MaybeObserver
如果你有一個需求是可能發送一個資料或者不會發送任何資料,這時候你就需要Maybe,它類似於Single和Completable的混合體。
Maybe可能會調用以下其中一種情況:
private void myMaybe() {
Maybe<String> maybe = Maybe.create(new MaybeOnSubscribe<String>() {
@Override
public void subscribe(@NonNull MaybeEmitter<String> emitter) throws Throwable {
// 發送一個資料的情況或onError,不需再調用onComplete
emitter.onSuccess("hello");
// 不需要發送資料的情況或onError
// emitter.onComplete();
}
});
maybe.subscribe(new MaybeObserver<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onSuccess(@NonNull String s) {
Log.d("onSuccess", s);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
}
謝謝大家願意花時間閱讀,小弟弟我在此鞠躬