iT邦幫忙

2024 iThome 鐵人賽

DAY 27
0
Software Development

用 NestJS 闖蕩微服務!系列 第 27

[用NestJS闖蕩微服務!] DAY27 - Saga (四)

  • 分享至 

  • xImage
  •  

用 NestJS 實現 Orchestration Saga(下)

上一篇已經將 CreateOrderSaga 的部分實作出來了,不過還需要將訂單服務、商品服務及物流服務的部分實作出來,並訂閱 Orchestrator 發送的 Command。廢話不多說,趕快開始吧!

實作 Order Service

透過 Nx 建立一個名為 order-service 的 Application。由於 order-service 會需要提供 RESTful API 讓 Client 可以建立訂單、查詢訂單,同時會需要建立微服務應用程式來接收其他服務的事件,這裡會需要調整 main.ts 的內容,透過 INestApplication 實例的 connectMicroservice 方法來產生微服務應用程式實例,如此一來,便能夠保留一般 HTTP Server 的功能又擁有微服務應用程式的功能:

import { Logger } from '@nestjs/common';
import { NestFactory } from '@nestjs/core';

import { AppModule } from './app/app.module';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  const url = 'mqtt://localhost:1883';
  const microservice = app.connectMicroservice<MicroserviceOptions>({
    transport: Transport.MQTT,
    options: {
      url,
    },
  });
  const globalPrefix = 'api';
  app.setGlobalPrefix(globalPrefix);
  const port = process.env.PORT || 3000;
  await app.listen(port);
  await microservice.listen();
  Logger.log(
    `🚀 Application is running on: http://localhost:${port}/${globalPrefix}`
  );
}

bootstrap();

由於 order-service 會兼任 Create Order Saga Orchestrator 的角色,故其需要使用 ClientsModule 來進行事件的發送以及匯入 CreateOrderSagaModule。下方為範例程式碼,修改 AppModule 的內容,使用 ClientsModule 來產生 Token 為 MS_CLIENT 的 MQTT ClientProxy,並透過 isGlobalClientProxy 提升到全域:

import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { CreateOrderSagaModule } from '@nestjs-microservices/DAY26/create-order-saga';
import { AppController } from './app.controller';
// ...

@Module({
  imports: [
    ClientsModule.register({
      isGlobal: true,
      clients: [
        {
          name: 'MS_CLIENT',
          transport: Transport.MQTT,
          options: {
            url: 'mqtt://localhost:1883',
          },
        },
      ],
    }),
    CreateOrderSagaModule,
  ],
  controllers: [AppController],
  // ...
})
export class AppModule {}

為了模擬建立、更新訂單的情況,建立 order.store.ts,實作一個 OrderStore 來提供 creategetByIdupdateLogisticsById 以及 updateStatusById 方法:

import { Injectable } from '@nestjs/common';
import { Observable, of } from 'rxjs';
import {
  Order,
  OrderLogisticsDetail,
} from '@nestjs-microservices/DAY26/domain';
import { CreateOrder } from './order';

@Injectable()
export class OrderStore {
  private readonly orders: Array<Order> = [];

  getById(id: string) {
    const order = this.orders.find((order) => order.id === id) ?? null;
    return of(order);
  }

  create(payload: CreateOrder) {
    const order: Order = {
      id: crypto.randomUUID(),
      status: 'preparing',
      ...payload,
    };
    this.orders.push(order);
    return of(order);
  }

  updateStatusById(
    id: string,
    status: Order['status']
  ): Observable<Order | null> {
    const order = this.orders.find((order) => order.id === id);
    if (order) {
      order.status = status;
      return of(order);
    }
    return of(null);
  }

  updateLogisticsById(
    id: string,
    logistics: OrderLogisticsDetail
  ): Observable<Order | null> {
    const order = this.orders.find((order) => order.id === id);
    if (order) {
      order.logistics = logistics;
      return of(order);
    }
    return of(null);
  }
}

其中,CreateOrder 這個 type 宣告在 order.ts 檔案中,供後續其他檔案使用:

import {
  OrderDetail,
  OrderLogisticsDetail,
} from '@nestjs-microservices/DAY26/domain';

export type CreateOrder = {
  details: Array<OrderDetail>;
  logistics: OrderLogisticsDetail;
};

注意:記得將 OrderStore 放入 AppModuleproviders 中。

接著,修改 AppController 的內容,針對「建立訂單」與「讀取訂單」的 API 進行實作,需特別注意,在「建立訂單」完成時,需執行 CreateOrderSagastart 方法:

import { Body, Controller, Get, Param, Post } from '@nestjs/common';
import { CreateOrderSaga } from '@nestjs-microservices/DAY26/create-order-saga';
import { OrderStore } from './order.store';
import { CreateOrder } from './order';

@Controller()
export class AppController {
  constructor(
    private readonly orderStore: OrderStore,
    private readonly createOrderSaga: CreateOrderSaga
  ) {}

