iT邦幫忙

2024 iThome 鐵人賽

DAY 25
0
Software Development

用 NestJS 闖蕩微服務!系列 第 25

[用NestJS闖蕩微服務!] DAY25 - Saga (二)

  • 分享至 

  • xImage
  •  

Saga 的運作機制

上一篇有提到 Saga 是由一連串非同步事件來完成資料一致性的機制,那麼要如何協調 Saga 執行的步驟就變得相當重要,如果遇到 Compensatable Transaction 失敗的情況還需要安排執行對應的 Compensation Transaction。因此,Saga 發展了兩種運作機制:編舞模式(Choreography)編排模式(Orchestration)

Choreography Saga

Choreography 是由各個參與的服務負責發送相關非同步事件與接收來自其他服務發送的非同步事件,並 沒有 一個中央協調者來安排參與方該做什麼事情。

以下方建立訂單的 Saga 為例:

Create Order Saga Concept

在 Choregraphy 下,整體運如下圖所示:

Choreography Saga Example

  1. 訂單服務執行「建立訂單」,並在建立完畢後發送 Order.Created 事件。
  2. 商品服務收到 Order.Created 事件,根據事件內容處理「減少可銷售庫存」,並在執行完畢後發送 Product.Decreased 事件。
  3. 訂單服務收到 Product.Decreased 事件,根據收到的事件內容處理「更新訂單狀態」,並在執行完畢後發送 Order.Ready 事件。
  4. 物流服務收到 Order.Ready 事件,根據收到的事件內容處理「建立物流單」,並在執行完畢後發送 Logistics.Created 事件。
  5. 訂單服務收到 Logistics.Created 事件,根據收到的內容處理「更新訂單資訊」。

這樣的運作方式有以下幾個優點:

  • 簡單:每個服務在對自身資料做異動的時候,可以直接發佈相關事件。
  • 鬆耦合:由於採用 Message Broker 的方式傳遞事件,服務之間不會直接相依產生耦合。

但也有以下幾個缺點:

  • 邏輯過度分散:由於整個 Saga 的流程分散在多個服務中,每個參與的服務都涉略了一部分的邏輯,這會導致開發者難以迅速掌握 Saga 的運作流程。
  • 容易產生循環依賴:雖然說採用 Message Broker 可以消除服務之間的耦合關係,但如果服務之間訂閱彼此所產生的事件,那就會產生循環依賴,這在設計上是一種 Bad Practice。

Orchestration Saga

與 Choreography 不同,會有一個中央協調者負責安排 Saga 整個流程,這個協調者又稱 編排器(Orchestrator)。透過非同步事件傳遞 命令(Command) 給該 Saga 參與方,參與方收到 Command 後,需執行對應的操作,當參與方完成任務時,需 回覆(Reply) Orchestrator 訊息,Orchestrator 會再繼續執行該 Saga 流程的下個步驟。

以建立訂單的 Saga 為例,在 Orchestration 下,整體運如下圖所示:

Orchestration Saga Example

  1. 當訂單服務建立訂單後,透過「Create Order Saga Orchestrator」發送 Product.Decrease 命令。
  2. 當商品服務收到 Product.Decrease 命令,執行「減少可銷售庫存」,執行完畢後,回覆訊息給 Orchestrator。
  3. 當 Orchestrator 收到回覆後,執行下一個任務,發送 Order.Update 命令,這裡可以注意,該命令的接收方是訂單服務本身,這樣的設計是為了保持運作的一致性。
  4. 當訂單服務收到 Order.Update 命令,執行「更新訂單狀態」,執行完畢後,回覆訊息給 Orchestrator。
  5. 當 Orchestrator 收到回覆後,執行下一個任務,發送 Logistics.Create 命令。
  6. 當物流服務收到 Logistics.Create 命令,執行「建立物流單」,執行完畢後,回覆訊息給 Orchestrator。
  7. 當 Orchestrator 收到回覆後,執行下一個任務,發送 Order.UpdateInfo 命令。
  8. 當訂單服務收到 Order.UpdateInfo 命令,執行「更新訂單資訊」,執行完畢後,整個 Saga 流程即結束。

