Reactor中的stream分為兩種:Mono和Flux,差別在於stream所乘載的物件是單個來是多個,若有單個物件傳遞的需求就會使用Mono;反之多個則為使用Flux。
這段會說明一些常用的Mono/Flux的操作方法,如果你對Java 8的stream API有概念的話,會覺得很熟悉。
create flux by array:
@Test
public void createAFlux_fromArray() {
String[] videos = new String[] {
"Netflix", "Disney", "Amazon", "HBO", "Pirate" };
Flux<String> videoFlux = Flux.fromArray(videos);
StepVerifier.create(videoFlux)
.expectNext("Netflix")
.expectNext("Disney")
.expectNext("Amazon")
.expectNext("HBO")
.expectNext("Pirate")
.verifyComplete();
}
create flux by Iterable:
@Test
public void createAFlux_fromIterable() {
List<String> videoList = new ArrayList<>();
fruitList.add("Netflix");
fruitList.add("Disney");
fruitList.add("Amazon");
fruitList.add("HBO");
fruitList.add("Pirate");
Flux<String> videoFlux = Flux.fromIterable(videoList);
StepVerifier.create(videoFlux)
.expectNext("Netflix")
.expectNext("Disney")
.expectNext("Amazon")
.expectNext("HBO")
.expectNext("Pirate")
.verifyComplete();
}
create flux by stream:
@Test
public void createAFlux_fromStream() {
Stream<String> videoStream =
Stream.of("Netflix", "Disney", "Amazon", "HBO", "Pirate");
Flux<String> videoFlux = Flux.fromStream(videoStream);
StepVerifier.create(videoFlux)
.expectNext("Netflix")
.expectNext("Disney")
.expectNext("Amazon")
.expectNext("HBO")
.expectNext("Pirate")
.verifyComplete();
}
range:
@Test
public void createAFlux_range() {
Flux<Integer> intervalFlux = Flux.range(1, 5);
StepVerifier.create(intervalFlux)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectNext(4)
.expectNext(5)
.verifyComplete();
}
interval:
@Test
public void createAFlux_interval() {
Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1)).take(5);
StepVerifier.create(intervalFlux)
.expectNext(0L)
.expectNext(1L)
.expectNext(2L)
.expectNext(3L)
.expectNext(4L)
.verifyComplete();
}
merge:
@Test
public void mergeFluxes() {
Flux<String> jobFlux = Flux
.just("knight", "wizard", "assassin")
.delayElements(Duration.ofMillis(500));
Flux<String> weaponFlux = Flux
.just("sword", "wand", "dagger")
.delaySubscription(Duration.ofMillis(250))
.delayElements(Duration.ofMillis(500));
Flux<String> mergedFlux = jobFlux.mergeWith(weaponFlux);
StepVerifier.create(mergedFlux)
.expectNext("knight")
.expectNext("wizard")
.expectNext("assassin")
.expectNext("sword")
.expectNext("wand")
.expectNext("dagger")
.verifyComplete();
}
zip:
@Test
public void zipFluxes() {
Flux<String> jobFlux = Flux
.just("knight", "wizard", "assassin");
Flux<String> weaponFlux = Flux
.just("sword", "wand", "dagger");
Flux<Tuple2<String, String>> zippedFlux =
Flux.zip(jobFlux, weaponFlux);
StepVerifier.create(zippedFlux)
.expectNextMatches(p ->
p.getT1().equals("knight") &&
p.getT2().equals("sword"))
.expectNextMatches(p ->
p.getT1().equals("wizard") &&
p.getT2().equals("wand"))
.expectNextMatches(p ->
p.getT1().equals("assassin") &&
p.getT2().equals("dagger"))
.verifyComplete();
}
zip with Function:
@Test
public void zipFluxesToObject() {
Flux<String> jobFlux = Flux
.just("knight", "wizard", "assassin");
Flux<String> weaponFlux = Flux
.just("sword", "wand", "dagger");
Flux<String> zippedFlux =
Flux.zip(jobFlux, weaponFlux, (c, f) -> c + " uses " + f);
StepVerifier.create(zippedFlux)
.expectNext("knight uses sword")
.expectNext("wizard uses wand")
.expectNext("assassin uses dagger")
.verifyComplete();
}
firstWithSignal:
@Test
public void firstWithSignalFlux() {
Flux<String> sweetFlux = Flux.just("lollipop", "soda", "tofi")
.delaySubscription(Duration.ofMillis(100));
Flux<String> saltyFlux = Flux.just("fries", "burger", "ham");
Flux<String> firstFlux = Flux.firstWithSignal(sweetFlux, saltyFlux);
StepVerifier.create(firstFlux)
.expectNext("fries")
.expectNext("burger")
.expectNext("ham")
.verifyComplete();
}
skip:
@Test
public void skipAFew() {
Flux<String> countFlux = Flux.just(
"one", "two", "skip a few", "eighty seven", "eighty eight")
.skip(3);
StepVerifier.create(countFlux)
.expectNext("eighty seven", "eighty eight")
.verifyComplete();
}
skip by time:
@Test
public void skipAFewSeconds() {
Flux<String> countFlux = Flux.just(
"one", "two", "skip a few", "eighty seven", "eighty eight")
.delayElements(Duration.ofSeconds(1))
.skip(Duration.ofSeconds(4));
StepVerifier.create(countFlux)
.expectNext("eighty seven", "eighty eight")
.verifyComplete();
}
take:
@Test
public void take() {
Flux<String> superCarFlux = Flux.just(
"Bugatti", "Ferrari", "Porsche", "McLaren", "Pagani")
.take(3);
StepVerifier.create(superCarFlux)
.expectNext("Bugatti", "Ferrari", "Porsche")
.verifyComplete();
}
take by time:
@Test
public void takeForAwhile() {
Flux<String> superCarFlux = Flux.just(
"Bugatti", "Ferrari", "Porsche", "McLaren", "Pagani")
.delayElements(Duration.ofSeconds(1))
.take(Duration.ofMillis(3500));
StepVerifier.create(superCarFlux)
.expectNext("Bugatti", "Ferrari", "Porsche")
.verifyComplete();
}
filter:
@Test+
public void filter() {
Flux<String> superCarFlux = Flux.just(
"Bugatti", "Ferrari", "Porsche", "McLaren", "Pagani")
.filter(np -> !np.contains(" "));
StepVerifier.create(superCarFlux)
.expectNext("Bugatti", "Ferrari", "Porsche")
.verifyComplete();
}
distinct:
@Test
public void distinct() {
Flux<String> animalFlux = Flux.just(
"dog", "cat", "bird", "dog", "bird", "horse")
.distinct();
StepVerifier.create(animalFlux)
.expectNext("dog", "cat", "bird", "horse")
.verifyComplete();
}
map:
@Test
public void map() {
Flux<Player> playerFlux = Flux
.just("Faker Korea", "Toyz HK", "Godtone Taiwan")
.map(n -> {
String[] split = n.split("\\s");
return new Player(split[0], split[1]);
});
StepVerifier.create(playerFlux)
.expectNext(new Player("Michael", "Korea"))
.expectNext(new Player("Scottie", "HK"))
.expectNext(new Player("Steve", "Taiwan"))
.verifyComplete();
}
@Data
private static class Player {
private final String firstName;
private final String lastName;
}
flatMap:
@Test
public void flatMap() {
Flux<Player> playerFlux = Flux
.just("Faker Korea", "Toyz HK", "Godtone Taiwan")
.flatMap(n -> Mono.just(n)
.map(p -> {
String[] split = p.split("\\s");
return new Player(split[0], split[1]);
})
.subscribeOn(Schedulers.parallel())
);
List<Player> playerList = Arrays.asList(
new Player("Faker", "Korea"),
new Player("Toyz", "HK"),
new Player("Godtone", "Taiwan"));
StepVerifier.create(playerFlux)
.expectNextMatches(p -> playerList.contains(p))
.expectNextMatches(p -> playerList.contains(p))
.expectNextMatches(p -> playerList.contains(p))
.verifyComplete();
}
buffer:
@Test
public void buffer() {
Flux<String> animalFlux = Flux.just(
"dog", "cat", "bird", "horse", "monkey");
Flux<List<String>> bufferedFlux = animalFlux.buffer(3);
StepVerifier
.create(bufferedFlux)
.expectNext(Arrays.asList("dog", "cat", "bird"))
.expectNext(Arrays.asList("horse", "monkey"))
.verifyComplete();
}
buffer with flatMap:
@Test
public void bufferAndFlatMap() throws Exception {
Flux.just(
"dog", "cat", "bird", "horse", "monkey")
.buffer(3)
.flatMap(x ->
Flux.fromIterable(x)
.map(y -> y.toUpperCase())
.subscribeOn(Schedulers.parallel())
.log()
).subscribe();
}
collectList:
@Test
public void collectList() {
Flux<String> animalFlux = Flux.just(
"dog", "cat", "bird", "horse", "monkey");
Mono<List<String>> animalListMono = animalFlux.collectList();
StepVerifier
.create(animalListMono)
.expectNext(Arrays.asList(
"dog", "cat", "bird", "horse", "monkey"))
.verifyComplete();
}
collectMap:
@Test
public void collectMap() {
Flux<String> cityFlux = Flux.just(
"Taichung", "Hsinchu", "Yilan", "Taipei", "Hualian");
Mono<Map<Character, String>> citylMapMono =
cityFlux.collectMap(a -> a.charAt(0));
StepVerifier
.create(cityMapMono)
.expectNextMatches(map -> {
return
map.size() == 3 &&
map.get('T').equals("Taipei") &&
map.get('H').equals("Hualian") &&
map.get('Y').equals("Yilan");
})
.verifyComplete();
}
all:
@Test
public void all() {
Flux<String> cityFlux = Flux.just(
"Taichung", "Hsinchu", "Yilan", "Taipei", "Hualian");
Mono<Boolean> hasIMono = cityFlux.all(a -> a.contains("i"));
StepVerifier.create(hasIMono)
.expectNext(true)
.verifyComplete();
Mono<Boolean> hasAMono = cityFlux.all(a -> a.contains("a"));
StepVerifier.create(hasAMono)
.expectNext(false)
.verifyComplete();
}
any:
@Test
public void any() {
Flux<String> cityFlux = Flux.just(
"Taichung", "Hsinchu", "Yilan", "Taipei", "Hualian");
Mono<Boolean> hasTMono = cityFlux.any(a -> a.contains("T"));
StepVerifier.create(hasTMono)
.expectNext(true)
.verifyComplete();
Mono<Boolean> hasZMono = cityFlux.any(a -> a.contains("z"));
StepVerifier.create(hasZMono)
.expectNext(false)
.verifyComplete();
}