iT邦幫忙

2024 iThome 鐵人賽

DAY 26
0
Software Development

用 NestJS 闖蕩微服務!系列 第 26

[用NestJS闖蕩微服務!] DAY26 - Saga (三)

  • 分享至 

  • xImage
  •  

用 NestJS 實現 Orchestration Saga (上)

試著使用 NestJS 來實現極簡版的 Orchestration Saga,目標是將前一篇提到的「建立訂單」流程實現出來,並針對失敗的情況執行 Compensatable Transactions,不過細節較多,這部分將會拆成上下兩篇來實作完成。

注意:接下來的實作內容會使用 MQTT 作為微服務應用程式溝通的媒介,相關教學可以參考前面介紹的 MQTT Transporter

注意:接下來的內容為了簡化實作邏輯將 不會 使用真正的資料庫,主要是將 Orchestration Saga 的實現邏輯呈現出來,若有需要可以在實作時自行更換成資料庫的實現邏輯。

實作 Saga Orchestrator

Orchestrator 的目標是將一個 Saga 中要完成的所有任務串連起來,並針對失敗的情況自動執行補償機制。所以我們可以把每個任務拆成一個個的 步驟(Step),並將 Step 交給 Orchestrator 來照順序執行,針對這塊的機制可以拆成獨立的 Nx Library,讓需要建立 Saga 的地方來使用這套機制。

首先,透過 Nx 建立名為 saga-orchestrator 的 Library 並新增 step.ts,在該檔案內實作用於 Retriable Transaction 的 Step、用於 Compensatable Transaction 的 CompensatableStep 與用於 Pivot Transaction 的 PivotStep

import { Observable } from 'rxjs';

export abstract class Step<Input, Output> {
  abstract execute(input: Input): Observable<Output>;
  constructor(public readonly name: string) {}
}

export abstract class CompensatableStep<Input, Output> extends Step<
  Input,
  Output
> {
  abstract executeCompensation(input: Output): Observable<Input>;
}

export abstract class PivotStep<Input, Output> extends Step<Input, Output> {}

接著,要來實作 Orchestrator 的部分,在開始撰寫前,先針對其實作細節做更詳細的規劃。首先,我們會需要在建構 Orchestrator 時提供要執行的 Steps,這些 Step 可以是 StepCompensationStepPivotStep,並按照要執行的順序排列。

Orchestrator Flow Concept1

有了 Steps 後,需要有一個觸發流程的方法,當流程開始時,會從第一個 Step 開始執行其 execute 方法,並將該 Step 的結果當作是下一個 Step 的輸入值,會需要這麼做的原因是使用最一開始執行流程的輸入值可能會在流程中過時,如果在某個 Step 中針對狀態做異動,就可以將異動後的狀態向下一個步驟傳遞。

Orchestrator Flow Concept2

那麼如果中途有 Step 執行失敗了,要如何知道該執行哪些 Compensation Transactions 呢?有個比較簡單的做法是在執行完成一個 CompensationStep 時,將該 CompensationStep 記錄在一個暫存區,並且永遠將最新的 CompensationStep 放在第一位,但還需要特別注意,當 Pivot Transaction 完成時,就不會再去執行這些 Compensation Transactions,所以在 PivotStep 完成時,需要將暫存區清空,如此一來,就可以知道要執行的 Compensation Transaction 有哪些,以及它們的執行順序。

Orcgestrator Flow Concept3

有了概念後,建立 saga-orchestrator.ts,實作一個名為 Orchestratorclass,將上述流程實作出來:

import { concatMap, from, map, Subject, takeUntil, tap } from 'rxjs';
import { CompensatableStep, PivotStep, Step } from './step';
import { Logger } from '@nestjs/common';

export abstract class Orchestrator {
  constructor(
    private readonly steps: Array<
      Step<any, any> | CompensatableStep<any, any> | PivotStep<any, any>
    >
  ) {}

