上一篇介紹了何謂FLUX & MONO,本篇就來說明具體的使用方式。
最簡單建立Flux or Mono的方法Just
Flux<String> seq1 = Flux.just("Robert", "Jean", "Jerry");
//從Iterable建立
Flux<String> seq2 = Flux.fromIterable(Arrays.asList("Robert", "Jean", "Jerry"));
//一個範圍
Flux<Integer> numbersFromFiveToSix = Flux.range(5, 2);
Mono<String> mono = Mono.just("Robert");
//即便是空的也要給泛型
Mono<String> mono1 = Mono.empty();
Mono<String> mono2 = Mono.justOrEmpty("Robert");
Reactor doc有很多精美的圖片
Reactive Steam是lazy的,而Flux
& Mono
是Publisher,所以這時候就需要subscribe來讓Publisher動起來。常見的subscribe有三個參數,也可以只傳一個或兩個,第一個參數就是需要對資料做甚麼處理,第二則是當發生error需要如何處理、最後則是完成的時候,跟之前Java9裡面onNext
、onError
、onComplete
的概念一樣,要注意error
跟complete
都是終結(terminal)訊號,就跟java 8 stream.collect
stream.foreach
只會有一個而不會同時出現。
subscribe();
subscribe(Consumer<? super T> consumer);
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer);
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer);
Flux<Integer> ints = Flux.range(1, 4);
ints.subscribe(i -> System.out.println(i),
error -> System.err.println("Error " + error),
() -> System.out.println("Done"));
// 1
// 2
// 3
// 4
// Done
操作flux內的資料做法類似於java 8 stream,簡單介紹幾個常用的方法
轉換每一個flux內的資料
從文件上來看,簡化後map的參數是Function<T, V>,flatMap參數Function<T, Publisher>,可以很容易的看出來flatMap
會傳入另一個Publisher,從文件上的圖來看就是將兩條Publisher攤開合併成同一條。
從下面這個範例更容易去理解,兩個字串經過flatpMap後變成十個字元。
Flux<String> strFlux = Flux.just("robert", "wang");
strFlux.flatMap(i -> Flux.just(i.split(""))).subscribe(System.out::println);
//r
//o
//b
//....
flatMap | map |
---|---|
轉換一個來源對應一個Flux(N個) | 轉換一個來源對應一個輸出 |
subscribe每一個Flux | 單純的處理轉換 |
非同步(async) | 同步(synchronous) |
類似的還有mergeWith
,他是單純傳入一個publisher而且是同一個類別的資料,圖形上可以看出來都是圓形的圖案,也就是合併兩個內含是同類別的Publisher。
Filter
stream中常常使用到的filter
過濾,過濾出符合條件的資料。
Zip
Zip會合併多個Publisher,通常會搭配map
/flatMap
使用,根據文件可以看出回傳Tuple物件,單純就是幫你把各個物件組合起來方便取用,要注意就是長度若不一樣,會在最短的完成的時候就結束。
這個範例是可能當你有拿到各種資料(依造順序),需要把所有資料組合成一個客戶的物件時,zip就能很好的處理,下面有三個基本資料的Flux,透過zip整合配對後轉為Customer,要注意第三個人因為性別沒有資料就不會處理到。
{
Flux<String> name = Flux.just("Robert", "Jean", "Jerry");
Flux<Integer> age = Flux.just(30, 29, 36);
Flux<String> sex = Flux.just("M", "F");
Flux<Tuple3<String, Integer, String>> zip = Flux.zip(name, age, sex);
zip
.map(data -> new Customer(data.getT1(), data.getT2(), data.getT3()))
.subscribe(System.out::println);
// Customer{name='Robert', age=30, sex='M'}
// Customer{name='Jean', age=29, sex='F'}
}
class Customer{
private String name;
private Integer age;
private String sex;
public Customer(String name, Integer age, String sex) {
this.name = name;
this.age = age;
this.sex = sex;
}
@Override
public String toString() {
return "Customer{" +
"name='" + name + '\'' +
", age=" + age +
", sex='" + sex + '\'' +
'}';
}
}
部分的操作跟java stream類似,其餘的等用到再來說明,下一篇來說明之前提到的backpressure
。