這樣的運作方式有以下幾個優點:

  • 簡化依賴關係:使用 Orchestration 不會產生循環依賴,原因是 Orchestrator 會依賴參與方,但是參與方實際上並不依賴 Orchestrator。
  • 鬆耦合:所有參與方只需要提供相關介面給 Orchestrator 即可,不需要知道其他參與方所發佈的事件。

當然也有以下缺點:

  • Orchestrator 承擔過多商業邏輯:如果在設計 Orchestrator 時,添加太多商業邏輯,那會導致 Orchestrator 承擔過多責任,理論上,Orchestrator 應該只負責處理 Saga 流程的執行,不應該帶有其他商業邏輯。

用 NestJS 實現 Choreography Saga

試著使用 NestJS 來實現極簡版的 Choreography Saga,目標是將上方建立訂單的流程實現出來,並針對失敗的情況執行 Compensatable Transactions。

注意:接下來的實作內容會使用 MQTT 作為微服務應用程式溝通的媒介,相關教學可以參考前面介紹的 MQTT Transporter

注意:接下來的內容為了簡化實作邏輯將 不會 使用真正的資料庫,主要是將 Choreography Saga 的實現邏輯呈現出來,若有需要可以在實作時自行更換成資料庫的實現邏輯。

實作 Shared Domain Library

為了方便我們取用不同 Domain 的 Type,這邊透過 Nx 建立了名為 domain 的 Library 來保存訂單、物流訂單、商品庫存的 type

注意:針對 Domain Type 要如何透過 Library 進行管理有許多方式,我會建議依照 Domain 性質各自拆分不同的 Library,比如:order-domainlogistics-domainproduct-domain。不過因為這篇文章的重點是在 Choreography Saga,所以就統一放在 domain Library 中,避免失焦。

下方是範例程式碼,在 domain Library 新增 logistics.ts 並宣告訂單相關的 type

export type LogisticsOrderId = string;

export type LogisticsOrder = {
  id: LogisticsOrderId;
  address: string;
};

新增 product.ts 並宣告商品庫存的 type

export type ProductId = string;
export type Inventory = {
  productId: ProductId;
  count: number;
};

新增 order.ts 並宣告訂單相關的 type

import { LogisticsOrderId } from './logistics';
import { ProductId } from './product';

export type Order = {
  id: string;
  status: 'preparing' | 'prepared' | 'reject';
  details: Array<OrderDetail>;
  logistics: OrderLogisticsDetail;
};

export type OrderDetail = {
  productId: ProductId;
  count: number;
};

export type OrderLogisticsDetail = {
  id: LogisticsOrderId | null;
  address: string;
};

新增 message.ts 並宣告發送的非同步事件的 type,這裡可以發現,所有事件都會帶有 messageId,原因是基於 Publish/Subscribe 機制的 Choreography Saga 需要有辦法知道事件之間的關聯,而 messageId 就扮演著 關聯 ID(Relation ID) 的角色:

import { LogisticsOrder } from './logistics';
import { Order } from './order';
import { Inventory } from './product';

export type Message<T> = { data: T } & {
  messageId: string;
};

export type OrderCreatedMessage = Message<Order>;

export type ProductDecreasedMessage = Message<Array<Inventory>>;
export type ProductDecreaseFailedMessage = Message<Array<Inventory>>;

export type OrderReadyMessage = Message<Order>;
export type OrderReadyFailedMessage = Message<Order>;

export type LogisticsCreatedMessage = Message<LogisticsOrder>;
export type LogisticsCreateFailedMessage = Message<null>;

最後,在 index.ts 匯出這些檔案的內容:

export * from './lib/logistics';
export * from './lib/order';
export * from './lib/product';
export * from './lib/message';

實作 Order Service

