這個主題花了我好多的時間查資料,之前提到動態的產生publisher其實就有sink的概念,但文件上對於sink的描述不是很清楚,當然有可能是我資質駑鈍,找了一些其他資料後終於了解其實Sinks就是經過Reactor優化過後的Processors。
在介紹Reactive Programming的時候有提到在Publisher & Subscriber之間有個Processor來扮演中間人轉換之類的角色,然而在Reactor中,這些功能大部分其實透過Publisher 的operator就能做到,而其中剩下比較特別的部分,就還是需要使用Processor來特別處理,Processor同樣身為Subscriber所以可以直接使用onNext, onComplete and onError (透過Subscriber interface),而這樣的行為是比較危險需要謹慎使用,所以Reactor在3.4.0的版本後完全棄用了Processor,取而代之的是Sinks。
both abstract and concrete
FluxProcessorandMonoProcessorare deprecated and slated for removal in 3.5.0
Sinks就是Reactor優化過後的Processors,他是thread-safe也可以避免一些不符合Reactive 規範的設計,相較於Processor.onNext必須是同步的(synchronized),Sinks有提供兩種Api,tryEmit* API 會回傳EmitResult,emit* API提供EmissionFailureHandler,讓你可以更容易的根據發送結果來設計你的api或是更容易的處理錯誤。
Sinks主要有三種:
Sinks.One:僅可以傳送一個資料,類似Mono。Sinks.Many:可以傳送多筆資料,類似Flux。Sinks.Empty:沒有資料,僅能傳送終結訊號(terminal signal),類似Mono.empty()。Sinks.unsafe(),其之下也有相對的Sinks.unsafe().one()、Sinks.unsafe().empty()、Sinks.unsafe().Many(),其實就是不保證thread-safe的版本,所以如果你能夠確保使用情境會是thread-safe的,使用Sinks.unsafe()可以相對增加效能。EmitResult的種類文件
Sinks.Many又分為三種
multicast:可以有多個訂閱者,每個訂閱者並不會都拿到全部且一樣的資料,而是只會取得訂閱後開始最新的。unicast:透過buffer來處理backpressure,但代價就是只能有一個訂閱者。replay:會快取(cache)所有資料來讓每一個訂閱者都能拿到全部且一樣的資料。下面分別使用三種來轉為Flux,產生1~5的資料並且有兩個訂閱者。multicast的部分可以看到subscribe2 只會有訂閱後的最新一筆資料而已,不會有之前的。
Sinks.Many<Integer> multicastSink = Sinks.many().multicast().onBackpressureBuffer();
multicastSink.emitNext(1, FAIL_FAST);
multicastSink.emitNext(2, FAIL_FAST);
multicastSink.asFlux().subscribe(t -> System.out.println("subscribe1 :" + t));
multicastSink.emitNext(3, FAIL_FAST);
multicastSink.emitNext(4, FAIL_FAST);
multicastSink.asFlux().subscribe(t -> System.out.println("subscribe2 :" + t));
multicastSink.emitNext(5, FAIL_FAST);
/*
subscribe1 :1
subscribe1 :2
subscribe1 :3
subscribe1 :4
subscribe1 :5
subscribe2 :5
*/
unicast可以看到第一個訂閱者正常顯示,但當subscribe2 出現後隨即會出現錯誤。
Sinks.Many<Integer> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
unicastSink.emitNext(1, FAIL_FAST);
unicastSink.emitNext(2, FAIL_FAST);
unicastSink.asFlux().subscribe(t -> System.out.println("subscribe1 :" + t));
unicastSink.emitNext(3, FAIL_FAST);
unicastSink.emitNext(4, FAIL_FAST);
unicastSink.asFlux().subscribe(t -> System.out.println("subscribe2 :" + t));
unicastSink.emitNext(5, FAIL_FAST);
/*
subscribe1 :1
subscribe1 :2
subscribe1 :3
subscribe1 :4
reactor.core.Exceptions$ErrorCallbackNotImplemented:
java.lang.IllegalStateException: UnicastProcessor allows only a single Subscriber
*/
replay的subscribe2 一樣能取得前面的資料,所以最終兩個訂閱者拿到的資料都是全部且一樣的。
Sinks.Many<Integer> replaySink = Sinks.many().replay().all();
replaySink.emitNext(1, FAIL_FAST);
replaySink.emitNext(2, FAIL_FAST);
replaySink.asFlux().subscribe(t -> System.out.println("subscribe1 :" + t));
replaySink.emitNext(3, FAIL_FAST);
replaySink.emitNext(4, FAIL_FAST);
replaySink.asFlux().subscribe(t -> System.out.println("subscribe2 :" + t));
replaySink.emitNext(5, FAIL_FAST);
/*
subscribe1 :1
subscribe1 :2
subscribe1 :3
subscribe1 :4
subscribe2 :1
subscribe2 :2
subscribe2 :3
subscribe2 :4
subscribe1 :5
subscribe2 :5
*/
再補充說明三個種類建構的方法,
multicast():
onBackpressureBuffer()是第一個訂閱者訂閱之前的暫存,之後的訂閱者就只會收到最新的資料。onBackpressureBuffer(int bufferSize, boolean autoCancel) 可以傳入buffer的大小,並且當所有訂閱者都取消訂閱後是否自動清除buffer。directAllOrNothing() 只要有一個訂閱者變慢(無法消耗(consume更多的資料),則所有訂閱者都會停止,直到恢復正常。directBestEffort() 相較於上者,只會停止推送給無法接受資料的訂閱者,其他則正常。unicast():
onBackpressureBuffer() 這個buffer是用來存唯一訂閱者訂閱資料已經推送出去的資料,這樣才能確保訂閱者可以拿到全部,預設是沒有上限所以可能會有OOM的風險,Reactor也提供傳入自訂Queue來限制上限onBackpressureBuffer(Queue),超過的部分就會捨棄掉。replay():
limit(int))因為replay會保存資料讓所有訂閱者都能接受到一樣的資料,limit會限制保存的數量。all():所有資料都保存limit(Duration):保存某個時間limit(int, Duration):結合時間跟數量的限制latest():只保存最後一筆latestOrDefault(T):保存最後一筆或是預設值Sinks.One 類似於Mono,只能有一個值,內含以下三個method,
emitValue(T value) 等於 onNext(value) + onComplete()。emitEmpty() 等於onComplete(),基本上就是Mono.empty()。emitError``(Throwable t) 等於onError(t)
基本上就是Sinks.One,只是沒有emitValue(T value)。
以上就是關於Reactor的 Sinks介紹,希望看完有基礎的了解,感覺Sinks是比較進階的使用方式,如果後續有找到使用情境會再補充。