iT邦幫忙

2021 iThome 鐵人賽

DAY 29
0
Software Development

從零開始Reactive Programming- Spring系列 第 30

[Day 29] Reactive Programming - RSocket (Hello World) Part 1

前言

RSocket有基本的了解之後,照慣例要來做一個簡單的實作,內容參考Josh Long的The RSocket Revolution ,額外調整更新與說明,詳細的code有放上github。(謎:第29天才開始放code)

RSocket Server

DEMO方便起見統一放在同一個檔案。第一個範例是演示一個每隔一秒鐘發送一次現在時間(參考The RSocket Revolution),剩下則是實作其他三種RSocket的互動方式,最後記得要補上設定spring.rsocket.server.port,要不然不會啟動。


@Controller
@Slf4j
class GreetingController {

  @MessageMapping("greetings")
  Flux<GreetingResponse> greet(GreetingRequest request) {
    log.info("greetings name:{}", request.getName());
    Stream<GreetingResponse> generate = Stream.generate(
        () -> new GreetingResponse("Hello" + request.getName() + "@" + Instant.now()));

    return Flux.fromStream(generate)
        .delayElements(Duration.ofSeconds(1)).take(10);

  }

  @MessageMapping("request-response")
  Mono<GreetingResponse> requestResponse(GreetingRequest request) {
    log.info("request-response name:{}", request.getName());
    return Mono.just(new GreetingResponse("Hello" + request.getName()));

  }
  @MessageMapping("fire-and-forget")
  Mono<Void> fireAndForget(GreetingRequest request) {
    log.info("fire-and-forget name:{}", request.getName());
    return Mono.empty();

  }
  @MessageMapping("stream-stream")
  Flux<GreetingResponse> channel(Flux<GreetingRequest> request) {
    log.info("channel....");
    return request.
        map(rq -> new GreetingResponse("Hello" +rq.getName()));

  }

}

application.properties

spring.rsocket.server.port=8888

RSocket Client

有三種方式可以呼叫RSocket的服務。

RSC

RSocket Client CLI (RSC) 是由一名日本人Toshiaki Maki開發,是專門用來呼叫RSocket的command line服務,等同於之前使用的curl
實際呼叫上面寫的服務結果如下,感覺debug模式十分強大,非常詳細。
https://ithelp.ithome.com.tw/upload/images/20211013/20141418VabPHXmOq5.png
使用的過程我也真的透過這個工具找到我的問題,大部分command line工具在windows系統都會或多或少有些小問題,有跳脫了雙引號,也實際echo出符合預期的指令,卻怎麼樣都無法通過json轉換成物件,仔細一看(下圖)才發現實際上傳送是沒有加上雙引號的,最後額外補上斜線才成功。
rsc tcp://localhost:8888 --stream --route greetings --log --debug -d "{ ""name"" : ""Josh""}"
https://ithelp.ithome.com.tw/upload/images/20211013/20141418kazTD5wtrM.png
rsc tcp://localhost:8888 --stream --route greetings --log --debug -d "{\""name\"":\""Josh\""}"

RSocketRequester

因為client端不是一個服務不需一直維持啟動的狀態,所以補一個System.in.read()main thread暫停,要不然會看不到執行結果就已經結束了,下面就透過ApplicationListener實際去呼叫四種不同的溝通方式,要注意channel不是用array而是Flux,在Reactive裡面array也只是一個單純的Request。

public static void main(String[] args) throws IOException {
 SpringApplication.run(RsocketClientApplication.class, args);
 System.in.read();
}
@Bean
RSocketRequester rSocketRequester(RSocketRequester.Builder builder) {
 return builder.tcp("localhost", 8888);
}

@Bean
ApplicationListener<ApplicationReadyEvent> client(RSocketRequester client) {
 return  arg -> {
  Flux<GreetingResponse> greetingResponseFlux = client.route("greetings")
    .data(new GreetingRequest("Robert"))
    .retrieveFlux(GreetingResponse.class);

  greetingResponseFlux.subscribe(System.out::println);

  Mono<GreetingResponse> requestResponse = client.route("request-response")
    .data(new GreetingRequest("request-response :Robert"))
    .retrieveMono(GreetingResponse.class);

  requestResponse.subscribe(System.out::println);

  Mono<Void> fireAndForget = client.route("fire-and-forget")
    .data(new GreetingRequest("Robert"))
    .retrieveMono(Void.class);

  fireAndForget.subscribe(System.out::println);

  Flux<GreetingResponse> channel = client.route("stream-stream")
    .data(Flux.just(new GreetingRequest("Robert"),new GreetingRequest("Jerry"),new GreetingRequest("Rhys")))
    .retrieveFlux(GreetingResponse.class);

  channel.subscribe(System.out::println);
 };
}

https://ithelp.ithome.com.tw/upload/images/20211013/20141418lUOgXoDAav.png

Spring Retrosocket

在微服務當中,少不了穿插許多呼叫其他服務的部分,若只用RestTemplate程式碼相對會比較雜亂,Spring Cloud OpenFeign 提供了相對乾淨整齊的介面來解決這個問題,而Spring Retrosocket就是提供給RSocket Feign風格的client端,目前還在開發實驗階段。

結語

上班日趕稿比較緊繃一點,下班時間光是準備資料都不太夠,利用中午時間終於趕了一篇。RSocket還有不少內容可以研究,但今天已經是Day29,看來只能留給後人再去探索了。
註:這是一個可能沒有Part2的Part1,因為下一篇是應該會是感想。

資料來源

上一篇
[Day 28] Reactive Programming - RSocket Part2
下一篇
[Day 30] Reactive Programming - RSocket (Hello World) Part 2
系列文
從零開始Reactive Programming- Spring32

尚未有邦友留言

立即登入留言