這章節會介紹reactive programming, 有別於imperative programming會循序執行,reactive programming會先建立stream,程式內部會以非同步的方式執行使資料流經這些stream來達到想要的處理效果。
imperative programming:
String name = "Bond";
String upperCaseName = name.toUpperCase();
String sayHello = "Hello, " + upperCaseName + "!";
System.out.println(sayHello);
reactive programming:
Mono.just("Bond")
.map(n -> n.toUpperCase())
.map(cn -> "Hello, " + cn + "!")
.subscribe(System.out::println);
Reactive programming會使用到的stream基本上會follow Reactive Stream specification,而這個規格的實作在Spring WebFlux中會使用Project Reactor,這個Reactor library可說是在Spring中使用reactive programming的一切基底。
Reactive Stream specification主要由四個介面來構成:
1.Publisher
public interface Publisher<T> {
void subscribe(Subscriber<? super T> subscriber);
}
Publisher可說是一切的起點,其代表的意義就是資料的產出者,當它被呼叫了唯一的一個方法subscribe時會需要傳入一個Subscriber,代表是誰來去接收Pulisher產生的資料。
2.Subscriber
public interface Subscriber<T> {
void onSubscribe(Subscription sub);
void onNext(T item);
void onError(Throwable ex);
void onComplete();
}
當Subscriber被傳入Publisher的subscribe方法後,實作內容就會拿著這個subscriber來定義4種subscriber在接收資料後的動作。第一步就是onSubscribe,它代表當subscriber在初出被publisher subscribe後該做什麼,這時會傳入一個Subscription。而onNext, onError, onComplete就是在subscribe後實際處理資料時會發生的狀態。
3.Subscription
public interface Subscription {
void request(long n);
void cancel();
}
在subscriber被publisher subscribe後,通常就是在publisher內實作subscriber的onSubscribe;而在Subscription中的request代表一次要拿多少資料,以long n來做一個門檻值的管控,假若有任何不能處理的條件達成後,就會執行cancel方法。而在request的實作中會去定義subscriber的onNext, onComplete, onError。
4.Processor
public interface Processor<T, R>
extends Subscriber<T>, Publisher<R> {}
一個更有彈性的介面。
以下引用了Java 9的Flow API介紹中舉的例子([https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.html]),來有一個具體的理解:
class OneShotPublisher implements Publisher<Boolean> {
private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
private boolean subscribed; // true after first subscribe
public synchronized void subscribe(Subscriber<? super Boolean> subscriber) {
if (subscribed)
subscriber.onError(new IllegalStateException()); // only one allowed
else {
subscribed = true;
subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
}
}
static class OneShotSubscription implements Subscription {
private final Subscriber<? super Boolean> subscriber;
private final ExecutorService executor;
private Future<?> future; // to allow cancellation
private boolean completed;
OneShotSubscription(Subscriber<? super Boolean> subscriber,
ExecutorService executor) {
this.subscriber = subscriber;
this.executor = executor;
}
public synchronized void request(long n) {
if (n != 0 && !completed) {
completed = true;
if (n < 0) {
IllegalArgumentException ex = new IllegalArgumentException();
executor.execute(() -> subscriber.onError(ex));
} else {
future = executor.submit(() -> {
subscriber.onNext(Boolean.TRUE);
subscriber.onComplete();
});
}
}
}
public synchronized void cancel() {
completed = true;
if (future != null) future.cancel(false);
}
}
}