  @Post('orders')
  createOrder(@Body() payload: CreateOrder) {
    return this.orderStore.create(payload).pipe(
      tap({
        next: (order) => {
          this.createOrderSaga.start(order);
        },
      })
    );
  }

  @Get('orders/:id')
  getOrderById(@Param('id') id: string) {
    return this.orderStore.getById(id);
  }
}

最後,針對 Saga 流程中需處理的 Step 進行相關實作,比如處理「將訂單狀態改為 reject」的 Command、處理「將訂單狀態改為 prepared」的 Command、「回退將訂單狀態改為 prepared」的 Command 以及處理「更新訂單物流資訊」的 Command:

// ...
import { MessagePattern, Payload, RpcException } from '@nestjs/microservices';
import { CreateOrderSaga } from '@nestjs-microservices/DAY26/create-order-saga';
import { concatMap, of, tap, throwError } from 'rxjs';
import {
  OrderPreparedMessage,
  OrderPrepareMessage,
  OrderPrepareRollbackDoneMessage,
  OrderPrepareRollbackMessage,
  OrderRejectedMessage,
  OrderRejectMessage,
  OrderUpdatedLogisticsMessage,
  OrderUpdateLogisticsMessage,
} from '@nestjs-microservices/DAY26/domain';

@Controller()
export class AppController {
  // ...
  @MessagePattern('Order/Reject')
  receiveRejectOrderCommand(@Payload() message: OrderRejectMessage) {
    return this.orderStore.updateStatusById(message.data.id, 'reject').pipe(
      concatMap((order) => {
        if (!order) {
          return throwError(
            () => new RpcException(`The order ${message.data} is not found.`)
          );
        }
        return of<OrderRejectedMessage>({ data: order });
      })
    );
  }

  @MessagePattern('Order/Prepare')
  receivePrepareOrderCommand(@Payload() message: OrderPrepareMessage) {
    return this.orderStore.updateStatusById(message.data, 'prepared').pipe(
      concatMap((order) => {
        if (!order) {
          return throwError(
            () => new RpcException(`The order ${message.data} is not found.`)
          );
        }
        return of<OrderPreparedMessage>({ data: order });
      })
    );
  }

  @MessagePattern('Order/Prepare/Rollback')
  receivePrepareOrderRollbackCommand(
    @Payload() message: OrderPrepareRollbackMessage
  ) {
    return this.orderStore.updateStatusById(message.data, 'preparing').pipe(
      concatMap((order) => {
        if (!order) {
          return throwError(
            () => new RpcException(`The order ${message.data} is not found.`)
          );
        }
        return of<OrderPrepareRollbackDoneMessage>({ data: order });
      })
    );
  }

  @MessagePattern('Order/Logistics/Update')
  receiveUpdateOrderLogisticsCommand(
    @Payload() message: OrderUpdateLogisticsMessage
  ) {
    return this.orderStore
      .updateLogisticsById(message.data.orderId, message.data.logisticsOrder)
      .pipe(
        concatMap((order) => {
          if (!order) {
            return throwError(
              () => new RpcException(`The order ${message.data} is not found.`)
            );
          }
          return of<OrderUpdatedLogisticsMessage>({ data: order });
        })
      );
  }
}

實作 Product Service

透過 Nx 建立一個名為 product-service 的 Application。修改 main.ts 的內容,透過 NestFactorycreateMicroservices 建立微服務應用程式:

import { Logger } from '@nestjs/common';
import { NestFactory } from '@nestjs/core';

import { AppModule } from './app/app.module';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';

async function bootstrap() {
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      transport: Transport.MQTT,
      options: {
        url: 'mqtt://localhost:1883',
      },
    }
  );
  await app.listen();
  Logger.log(`🚀 Product Service is running.`);
}

bootstrap();

接著,修改 AppController 的內容,針對 Saga 流程中需處理的 Step 進行相關實作,也就是處理「可銷售庫存減少」的 Command 與處理「回退可銷售庫存減少」的 Command:

import { Controller } from '@nestjs/common';

import { Inventory } from '@nestjs-microservices/DAY25/domain';
import { MessagePattern, Payload, RpcException } from '@nestjs/microservices';
import {
  ProductDecreasedMessage,
  ProductDecreaseMessage,
  ProductDecreaseRollbackDoneMessage,
  ProductDecreaseRollbackMessage,
} from '@nestjs-microservices/DAY26/domain';

@Controller()
export class AppController {
  private readonly inventories: Array<Inventory> = [
    {
      productId: 'a',
      count: 100,
    },
    {
      productId: 'b',
      count: 100,
    },
  ];

  @MessagePattern('Product/Decrease')
  receiveDecreaseProductCommand(
    @Payload() message: ProductDecreaseMessage
  ): ProductDecreasedMessage {
    message.data.forEach((detail) => {
      const inventory = this.inventories.find(
        (inventory) => inventory.productId === detail.productId
      );
      if (!inventory) {
        throw new RpcException(`The product ${detail.productId} is not found.`);
      }

      if (inventory.count - detail.count < 0) {
        throw new RpcException(
          `The product ${detail.productId} is over the max count.`
        );
      }

      inventory.count -= detail.count;
    });
    const inventories = this.inventories.filter((inventory) =>
      message.data.find((detail) => detail.productId === inventory.productId)
    );

    return { data: inventories };
  }

