iT邦幫忙

2021 iThome 鐵人賽

DAY 8
0
Software Development

從零開始Reactive Programming- Spring系列 第 9

[Day 8] Reactive Programming - Reactor(FLUX & MONO) Part 2

  • 分享至 

  • xImage
  •  

前言

上一篇介紹了何謂FLUX & MONO,本篇就來說明具體的使用方式。

Just

最簡單建立Flux or Mono的方法Just

Flux<String> seq1 = Flux.just("Robert", "Jean", "Jerry");
//從Iterable建立
Flux<String> seq2 = Flux.fromIterable(Arrays.asList("Robert", "Jean", "Jerry"));
//一個範圍
Flux<Integer> numbersFromFiveToSix = Flux.range(5, 2); 
Mono<String> mono = Mono.just("Robert");
//即便是空的也要給泛型
Mono<String> mono1 = Mono.empty(); 
Mono<String> mono2 = Mono.justOrEmpty("Robert"); 

Reactor doc有很多精美的圖片

https://ithelp.ithome.com.tw/upload/images/20210922/20141418gmKjF5Ejwx.png
https://ithelp.ithome.com.tw/upload/images/20210922/20141418FHsPRXMwsD.png
https://ithelp.ithome.com.tw/upload/images/20210922/20141418bUiCiwpbqC.png

subscribe

Reactive Steam是lazy的,而Flux & Mono是Publisher,所以這時候就需要subscribe來讓Publisher動起來。常見的subscribe有三個參數,也可以只傳一個或兩個,第一個參數就是需要對資料做甚麼處理,第二則是當發生error需要如何處理、最後則是完成的時候,跟之前Java9裡面onNextonErroronComplete的概念一樣,要注意errorcomplete都是終結(terminal)訊號,就跟java 8 stream.collect  stream.foreach 只會有一個而不會同時出現。

subscribe();  
subscribe(Consumer<? super T> consumer);  
subscribe(Consumer<? super T> consumer, 
          Consumer<? super Throwable> errorConsumer);  
subscribe(Consumer<? super T> consumer, 
          Consumer<? super Throwable> errorConsumer, 
          Runnable completeConsumer);
Flux<Integer> ints = Flux.range(1, 4);
ints.subscribe(i -> System.out.println(i),
    error -> System.err.println("Error " + error),
    () -> System.out.println("Done"));
// 1
// 2
// 3
// 4
// Done

https://ithelp.ithome.com.tw/upload/images/20210922/20141418b0FXL1Qu5w.png

操作flux內的資料做法類似於java 8 stream,簡單介紹幾個常用的方法

map

轉換每一個flux內的資料
https://ithelp.ithome.com.tw/upload/images/20210922/20141418aKpVazjmB0.png

flatMap

從文件上來看,簡化後map的參數是Function<T,  V>,flatMap參數Function<T,  Publisher>,可以很容易的看出來flatMap會傳入另一個Publisher,從文件上的圖來看就是將兩條Publisher攤開合併成同一條。
https://ithelp.ithome.com.tw/upload/images/20210922/20141418LXR4RTcZuc.png
從下面這個範例更容易去理解,兩個字串經過flatpMap後變成十個字元。

Flux<String> strFlux = Flux.just("robert", "wang");
strFlux.flatMap(i -> Flux.just(i.split(""))).subscribe(System.out::println);
//r
//o
//b
//....

flatMap vs map

flatMap map
轉換一個來源對應一個Flux(N個) 轉換一個來源對應一個輸出
subscribe每一個Flux 單純的處理轉換
非同步(async) 同步(synchronous)

mergeWith

類似的還有mergeWith,他是單純傳入一個publisher而且是同一個類別的資料,圖形上可以看出來都是圓形的圖案,也就是合併兩個內含是同類別的Publisher。
https://ithelp.ithome.com.tw/upload/images/20210922/20141418yW0z3kElQq.png
Filter
stream中常常使用到的filter過濾,過濾出符合條件的資料。
https://ithelp.ithome.com.tw/upload/images/20210922/20141418jnK5fArLYb.png
Zip
Zip會合併多個Publisher,通常會搭配map/flatMap使用,根據文件可以看出回傳Tuple物件,單純就是幫你把各個物件組合起來方便取用,要注意就是長度若不一樣,會在最短的完成的時候就結束。
https://ithelp.ithome.com.tw/upload/images/20210922/20141418pARb5D4lLm.png
這個範例是可能當你有拿到各種資料(依造順序),需要把所有資料組合成一個客戶的物件時,zip就能很好的處理,下面有三個基本資料的Flux,透過zip整合配對後轉為Customer,要注意第三個人因為性別沒有資料就不會處理到。

  {
     Flux<String> name = Flux.just("Robert", "Jean", "Jerry");
    Flux<Integer> age = Flux.just(30, 29, 36);
    Flux<String> sex = Flux.just("M", "F");
    Flux<Tuple3<String, Integer, String>> zip = Flux.zip(name, age, sex);
     zip
        .map(data -> new Customer(data.getT1(), data.getT2(), data.getT3()))
        .subscribe(System.out::println);
//    Customer{name='Robert', age=30, sex='M'}
//    Customer{name='Jean', age=29, sex='F'}
  }

  class Customer{
    private String name;
    private Integer age;
    private String sex;

    public Customer(String name, Integer age, String sex) {
      this.name = name;
      this.age = age;
      this.sex = sex;
    }

    @Override
    public String toString() {
      return "Customer{" +
          "name='" + name + '\'' +
          ", age=" + age +
          ", sex='" + sex + '\'' +
          '}';
    }
  }

結語

部分的操作跟java stream類似,其餘的等用到再來說明,下一篇來說明之前提到的backpressure

資料來源


上一篇
[Day 7] Reactive Programming - Reactor(FLUX & MONO) Part 1
下一篇
[Day 9] Reactive Programming - Backpressure
系列文
從零開始Reactive Programming- Spring32
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言