  // 啟動 Saga 流程
  start<Input>(input: Input) {
    // 成功的 `CompensatableStep` 暫存區
    let succeedSteps: Array<CompensatableStep<any, any>> = [];

    const whenErrorOccurred$ = new Subject<void>();
    const whenCompleted$ = new Subject<void>();

    const sagaId = crypto.randomUUID();
    let lastInput: any = input;

    Logger.log(`Saga ${sagaId} start...`);

    // 使用 `from` + `concatMap` 來確保照順序執行 Step
    from(this.steps)
      .pipe(
        concatMap((step) => {
          Logger.log(`${step.name} start...`);
          return step.execute(lastInput as Input).pipe(
            // 將 Step 的輸出值作為下一個 Step 的輸入值
            tap((output) => (lastInput = output)),
            map(() => step)
          );
        }),
        tap((step) => Logger.log(`${step.name} end...`))
      )
      .subscribe({
        next: (step) => {
          // 如果完成的 Step 是 `PivotStep`,就清空暫存區
          if (step instanceof PivotStep) {
            succeedSteps = [];
            return;
          }
          // 如果完成的 Step 是 `CompensatableStep`,就將該 Step 放在暫存區第一位
          if (step instanceof CompensatableStep) {
            succeedSteps.unshift(step);
          }
        },
        error: () => {
          whenErrorOccurred$.next();
          whenErrorOccurred$.complete();
        },
        complete: () => {
          whenCompleted$.next();
          whenCompleted$.complete();
        },
      });

    // 當有錯誤發生時,執行補償機制
    whenErrorOccurred$
      .pipe(
        concatMap(() => from(succeedSteps)),
        concatMap((step) => {
          Logger.log(`${step.name} compemsation start...`);
          return step
            .executeCompensation(lastInput)
            .pipe(tap(() => Logger.log(`${step.name} compemsation end...`)));
        }),
        takeUntil(whenCompleted$)
      )
      .subscribe({
        next: (output) => {
          // 將 Step 的輸出值作為下一個 Step 的輸入值
          lastInput = output;
        },
        complete: () => {
          Logger.log(`Saga ${sagaId} end...`);
        },
      });
  }
}

最後,將上述兩個檔案從 index.ts 匯出:

export * from './step';
export * from './saga-orchestrator';

實作 Shared Domain Library

為了方便我們取用不同 Domain 的 Type,這邊透過 Nx 建立了名為 domain 的 Library 來保存訂單、物流訂單、商品庫存的 type

注意:針對 Domain Type 要如何透過 Library 進行管理有許多方式,我會建議依照 Domain 性質各自拆分不同的 Library,比如:order-domainlogistics-domainproduct-domain。不過因為這篇文章的重點是在 Orchestrator Saga,所以就統一放在 domain Library 中,避免失焦。

下方是範例程式碼,在 domain Library 新增 logistics.ts 並宣告訂單相關的 type

export type LogisticsOrderId = string;

export type LogisticsOrder = {
  id: LogisticsOrderId;
  address: string;
};

新增 product.ts 並宣告商品庫存的 type

export type ProductId = string;
export type Inventory = {
  productId: ProductId;
  count: number;
};

新增 order.ts 並宣告訂單相關的 type

import { LogisticsOrderId } from './logistics';
import { ProductId } from './product';

export type Order = {
  id: string;
  status: 'preparing' | 'prepared' | 'reject';
  details: Array<OrderDetail>;
  logistics: OrderLogisticsDetail;
};

export type OrderDetail = {
  productId: ProductId;
  count: number;
};

export type OrderLogisticsDetail = {
  id: LogisticsOrderId | null;
  address: string;
};

新增 message.ts 並宣告所有 Step 會用到的 Command 與 Reply 的 type

import { LogisticsOrder } from './logistics';
import { Order, OrderDetail } from './order';
import { Inventory } from './product';

export type Message<T> = { data: T };

// 將訂單狀態切換為 `reject`
export type OrderRejectMessage = Message<Order>;
export type OrderRejectedMessage = Message<Order>;

// 減少可銷售庫存
export type ProductDecreaseMessage = Message<Array<OrderDetail>>;
export type ProductDecreasedMessage = Message<Array<Inventory>>;
// 減少可銷售庫存的補償
export type ProductDecreaseRollbackMessage = Message<Array<OrderDetail>>;
export type ProductDecreaseRollbackDoneMessage = Message<Array<Inventory>>;

