之前文章提到Publisher
都是Lazy loading,subscribe觸發之前整個streaming是不會運作的,就像是java streaming一樣,需要透過terminal動作(terminal operations)來啟動整個流程,但有別於java streaming是無法重複使用的,reactive steaming就算是已經subscribe觸發過,仍可被其他的Subscriber
再subscribe
一次。但其實Publisher
都是Lazy loading並不完全正確。先介紹reactor的兩種時間。
我們學習了一些Flux
或是Mono
的operator,像是map、flatMap、filter
......等等,也知道在宣告整個Flux的過程中並不會馬上觸發這些operators的行為,因為我們知道Reactor是lazy的,而這個在宣告並且串聯operators的時刻就被稱作是「Assembly Time」。
像是我們先產生一個0~9數字的Flux
,並根據奇偶數又拆分了兩個不同的Flux
,他們其實彼此間不會相互影響,不像是java 8 streams。一旦Flux被宣告出來,他就是immutable,就算被拿去做了其他的操作也一樣,像是積木組裝一樣,可以互相組裝,但最原始的積木是不會變的。
Flux<Integer> integerFlux = Flux.range(0, 10);
Flux<Integer> evenInt = integerFlux.filter(integer -> integer % 2 == 0);
Flux<Integer> oddInt = integerFlux.filter(integer -> integer % 2 != 0);
在Assembly Time時,我們宣告並組合了Flux,但並不會馬上的開始執行,而是直到有人訂閱(subscription),而官方推薦最簡單的訂閱方式就是「Flux.subscribe(valueConsumer, errorConsumer)
」,當subscribe開始觸發,開始的訊號從subscribe向上,經過map、filter
,最後到達range
,也就是我們的來源(source operator)就會開始產生資料(initial data),從這個時刻開始,就是Subscription Time。
Flux.range(0, 10)
.filter(integer -> integer % 2 == 0)
.map(integer -> "偶數: " + integer)
.subscribe(System.out::println, Throwable::printStackTrace);
講了這麼多次當subscribe觸發後才開始動作,其實觸發的方式有三種。
Flux.blockFirst
,阻斷也會觸發。如同上面介紹的,觸發subscribe
之前不做任何事情,只有在觸發之後才會產生資料(generate data),而且每個訂閱者(subscriber)都能拿到全部且一樣的資料。
不需要subscriber就會產生資料,常常是持續變動的資料,像是滑鼠軌跡、股票價格之類的,在連線(connection)就開始產生資料,通常是一開始。每個訂閱者(subscriber)只能拿到當他們開始subscribe之後的最新資料。
來做一個每隔一秒推送資料的Flux,分別有兩個subscriber來訂閱,為了做出區隔中間停止兩秒再開始第二個,因為interval
是cold operator,所以可以預期到第二個subscribe會是從0s開始,符合我們對Cold Publisher
的期待,每一個subscriber都能拿到一樣的資料。
Flux<Long> clockTicks = Flux.interval(Duration.ofSeconds(1));
clockTicks.subscribe(tick -> System.out.println("clock1 " + tick + "s"));
Thread.sleep(2000);
clockTicks.subscribe(tick -> System.out.println("\tclock2 " + tick + "s"));
Thread.sleep(5000);
clock1 0s
clock1 1s
clock1 2s
clock2 0s
clock1 3s
clock2 1s
clock1 4s
clock2 2s
clock2 3s
clock1 5s
clock2 4s
clock1 6s
而當我們透過share
來改變我們的程式,share可以將Cold Publisher轉換為Hot Publisher,就可以明顯看出第二個訂閱者因為間隔了兩秒,只會拿到從兩秒開始推送的資料,而不會是從0s。
Flux<Long> source = Flux.interval(Duration.ofSeconds(1));
Flux<Long> clockTicks = source.share();
clockTicks.subscribe(tick -> System.out.println("clock1 " + tick + "s"));
Thread.sleep(2000);
clockTicks.subscribe(tick -> System.out.println("\tclock2 " + tick + "s"));
Thread.sleep(5000);
clock1 0s
clock1 1s
clock1 2s
clock2 2s
clock1 3s
clock2 3s
clock1 4s
clock2 4s
clock1 5s
clock2 5s
clock1 6s
clock2 6s
用youtube來比喻Hot /Cold Publisher
,直播、首播就像是Hot Publisher
,每個進來頻道的人都是從最新的地方開始,進入的時間點不同得到的資料就會不同,就算沒有人看影片還是持續在跑動。
一般的影片就是像是Cold Publisher
,在沒有點開之前是不會做事,每個人點開都一樣是從最一開始的地方。
下一篇就來介紹有哪些像是share
可以轉換的operators。
資料來源