  @MessagePattern('Product/Decrease/Rollback')
  receiveDecreaseProductRollbackCommand(
    @Payload() message: ProductDecreaseRollbackMessage
  ): ProductDecreaseRollbackDoneMessage {
    message.data.forEach((detail) => {
      const inventory = this.inventories.find(
        (inventory) => inventory.productId === detail.productId
      );
      if (!inventory) {
        throw new RpcException(`The product ${detail.productId} is not found.`);
      }

      inventory.count += detail.count;
    });
    const inventories = this.inventories.filter((inventory) =>
      message.data.find((detail) => detail.productId === inventory.productId)
    );

    return { data: inventories };
  }
}

實作 Logistics Service

透過 Nx 建立一個名為 logistics-service 的 Application。修改 main.ts 的內容,透過 NestFactorycreateMicroservices 建立微服務應用程式:

import { Logger } from '@nestjs/common';
import { NestFactory } from '@nestjs/core';

import { AppModule } from './app/app.module';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';

async function bootstrap() {
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      transport: Transport.MQTT,
      options: {
        url: 'mqtt://localhost:1883',
      },
    }
  );
  await app.listen();
  Logger.log(`🚀 Logistics Service is running.`);
}

bootstrap();

接著,修改 AppController 的內容,針對 Saga 流程中需處理的 Step 進行相關實作,也就是處理「建立物流單」的 Command:

import { Controller } from '@nestjs/common';
import { MessagePattern, Payload } from '@nestjs/microservices';
import { map, of } from 'rxjs';
import {
  LogisticsOrder,
  LogisticsOrderCreatedMessage,
  LogisticsOrderCreateMessage,
} from '@nestjs-microservices/DAY26/domain';

@Controller()
export class AppController {
  @MessagePattern('Logistics/Create')
  receiveCreateLogisticsCommand(
    @Payload() message: LogisticsOrderCreateMessage
  ) {
    const logisticsOrder$ = of<LogisticsOrder>({
      id: crypto.randomUUID(),
      address: message.data,
    });
    return logisticsOrder$.pipe(
      map((data) => <LogisticsOrderCreatedMessage>{ data })
    );
  }
}

整合測試

注意:在啟動服務之前,先參考 MQTT Transporter 的內容將 Mosquitto 架設起來。

透過 Nx Console 來啟動所有的服務,在「Common Nx Commands」區塊選擇 run-many 並點選 serve 來同時啟動多個服務:

Nx Run Many Serve

啟動後,透過 Postman 使用 POST 方法存取 http://localhost:3000/api/orders,並使用下方內容當作 Payload,會收到初始化訂單的內容:

{
  "details": [
    {
      "productId": "a",
      "count": 30
    }
  ],
  "logistics": {
    "id": null,
    "address": "xxx"
  }
}

Create Order Result

這時候可以觀察一下終端機的 Log,會發現 Saga 的流程與預期相符:

Check Create Order Saga Log

將建立好的訂單 ID 透過 Postman 使用 GET 方法存取 http://localhost:3000/api/orders/<ORDER_ID>,也確實會收到狀態為 prepared、帶有物流單資訊的訂單:

Get Order Result

來試試看執行補償機制的情況,我們針對 logistics-service 進行調整,修改 AppController 的內容,在 receiveCreateLogisticsCommand 內直接拋出 RpcException

import { Controller } from '@nestjs/common';
import { MessagePattern, Payload, RpcException } from '@nestjs/microservices';
import { map, of } from 'rxjs';
import { LogisticsOrderCreateMessage } from '@nestjs-microservices/DAY26/domain';

@Controller()
export class AppController {
  @MessagePattern('Logistics/Create')
  receiveCreateLogisticsCommand(
    @Payload() message: LogisticsOrderCreateMessage
  ) {
    throw new RpcException('Oops! something went wrong!');
  }
}

透過 Postman 建立一筆訂單後,再使用 GET 方法存取 http://localhost:3000/api/orders/<ORDER_ID>,會看到訂單的狀態為 reject 且沒有物流單資訊:

Reject Order Result

觀察一下終端機的 Log,會發現 Saga 的流程與預期相符:

Create Order Saga Rollback Log

小結

回顧一下今天的重點內容,運用上一篇完成的 Orchestrator 機制來將訂單服務、商品服務與物流服務提供的 Command 串成一個完整的 Saga 流程。從這次實作中可以發現,各個服務只需專注於自己的 Domain,而不會參雜過多其他 Domain 的東西,也較容易追蹤整個 Saga 流程,由此可證,Orchestrator 比 Choreography 更適合運用在較複雜的 Saga。


上一篇
[用NestJS闖蕩微服務!] DAY26 - Saga (三)
下一篇
[用NestJS闖蕩微服務!] DAY28 - CQRS (上)
系列文
用 NestJS 闖蕩微服務!30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言