這個主題花了我好多的時間查資料,之前提到動態的產生publisher其實就有sink
的概念,但文件上對於sink
的描述不是很清楚,當然有可能是我資質駑鈍,找了一些其他資料後終於了解其實Sinks
就是經過Reactor優化過後的Processors
。
在介紹Reactive Programming的時候有提到在Publisher
& Subscriber
之間有個Processor
來扮演中間人轉換之類的角色,然而在Reactor中,這些功能大部分其實透過Publisher
的operator就能做到,而其中剩下比較特別的部分,就還是需要使用Processor
來特別處理,Processor
同樣身為Subscriber
所以可以直接使用onNext
, onComplete
and onError
(透過Subscriber interface
),而這樣的行為是比較危險需要謹慎使用,所以Reactor在3.4.0的版本後完全棄用了Processor
,取而代之的是Sinks
。
both abstract and concrete
FluxProcessor
andMonoProcessor
are deprecated and slated for removal in 3.5.0
Sinks
就是Reactor優化過後的Processors
,他是thread-safe也可以避免一些不符合Reactive 規範的設計,相較於Processor.onNext
必須是同步的(synchronized),Sinks
有提供兩種Api,tryEmit*
API 會回傳EmitResult
,emit*
API提供EmissionFailureHandler
,讓你可以更容易的根據發送結果來設計你的api或是更容易的處理錯誤。
Sinks主要有三種:
Sinks.One
:僅可以傳送一個資料,類似Mono
。Sinks.Many
:可以傳送多筆資料,類似Flux
。Sinks.Empty
:沒有資料,僅能傳送終結訊號(terminal signal),類似Mono.empty()
。Sinks.unsafe()
,其之下也有相對的Sinks.unsafe().one()、Sinks.unsafe().empty()、Sinks.unsafe().Many()
,其實就是不保證thread-safe的版本,所以如果你能夠確保使用情境會是thread-safe的,使用Sinks.unsafe()
可以相對增加效能。EmitResult
的種類文件Sinks.Many
又分為三種
multicast
:可以有多個訂閱者,每個訂閱者並不會都拿到全部且一樣的資料,而是只會取得訂閱後開始最新的。unicast
:透過buffer來處理backpressure,但代價就是只能有一個訂閱者。replay
:會快取(cache)所有資料來讓每一個訂閱者都能拿到全部且一樣的資料。下面分別使用三種來轉為Flux,產生1~5的資料並且有兩個訂閱者。multicast
的部分可以看到subscribe2 只會有訂閱後的最新一筆資料而已,不會有之前的。
Sinks.Many<Integer> multicastSink = Sinks.many().multicast().onBackpressureBuffer();
multicastSink.emitNext(1, FAIL_FAST);
multicastSink.emitNext(2, FAIL_FAST);
multicastSink.asFlux().subscribe(t -> System.out.println("subscribe1 :" + t));
multicastSink.emitNext(3, FAIL_FAST);
multicastSink.emitNext(4, FAIL_FAST);
multicastSink.asFlux().subscribe(t -> System.out.println("subscribe2 :" + t));
multicastSink.emitNext(5, FAIL_FAST);
/*
subscribe1 :1
subscribe1 :2
subscribe1 :3
subscribe1 :4
subscribe1 :5
subscribe2 :5
*/
unicast
可以看到第一個訂閱者正常顯示,但當subscribe2 出現後隨即會出現錯誤。
Sinks.Many<Integer> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
unicastSink.emitNext(1, FAIL_FAST);
unicastSink.emitNext(2, FAIL_FAST);
unicastSink.asFlux().subscribe(t -> System.out.println("subscribe1 :" + t));
unicastSink.emitNext(3, FAIL_FAST);
unicastSink.emitNext(4, FAIL_FAST);
unicastSink.asFlux().subscribe(t -> System.out.println("subscribe2 :" + t));
unicastSink.emitNext(5, FAIL_FAST);
/*
subscribe1 :1
subscribe1 :2
subscribe1 :3
subscribe1 :4
reactor.core.Exceptions$ErrorCallbackNotImplemented:
java.lang.IllegalStateException: UnicastProcessor allows only a single Subscriber
*/
replay
的subscribe2 一樣能取得前面的資料,所以最終兩個訂閱者拿到的資料都是全部且一樣的。
Sinks.Many<Integer> replaySink = Sinks.many().replay().all();
replaySink.emitNext(1, FAIL_FAST);
replaySink.emitNext(2, FAIL_FAST);
replaySink.asFlux().subscribe(t -> System.out.println("subscribe1 :" + t));
replaySink.emitNext(3, FAIL_FAST);
replaySink.emitNext(4, FAIL_FAST);
replaySink.asFlux().subscribe(t -> System.out.println("subscribe2 :" + t));
replaySink.emitNext(5, FAIL_FAST);
/*
subscribe1 :1
subscribe1 :2
subscribe1 :3
subscribe1 :4
subscribe2 :1
subscribe2 :2
subscribe2 :3
subscribe2 :4
subscribe1 :5
subscribe2 :5
*/
再補充說明三個種類建構的方法,
multicast()
:
onBackpressureBuffer()
是第一個訂閱者訂閱之前的暫存,之後的訂閱者就只會收到最新的資料。onBackpressureBuffer(int bufferSize, boolean autoCancel)
可以傳入buffer的大小,並且當所有訂閱者都取消訂閱後是否自動清除buffer。directAllOrNothing()
只要有一個訂閱者變慢(無法消耗(consume更多的資料),則所有訂閱者都會停止,直到恢復正常。directBestEffort()
相較於上者,只會停止推送給無法接受資料的訂閱者,其他則正常。unicast()
:
onBackpressureBuffer()
這個buffer是用來存唯一訂閱者訂閱資料已經推送出去的資料,這樣才能確保訂閱者可以拿到全部,預設是沒有上限所以可能會有OOM的風險,Reactor也提供傳入自訂Queue來限制上限onBackpressureBuffer(Queue)
,超過的部分就會捨棄掉。replay()
:
limit(int))
因為replay會保存資料讓所有訂閱者都能接受到一樣的資料,limit會限制保存的數量。all()
:所有資料都保存limit(Duration)
:保存某個時間limit(int, Duration)
:結合時間跟數量的限制latest()
:只保存最後一筆latestOrDefault(T)
:保存最後一筆或是預設值Sinks.One
類似於Mono,只能有一個值,內含以下三個method,
emitValue(T value)
等於 onNext(value) + onComplete()
。emitEmpty()
等於onComplete()
,基本上就是Mono.empty()
。emitError``(Throwable t)
等於onError(t)
基本上就是Sinks.One,只是沒有emitValue(T value)。
以上就是關於Reactor的 Sinks
介紹,希望看完有基礎的了解,感覺Sinks
是比較進階的使用方式,如果後續有找到使用情境會再補充。