Reactive Programming 是我偶然從社群討論中聽到的概念,作為一個基本的Java開發者,除了OOP外,我也經常會去了解其他的開發模式(?),所以今天想要分享一下我對Reactive Programming的學習,作為拋磚引玉之用途。
Reactive Programming,有人稱呼為響應式程式設計,它是一種傳播資料流變化的程式設計典範。
比方來說,一行程式碼 a = c + 5,一般開發模式會直覺認定 a 值在該行 就隨著變數c的值而被確定,但在Reactive的世界觀中,a 的值會隨著後續c值的變動而重新被改變。可以想像是前端的監聽一樣,a會隨著c去改變。
那要實作出這樣的功能,主要就是利用「發布訂閱模式」(Publish-Subscribe) ,來訂閱某事件,便在值改變值隨之更新。
RxJava 是 Reactive Java 的簡稱,他擴展了觀察者模式 (Observer Pattern),主要功能分為了四個介面。
Publisher 發布者,發布資料流,在接受訂閱者(Subscriber)後建立一個訂閱(Subscription),在訂閱成功時呼叫Subscriber的onSubscribe並傳入Subscription實例。
Subscriber 訂閱者
Subscription 訂閱 作為Publisher與Subscribe關係的管理者,管理資料的傳輸,Subscription也協助處理有關BackPressure等問題。
Back Pressure──消費端(Consumer) 有能力控制/通知 生產端(Publish)發出資料的速度。
當consumer無法負荷來自producer發出的資料量時就會形成backpressure。當發生Backpressure會導致資料流失。
Processor 處理器 同時實作了Publisher與Subscribber,他會接受資料並把資料發布給訂閱者,與Subscription的差別是,Subscription關注的是Publisher與Subscriber之間的訂閱關係,Processor則關注在對資料進行處理的部分。
實作方面,首先使用需要引入依賴項目。
implementation "io.reactivex.rxjava3:rxjava:3.1.9"
再來,我們來看一個基本的Hello World程式碼。
package rxjava.examples;
import io.reactivex.rxjava3.core.*;
public class HelloWorld {
public static void main(String[] args) {
Flowable.just("Hello world").subscribe(System.out::println);
}
}
在RxJava中,有幾個基本類別:
Flowable
Flowable用來處理BackPressure情境,也就是當Publisher發出的資訊超出Subscriber可以接收的資訊時,Flowable提供了BackPressure機制來控制資料流的數量。
Flowable.range(1, 1000)
.map(v -> v * v)
.subscribe(System.out::println);
解釋以上程式碼就是,第一行發布者定義 1~1000的資料流,第二行進行資料的處理(開平方),第三行.subscribe(),則是資料真正開始被處理的時機點,概念同Stream()的資料流,這段就表示有一個subscriber接收了這個訂閱內容,並開始監聽並處理資料(這邊是印出)。
這樣看你可能沒有感覺,讓我們在假設一個情境,假設我們今天有兩個訂閱者的話,又會怎麼處理呢?參考以下程式碼。
Flowable<Integer> flowable = Flowable.range(1, 1000)
.map(v -> v * v); // 對資料進行平方操作
// 第一個訂閱者,印出結果。
flowable.subscribe(result -> System.out.println("Subscriber 1: " + result));
// 第一個訂閱者,印出結果。
flowable.subscribe(result -> System.out.println("Subscriber 2: " + result));
以上你可以知道,Reactive的其中一個精神,在於將建立資料與資料發送切分開來了。以第一行、第二行的狀況來說,只關注了資料的建立與處理,而不在意發送的對象。這段處理階段又被稱為Assembly time,此時的資料流尚未流動,也沒有產生副作用。直到Subscription time──也就是.subscribe()被觸發,資料才開始流動。
另外,在上面的例子來說,每次呼叫.subscribe()時都會重新處理資料,那如果我們想要資料只被處理一次的話,我們可以改成這樣。
ConnectableFlowable<Integer> connectableFlowable = Flowable.range(1, 1000)
.map(v -> v * v)
.publish(); // 使用 publish() 建立一個共用的資料流
// 第二個訂閱者,印出平方後的結果
connectableFlowable.subscribe(result -> System.out.println("订阅者 1: " + result));
// 第二個訂閱者,印出平方後的結果
connectableFlowable.subscribe(result -> System.out.println("订阅者 2: " + result));
// 啟動資料流
connectableFlowable.connect();
在這個例子中,除非connect()被呼叫,否則資料流不會執行。
Observable
Observable是最常用的類別,處理0~N個資料流,適用資料量比較小、輕量級的資料流。
Observable.just("Hello", "World")
.map(String::toLowerCase)
.subscribe(System.out::println);
Single
Single 適合只需要一個結果的操作,可以是一個資料流或是一個錯誤。
Single.just("Hello, Single!")
.map(String::toLowerCase)
.subscribe(System.out::println);
Completable
Completable 表示一個資料流中不發送資料,只表示動作的完成與錯誤,通常用在只關心操作完成還是失敗的場景。
Completable.fromRunnable(() -> System.out.println("Task completed"))
.subscribe(() -> System.out.println("Done"));
Maybe
介於Single與Completon之間,表示0或是1個資料流或是錯誤,適用無法確定有資料或是沒有資料的情況。
Maybe.just("Hello, Maybe!")
.subscribe(System.out::println, Throwable::printStackTrace,
() -> System.out.println("Completed without data"));
Upstream、Downstream是什麼?
這兩個名詞用來指在Stream中的前與後。比方說 .stream().filter().map().collection()來說,map()的Upstream就是指.filter(),downstream()就是指collection()。
另外,在RxJava的官方也有提供一個範例,讓我們更了解RxJava是如何解決問題的。
import io.reactivex.rxjava3.schedulers.Schedulers;
Flowable.fromCallable(() -> {
Thread.sleep(1000); // imitate expensive computation
return "Done";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000); // <--- wait for the flow to finish
// OR
Flowable<String> source = Flowable.fromCallable(() -> {
Thread.sleep(1000); // imitate expensive computation
return "Done";
});
Flowable<String> runBackground = source.subscribeOn(Schedulers.io());
Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());
showForeground.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000);
在這段程式碼中它首先使用fromCallable()方法建立了一個Flowable,fromCallable()用來建立Flowable,與.just()不同的是,fromCallable()擁有更多功能。這邊模仿了一個耗時操作。
.subscribeOn(Schedulers.io()) 代表了這個Flowable的操作會在一個IO執行緒中被執行,Schedulers.io()是由RxJava定義,一個專門用來處理IO的執行緒。
observeOn(Schedulers.single()) 指定了訂閱者在哪個Thread上處理事件,Scheduler.single()代表會在單一一個Thread上被執行。
.subscribe(System.out::println, Throwable::printStackTrace) 代表訂閱者訂閱了這個事件,內部包含了處理成功的Callback與應對發生異常的Callback
Thread.sleep(2000); // 因為Flowable是異步執行的,所以需要先將Thread()暫停兩秒,等待處理完成,避免程式在Thread尚未完成之前就先被結束。這樣的寫法也被稱作Fluent API。
那麼以上就是今天的分享了,Reactive Programming的概念我覺得並不是很好懂,網路上的範例也不多,本次的內容可能也會有錯誤,但同樣學起來也很有成就感。希望之後有機會將這份技術活用到工作之中。
那麼今天的部分就到此為止了,明天就是最後一天了,明天見吧。
RxJava 針對排程,提供了以下服務
Schedulers.computation()Scheduler
:在後台對固定數量的專用線程運行計算密集型工作。大多數異步運算符將此作為其 default 。
Schedulers.io()
:在動態變化的線程集上運行類似 I/O 或阻塞的操作。
Schedulers.single()
:以順序和 FIFO 方式在單個線程上運行工作。
Schedulers.trampoline()
:在其中一個參與線程中以 Sequential 和 FIFO 方式運行工作,通常用於測試目的。
Concurrency within a flow
RxJava的資料流本身是分為多個處理階段循序處理,這些階段可以彼此併發運行。
Flowable.range(1, 10)
.observeOn(Schedulers.computation())
.map(v -> v * v)
.blockingSubscribe(System.out::println);
// Lambda並不會將他們平行處理,而是循序處裡
Parallel processing
Flowable.range(1, 10)
.flatMap(v ->
Flowable.just(v)
.subscribeOn(Schedulers.computation())
.map(w -> w * w)
)
.blockingSubscribe(System.out::println);
RxJava中 Flatmap通過首先將1~10每個數字對應到各自獨立的Flowable中,然後運行並合併計算出平方值。
但请注意,flatMap
不保证任何顺序,来自内部流的项目可能会交错排列。还有其他替代操作符:
concatMap
会一个接一个地映射并运行内部流,concatMapEager
则会 "同时" 运行所有内部流,但输出流的顺序将按照这些内部流创建的顺序排列。此外,也有其他並行處理的解決方案
Flowable.range(1, 10)
.parallel()
.runOn(Schedulers.computation())
.map(v -> v * v)
.sequential()
.blockingSubscribe(System.out::println);
flatMap
is a powerful operator and helps in a lot of situations. For example, given a service that returns a Flowable
, we'd like to call another service with values emitted by the first service:
Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();
inventorySource
.flatMap(inventoryItem -> erp.getDemandAsync(inventoryItem.getId())
.map(demand -> "Item " + inventoryItem.getName() + " has demand " + demand))
.subscribe(System.out::println);
Sometimes, when an item has become available, one would like to perform some dependent computations on it. This is sometimes called continuations and, depending on what should happen and what types are involved, may involve various operators to accomplish.
Dependent
The most typical scenario is to given a value, invoke another service, await and continue with its result:
service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next))
It is often the case also that later sequences would require values from earlier mappings. This can be achieved by moving the outer flatMap
into the inner parts of the previous flatMap
for example:
service.apiCall()
.flatMap(value ->
service.anotherApiCall(value)
.flatMap(next -> service.finalCallBoth(value, next))
)
Here, the original value
will be available inside the inner flatMap
, courtesy of lambda variable capture.
在其他情況下,第一個源/數據流的結果是無關緊要的,並且希望繼續使用准獨立的另一個源。
Observable continued = sourceObservable.flatMapSingle(ignored -> someSingleSource)
continued.map(v -> v.toString())
.subscribe(System.out::println, Throwable::printStackTrace);
sourceObservable
.ignoreElements() // returns Completable
.andThen(someSingleSource)
.map(v -> v.toString())
延遲依賴
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.just(count.get()))
.subscribe(System.out::println);
不幸的是,這會列印出來,因為在彙編時,當數據流甚至還沒有運行時被評估。我們需要一些東西,將這個源的評估推遲到主源完成時運行時:0Single.just(count.get())Single
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.defer(() -> Single.just(count.get())))
.subscribe(System.out::println);
//OR
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.fromCallable(() -> count.get()))
.subscribe(System.out::println);