// 將訂單狀態切換為 `prepared`
export type OrderPrepareMessage = Message<string>;
export type OrderPreparedMessage = Message<Order>;
// 將訂單狀態切換為 `prepared` 的補償
export type OrderPrepareRollbackMessage = Message<string>;
export type OrderPrepareRollbackDoneMessage = Message<Order>;

// 建立物流單
export type LogisticsOrderCreateMessage = Message<string>;
export type LogisticsOrderCreatedMessage = Message<LogisticsOrder>;

// 更新訂單的物流資訊
export type OrderUpdateLogisticsMessage = Message<{
  logisticsOrder: LogisticsOrder;
  orderId: string;
}>;
export type OrderUpdatedLogisticsMessage = Message<Order>;

最後,將上述內容從 index.ts 匯出:

export * from './lib/logistics';
export * from './lib/order';
export * from './lib/product';
export * from './lib/message';

實作 Create Order Saga Orchestrator

透過 Nx 建立名為 create-order-saga 的 Library,在這個 Library 內實作 Create Order Saga Orchestrator。先將所有 Step 列出,並進行實作。下方是預計會建立的 Step:

  1. CreateOrderStep:為 CompensatableStep,主要是提供 executeCompensation 來針對「建立訂單」進行補償,也就是要發送「將訂單狀態改為 reject」的 Command。
  2. DecreaseInventoryStep:為 CompensatableStep,當執行 execute 方法時,發送「可銷售庫存減少」的 Command,並提供 executeCompensation 方法來針對「可銷售庫存減少」進行補償,也就是發送「回退可銷售庫存減少」的 Command。
  3. PrepareOrderStep:為 CompensatableStep,當執行 execute 方法時,發送「將訂單狀態改為 prepared」的 Command,並提供 executeCompensation 方法來針對「將訂單狀態改為 prepared」進行補償,也就是發送「回退將訂單狀態改為 prepared」的 Command。
  4. CreateLogisticsStep:為 PivotStep,當執行 execute 方法時,發送「建立物流單」的 Command。
  5. UpdateOrderLogisticsStep:為 Step,當執行 execute 方法時,發送「更新訂單物流資訊」的 Command。

建立 CreateOrderStep

建立 create-order.step.ts,實作 CreateOrderStep 並繼承 CompensatableStep,在 executeCompensation 透過 ClientProxysend 方法發送 Order/Reject

import { Inject, Injectable } from '@nestjs/common';
import { ClientProxy, MqttRecordBuilder } from '@nestjs/microservices';
import { map, Observable, of } from 'rxjs';
import {
  Order,
  OrderRejectedMessage,
  OrderRejectMessage,
} from '@nestjs-microservices/DAY26/domain';
import { CompensatableStep } from '@nestjs-microservices/DAY26/saga-orchestrator';

@Injectable()
export class CreateOrderStep extends CompensatableStep<Order, Order> {
  constructor(@Inject('MS_CLIENT') private readonly client: ClientProxy) {
    super('Create Order');
  }

  execute(input: Order): Observable<Order> {
    return of(input);
  }

  executeCompensation(input: Order): Observable<Order> {
    const record = new MqttRecordBuilder<OrderRejectMessage>({
      data: input,
    })
      .setQoS(2)
      .build();
    return this.client
      .send<OrderRejectedMessage>('Order/Reject', record)
      .pipe(map((message) => message.data));
  }
}

建立 DecreaseInventoryStep

建立 decrease-inventory.step.ts,實作 DecreaseInventoryStep 並繼承 CompensatableStep,在 execute 透過 ClientProxysend 方法發送 Product/Decrease,並在 executeCompensation 透過 ClientProxysend 方法發送 Product/Decrease/Rollback

import { Inject, Injectable } from '@nestjs/common';
import { ClientProxy, MqttRecordBuilder } from '@nestjs/microservices';
import { map, Observable } from 'rxjs';
import { CompensatableStep } from '@nestjs-microservices/DAY26/saga-orchestrator';
import {
  Order,
  ProductDecreaseMessage,
  ProductDecreaseRollbackMessage,
} from '@nestjs-microservices/DAY26/domain';