透過 Nx 建立一個名為 order-service 的 Application。由於 order-service 會需要提供 RESTful API 讓 Client 可以建立訂單、查詢訂單,同時會需要建立微服務應用程式來接收其他服務的事件,這裡會需要調整 main.ts 的內容,透過 INestApplication 實例的 connectMicroservice 方法來產生微服務應用程式實例,如此一來,便能夠保留一般 HTTP Server 的功能又擁有微服務應用程式的功能:

import { Logger } from '@nestjs/common';
import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';

import { AppModule } from './app/app.module';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  const microservice = app.connectMicroservice<MicroserviceOptions>({
    transport: Transport.MQTT,
    options: {
      url: 'mqtt://localhost:1883',
    },
  });
  const globalPrefix = 'api';
  app.setGlobalPrefix(globalPrefix);
  const port = process.env.PORT || 3000;
  await app.listen(port);
  await microservice.listen();
  Logger.log(
    `🚀 Application is running on: http://localhost:${port}/${globalPrefix}`
  );
}

bootstrap();

由於 order-service 會在訂單建立完成時發送 Order/Created 事件,以及在收到 Product/Decreased 事件並更新狀態後發送 Order/Ready 事件,故其需要使用 ClientsModule 來進行事件的發送。下方為範例程式碼,修改 AppModule 的內容,使用 ClientsModule 來產生 Token 為 MS_CLIENT 的 MQTT ClientProxy

import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { AppController } from './app.controller';
// ...

@Module({
  imports: [
    ClientsModule.register({
      clients: [
        {
          name: 'MS_CLIENT',
          transport: Transport.MQTT,
          options: {
            url: 'mqtt://localhost:1883',
          },
        },
      ],
    }),
  ],
  controllers: [AppController],
  // ...
})
export class AppModule {}

接著,修改 AppController 的內容,針對「建立訂單」與「讀取訂單」的 API 進行實作,需特別注意,建立訂單後需要透過發送 Order/Created 事件來執行 Saga 流程,所以在建立完訂單要透過 ClientProxy 發送該事件,並指定訂單的 ID 為 messageId

import { Body, Controller, Get, Inject, Param, Post } from '@nestjs/common';

import { ClientProxy, MqttRecordBuilder } from '@nestjs/microservices';

import {
  Order,
  OrderDetail,
  OrderLogisticsDetail,
  OrderCreatedMessage,
} from '@nestjs-microservices/DAY25/domain';

export type CreateOrder = {
  details: Array<OrderDetail>;
  logistics: OrderLogisticsDetail;
};

@Controller()
export class AppController {
  constructor(@Inject('MS_CLIENT') private readonly client: ClientProxy) {}

  private readonly orders: Array<Order> = [];

  @Get('/orders/:id')
  getOrderById(@Param('id') id: string) {
    return this.orders.find((order) => order.id === id);
  }

  @Post('/orders')
  createOrder(@Body() payload: CreateOrder) {
    const id = crypto.randomUUID();
    const order: Order = {
      id,
      status: 'preparing',
      logistics: payload.logistics,
      details: payload.details,
    };
    this.orders.push(order);
    const record = new MqttRecordBuilder<OrderCreatedMessage>({
      messageId: order.id,
      data: order,
    })
      .setQoS(2)
      .build();
    this.client.emit('Order/Created', record);
    return order;
  }
}

針對「減少可銷售庫存」的部分進行處理,透過 @MessagePattern 裝飾器訂閱 Product/Decreased,當收到事件時,將訂單的狀態切換為 prepared 並透過 ClientProxy 發送 Order/Ready 事件。另外,針對「減少可銷售庫存失敗」的情況設計補償機制,透過 @MessagePattern 裝飾器訂閱 Product/Decreased/Failed,當收到事件時,將訂單狀態切換為 reject

