iT邦幫忙

2022 iThome 鐵人賽

DAY 26
0
自我挑戰組

Spring In Action系列 第 26

RSocket

  • 分享至 

  • xImage
  •  

RSocket提供了四種request-response模式,為擺脫傳統單一request單一response的限制而生,並原生以WebFlux的reactive方式來架構程式。

引入以下dependency來使用RSocket:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
#application.yaml
spring:
  rsocket:
    server:
      port: 7000

這4種模式為request-reposne、request-stream、fire-and-forget、channel。

1.request-response

server:

package rsocket;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
 
@Controller
@Slf4j
public class HelloController {
    @MessageMapping("hello/{name}")
    public Mono<String> handleRequest(@DestinationVariable("name") String name, 
													Mono<String> helloMono) {
        return helloMono
            .doOnNext(hello -> 
							logger.info("Received a hello: {} says {}", name, hello)
						)
            .map(hello -> "Hello, " + name + "!");
    }
}

client:

RSocketRequester tcp = requesterBuilder.tcp("localhost", 7000);
String name = "Mario";
tcp.route("hello/{name}", name)
  .data("Hello RSocket!")
  .retrieveMono(String.class)
  .subscribe(response -> log.info("Got a response: {}", response));

2.request-steam

server:

import java.math.BigDecimal;
import java.time.Duration;
import java.time.Instant;
import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;
 
@Controller
public class StockQuoteController {
    @MessageMapping("stock/{symbol}")
    public Flux<StockQuote> getStockPrice(
            @DestinationVariable("symbol") String symbol) {
        return Flux
            .interval(Duration.ofSeconds(1))
            .map(symbol -> {
                return SomePricer.someLogicReturnStockQuote(symbol);
            });
    }
}

client:

String stockSymbol = "TSLA";
RSocketRequester tcp = requesterBuilder.tcp("localhost", 7000);
tcp.route("stock/{symbol}", stockSymbol)
   .retrieveFlux(StockQuote.class)
   .doOnNext(stockQuote -> 
       logger.info("StockQuote: " + stockQuote)
   )
   .subscribe();

3.fire-and-forget

server:

import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
 
@Controller
@Slf4j
public class AlertController {
 
    @MessageMapping("alert")
    public Mono<Void> setAlert(Mono<Alert> alertMono) {
        return alertMono
            .doOnNext(alert ->
                logger.info("Receive alert: " + alert)
            )
            .thenEmpty(Mono.empty());
    }
    
}

client:

RSocketRequester tcp = requesterBuilder.tcp("localhost", 7000);
tcp.route("alert")
   .data(new Alert("red alert!"))
   .send()
   .subscribe();
logger.info("Alert sent");

4.channel

server:

import java.math.BigDecimal;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
 
@Controller
@Slf4j
public class TipsController {
 
    @MessageMapping("tip")
    public Flux<PriceOutput> calculate(Flux<PriceInput> priceInputFlux) {
        return priceInputFlux
            .doOnNext(in -> logger.info("Calculating tips:  {}", in))
            .map(in -> PriceOutputCalculator.getPriceOutput(in));
    }
 
}

client:

RSocketRequester tcp = requesterBuilder.tcp("localhost", 7000);
 
Flux<PriceInput> priceInputFlux =
        Flux.fromArray(new PriceInput[] {
                new PriceInput(10000),
                new PriceInput(3500),
                new PriceInput(7800)
        })
        .delayElements(Duration.ofSeconds(1));
 
        tcp.route("tips")
           .data(priceInputFlux)
           .retrieveFlux(PriceOutput.class)
           .subscribe(out -> logger.info("PriceOutput: " + out);

RSocket除了設定TCP協定的port來接收訊息外,也可設定成websocket的方式。引入下面的dependency:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
#application.yaml
spring:
  rsocket:
    server:
      transport: websocket
      mapping-path: /rsocket

client example:

RSocketRequester requester = requesterBuilder.websocket(
																			URI.create("ws://localhost:8081/rsocket"));

requester.route("hello")
				 .data("Hello RSocket in websocket!")
				 .retrieveMono(String.class)
				 .subscribe(response -> logger.info("Resposne: " + response);

上一篇
Send request as a client in reactive programming
下一篇
Spring Boot Actuator
系列文
Spring In Action30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言