@Injectable()
export class DecreaseInventoryStep extends CompensatableStep<Order, Order> {
  constructor(@Inject('MS_CLIENT') private readonly client: ClientProxy) {
    super('Decrease Inventory');
  }

  execute(input: Order): Observable<Order> {
    const record = new MqttRecordBuilder<ProductDecreaseMessage>({
      data: input.details,
    })
      .setQoS(2)
      .build();
    return this.client.send('Product/Decrease', record).pipe(map(() => input));
  }

  executeCompensation(input: Order): Observable<Order> {
    const record = new MqttRecordBuilder<ProductDecreaseRollbackMessage>({
      data: input.details,
    })
      .setQoS(2)
      .build();

    return this.client
      .send('Product/Decrease/Rollback', record)
      .pipe(map(() => input));
  }
}

建立 PrepareOrderStep

建立 prepare-order.step.ts,實作 PrepareOrderStep 並繼承 CompensatableStep,在 execute 透過 ClientProxysend 方法發送 Order/Prepare,並在 executeCompensation 透過 ClientProxysend 方法發送 Order/Prepare/Rollback

import { Inject, Injectable } from '@nestjs/common';
import { ClientProxy, MqttRecordBuilder } from '@nestjs/microservices';
import { map, Observable } from 'rxjs';
import {
  Order,
  OrderPreparedMessage,
  OrderPrepareMessage,
  OrderPrepareRollbackDoneMessage,
  OrderPrepareRollbackMessage,
} from '@nestjs-microservices/DAY26/domain';
import { CompensatableStep } from '@nestjs-microservices/DAY26/saga-orchestrator';

@Injectable()
export class PrepareOrderStep extends CompensatableStep<Order, Order> {
  constructor(@Inject('MS_CLIENT') private readonly client: ClientProxy) {
    super('Prepare Order');
  }

  execute(input: Order): Observable<Order> {
    const record = new MqttRecordBuilder<OrderPrepareMessage>({
      data: input.id,
    })
      .setQoS(2)
      .build();
    return this.client
      .send<OrderPreparedMessage>('Order/Prepare', record)
      .pipe(map((message) => message.data));
  }

  executeCompensation(input: Order): Observable<Order> {
    const record = new MqttRecordBuilder<OrderPrepareRollbackMessage>({
      data: input.id,
    })
      .setQoS(2)
      .build();
    return this.client
      .send<OrderPrepareRollbackDoneMessage>('Order/Prepare/Rollback', record)
      .pipe(map((message) => message.data));
  }
}

建立 CreateLogisticsStep

建立 create-logistics.step.ts,實作 CreateLogisticsStep 並繼承 PivotStep,在 execute 透過 ClientProxysend 方法發送 Logistics/Create

import { Inject, Injectable } from '@nestjs/common';
import { ClientProxy, MqttRecordBuilder } from '@nestjs/microservices';
import { map, Observable } from 'rxjs';
import {
  LogisticsOrderCreatedMessage,
  LogisticsOrderCreateMessage,
  Order,
  OrderUpdateLogisticsMessage,
} from '@nestjs-microservices/DAY26/domain';
import { PivotStep } from '@nestjs-microservices/DAY26/saga-orchestrator';

@Injectable()
export class CreateLogisticsStep extends PivotStep<
  Order,
  OrderUpdateLogisticsMessage['data']
> {
  constructor(@Inject('MS_CLIENT') private readonly client: ClientProxy) {
    super('Create Logistics');
  }

  execute(input: Order): Observable<OrderUpdateLogisticsMessage['data']> {
    const record = new MqttRecordBuilder<LogisticsOrderCreateMessage>({
      data: input.logistics.address,
    })
      .setQoS(2)
      .build();
    return this.client.send('Logistics/Create', record).pipe(
      map((message: LogisticsOrderCreatedMessage) => ({
        logisticsOrder: message.data,
        orderId: input.id,
      }))
    );
  }
}

建立 UpdateOrderLogisticsStep

建立 update-order-logistics.step.ts,實作 UpdateOrderLogisticsStep 並繼承 Step,在 execute 透過 ClientProxysend 方法發送 Order/Logistics/Update

