java 9的時候新增支援Reactive Stream,所以在介紹Spring Reactor
、WebFlux
之前先來認識一下Java 原生的Flow Api。
java 9更新推出對應Reactive Streams 規格的分別有三個interface,Publisher
負責產生item來推送給一個或多個Subscribers
去使用(consumed),每一個item都被Subscription
所管理,中間有一或多個Processor
來處理轉換或是特別的邏輯。
Subscriber
訂閱Publisher
,Publisher
負責產生推送資料,透過Subscription
居中管理, Subscriber
會提出需要的資料量,這樣就有背壓(backpressure)機制(之後介紹),來避免Subscriber來不及消化大量資料導致系統異常。下方是java.util.concurrent.Flow 移除掉文件說明的部分,從程式碼可以看出
Publisher
提供訂閱Subscriber
有四個方法 分別為
Subscription
Publisher
結束時/*
public final class Flow {
private Flow() {} // uninstantiable
@FunctionalInterface
public static interface Publisher<T> {
public void subscribe(Subscriber<? super T> subscriber);
}
public static interface Subscriber<T> {
public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
}
public static interface Subscription {
public void request(long n);
public void cancel();
}
}
實際動手做一個簡單聽Podcast要取得節目內容的DEMO
建立Member會員實作Subscriber
,假設會員要訂閱Podcast頻道,訂閱時同步subscription
並要求一個item,每接受到一個item就存入episodes 並要求下一個,最後當Publisher
結束時則會印出onComplete。
@Data
public class Member <T> implements Subscriber<T> {
private Subscription subscription;
private List<T> episodes = new LinkedList<>();
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item) {
System.out.println("onNext:" + item);
episodes.add(item);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError:" + throwable);
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
}
因為Publisher
比較複雜我們直接拿原生的SubmissionPublisher
來使用,實際DEMO情況如下
新增一個會員跟一個Podcast頻道並訂閱,將內容推送到頻道當中,在第一時間印出是可以發現尚未收到任何集數,因為整個流程是非同步的在背後進行,所以當我們暫停一秒鐘後就可以正常到看到結果。
@Test
void testJava9Reactive() throws InterruptedException {
Member<String> member = new Member<>();
SubmissionPublisher<String> podcastChannel = new SubmissionPublisher<>();
podcastChannel.subscribe(member);
Assertions.assertEquals(1, podcastChannel.getSubscribers().size());
List<String> episodes = List.of("1", "2", "3", "4");
episodes.forEach(podcastChannel::submit);
System.out.println(member.getEpisodes());
Thread.sleep(1000);
podcastChannel.close();
System.out.println(member.getEpisodes());
/* output:
[]
onNext:1
onNext:2
onNext:3
onNext:4
[1, 2, 3, 4]
onComplete
*/
}
今天終於開始進入到Reactive Programming並帶有一點實作,下一篇會繼續介紹今天用到的SubmissionPublisher
與一開始提到的Processor。