其實有一個特別的例子是just
,直覺會認為just
就是產生一個publisher
等人來subscribe
,理所當然就是Cold Publisher,我一開始也是這樣認為,直到看到官方doc說明他是Hot Publisher,原來最熟悉的好朋友其實如此的陌生,又在程式的道路上學習了人生的一課,Cold Publisher是Lazy loading,但其實just
在assembly time就已經準備好所有的data,當有Subscriber
訂閱只是在replay一次而已。
使用Flux.just
的時候,在assembly time就已經取得資料,我們可以改成透過defer
,回歸lazy,直到subscribe才會取資料,defer
就是延遲的意思,透過Supplier
來達到延遲的效果,來源資料會被延遲到subscribe才會被初始化,也是因為這樣,如果來源資料是與當下時間有關,這樣每個不同的subscribe都會取到不同的時間,但本質上還是一個Cold Publisher。
以下範例是分別透過just
& defer
來取用現在時間,可以看出just
每次subscribe結果會相同,就能得知just
在assembly time就已經取資料了,而defer
則是延遲直到subscribe才真的去取得現在時間,看到這邊可以會有點錯亂,原本以為Hot Publisher是每次結果會不相同,但這邊的Hot Publisher是just
每次結果相同,COLD Publisher的defer
則會每次不同,老實說我是有點錯亂,官方文件上也沒有明確定義cold & hot,所以我個人認為是產生資料的時間點,如果是lazy的要等到subscribe才會觸發的就是COLD Publisher,如果無論有沒有subscribe都會先取得資料的,則是Hot Publisher。
Mono<Long> clock = Mono.just(System.currentTimeMillis());
Mono<Long> clockDefer = Mono.defer(() -> Mono.just(System.currentTimeMillis()));
Thread.sleep(500);
clock.subscribe(t -> System.out.println("clock:"+ t));
clockDefer.subscribe(t -> System.out.println("\tclockDefer:"+ t));
Thread.sleep(500);
clock.subscribe(t -> System.out.println("clock:"+ t));
clockDefer.subscribe(t -> System.out.println("\tclockDefer:"+ t));
/*
clock:1631980808996
clockDefer:1631980809602
clock:1631980808996
clockDefer:1631980810105 */
再比較一次COLD 與HOT的不同,第一個範例是先用fromIterable
產生一個cold publisher,兩個Subscriber 得到的結果是一模一樣的。
Flux<String> source =
Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
.map(String::toUpperCase);
source.subscribe(d -> System.out.println("Subscriber 1: " + d));
source.subscribe(d -> System.out.println("Subscriber 2: " + d));
/*
Subscriber 1: BLUE
Subscriber 1: GREEN
Subscriber 1: ORANGE
Subscriber 1: PURPLE
Subscriber 2: BLUE
Subscriber 2: GREEN
Subscriber 2: ORANGE
Subscriber 2: PURPLE
*/
再來利用前面學到的Sinks
動態的產生Flux
,當時就有提到後面subscribe的只會得到之後的資料,可以看到Subscriber 2 就只有橘色與紫色。
Sinks.Many<String> hotSource = Sinks.unsafe().many().multicast().directBestEffort();
Flux<String> hotFlux = hotSource.asFlux().map(String::toUpperCase);
hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: "+d));
hotSource.emitNext("blue", FAIL_FAST);
hotSource.tryEmitNext("green").orThrow();
hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: "+d));
hotSource.emitNext("orange", FAIL_FAST);
hotSource.emitNext("purple", FAIL_FAST);
hotSource.emitComplete(FAIL_FAST);
/*
Subscriber 1 to Hot Source: BLUE
Subscriber 1 to Hot Source: GREEN
Subscriber 1 to Hot Source: ORANGE
Subscriber 2 to Hot Source: ORANGE
Subscriber 1 to Hot Source: PURPLE
Subscriber 2 to Hot Source: PURPLE
*/
經過兩天的學習,應該能大致了解cold/hot publihser,不過其實官方文件並沒有明確定義,所以建議也不用太鑽牛角尖的想去定義每一個operator是屬於哪一種,遇到再來討論。