iT邦幫忙

2022 iThome 鐵人賽

DAY 20
0
自我挑戰組

Spring In Action系列 第 20

Mono / Flux operation

  • 分享至 

  • xImage
  •  

Reactor中的stream分為兩種:Mono和Flux,差別在於stream所乘載的物件是單個來是多個,若有單個物件傳遞的需求就會使用Mono;反之多個則為使用Flux。

這段會說明一些常用的Mono/Flux的操作方法,如果你對Java 8的stream API有概念的話,會覺得很熟悉。

create flux by array:

@Test
public void createAFlux_fromArray() {
  String[] videos = new String[] {
      "Netflix", "Disney", "Amazon", "HBO", "Pirate" };
  Flux<String> videoFlux = Flux.fromArray(videos);
  StepVerifier.create(videoFlux)
      .expectNext("Netflix")
      .expectNext("Disney")
      .expectNext("Amazon")
      .expectNext("HBO")
      .expectNext("Pirate")
      .verifyComplete();
}

create flux by Iterable:

@Test
public void createAFlux_fromIterable() {
  List<String> videoList = new ArrayList<>();
  fruitList.add("Netflix");
  fruitList.add("Disney");
  fruitList.add("Amazon");
  fruitList.add("HBO");
  fruitList.add("Pirate");
  Flux<String> videoFlux = Flux.fromIterable(videoList);
  StepVerifier.create(videoFlux)
      .expectNext("Netflix")
      .expectNext("Disney")
      .expectNext("Amazon")
      .expectNext("HBO")
      .expectNext("Pirate")
      .verifyComplete();
}

create flux by stream:

@Test
 public void createAFlux_fromStream() {
   Stream<String> videoStream = 
        Stream.of("Netflix", "Disney", "Amazon", "HBO", "Pirate");
   Flux<String> videoFlux = Flux.fromStream(videoStream);
   StepVerifier.create(videoFlux)
       .expectNext("Netflix")
       .expectNext("Disney")
       .expectNext("Amazon")
       .expectNext("HBO")
       .expectNext("Pirate")
       .verifyComplete();
 }

range:

@Test
public void createAFlux_range() {
  Flux<Integer> intervalFlux = Flux.range(1, 5);
  StepVerifier.create(intervalFlux)
      .expectNext(1)
      .expectNext(2)
      .expectNext(3)
      .expectNext(4)
      .expectNext(5)
      .verifyComplete();
}

interval:

@Test
public void createAFlux_interval() {
  Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1)).take(5);
  StepVerifier.create(intervalFlux)
      .expectNext(0L)
      .expectNext(1L)
      .expectNext(2L)
      .expectNext(3L)
      .expectNext(4L)
      .verifyComplete();
}

merge:

@Test
public void mergeFluxes() {
  Flux<String> jobFlux = Flux
      .just("knight", "wizard", "assassin")
      .delayElements(Duration.ofMillis(500));
  Flux<String> weaponFlux = Flux
      .just("sword", "wand", "dagger")
      .delaySubscription(Duration.ofMillis(250))
      .delayElements(Duration.ofMillis(500));
  Flux<String> mergedFlux = jobFlux.mergeWith(weaponFlux);
  StepVerifier.create(mergedFlux)
      .expectNext("knight")
      .expectNext("wizard")
      .expectNext("assassin")
      .expectNext("sword")
      .expectNext("wand")
      .expectNext("dagger")
      .verifyComplete();
}

zip:

@Test
public void zipFluxes() {
  Flux<String> jobFlux = Flux
      .just("knight", "wizard", "assassin");
  Flux<String> weaponFlux = Flux
      .just("sword", "wand", "dagger");
  Flux<Tuple2<String, String>> zippedFlux = 
      Flux.zip(jobFlux, weaponFlux);
  StepVerifier.create(zippedFlux)
        .expectNextMatches(p -> 
            p.getT1().equals("knight") && 
            p.getT2().equals("sword"))
        .expectNextMatches(p -> 
            p.getT1().equals("wizard") && 
            p.getT2().equals("wand"))
        .expectNextMatches(p -> 
            p.getT1().equals("assassin") && 
            p.getT2().equals("dagger"))
        .verifyComplete();
}

