RSocket提供了四種request-response模式,為擺脫傳統單一request單一response的限制而生,並原生以WebFlux的reactive方式來架構程式。
引入以下dependency來使用RSocket:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
#application.yaml
spring:
rsocket:
server:
port: 7000
這4種模式為request-reposne、request-stream、fire-and-forget、channel。
1.request-response
server:
package rsocket;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
@Controller
@Slf4j
public class HelloController {
@MessageMapping("hello/{name}")
public Mono<String> handleRequest(@DestinationVariable("name") String name,
Mono<String> helloMono) {
return helloMono
.doOnNext(hello ->
logger.info("Received a hello: {} says {}", name, hello)
)
.map(hello -> "Hello, " + name + "!");
}
}
client:
RSocketRequester tcp = requesterBuilder.tcp("localhost", 7000);
String name = "Mario";
tcp.route("hello/{name}", name)
.data("Hello RSocket!")
.retrieveMono(String.class)
.subscribe(response -> log.info("Got a response: {}", response));
2.request-steam
server:
import java.math.BigDecimal;
import java.time.Duration;
import java.time.Instant;
import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;
@Controller
public class StockQuoteController {
@MessageMapping("stock/{symbol}")
public Flux<StockQuote> getStockPrice(
@DestinationVariable("symbol") String symbol) {
return Flux
.interval(Duration.ofSeconds(1))
.map(symbol -> {
return SomePricer.someLogicReturnStockQuote(symbol);
});
}
}
client:
String stockSymbol = "TSLA";
RSocketRequester tcp = requesterBuilder.tcp("localhost", 7000);
tcp.route("stock/{symbol}", stockSymbol)
.retrieveFlux(StockQuote.class)
.doOnNext(stockQuote ->
logger.info("StockQuote: " + stockQuote)
)
.subscribe();
3.fire-and-forget
server:
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
@Controller
@Slf4j
public class AlertController {
@MessageMapping("alert")
public Mono<Void> setAlert(Mono<Alert> alertMono) {
return alertMono
.doOnNext(alert ->
logger.info("Receive alert: " + alert)
)
.thenEmpty(Mono.empty());
}
}
client:
RSocketRequester tcp = requesterBuilder.tcp("localhost", 7000);
tcp.route("alert")
.data(new Alert("red alert!"))
.send()
.subscribe();
logger.info("Alert sent");
4.channel
server:
import java.math.BigDecimal;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Controller
@Slf4j
public class TipsController {
@MessageMapping("tip")
public Flux<PriceOutput> calculate(Flux<PriceInput> priceInputFlux) {
return priceInputFlux
.doOnNext(in -> logger.info("Calculating tips: {}", in))
.map(in -> PriceOutputCalculator.getPriceOutput(in));
}
}
client:
RSocketRequester tcp = requesterBuilder.tcp("localhost", 7000);
Flux<PriceInput> priceInputFlux =
Flux.fromArray(new PriceInput[] {
new PriceInput(10000),
new PriceInput(3500),
new PriceInput(7800)
})
.delayElements(Duration.ofSeconds(1));
tcp.route("tips")
.data(priceInputFlux)
.retrieveFlux(PriceOutput.class)
.subscribe(out -> logger.info("PriceOutput: " + out);
RSocket除了設定TCP協定的port來接收訊息外,也可設定成websocket的方式。引入下面的dependency:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
#application.yaml
spring:
rsocket:
server:
transport: websocket
mapping-path: /rsocket
client example:
RSocketRequester requester = requesterBuilder.websocket(
URI.create("ws://localhost:8081/rsocket"));
requester.route("hello")
.data("Hello RSocket in websocket!")
.retrieveMono(String.class)
.subscribe(response -> logger.info("Resposne: " + response);