對RSocket
有基本的了解之後,照慣例要來做一個簡單的實作,內容參考Josh Long的The RSocket Revolution ,額外調整更新與說明,詳細的code有放上github。(謎:第29天才開始放code)
DEMO方便起見統一放在同一個檔案。第一個範例是演示一個每隔一秒鐘發送一次現在時間(參考The RSocket Revolution),剩下則是實作其他三種RSocket
的互動方式,最後記得要補上設定spring.rsocket.server.port
,要不然不會啟動。
@Controller
@Slf4j
class GreetingController {
@MessageMapping("greetings")
Flux<GreetingResponse> greet(GreetingRequest request) {
log.info("greetings name:{}", request.getName());
Stream<GreetingResponse> generate = Stream.generate(
() -> new GreetingResponse("Hello" + request.getName() + "@" + Instant.now()));
return Flux.fromStream(generate)
.delayElements(Duration.ofSeconds(1)).take(10);
}
@MessageMapping("request-response")
Mono<GreetingResponse> requestResponse(GreetingRequest request) {
log.info("request-response name:{}", request.getName());
return Mono.just(new GreetingResponse("Hello" + request.getName()));
}
@MessageMapping("fire-and-forget")
Mono<Void> fireAndForget(GreetingRequest request) {
log.info("fire-and-forget name:{}", request.getName());
return Mono.empty();
}
@MessageMapping("stream-stream")
Flux<GreetingResponse> channel(Flux<GreetingRequest> request) {
log.info("channel....");
return request.
map(rq -> new GreetingResponse("Hello" +rq.getName()));
}
}
application.properties
spring.rsocket.server.port=8888
有三種方式可以呼叫RSocket
的服務。
RSocket Client CLI (RSC) 是由一名日本人Toshiaki Maki開發,是專門用來呼叫RSocket
的command line服務,等同於之前使用的curl
。
實際呼叫上面寫的服務結果如下,感覺debug模式十分強大,非常詳細。
使用的過程我也真的透過這個工具找到我的問題,大部分command line
工具在windows
系統都會或多或少有些小問題,有跳脫了雙引號,也實際echo
出符合預期的指令,卻怎麼樣都無法通過json
轉換成物件,仔細一看(下圖)才發現實際上傳送是沒有加上雙引號的,最後額外補上斜線才成功。rsc tcp://localhost:8888 --stream --route greetings --log --debug -d "{ ""name"" : ""Josh""}"
rsc tcp://localhost:8888 --stream --route greetings --log --debug -d "{\""name\"":\""Josh\""}"
因為client端不是一個服務不需一直維持啟動的狀態,所以補一個System.in.read()
讓main thread
暫停,要不然會看不到執行結果就已經結束了,下面就透過ApplicationListener
實際去呼叫四種不同的溝通方式,要注意channel不是用array
而是Flux
,在Reactive裡面array
也只是一個單純的Request。
public static void main(String[] args) throws IOException {
SpringApplication.run(RsocketClientApplication.class, args);
System.in.read();
}
@Bean
RSocketRequester rSocketRequester(RSocketRequester.Builder builder) {
return builder.tcp("localhost", 8888);
}
@Bean
ApplicationListener<ApplicationReadyEvent> client(RSocketRequester client) {
return arg -> {
Flux<GreetingResponse> greetingResponseFlux = client.route("greetings")
.data(new GreetingRequest("Robert"))
.retrieveFlux(GreetingResponse.class);
greetingResponseFlux.subscribe(System.out::println);
Mono<GreetingResponse> requestResponse = client.route("request-response")
.data(new GreetingRequest("request-response :Robert"))
.retrieveMono(GreetingResponse.class);
requestResponse.subscribe(System.out::println);
Mono<Void> fireAndForget = client.route("fire-and-forget")
.data(new GreetingRequest("Robert"))
.retrieveMono(Void.class);
fireAndForget.subscribe(System.out::println);
Flux<GreetingResponse> channel = client.route("stream-stream")
.data(Flux.just(new GreetingRequest("Robert"),new GreetingRequest("Jerry"),new GreetingRequest("Rhys")))
.retrieveFlux(GreetingResponse.class);
channel.subscribe(System.out::println);
};
}
在微服務當中,少不了穿插許多呼叫其他服務的部分,若只用RestTemplate
程式碼相對會比較雜亂,Spring Cloud OpenFeign
提供了相對乾淨整齊的介面來解決這個問題,而Spring Retrosocket
就是提供給RSocket Feign風格的client端,目前還在開發實驗階段。
上班日趕稿比較緊繃一點,下班時間光是準備資料都不太夠,利用中午時間終於趕了一篇。RSocket還有不少內容可以研究,但今天已經是Day29,看來只能留給後人再去探索了。
註:這是一個可能沒有Part2的Part1,因為下一篇是應該會是感想。