iT邦幫忙

2021 iThome 鐵人賽

DAY 23
0

Abstract

卡哇勾嘎!!!想必大家都有開發過Publisher-Subscriber架構,有些開發者可能是透過第三放套件進行介接,如:Redis、Kafka或RabbitMQ等套件,或者您系統是在JAVA 8以前開發的,可能是採用RxJAVA或者自己實作Observable Pattern進行管理各項訊息傳播,小編今日要介紹的Spring Reactor套件與RxJava 2採用共同一套接口API標準Reactive Streams Commons,故說明他們的最終目的都是一致的,且這些API都有通用性,如果您曾經有RxJava的開發經驗的話,在今天這個Flux初階介紹,想必是探囊取物一般的簡單,想不想變忍者龜一樣,可以自由地在下水道自由行走,那現在我們就開始來深入分析囉。

Principle Introduction

Reactor 3是基於JDK中提供的java.util.function來設計實現的,可以很輕鬆地從Java.util.stream.Stream轉為Flux(RxJava中的Flowable類型),亦可輕鬆轉回Stream,也可很快速地實現CompletableFuture與Mono(Mono也支援Publisher接口,可被理解為RxJava 2中對Single的背壓(Back Pressure)加強版)之間的互相轉換,所以可以很輕鬆且安全的基於Optional類型的元素創建Mono,所以在操作上可快方便的應用於Spring Framework 5,適用於各種新版的JDK,也是小編為什麼一開始選用Java 15的原因囉。
根據下圖我們可以看到三角行為開始執行訂閱,圓圈可以視為一個物件、I可視為輸入原子(int,long,float等)、X可視為關閉訂閱頻道(channel),所有推播內容會循序發送給個訂閱者,若推播端發生錯誤或關閉時,會自動將所有訂閱者進行關閉,並停止進行接收任何內容。

圖一、實現Publisher接口原理
image

小編延續前幾天的架構,新增一個推播產品控制器,此產品可推播給台灣及中國兩個地區國家共同銷售,故小編先建立一個產品推播服務(CacheSubscribeService),由台灣(SeaFoodRetailerServiceImpl)區服務及中國(ChinaSeaFoodRetailerServiceImpl)區服務當訂閱者,透過以下範例可得知,小編建立一個水池道稱為seaFoodSink,並建立一個通量道(Flux)配置推播者入口道(sink),在配置訂閱者排成類型及行為後,即可開始進行產品的推播任務,控制器僅需把產品放入seaFoodSink中,即可讓相關服務開始進行任務,以下程式碼範例提供參考。

  // Publisher 服務端配置
@Service
public class CacheSubscribeServiceImpl implements CacheSubscribeService {

    Logger logger =  LoggerFactory.getLogger(CacheSubscribeServiceImpl.class);

    private static FluxSink<SeaFood> seaFoodSink;

    @Autowired
    @Qualifier("seaFoodRetailService")
    SeaFoodRetailerService taiwanSeaFoodRetailerService;


    @Autowired
    @Qualifier("chinaSeaFoodRetailService")
    SeaFoodRetailerService chinaSeaFoodRetailerService;

    @PostConstruct
    public void init() {
        Flux.<SeaFood>create(sink -> this.seaFoodSink = sink)
                .doOnNext(seaFood -> {
                    try {
                        taiwanSeaFoodRetailerService.createSeaFood(seaFood);
                        chinaSeaFoodRetailerService.createSeaFood(seaFood);
                    } catch (SeaFoodRetailerGenericException e) {
                        logger.error("Create Sea Food into all Place Fail. ex:" + e.toString());
                    }
                })
                .onErrorReturn(new SeaFood())
                .subscribeOn( Schedulers.elastic())
                .subscribe(seaFood -> {
                    System.out.println("Subscribe model : " + new Gson().toJson(seaFood));
                });

    }

    @Override
    public SeaFood subscribeAllLocationSeaFoodProducts(SeaFood seaFood) {
        seaFoodSink.next(seaFood);
        return seaFood;
    }

}

  
  // 控制器配置
  @RestController
public class PublishProductController extends ControllerBase {

    @Autowired
    CacheSubscribeService cacheSubscribeService;

    @PostMapping(
            value="/${sea.food.api.all}/create",
            produces = MediaType.APPLICATION_JSON_VALUE,
            consumes = MediaType.APPLICATION_JSON_VALUE
    )
    ResponseEntity<SeaFood> createSeaFood(@RequestBody SeaFood entity) throws SeaFoodRetailerGenericException {
        return new ResponseEntity<>(
                cacheSubscribeService.subscribeAllLocationSeaFoodProducts(entity)
                , HttpStatus.CREATED
        );
    }

}

測試結果,進行觸發推播產品
推播產品
image

台灣區及中國區產品均已加入

image

image

故我們可以看到透過Flux可以快速的傳播各項物件,建議最佳的使用環境是在Socket服務上喔,這樣雙邊介面都可快速達到映射物件轉到作用。

Structure

由下圖可看出,LambdaSubscriber實現了CoreSubscriber接口,該接口可衍生出各式各樣的訂閱者,對於生產者Publisher接口,也是可衍生多個,Flux留了抽象subscribe方法,提供給各類別具體實現類來實現,CoreSubscriber繼承了org.reactivestreams.Subscriber接口,並加入一些Reactor3特有的Context功能來實現,所以我們可得知此訂閱邏輯概念,如果在LambdaSubscriber中的引數值subscriptionConsumer不為空的,就會觸發onSubscribe方法,反之,則自動觸發onNext方法,詳細各位可參照程式碼邏輯,這邊就不在多加詳述。

圖二、訊息發送流程圖
image

Monitor Result

透過API監測結果,我們可看到此API創建順利,並回傳成功建立的代碼201,無相關異常產生。
image

Sample Source

Spring-Sample-Flux

Reference Url

Java 編成方法論 - 響應式Spring Reactor 3設計與實現

Reactive Spring實戰 -- 理解Reactor的設計與實現

響應式程式設計簡介之:Reactor


上一篇
[Day - 22] - 今晚我想來個Spring Async非同步的感覺
下一篇
[Day-24] - Spring Reactor Mono 一日初探就上手
系列文
Wow ! There is no doubt about Learn Spring framework in a month.30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言