import {
  // ...
  MqttRecordBuilder,
  Payload
} from '@nestjs/microservices';
import {
  // ...
  ProductDecreasedMessage,
  OrderReadyMessage,
  ProductDecreaseFailedMessage,
} from '@nestjs-microservices/DAY25/domain';
// ...
@Controller()
export class AppController {
  // ...
  @EventPattern('Product/Decreased')
  onProductDecreased(@Payload() message: ProductDecreasedMessage) {
    const order = this.orders.find((order) => order.id === message.messageId);
    if (order) {
      const record = new MqttRecordBuilder<OrderReadyMessage>({
        messageId: order.id,
        data: order,
      })
        .setQoS(2)
        .build();
      order.status = 'prepared';
      this.client.emit<OrderReadyMessage>('Order/Ready', record);
    }
  }

  @EventPattern('Product/Decrease/Failed')
  onProductDecreaseFailed(@Payload() message: ProductDecreaseFailedMessage) {
    const order = this.orders.find((order) => order.id === message.messageId);
    if (order) {
      order.status = 'reject';
    }
  }
}

最後,針對「建立物流單」的部分進行處理,透過 @MessagePattern 裝飾器訂閱 Logistics/Created,當收到事件時,將物流單資訊更新進訂單中。另外,針對「建立物流單失敗」的情況設計補償機制,透過 @MessagePattern 裝飾器訂閱 Logistics/Create/Failed,當收到事件時,將訂單狀態回退到 preparing,並透過 ClientProxy 發送 Order/Ready/Failed 來觸發「減少可銷售庫存」的補償機制:

import {
  // ...
  LogisticsCreatedMessage,
  LogisticsCreateFailedMessage,
  OrderReadyFailedMessage,
} from '@nestjs-microservices/DAY25/domain';
// ...
@Controller()
export class AppController {
  // ...
  @EventPattern('Logistics/Created')
  onLogisticsCreated(@Payload() message: LogisticsCreatedMessage) {
    const order = this.orders.find((order) => order.id === message.messageId);
    if (order) {
      order.logistics = message.data;
    }
  }

  @EventPattern('Logistics/Create/Failed')
  onLogisticsCreateFailed(@Payload() message: LogisticsCreateFailedMessage) {
    const order = this.orders.find((order) => order.id === message.messageId);
    if (order) {
      order.status = 'preparing';
      const record = new MqttRecordBuilder<OrderReadyFailedMessage>({
        messageId: order.id,
        data: order,
      })
        .setQoS(2)
        .build();
      this.client.emit('Order/Ready/Failed', record);
    }
  }
}

實作 Product Service

透過 Nx 建立一個名為 product-service 的 Application。修改 main.ts 的內容,透過 NestFactorycreateMicroservices 建立微服務應用程式:

import { Logger } from '@nestjs/common';
import { NestFactory } from '@nestjs/core';

import { AppModule } from './app/app.module';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';

async function bootstrap() {
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      transport: Transport.MQTT,
      options: {
        url: 'mqtt://localhost:1883',
      },
    }
  );
  await app.listen();
  Logger.log(`🚀 Product Service is running.`);
}

bootstrap();

由於 product-service 會在可銷售庫存減少時發送 Product/Decreased 事件,故需要使用 ClientsModule 來進行事件的發送。下方為範例程式碼,修改 AppModule 的內容,使用 ClientsModule 來產生 Token 為 MS_CLIENT 的 MQTT ClientProxy

import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { AppController } from './app.controller';
// ...

@Module({
  imports: [
    ClientsModule.register({
      clients: [
        {
          name: 'MS_CLIENT',
          transport: Transport.MQTT,
          options: {
            url: 'mqtt://localhost:1883',
          },
        },
      ],
    }),
  ],
  controllers: [AppController],
  // ...
})
export class AppModule {}

接著,修改 AppController 的內容,針對「減少可銷售庫存」的部分進行處理,透過 @MessagePattern 裝飾器訂閱 Order/Created,當收到事件時,針對訂單中的 details 欄位內的 productId 來減少庫存,計算完畢後,透過 ClientProxy 發送 Product/Decreased 事件,並以訂單的 ID 作為 messageId。另外,針對「更新訂單狀態」設計補償機制,透過 @MessagePattern 裝飾器訂閱 Order/Ready/Failed,當收到事件時,針對訂單內的 details 欄位內的 productId 來補回庫存,計算完畢後,透過 ClientProxy 發送 Product/Decrease/Failed 事件觸發「建立訂單」的補償機制:

import { Controller, Inject } from '@nestjs/common';

import {
  ClientProxy,
  MessagePattern,
  MqttRecordBuilder,
  Payload,
} from '@nestjs/microservices';
import {
  Inventory,
  OrderCreatedMessage,
  OrderReadyFailedMessage,
  ProductDecreasedMessage,
  ProductDecreaseFailedMessage,
} from '@nestjs-microservices/DAY25/domain';

@Controller()
export class AppController {
  private readonly inventories: Array<Inventory> = [
    {
      productId: 'a',
      count: 100,
    },
    {
      productId: 'b',
      count: 100,
    },
  ];

  constructor(@Inject('MS_CLIENT') private readonly client: ClientProxy) {}

  @MessagePattern('Order/Created')
  onOrderCreated(@Payload() message: OrderCreatedMessage) {
    message.data.details.forEach((detail) => {
      const inventory = this.inventories.find(
        (inventory) => inventory.productId === detail.productId
      );
      if (!inventory) {
        return;
      }

      if (inventory.count - detail.count < 0) {
        return;
      }

      inventory.count -= detail.count;
    });
    const inventories = this.inventories.filter((inventory) =>
      message.data.details.find(
        (detail) => detail.productId === inventory.productId
      )
    );
    const record = new MqttRecordBuilder<ProductDecreasedMessage>({
      messageId: message.messageId,
      data: inventories,
    })
      .setQoS(2)
      .build();
    this.client.emit('Product/Decreased', record);
  }

  @MessagePattern('Order/Ready/Failed')
  onOrderReadyFailed(@Payload() message: OrderReadyFailedMessage) {
    message.data.details.forEach((detail) => {
      const inventory = this.inventories.find(
        (inventory) => inventory.productId === detail.productId
      );

      if (!inventory) {
        return;
      }

      inventory.count += detail.count;
    });
    const inventories = this.inventories.filter((inventory) =>
      message.data.details.find(
        (detail) => detail.productId === inventory.productId
      )
    );
    const record = new MqttRecordBuilder<ProductDecreaseFailedMessage>({
      messageId: message.messageId,
      data: inventories,
    })
      .setQoS(2)
      .build();

    this.client.emit('Product/Decrease/Failed', record);
  }
}

實作 Logistics Service

透過 Nx 建立一個名為 logistics-service 的 Application。修改 main.ts 的內容,透過 NestFactorycreateMicroservices 建立微服務應用程式:

import { Logger } from '@nestjs/common';
import { NestFactory } from '@nestjs/core';

import { AppModule } from './app/app.module';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';

async function bootstrap() {
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      transport: Transport.MQTT,
      options: {
        url: 'mqtt://localhost:1883',
      },
    }
  );
  await app.listen();
  Logger.log(`🚀 Logistics Service is running.`);
}

bootstrap();

由於 logistics-service 會在物流訂單建立完成時發送 Logistics/Created 事件,故需要使用 ClientsModule 來進行事件的發送。下方為範例程式碼,修改 AppModule 的內容,使用 ClientsModule 來產生 Token 為 MS_CLIENT 的 MQTT ClientProxy

import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { AppController } from './app.controller';
// ...

@Module({
  imports: [
    ClientsModule.register({
      clients: [
        {
          name: 'MS_CLIENT',
          transport: Transport.MQTT,
          options: {
            url: 'mqtt://localhost:1883',
          },
        },
      ],
    }),
  ],
  controllers: [AppController],
  // ...
})
export class AppModule {}

接著,修改 AppController 的內容,針對「建立物流單」的部分進行處理,透過 @MessagePattern 裝飾器訂閱 Order/Ready,當收到事件時,建立物流單並透過 ClientProxy 發送 Logistics/Created 事件,並以訂單的 ID 作為 messageId

