iT邦幫忙

2021 iThome 鐵人賽

DAY 14
0
Software Development

從零開始Reactive Programming- Spring系列 第 15

[Day 14] Reactive Programming -Reactor(COLD VS HOT) -PART 1

前言

之前文章提到Publisher都是Lazy loading,subscribe觸發之前整個streaming是不會運作的,就像是java streaming一樣,需要透過terminal動作(terminal operations)來啟動整個流程,但有別於java streaming是無法重複使用的,reactive steaming就算是已經subscribe觸發過,仍可被其他的Subscribersubscribe一次。但其實Publisher都是Lazy loading並不完全正確。先介紹reactor的兩種時間。

圖片

圖片來源 : unsplash

Assembly Time

我們學習了一些Flux或是Mono的operator,像是map、flatMap、filter......等等,也知道在宣告整個Flux的過程中並不會馬上觸發這些operators的行為,因為我們知道Reactor是lazy的,而這個在宣告並且串聯operators的時刻就被稱作是「Assembly Time」。
像是我們先產生一個0~9數字的Flux,並根據奇偶數又拆分了兩個不同的Flux,他們其實彼此間不會相互影響,不像是java 8 streams。一旦Flux被宣告出來,他就是immutable,就算被拿去做了其他的操作也一樣,像是積木組裝一樣,可以互相組裝,但最原始的積木是不會變的。


Flux<Integer> integerFlux = Flux.range(0, 10);
Flux<Integer> evenInt = integerFlux.filter(integer -> integer % 2 == 0);
Flux<Integer> oddInt = integerFlux.filter(integer -> integer % 2 != 0);

Subscription Time

在Assembly Time時,我們宣告並組合了Flux,但並不會馬上的開始執行,而是直到有人訂閱(subscription),而官方推薦最簡單的訂閱方式就是「Flux.subscribe(valueConsumer, errorConsumer)」,當subscribe開始觸發,開始的訊號從subscribe向上,經過map、filter,最後到達range,也就是我們的來源(source operator)就會開始產生資料(initial data),從這個時刻開始,就是Subscription Time。

Flux.range(0, 10) 
    .filter(integer -> integer % 2 == 0) 
    .map(integer -> "偶數: " + integer) 
    .subscribe(System.out::println, Throwable::printStackTrace);

講了這麼多次當subscribe觸發後才開始動作,其實觸發的方式有三種。

  1. subscribe:之前介紹都是屬於這種
  2. block:Flux.blockFirst,阻斷也會觸發。
  3. hot publisher:接下來就來說明第三種。

Cold Publisher

如同上面介紹的,觸發subscribe之前不做任何事情,只有在觸發之後才會產生資料(generate data),而且每個訂閱者(subscriber)都能拿到全部且一樣的資料。

Hot Publisher

不需要subscriber就會產生資料,常常是持續變動的資料,像是滑鼠軌跡、股票價格之類的,在連線(connection)就開始產生資料,通常是一開始。每個訂閱者(subscriber)只能拿到當他們開始subscribe之後的最新資料。

DEMO

來做一個每隔一秒推送資料的Flux,分別有兩個subscriber來訂閱,為了做出區隔中間停止兩秒再開始第二個,因為interval是cold operator,所以可以預期到第二個subscribe會是從0s開始,符合我們對Cold Publisher的期待,每一個subscriber都能拿到一樣的資料。

Flux<Long> clockTicks = Flux.interval(Duration.ofSeconds(1));

clockTicks.subscribe(tick -> System.out.println("clock1 " + tick + "s"));
Thread.sleep(2000);

clockTicks.subscribe(tick -> System.out.println("\tclock2 " + tick + "s"));
Thread.sleep(5000);
clock1 0s 
clock1 1s 
clock1 2s 
	clock2 0s 
clock1 3s 
	clock2 1s 
clock1 4s 
	clock2 2s 
	clock2 3s 
clock1 5s 
	clock2 4s 
clock1 6s

而當我們透過share來改變我們的程式,share可以將Cold Publisher轉換為Hot Publisher,就可以明顯看出第二個訂閱者因為間隔了兩秒,只會拿到從兩秒開始推送的資料,而不會是從0s。

Flux<Long> source = Flux.interval(Duration.ofSeconds(1));
Flux<Long> clockTicks = source.share();
clockTicks.subscribe(tick -> System.out.println("clock1 " + tick + "s"));
Thread.sleep(2000);

clockTicks.subscribe(tick -> System.out.println("\tclock2 " + tick + "s"));
Thread.sleep(5000);
clock1 0s 
clock1 1s 
clock1 2s 
	clock2 2s 
clock1 3s 
	clock2 3s 
clock1 4s 
	clock2 4s 
clock1 5s 
	clock2 5s 
clock1 6s 
	clock2 6s

範例來源

總結

用youtube來比喻Hot /Cold Publisher,直播、首播就像是Hot Publisher,每個進來頻道的人都是從最新的地方開始,進入的時間點不同得到的資料就會不同,就算沒有人看影片還是持續在跑動。
一般的影片就是像是Cold Publisher,在沒有點開之前是不會做事,每個人點開都一樣是從最一開始的地方。

下一篇就來介紹有哪些像是share可以轉換的operators。
資料來源


上一篇
[Day 13] Reactive Programming - Reactor(Processors & Sinks)
下一篇
[Day 15] Reactive Programming -Reactor(COLD VS HOT) -PART 2
系列文
從零開始Reactive Programming- Spring32
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言