iT邦幫忙

2021 iThome 鐵人賽

DAY 24
0
Software Development

Wow ! There is no doubt about Learn Spring framework in a month.系列 第 24

[Day-24] - Spring Reactor Mono 一日初探就上手

Abstract

昨日已先行提過Flux,可方便處理一連串指定類型事件,所以說Flux就像瑞凡,被眾人討厭的瑞凡,散播出去,就難收回了,所以我們說,瑞凡,回不去了,而Mono用於處理單一指定類型事件,可說是一種單一(Single)架構,其中也包含了許多與Flux相似的靜態方法,如:just、empty、error及never等用法,所以只要出錯位置都可以確切地發現,而Mono有許多針對自身元件設計的特別方法,較常被開發者使用的有fromCallable、fromCompletionStage、fromFuture、fromRunnable和fromSupplier方法,這五種方法分別從Callable、CompletionStage、CompletableFuture、Runnable和Supplier中創建了Mono對象,我是小編威斯丁,我將以fromCallable作原理與範例介紹。

Principle Introduction

Mono架構是處理0到1個異步發射器,他是採用一種非阻塞式的架構,可以確保每個指定類型事件可以運算完畢,不受其他執行緒的影響,Mono是一個專一的Publisher,也就是說最多只能發射出一個類別,可在運作元subscribe任務後,以onComplete方法或onError方法结束。可用於操作Flux子集,可將Mono與另一個發布者的操作與做組合,並切換到Flux資料序列,如Mono.concatWith(Publisher)進行返回一個Flux ,而Mono.then(Mono)則返回一個Mono,亦可用於表示只有執行概念且無返回物件值地執行緒。如:Mono,我們可參照以下流程圖,可得知粗黑垂直線為onComplete方法,紅色X代表onError方法,僅支援一次性發射與運作性任務,提供給各位開發者作參考。

圖一、Mono 物件運作流程圖
image

小編延續先前範例,將台灣區及中國區建立產品的程式碼區段,改為Mono單一執行緒處理方式,確版建立過程不受其他程序影響,可看出在成功時(onComplete)會寫出建立成功的log,錯誤時(onError)會噴出錯誤例外及寫錯誤Log。

  // Taiwan create product flow
    @Override
    public SeaFood createSeaFood(SeaFood seaFood) throws SeaFoodRetailerGenericException {
       validateNullId(seaFood);
        Mono.fromCallable(() ->seaFood).subscribe(
                new Consumer<SeaFood>() {
                    @Override
                    public void accept(SeaFood seaFood1) {
                        SEA_FOOD_CACHE_TAIWAN.asMap().putIfAbsent(seaFood.getId(),seaFood);
                    }
                },
                new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) {
                        try {
                            logger.error("Taiwan product create fail. ex:{}",throwable.getMessage());
                            throw new SeaFoodRetailerGenericException("Out of EXPECT error.");
                        } catch (SeaFoodRetailerGenericException e) {
                            e.printStackTrace();
                        }
                    }
                },
                ()->logger.info("Create Taiwan product success ! "));
        return seaFood;
    }
  
  //china create product flow 
  @Override
    public SeaFood createSeaFood(SeaFood seaFood) throws SeaFoodRetailerGenericException {
        validateNullId(seaFood);
        validateNullId(seaFood);
        Mono.fromCallable(() ->seaFood).subscribe(
                new Consumer<SeaFood>() {
                    @Override
                    public void accept(SeaFood seaFood1) {
                        SEA_FOOD_CACHE_CHINESE.asMap().putIfAbsent(seaFood.getId(),seaFood);
                    }
                },
                new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) {
                        try {
                            logger.error("China product create fail. ex:{}",throwable.getMessage());
                            throw new SeaFoodRetailerGenericException("Out of EXPECT error.");
                        } catch (SeaFoodRetailerGenericException e) {
                            e.printStackTrace();
                        }
                    }
                },
                ()->logger.info("Create China product success ! "));

        return seaFood;
    }

透過Postman發射Create all Product測試結果,可看到以下成功的log,可看出我們一個session發射出兩個執行緒(Thread)。

23:14:08.476  INFO 21055 --- [nio-8080-exec-2] o.s.web.servlet.DispatcherServlet        : Completed initialization in 10 ms
23:14:08.651  INFO 21055 --- [nio-8080-exec-2] s.s.s.s.SeaFoodRetailerServiceImpl       : Create Taiwan product success ! 
23:14:08.654  INFO 21055 --- [nio-8080-exec-2] s.s.s.s.ChinaSeaFoodRetailerServiceImpl  : Create China product success ! 
Subscribe model : {"id":"C-0099","name":"A-cha Crab","description":"Opilio is the primary species referred to as A-cha crab."}

小編透過以上流程與說明提供給各位開發者作參考,架構各位可參考昨日所帶出的Flux資料流架構,都是共通繼承CoreSubscriber介面,僅差別在Mono而外拉出專用的LambdaMonoSubscriber介面,並再客製其特有的方法,其所有處理的靜態方法都是透過onAssembly進行轉換後回傳新Mono對象,為一種方法鏈(Method Chain)寫法。

Sample source

Spring-Sample-Mono

Reference Url

Class Mono

Spring Reactive Stack(一)響應式程式設計中Mono和Flux

Reactor Mono和Flux 进行反应式编程详解


上一篇
[Day - 23] - Spring Reactor之進入忍者龜的Flux
下一篇
[Day - 25] - Spring Reactor Processor 之交易所OrderBook實作與設計
系列文
Wow ! There is no doubt about Learn Spring framework in a month.30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言