import { Controller, Inject } from '@nestjs/common';

import {
  ClientProxy,
  MessagePattern,
  MqttRecordBuilder,
  Payload,
} from '@nestjs/microservices';
import {
  LogisticsCreatedMessage,
  LogisticsOrder,
  OrderReadyMessage,
} from '@nestjs-microservices/DAY25/domain';

@Controller()
export class AppController {
  constructor(@Inject('MS_CLIENT') private readonly client: ClientProxy) {}

  @MessagePattern('Order/Ready')
  onOrderReady(@Payload() message: OrderReadyMessage) {
    const logisticsOrder: LogisticsOrder = {
      id: crypto.randomUUID(),
      address: message.data.logistics.address,
    };
    const record = new MqttRecordBuilder<LogisticsCreatedMessage>({
      messageId: message.messageId,
      data: logisticsOrder,
    })
      .setQoS(2)
      .build();
    this.client.emit('Logistics/Created', record);
  }
}

整合測試

注意:在啟動服務之前,先參考 MQTT Transporter 的內容將 Mosquitto 架設起來。

透過 Nx Console 來啟動所有的服務,在「Common Nx Commands」區塊選擇 run-many 並點選 serve 來同時啟動多個服務:

Nx Run Many Serve

啟動後,透過 Postman 使用 POST 方法存取 http://localhost:3000/api/orders,並使用下方內容當作 Payload,會收到初始化訂單的內容:

{
  "details": [
    {
      "productId": "a",
      "count": 30
    }
  ],
  "logistics": {
    "id": null,
    "address": "xxx"
  }
}

Create Order Result

接著,將建立好的訂單 ID 透過 Postman 使用 GET 方法存取 http://localhost:3000/api/orders/<ORDER_ID>,會收到狀態為 prepared、帶有物流單資訊的訂單:

Get Order Result

來試試看執行補償機制的情況,我們針對 logistics-service 進行調整,修改 AppController 的內容,在 onOrderReady 透過 ClientProxy 發送 Logistics/Create/Failed 事件:

import { Controller, Inject } from '@nestjs/common';

import {
  ClientProxy,
  MessagePattern,
  MqttRecordBuilder,
  Payload,
} from '@nestjs/microservices';
import {
  LogisticsCreateFailedMessage,
  OrderReadyMessage,
} from '@nestjs-microservices/DAY25/domain';

@Controller()
export class AppController {
  constructor(@Inject('MS_CLIENT') private readonly client: ClientProxy) {}

  @MessagePattern('Order/Ready')
  onOrderReady(@Payload() message: OrderReadyMessage) {
    const record = new MqttRecordBuilder<LogisticsCreateFailedMessage>({
      messageId: message.messageId,
      data: null,
    })
      .setQoS(2)
      .build();
    this.client.emit('Logistics/Create/Failed', record);
  }
}

透過 Postman 建立一筆訂單後,再使用 GET 方法存取 http://localhost:3000/api/orders/<ORDER_ID>,會看到訂單的狀態為 reject 且沒有物流單資訊:

Reject Order Result

小結

回顧一下今天的重點內容,一開始先針對 Saga 的兩種運作模式進行介紹,分別是沒有中央協調器的 Choreography 與採用中央協調器的 Orchestration。文章中還進一步描述兩者的優缺點,其中,Orchestration 在較複雜的情境下會是較佳的選擇。

文章下半部則開始用 NestJS 實作 Choreography Saga 的概念,展示如何運用非同步的事件將各個服務的操作串連起來,同時展現了補償機制的實現。

下一篇將會使用 NestJS 實作 Orchestration Saga,敬請期待!


上一篇
[用NestJS闖蕩微服務!] DAY24 - Saga (一)
下一篇
[用NestJS闖蕩微服務!] DAY26 - Saga (三)
系列文
用 NestJS 闖蕩微服務!30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言