好的,我們已先行敘述過Flux及Mano兩項角色套件,最後,我們開始進行介紹Reactor間接角色套件,Reactor 3 是在Spring Framework 5.2中才開始進行登場,Reactor再以發展近六年的時間,但相對較少人使用,今天來跟大家介紹核心處理器(Processor)功能,它有兩個實現類MonoProcessor及FluxProcessor,由於Mono小編昨日以提過,僅支援一對一架構,故小編今日就只有介紹由FluxProcessor延伸出來的六種處理器囉,我們將運用此新技術進行開發一個仿造交易所的OrderBook功能。
Processor是一個很特別的存在,他是一個推播器(Publisher),同時也是一個訂閱者(Subscriber),也就是說可同時行使兩種權力,在前天我們可以通過特定累Create方法來創建一個FluxSink推播者,但若您想設置一個初始的生產源,則可以通過此調用已創建好Processor實例的sink方法,通過他創建一個SerializedSink或SerializeOnRequestSink(其內部調用的都是FluxCreate.createSink中相關方法),所以在觸發FluxHandle中包裝的訂閱者HandleSubscriber的onNext方法時,才會調用BiConsumer,此BiConsumer的內部最多只能調用一次SynchronousSink#next(Object),其中會調用0~1項SynchronousSink#error(Throwable生產者異常的情況下)或SynchronousSink#complete(定義某些情況下直接結束)操作。下面有六項延伸的處理器(Processor),小編陸續在下面表格做介紹。
Processor name | Description |
---|---|
TopicProcessor | 算是一個相對複雜的Processor,這是一個異步處理器,基於share方法獲得一個TopicProcessor時,可以轉發從上游多個元素生產下發元素(但同時只能訂閱一個Publisher)。 |
WorkQueueProcessor | 與TopicProcessor相同,也是一個異步處理器,shared設置為true時,會支持多個元素生產執行緒並下發元素 |
ReplayProcessor | 可針對推播者或通過它自己創建的Sink實力向下發送元素及進行緩存,後來新訂閱者可以重複接收這些元素,可透過create()方法配置緩存數量,即可達到暫存量效用。 |
EmitterProcessor | 可同時有多個訂閱者,並為訂閱者提供背壓支持功能,也可作為訂閱者來訂閱上游訂閱者,並同步向下發送元素給自己的訂閱者們,也就是說只要第一個訂閱者配置dispose(),後面所有訂閱者都會收不到推播者訊息,開發者要小心配置才行。 |
DirectProcessor | 可接收0到N個Subscriber,DirectProcessor不支持背壓,內部沒有用於存儲元素的數據結構(onNext也沒有提供元素的緩存操作) |
UnicastProcessor | 通過一個字定義的queue來實現背壓方式,僅允許一個訂閱者(Subscriber)及UnicastProcessor(單一推播器)可以在多個執行緒(Thread)中生產元素。 |
# 背壓方式(Backpressure) : 輸入轉為輸出過程以某種方式受到阻礙,此種阻力為計算處理速度,又稱吞吐量(throughput)
通過以上的處理器介紹,小編依舊將先前的銷售產品服務API 繼續延續,我們在此套加上一個最新的三項產品推播功能,只要有新的產品會將其加入ReplyProcessor來做新產品的緩存處理器,小編稱之Product OrderBook,可參照一下程式碼。
配置台灣區及中國區的ReplyProcessor,並建立各自的資料流推播器(FluxSink),再將預設好的產品推播進去。
public class ProductsOrderBookServiceImpl implements ProductsOrderBookService {
ReplayProcessor<SeaFood> taiwanProductProcessor = null;
ReplayProcessor<SeaFood> chinaProductProcessor = null;
FluxSink<SeaFood> taiwanProductSink = null;
FluxSink<SeaFood> chinaProductSink = null;
@PostConstruct
public void init() {
//預設配置三項緩存
this.taiwanProductProcessor = ReplayProcessor.create(3);
//建立台灣區推播器
this.taiwanProductSink = this.taiwanProductProcessor.sink();
//將台灣區預設產品推播出去
SEA_FOOD_CACHE_TAIWAN
.asMap()
.values()
.stream()
.forEach(seaFood -> taiwanProductSink.next(seaFood));
//預設配置三項緩存
this.chinaProductProcessor = ReplayProcessor.create(3);
//建立中國區推播器
this.chinaProductSink = this.chinaProductProcessor.sink();
//將中國區預設產品推播出去
SEA_FOOD_CACHE_CHINESE
.asMap()
.values()
.stream()
.forEach(seaFood -> chinaProductSink.next(seaFood));
}
}
當API觸發時,確認是新品推播到OrderBook中
@Override
public SeaFood createSeaFood(SeaFood seaFood) throws SeaFoodRetailerGenericException {
validateNullId(seaFood);
Mono.fromCallable(() ->seaFood).subscribe(
new Consumer<SeaFood>() {
@Override
public void accept(SeaFood seaFood1) {
SeaFood product = SEA_FOOD_CACHE_TAIWAN.asMap().putIfAbsent(seaFood.getId(),seaFood);
if (product == null)
//Map 產品池不存在,將產品加入OrderBook
productsOrderBookService.publishTaiwanProduct(seaFood);
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
try {
logger.error("Taiwan product create fail. ex:{}",throwable.getMessage());
throw new SeaFoodRetailerGenericException("Out of EXPECT error.");
} catch (SeaFoodRetailerGenericException e) {
e.printStackTrace();
}
}
},
()->logger.info("Create Taiwan product success ! "));
return seaFood;
}
建立取得OrderBook API ,並將各自的OrderBook放置Map集合中。
@Autowired
ProductsOrderBookService productsOrderBookService;
@GetMapping(
value="/orderbook/${sea.food.api.all}",
produces = MediaType.APPLICATION_JSON_VALUE,
consumes = MediaType.APPLICATION_JSON_VALUE
)
ResponseEntity< Map<String , List<SeaFood>>> orderBookSeaFood() throws SeaFoodRetailerGenericException {
Map<String , List<SeaFood>> orderBookMap = new LinkedHashMap<String,List<SeaFood>>();
orderBookMap.put("TW",productsOrderBookService.getTaiwanOrderBook());
orderBookMap.put("CN",productsOrderBookService.getChinaOrderBook());
return new ResponseEntity<>(
orderBookMap
, HttpStatus.CREATED
);
}
透過以上範例我們可透過ReplyProcessor來實作各種區塊鏈交易所得OrderBook,再透過Redis 進行持續存取效用,會達到最佳效益喔。
圖1. Default OrderBook
圖2. Create Taiwan Product
圖3. Create China Product
圖4. After Create Product OrderBook
其緩存分為無限存取與有限存取兩種情況,其緩存是可重複向訂閱者發送,元素不會隨者一次向下發送而消失,緩存的型態非一般陣列,而是採用另外他種數據結構,若要取用最後一個元素,即可呼叫cacheLast方法,此項處理器還有特別之處分別為可結合歷史元素的緩存數量和元素緩存時間限制來設計的API,即為createSizeAndTimeout方法。最後則為通過FluxReplay.SizeAndTimeBoundReplayBuffer實現基於時間調度的ReplayProcessor,緩存的歷史元素有過期時間限制。從這邊邏輯來看,我們可以看出Processor是一個中間者,當進行下發元素時,他會充當為Flux的一個訂閱者角色,此時為調用它的onNext方法進行發送元素,同時,Processor作為生產者的作用也可發揮出來,並進他推播給自己在代理,即可將Flux上游發送出來的元素向下發送給訂閱自己的二級訂閱者。即可完成快取重複發送給訂閱者行為,故整合下方結構圖(圖),我們可看出在沒有需求的情況下,並不會主動進行發送元素的行為,所以就看不到任何輸出。
圖5. Reply Process架構策略圖
Java 編成方法論 - 響應式Spring Reactor 3設計與實現
Reactor Processors——响应式Spring的道法术器