上一篇已經將 CreateOrderSaga
的部分實作出來了,不過還需要將訂單服務、商品服務及物流服務的部分實作出來,並訂閱 Orchestrator 發送的 Command。廢話不多說,趕快開始吧!
透過 Nx 建立一個名為 order-service
的 Application。由於 order-service
會需要提供 RESTful API 讓 Client 可以建立訂單、查詢訂單,同時會需要建立微服務應用程式來接收其他服務的事件,這裡會需要調整 main.ts
的內容,透過 INestApplication
實例的 connectMicroservice
方法來產生微服務應用程式實例,如此一來,便能夠保留一般 HTTP Server 的功能又擁有微服務應用程式的功能:
import { Logger } from '@nestjs/common';
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app/app.module';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
const url = 'mqtt://localhost:1883';
const microservice = app.connectMicroservice<MicroserviceOptions>({
transport: Transport.MQTT,
options: {
url,
},
});
const globalPrefix = 'api';
app.setGlobalPrefix(globalPrefix);
const port = process.env.PORT || 3000;
await app.listen(port);
await microservice.listen();
Logger.log(
`🚀 Application is running on: http://localhost:${port}/${globalPrefix}`
);
}
bootstrap();
由於 order-service
會兼任 Create Order Saga Orchestrator 的角色,故其需要使用 ClientsModule
來進行事件的發送以及匯入 CreateOrderSagaModule
。下方為範例程式碼,修改 AppModule
的內容,使用 ClientsModule
來產生 Token 為 MS_CLIENT
的 MQTT ClientProxy
,並透過 isGlobal
將 ClientProxy
提升到全域:
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { CreateOrderSagaModule } from '@nestjs-microservices/DAY26/create-order-saga';
import { AppController } from './app.controller';
// ...
@Module({
imports: [
ClientsModule.register({
isGlobal: true,
clients: [
{
name: 'MS_CLIENT',
transport: Transport.MQTT,
options: {
url: 'mqtt://localhost:1883',
},
},
],
}),
CreateOrderSagaModule,
],
controllers: [AppController],
// ...
})
export class AppModule {}
為了模擬建立、更新訂單的情況,建立 order.store.ts
,實作一個 OrderStore
來提供 create
、getById
、updateLogisticsById
以及 updateStatusById
方法:
import { Injectable } from '@nestjs/common';
import { Observable, of } from 'rxjs';
import {
Order,
OrderLogisticsDetail,
} from '@nestjs-microservices/DAY26/domain';
import { CreateOrder } from './order';
@Injectable()
export class OrderStore {
private readonly orders: Array<Order> = [];
getById(id: string) {
const order = this.orders.find((order) => order.id === id) ?? null;
return of(order);
}
create(payload: CreateOrder) {
const order: Order = {
id: crypto.randomUUID(),
status: 'preparing',
...payload,
};
this.orders.push(order);
return of(order);
}
updateStatusById(
id: string,
status: Order['status']
): Observable<Order | null> {
const order = this.orders.find((order) => order.id === id);
if (order) {
order.status = status;
return of(order);
}
return of(null);
}
updateLogisticsById(
id: string,
logistics: OrderLogisticsDetail
): Observable<Order | null> {
const order = this.orders.find((order) => order.id === id);
if (order) {
order.logistics = logistics;
return of(order);
}
return of(null);
}
}
其中,CreateOrder
這個 type
宣告在 order.ts
檔案中,供後續其他檔案使用:
import {
OrderDetail,
OrderLogisticsDetail,
} from '@nestjs-microservices/DAY26/domain';
export type CreateOrder = {
details: Array<OrderDetail>;
logistics: OrderLogisticsDetail;
};
注意:記得將
OrderStore
放入AppModule
的providers
中。
接著,修改 AppController
的內容,針對「建立訂單」與「讀取訂單」的 API 進行實作,需特別注意,在「建立訂單」完成時,需執行 CreateOrderSaga
的 start
方法:
import { Body, Controller, Get, Param, Post } from '@nestjs/common';
import { CreateOrderSaga } from '@nestjs-microservices/DAY26/create-order-saga';
import { OrderStore } from './order.store';
import { CreateOrder } from './order';
@Controller()
export class AppController {
constructor(
private readonly orderStore: OrderStore,
private readonly createOrderSaga: CreateOrderSaga
) {}
@Post('orders')
createOrder(@Body() payload: CreateOrder) {
return this.orderStore.create(payload).pipe(
tap({
next: (order) => {
this.createOrderSaga.start(order);
},
})
);
}
@Get('orders/:id')
getOrderById(@Param('id') id: string) {
return this.orderStore.getById(id);
}
}
最後,針對 Saga 流程中需處理的 Step 進行相關實作,比如處理「將訂單狀態改為 reject
」的 Command、處理「將訂單狀態改為 prepared
」的 Command、「回退將訂單狀態改為 prepared
」的 Command 以及處理「更新訂單物流資訊」的 Command:
// ...
import { MessagePattern, Payload, RpcException } from '@nestjs/microservices';
import { CreateOrderSaga } from '@nestjs-microservices/DAY26/create-order-saga';
import { concatMap, of, tap, throwError } from 'rxjs';
import {
OrderPreparedMessage,
OrderPrepareMessage,
OrderPrepareRollbackDoneMessage,
OrderPrepareRollbackMessage,
OrderRejectedMessage,
OrderRejectMessage,
OrderUpdatedLogisticsMessage,
OrderUpdateLogisticsMessage,
} from '@nestjs-microservices/DAY26/domain';
@Controller()
export class AppController {
// ...
@MessagePattern('Order/Reject')
receiveRejectOrderCommand(@Payload() message: OrderRejectMessage) {
return this.orderStore.updateStatusById(message.data.id, 'reject').pipe(
concatMap((order) => {
if (!order) {
return throwError(
() => new RpcException(`The order ${message.data} is not found.`)
);
}
return of<OrderRejectedMessage>({ data: order });
})
);
}
@MessagePattern('Order/Prepare')
receivePrepareOrderCommand(@Payload() message: OrderPrepareMessage) {
return this.orderStore.updateStatusById(message.data, 'prepared').pipe(
concatMap((order) => {
if (!order) {
return throwError(
() => new RpcException(`The order ${message.data} is not found.`)
);
}
return of<OrderPreparedMessage>({ data: order });
})
);
}
@MessagePattern('Order/Prepare/Rollback')
receivePrepareOrderRollbackCommand(
@Payload() message: OrderPrepareRollbackMessage
) {
return this.orderStore.updateStatusById(message.data, 'preparing').pipe(
concatMap((order) => {
if (!order) {
return throwError(
() => new RpcException(`The order ${message.data} is not found.`)
);
}
return of<OrderPrepareRollbackDoneMessage>({ data: order });
})
);
}
@MessagePattern('Order/Logistics/Update')
receiveUpdateOrderLogisticsCommand(
@Payload() message: OrderUpdateLogisticsMessage
) {
return this.orderStore
.updateLogisticsById(message.data.orderId, message.data.logisticsOrder)
.pipe(
concatMap((order) => {
if (!order) {
return throwError(
() => new RpcException(`The order ${message.data} is not found.`)
);
}
return of<OrderUpdatedLogisticsMessage>({ data: order });
})
);
}
}
透過 Nx 建立一個名為 product-service
的 Application。修改 main.ts
的內容,透過 NestFactory
的 createMicroservices
建立微服務應用程式:
import { Logger } from '@nestjs/common';
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app/app.module';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.MQTT,
options: {
url: 'mqtt://localhost:1883',
},
}
);
await app.listen();
Logger.log(`🚀 Product Service is running.`);
}
bootstrap();
接著,修改 AppController
的內容,針對 Saga 流程中需處理的 Step 進行相關實作,也就是處理「可銷售庫存減少」的 Command 與處理「回退可銷售庫存減少」的 Command:
import { Controller } from '@nestjs/common';
import { Inventory } from '@nestjs-microservices/DAY25/domain';
import { MessagePattern, Payload, RpcException } from '@nestjs/microservices';
import {
ProductDecreasedMessage,
ProductDecreaseMessage,
ProductDecreaseRollbackDoneMessage,
ProductDecreaseRollbackMessage,
} from '@nestjs-microservices/DAY26/domain';
@Controller()
export class AppController {
private readonly inventories: Array<Inventory> = [
{
productId: 'a',
count: 100,
},
{
productId: 'b',
count: 100,
},
];
@MessagePattern('Product/Decrease')
receiveDecreaseProductCommand(
@Payload() message: ProductDecreaseMessage
): ProductDecreasedMessage {
message.data.forEach((detail) => {
const inventory = this.inventories.find(
(inventory) => inventory.productId === detail.productId
);
if (!inventory) {
throw new RpcException(`The product ${detail.productId} is not found.`);
}
if (inventory.count - detail.count < 0) {
throw new RpcException(
`The product ${detail.productId} is over the max count.`
);
}
inventory.count -= detail.count;
});
const inventories = this.inventories.filter((inventory) =>
message.data.find((detail) => detail.productId === inventory.productId)
);
return { data: inventories };
}
@MessagePattern('Product/Decrease/Rollback')
receiveDecreaseProductRollbackCommand(
@Payload() message: ProductDecreaseRollbackMessage
): ProductDecreaseRollbackDoneMessage {
message.data.forEach((detail) => {
const inventory = this.inventories.find(
(inventory) => inventory.productId === detail.productId
);
if (!inventory) {
throw new RpcException(`The product ${detail.productId} is not found.`);
}
inventory.count += detail.count;
});
const inventories = this.inventories.filter((inventory) =>
message.data.find((detail) => detail.productId === inventory.productId)
);
return { data: inventories };
}
}
透過 Nx 建立一個名為 logistics-service
的 Application。修改 main.ts
的內容,透過 NestFactory
的 createMicroservices
建立微服務應用程式:
import { Logger } from '@nestjs/common';
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app/app.module';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.MQTT,
options: {
url: 'mqtt://localhost:1883',
},
}
);
await app.listen();
Logger.log(`🚀 Logistics Service is running.`);
}
bootstrap();
接著,修改 AppController
的內容,針對 Saga 流程中需處理的 Step 進行相關實作,也就是處理「建立物流單」的 Command:
import { Controller } from '@nestjs/common';
import { MessagePattern, Payload } from '@nestjs/microservices';
import { map, of } from 'rxjs';
import {
LogisticsOrder,
LogisticsOrderCreatedMessage,
LogisticsOrderCreateMessage,
} from '@nestjs-microservices/DAY26/domain';
@Controller()
export class AppController {
@MessagePattern('Logistics/Create')
receiveCreateLogisticsCommand(
@Payload() message: LogisticsOrderCreateMessage
) {
const logisticsOrder$ = of<LogisticsOrder>({
id: crypto.randomUUID(),
address: message.data,
});
return logisticsOrder$.pipe(
map((data) => <LogisticsOrderCreatedMessage>{ data })
);
}
}
注意:在啟動服務之前,先參考 MQTT Transporter 的內容將 Mosquitto 架設起來。
透過 Nx Console 來啟動所有的服務,在「Common Nx Commands」區塊選擇 run-many
並點選 serve
來同時啟動多個服務:
啟動後,透過 Postman 使用 POST
方法存取 http://localhost:3000/api/orders,並使用下方內容當作 Payload,會收到初始化訂單的內容:
{
"details": [
{
"productId": "a",
"count": 30
}
],
"logistics": {
"id": null,
"address": "xxx"
}
}
這時候可以觀察一下終端機的 Log,會發現 Saga 的流程與預期相符:
將建立好的訂單 ID 透過 Postman 使用 GET
方法存取 http://localhost:3000/api/orders/<ORDER_ID>,也確實會收到狀態為 prepared
、帶有物流單資訊的訂單:
來試試看執行補償機制的情況,我們針對 logistics-service
進行調整,修改 AppController
的內容,在 receiveCreateLogisticsCommand
內直接拋出 RpcException
:
import { Controller } from '@nestjs/common';
import { MessagePattern, Payload, RpcException } from '@nestjs/microservices';
import { map, of } from 'rxjs';
import { LogisticsOrderCreateMessage } from '@nestjs-microservices/DAY26/domain';
@Controller()
export class AppController {
@MessagePattern('Logistics/Create')
receiveCreateLogisticsCommand(
@Payload() message: LogisticsOrderCreateMessage
) {
throw new RpcException('Oops! something went wrong!');
}
}
透過 Postman 建立一筆訂單後,再使用 GET
方法存取 http://localhost:3000/api/orders/<ORDER_ID>,會看到訂單的狀態為 reject
且沒有物流單資訊:
觀察一下終端機的 Log,會發現 Saga 的流程與預期相符:
回顧一下今天的重點內容,運用上一篇完成的 Orchestrator 機制來將訂單服務、商品服務與物流服務提供的 Command 串成一個完整的 Saga 流程。從這次實作中可以發現,各個服務只需專注於自己的 Domain,而不會參雜過多其他 Domain 的東西,也較容易追蹤整個 Saga 流程,由此可證,Orchestrator 比 Choreography 更適合運用在較複雜的 Saga。