iT邦幫忙

2021 iThome 鐵人賽

DAY 9
0
Software Development

從零開始Reactive Programming- Spring系列 第 10

[Day 9] Reactive Programming - Backpressure

前言

大部分介紹Reactive Programming都一定會提到Backpressure,可能放在第九天有點稍晚,但我覺得有基本的Reactor觀念後再來看也不遲。

Backpressure

Reactive Programming 常常提到的backpressure,這在現實裡的管道中也會有,在軟體提到的背壓(backpressure)是指當生產者(producer )生產超過消費者(consumer),為了避免消費者被過多的資料所淹沒的一個保護機制。例如拿寶特瓶來喝水,背壓就是正常的喝水,而沒有了這個機制,就像是喝到一半有人用力捏寶特瓶,水就會來不及喝下去噴出來。

用程式碼來看介紹,interval會隨著時間不斷的emits data,delay會延遲Subscriber去接受,導致漸漸的DATA溢出報錯。

 Flux.interval(Duration.ofMillis(1)) 
        .log()   
        .delayElements(Duration.ofMillis(100))
        .blockLast();

https://ithelp.ithome.com.tw/upload/images/20210923/201414183GhaiLRwes.png
https://ithelp.ithome.com.tw/upload/images/20210923/20141418fQ5SNsX0Cj.png

reactor.core.Exceptions$OverflowException: Could not emit tick 32 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks) 
	at reactor.core.Exceptions.failWithOverflow(Exceptions.java:233) 
	at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:132) 
	at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59) 
	at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73) 
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) 
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) 
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) 
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
	at java.base/java.![https://ithelp.ithome.com.tw/upload/images/20210923/201414186VL7Y0MpU6.png](https://ithelp.ithome.com.tw/upload/images/20210923/201414186VL7Y0MpU6.png)til.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
	at java.base/java.lang.Thread.run(Thread.java:834)

稍微調整一下,加上一個buffer就能避免太快速的報錯,但若是Buffer被塞滿了仍然是會出錯,因為interval是根據時間不斷推送資料,較難去維持backpressure的機制,這邊只是為了demo而使用。

    Flux.interval(Duration.ofMillis(1)) 
        .log() 
        .onBackpressureBuffer() 
        .concatMap(x -> Mono.delay(Duration.ofMillis(100))) 
        .blockLast();

https://ithelp.ithome.com.tw/upload/images/20210923/20141418Ee7woewTDF.png

Range

range則會定義出總共emit的量,這時候透過limitRate(3)是subscribers 只會每次要求3個而不會因為太多的資料量而導致錯誤,也就是如果publisher可以根據下游(downstream)的要求來決定產生資料的速度,就算是有backpressure的機制

Flux.range(1, 20) 
        .log() 
        .limitRate(3) 
        .subscribe();

https://ithelp.ithome.com.tw/upload/images/20210923/20141418jUtmCQSbzL.png
https://ithelp.ithome.com.tw/upload/images/20210923/20141418r4LVAsJNLo.png

結語

Backpressure在Reactive Programming是蠻重要的觀念,後面的文章也會陸續提到Reactor的Api對應Backpressure會有一些處理機制。

資料來源


上一篇
[Day 8] Reactive Programming - Reactor(FLUX & MONO) Part 2
下一篇
[Day 10] Reactive Programming - Reactor (generate & create)
系列文
從零開始Reactive Programming- Spring32

尚未有邦友留言

立即登入留言