zip with Function:

@Test
public void zipFluxesToObject() {
  Flux<String> jobFlux = Flux
      .just("knight", "wizard", "assassin");
  Flux<String> weaponFlux = Flux
      .just("sword", "wand", "dagger");
  Flux<String> zippedFlux = 
      Flux.zip(jobFlux, weaponFlux, (c, f) -> c + " uses " + f);
  StepVerifier.create(zippedFlux)
        .expectNext("knight uses sword")
        .expectNext("wizard uses wand")
        .expectNext("assassin uses dagger")
        .verifyComplete();
}

firstWithSignal:

@Test
public void firstWithSignalFlux() {
  Flux<String> sweetFlux = Flux.just("lollipop", "soda", "tofi")
        .delaySubscription(Duration.ofMillis(100));
  Flux<String> saltyFlux = Flux.just("fries", "burger", "ham");
  Flux<String> firstFlux = Flux.firstWithSignal(sweetFlux, saltyFlux);
  StepVerifier.create(firstFlux)
      .expectNext("fries")
      .expectNext("burger")
      .expectNext("ham")
      .verifyComplete();
}

skip:

@Test
public void skipAFew() {
  Flux<String> countFlux = Flux.just(
      "one", "two", "skip a few", "eighty seven", "eighty eight")
      .skip(3);
  StepVerifier.create(countFlux)
      .expectNext("eighty seven", "eighty eight")
      .verifyComplete();
}

skip by time:

@Test
public void skipAFewSeconds() {
  Flux<String> countFlux = Flux.just(
      "one", "two", "skip a few", "eighty seven", "eighty eight")
      .delayElements(Duration.ofSeconds(1))
      .skip(Duration.ofSeconds(4));
  StepVerifier.create(countFlux)
      .expectNext("eighty seven", "eighty eight")
      .verifyComplete();
}

take:

@Test
public void take() {
  Flux<String> superCarFlux = Flux.just(
      "Bugatti", "Ferrari", "Porsche", "McLaren", "Pagani")
      .take(3);
  StepVerifier.create(superCarFlux)
      .expectNext("Bugatti", "Ferrari", "Porsche")
      .verifyComplete();
}

take by time:

@Test
public void takeForAwhile() {
  Flux<String> superCarFlux = Flux.just(
      "Bugatti", "Ferrari", "Porsche", "McLaren", "Pagani")
      .delayElements(Duration.ofSeconds(1))
      .take(Duration.ofMillis(3500));
  StepVerifier.create(superCarFlux)
      .expectNext("Bugatti", "Ferrari", "Porsche")
      .verifyComplete();
}

filter:

@Test+
public void filter() {
  Flux<String> superCarFlux = Flux.just(
      "Bugatti", "Ferrari", "Porsche", "McLaren", "Pagani")
      .filter(np -> !np.contains(" "));
  StepVerifier.create(superCarFlux)
      .expectNext("Bugatti", "Ferrari", "Porsche")
      .verifyComplete();
}

distinct:

@Test
public void distinct() {
  Flux<String> animalFlux = Flux.just(
      "dog", "cat", "bird", "dog", "bird", "horse")
      .distinct();
  StepVerifier.create(animalFlux)
      .expectNext("dog", "cat", "bird", "horse")
      .verifyComplete();
}

map:

@Test
public void map() {
  Flux<Player> playerFlux = Flux
    .just("Faker Korea", "Toyz HK", "Godtone Taiwan")
    .map(n -> {
      String[] split = n.split("\\s");
      return new Player(split[0], split[1]);
    });
  StepVerifier.create(playerFlux)
      .expectNext(new Player("Michael", "Korea"))
      .expectNext(new Player("Scottie", "HK"))
      .expectNext(new Player("Steve", "Taiwan"))
      .verifyComplete();
}
@Data
private static class Player {
  private final String firstName;
  private final String lastName;
}