import { Inject, Injectable } from '@nestjs/common';
import { ClientProxy, MqttRecordBuilder } from '@nestjs/microservices';
import { map, Observable } from 'rxjs';
import {
  Order,
  OrderUpdatedLogisticsMessage,
  OrderUpdateLogisticsMessage,
} from '@nestjs-microservices/DAY26/domain';
import { Step } from '@nestjs-microservices/DAY26/saga-orchestrator';

@Injectable()
export class UpdateOrderLogisticsStep extends Step<
  OrderUpdateLogisticsMessage['data'],
  Order
> {
  constructor(@Inject('MS_CLIENT') private readonly client: ClientProxy) {
    super('Update Order Logistics');
  }

  execute(input: OrderUpdateLogisticsMessage['data']): Observable<Order> {
    const record = new MqttRecordBuilder<OrderUpdateLogisticsMessage>({
      data: input,
    })
      .setQoS(2)
      .build();
    return this.client
      .send<OrderUpdatedLogisticsMessage>('Order/Logistics/Update', record)
      .pipe(map((message) => message.data));
  }
}

建立 CreateOrderSaga

建立 create-order-saga.ts,實作 CreateOrderSata 並繼承 Orchestrator,在 constructor 注入上述 Steps 並透過 super 傳遞給 Orchestrator

import { Injectable } from '@nestjs/common';
import { Orchestrator } from '@nestjs-microservices/DAY26/saga-orchestrator';
import {
  CreateLogisticsStep,
  CreateOrderStep,
  DecreaseInventoryStep,
  PrepareOrderStep,
  UpdateOrderLogisticsStep,
} from './steps';

@Injectable()
export class CreateOrderSaga extends Orchestrator {
  constructor(
    createOrderStep: CreateOrderStep,
    decreaseInventoryStep: DecreaseInventoryStep,
    prepareOrderStep: PrepareOrderStep,
    createLogisticsStep: CreateLogisticsStep,
    updateOrderLogisticsStep: UpdateOrderLogisticsStep
  ) {
    super([
      createOrderStep,
      decreaseInventoryStep,
      prepareOrderStep,
      createLogisticsStep,
      updateOrderLogisticsStep,
    ]);
  }
}

修改 create-order-saga.module.ts 的內容,將上述的 Steps 與 CreateOrderSaga 放入 providers 中,並將 CreateOrderSaga 放入 exports 中,讓其他 Module 使用:

import { Module } from '@nestjs/common';
import {
  CreateLogisticsStep,
  CreateOrderStep,
  DecreaseInventoryStep,
  PrepareOrderStep,
  UpdateOrderLogisticsStep,
} from './steps';
import { CreateOrderSaga } from './create-order-saga';

const STEPS = [
  CreateOrderStep,
  DecreaseInventoryStep,
  PrepareOrderStep,
  CreateLogisticsStep,
  UpdateOrderLogisticsStep,
];

@Module({
  providers: [...STEPS, CreateOrderSaga],
  exports: [CreateOrderSaga],
})
export class CreateOrderSagaModule {}

最後,調整 index.ts 的內容,將 CreateOrderSagaModuleCreateOrderSaga 匯出:

export * from './lib/create-order-saga.module';
export * from './lib/create-order-saga';

小結

回顧今天的重點內容,一開始先針對 Orchestrator 的部分進行實作,進一步分解出 StepCompensatableStepPivotStepOrchestrator。由 Step 決定該步驟內要執行的事情,Orchestrator 則負責照順序執行 Step,以及在遇到失敗時該如何執行補償。

接著,針對會使用到的 type 製作了 Shared Domain Library,並在此定義了訂單、庫存、物流單以及 Command 的 type

最後,製作了 Create Order Saga Orchestrator,運用前面實作的各類型 Step 與 Orchestrator 將「建立訂單」的 Saga 串連起來。

下一篇將會使用上方完成的內容來完成「建立訂單」的 Saga 流程,敬請期待!


上一篇
[用NestJS闖蕩微服務!] DAY25 - Saga (二)
下一篇
[用NestJS闖蕩微服務!] DAY27 - Saga (四)
系列文
用 NestJS 闖蕩微服務!30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言