flatMap:

@Test
public void flatMap() {
  Flux<Player> playerFlux = Flux
    .just("Faker Korea", "Toyz HK", "Godtone Taiwan")
    .flatMap(n -> Mono.just(n)
        .map(p -> {
            String[] split = p.split("\\s");
            return new Player(split[0], split[1]);
          })
        .subscribeOn(Schedulers.parallel())
      );
  List<Player> playerList = Arrays.asList(
      new Player("Faker", "Korea"), 
      new Player("Toyz", "HK"), 
      new Player("Godtone", "Taiwan"));
  StepVerifier.create(playerFlux)
      .expectNextMatches(p -> playerList.contains(p))
      .expectNextMatches(p -> playerList.contains(p))
      .expectNextMatches(p -> playerList.contains(p))
      .verifyComplete();
}

buffer:

@Test
public void buffer() {
  Flux<String> animalFlux = Flux.just(
      "dog", "cat", "bird", "horse", "monkey");
  Flux<List<String>> bufferedFlux = animalFlux.buffer(3);
  StepVerifier
      .create(bufferedFlux)
      .expectNext(Arrays.asList("dog", "cat", "bird"))
      .expectNext(Arrays.asList("horse", "monkey"))
      .verifyComplete();
}

buffer with flatMap:

@Test
public void bufferAndFlatMap() throws Exception {
  Flux.just(
      "dog", "cat", "bird", "horse", "monkey")
      .buffer(3)
      .flatMap(x -> 
        Flux.fromIterable(x)
          .map(y -> y.toUpperCase())
          .subscribeOn(Schedulers.parallel())   
          .log()
      ).subscribe();
}

collectList:

@Test
public void collectList() {
  Flux<String> animalFlux = Flux.just(
      "dog", "cat", "bird", "horse", "monkey");
  Mono<List<String>> animalListMono = animalFlux.collectList();
  StepVerifier
      .create(animalListMono)
      .expectNext(Arrays.asList(
          "dog", "cat", "bird", "horse", "monkey"))
      .verifyComplete();
}

collectMap:

@Test
public void collectMap() {
  Flux<String> cityFlux = Flux.just(
      "Taichung", "Hsinchu", "Yilan", "Taipei", "Hualian");
  Mono<Map<Character, String>> citylMapMono = 
      cityFlux.collectMap(a -> a.charAt(0));
  StepVerifier
      .create(cityMapMono)
      .expectNextMatches(map -> {
        return
            map.size() == 3 &&
            map.get('T').equals("Taipei") &&
            map.get('H').equals("Hualian") &&
            map.get('Y').equals("Yilan");
      })
      .verifyComplete();
}

all:

@Test
public void all() {
  Flux<String> cityFlux = Flux.just(
      "Taichung", "Hsinchu", "Yilan", "Taipei", "Hualian");
  Mono<Boolean> hasIMono = cityFlux.all(a -> a.contains("i"));
  StepVerifier.create(hasIMono)
    .expectNext(true)
    .verifyComplete();
  Mono<Boolean> hasAMono = cityFlux.all(a -> a.contains("a"));
  StepVerifier.create(hasAMono)
    .expectNext(false)
    .verifyComplete();
}

any:

@Test
public void any() {
  Flux<String> cityFlux = Flux.just(
      "Taichung", "Hsinchu", "Yilan", "Taipei", "Hualian");
  Mono<Boolean> hasTMono = cityFlux.any(a -> a.contains("T"));
  StepVerifier.create(hasTMono)
    .expectNext(true)
    .verifyComplete();
  Mono<Boolean> hasZMono = cityFlux.any(a -> a.contains("z"));
  StepVerifier.create(hasZMono)
    .expectNext(false)
    .verifyComplete();
}

上一篇
Reactive programming & Reactive stream specification
下一篇
Spring WebFlux in controller
系列文
Spring In Action30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言