試著使用 NestJS 來實現極簡版的 Orchestration Saga,目標是將前一篇提到的「建立訂單」流程實現出來,並針對失敗的情況執行 Compensatable Transactions,不過細節較多,這部分將會拆成上下兩篇來實作完成。
注意:接下來的實作內容會使用 MQTT 作為微服務應用程式溝通的媒介,相關教學可以參考前面介紹的 MQTT Transporter。
注意:接下來的內容為了簡化實作邏輯將 不會 使用真正的資料庫,主要是將 Orchestration Saga 的實現邏輯呈現出來,若有需要可以在實作時自行更換成資料庫的實現邏輯。
Orchestrator 的目標是將一個 Saga 中要完成的所有任務串連起來,並針對失敗的情況自動執行補償機制。所以我們可以把每個任務拆成一個個的 步驟(Step),並將 Step 交給 Orchestrator 來照順序執行,針對這塊的機制可以拆成獨立的 Nx Library,讓需要建立 Saga 的地方來使用這套機制。
首先,透過 Nx 建立名為 saga-orchestrator
的 Library 並新增 step.ts
,在該檔案內實作用於 Retriable Transaction 的 Step
、用於 Compensatable Transaction 的 CompensatableStep
與用於 Pivot Transaction 的 PivotStep
:
import { Observable } from 'rxjs';
export abstract class Step<Input, Output> {
abstract execute(input: Input): Observable<Output>;
constructor(public readonly name: string) {}
}
export abstract class CompensatableStep<Input, Output> extends Step<
Input,
Output
> {
abstract executeCompensation(input: Output): Observable<Input>;
}
export abstract class PivotStep<Input, Output> extends Step<Input, Output> {}
接著,要來實作 Orchestrator 的部分,在開始撰寫前,先針對其實作細節做更詳細的規劃。首先,我們會需要在建構 Orchestrator 時提供要執行的 Steps,這些 Step 可以是 Step
、CompensationStep
與 PivotStep
,並按照要執行的順序排列。
有了 Steps 後,需要有一個觸發流程的方法,當流程開始時,會從第一個 Step 開始執行其 execute
方法,並將該 Step 的結果當作是下一個 Step 的輸入值,會需要這麼做的原因是使用最一開始執行流程的輸入值可能會在流程中過時,如果在某個 Step 中針對狀態做異動,就可以將異動後的狀態向下一個步驟傳遞。
那麼如果中途有 Step 執行失敗了,要如何知道該執行哪些 Compensation Transactions 呢?有個比較簡單的做法是在執行完成一個 CompensationStep
時,將該 CompensationStep
記錄在一個暫存區,並且永遠將最新的 CompensationStep
放在第一位,但還需要特別注意,當 Pivot Transaction 完成時,就不會再去執行這些 Compensation Transactions,所以在 PivotStep
完成時,需要將暫存區清空,如此一來,就可以知道要執行的 Compensation Transaction 有哪些,以及它們的執行順序。
有了概念後,建立 saga-orchestrator.ts
,實作一個名為 Orchestrator
的 class
,將上述流程實作出來:
import { concatMap, from, map, Subject, takeUntil, tap } from 'rxjs';
import { CompensatableStep, PivotStep, Step } from './step';
import { Logger } from '@nestjs/common';
export abstract class Orchestrator {
constructor(
private readonly steps: Array<
Step<any, any> | CompensatableStep<any, any> | PivotStep<any, any>
>
) {}
// 啟動 Saga 流程
start<Input>(input: Input) {
// 成功的 `CompensatableStep` 暫存區
let succeedSteps: Array<CompensatableStep<any, any>> = [];
const whenErrorOccurred$ = new Subject<void>();
const whenCompleted$ = new Subject<void>();
const sagaId = crypto.randomUUID();
let lastInput: any = input;
Logger.log(`Saga ${sagaId} start...`);
// 使用 `from` + `concatMap` 來確保照順序執行 Step
from(this.steps)
.pipe(
concatMap((step) => {
Logger.log(`${step.name} start...`);
return step.execute(lastInput as Input).pipe(
// 將 Step 的輸出值作為下一個 Step 的輸入值
tap((output) => (lastInput = output)),
map(() => step)
);
}),
tap((step) => Logger.log(`${step.name} end...`))
)
.subscribe({
next: (step) => {
// 如果完成的 Step 是 `PivotStep`,就清空暫存區
if (step instanceof PivotStep) {
succeedSteps = [];
return;
}
// 如果完成的 Step 是 `CompensatableStep`,就將該 Step 放在暫存區第一位
if (step instanceof CompensatableStep) {
succeedSteps.unshift(step);
}
},
error: () => {
whenErrorOccurred$.next();
whenErrorOccurred$.complete();
},
complete: () => {
whenCompleted$.next();
whenCompleted$.complete();
},
});
// 當有錯誤發生時,執行補償機制
whenErrorOccurred$
.pipe(
concatMap(() => from(succeedSteps)),
concatMap((step) => {
Logger.log(`${step.name} compemsation start...`);
return step
.executeCompensation(lastInput)
.pipe(tap(() => Logger.log(`${step.name} compemsation end...`)));
}),
takeUntil(whenCompleted$)
)
.subscribe({
next: (output) => {
// 將 Step 的輸出值作為下一個 Step 的輸入值
lastInput = output;
},
complete: () => {
Logger.log(`Saga ${sagaId} end...`);
},
});
}
}
最後,將上述兩個檔案從 index.ts
匯出:
export * from './step';
export * from './saga-orchestrator';
為了方便我們取用不同 Domain 的 Type,這邊透過 Nx 建立了名為 domain
的 Library 來保存訂單、物流訂單、商品庫存的 type
。
注意:針對 Domain Type 要如何透過 Library 進行管理有許多方式,我會建議依照 Domain 性質各自拆分不同的 Library,比如:
order-domain
、logistics-domain
、product-domain
。不過因為這篇文章的重點是在 Orchestrator Saga,所以就統一放在domain
Library 中,避免失焦。
下方是範例程式碼,在 domain
Library 新增 logistics.ts
並宣告訂單相關的 type
:
export type LogisticsOrderId = string;
export type LogisticsOrder = {
id: LogisticsOrderId;
address: string;
};
新增 product.ts
並宣告商品庫存的 type
:
export type ProductId = string;
export type Inventory = {
productId: ProductId;
count: number;
};
新增 order.ts
並宣告訂單相關的 type
:
import { LogisticsOrderId } from './logistics';
import { ProductId } from './product';
export type Order = {
id: string;
status: 'preparing' | 'prepared' | 'reject';
details: Array<OrderDetail>;
logistics: OrderLogisticsDetail;
};
export type OrderDetail = {
productId: ProductId;
count: number;
};
export type OrderLogisticsDetail = {
id: LogisticsOrderId | null;
address: string;
};
新增 message.ts
並宣告所有 Step 會用到的 Command 與 Reply 的 type
:
import { LogisticsOrder } from './logistics';
import { Order, OrderDetail } from './order';
import { Inventory } from './product';
export type Message<T> = { data: T };
// 將訂單狀態切換為 `reject`
export type OrderRejectMessage = Message<Order>;
export type OrderRejectedMessage = Message<Order>;
// 減少可銷售庫存
export type ProductDecreaseMessage = Message<Array<OrderDetail>>;
export type ProductDecreasedMessage = Message<Array<Inventory>>;
// 減少可銷售庫存的補償
export type ProductDecreaseRollbackMessage = Message<Array<OrderDetail>>;
export type ProductDecreaseRollbackDoneMessage = Message<Array<Inventory>>;
// 將訂單狀態切換為 `prepared`
export type OrderPrepareMessage = Message<string>;
export type OrderPreparedMessage = Message<Order>;
// 將訂單狀態切換為 `prepared` 的補償
export type OrderPrepareRollbackMessage = Message<string>;
export type OrderPrepareRollbackDoneMessage = Message<Order>;
// 建立物流單
export type LogisticsOrderCreateMessage = Message<string>;
export type LogisticsOrderCreatedMessage = Message<LogisticsOrder>;
// 更新訂單的物流資訊
export type OrderUpdateLogisticsMessage = Message<{
logisticsOrder: LogisticsOrder;
orderId: string;
}>;
export type OrderUpdatedLogisticsMessage = Message<Order>;
最後,將上述內容從 index.ts
匯出:
export * from './lib/logistics';
export * from './lib/order';
export * from './lib/product';
export * from './lib/message';
透過 Nx 建立名為 create-order-saga
的 Library,在這個 Library 內實作 Create Order Saga Orchestrator。先將所有 Step 列出,並進行實作。下方是預計會建立的 Step:
CreateOrderStep
:為 CompensatableStep
,主要是提供 executeCompensation
來針對「建立訂單」進行補償,也就是要發送「將訂單狀態改為 reject
」的 Command。DecreaseInventoryStep
:為 CompensatableStep
,當執行 execute
方法時,發送「可銷售庫存減少」的 Command,並提供 executeCompensation
方法來針對「可銷售庫存減少」進行補償,也就是發送「回退可銷售庫存減少」的 Command。PrepareOrderStep
:為 CompensatableStep
,當執行 execute
方法時,發送「將訂單狀態改為 prepared
」的 Command,並提供 executeCompensation
方法來針對「將訂單狀態改為 prepared
」進行補償,也就是發送「回退將訂單狀態改為 prepared
」的 Command。CreateLogisticsStep
:為 PivotStep
,當執行 execute
方法時,發送「建立物流單」的 Command。UpdateOrderLogisticsStep
:為 Step
,當執行 execute
方法時,發送「更新訂單物流資訊」的 Command。建立 create-order.step.ts
,實作 CreateOrderStep
並繼承 CompensatableStep
,在 executeCompensation
透過 ClientProxy
的 send
方法發送 Order/Reject
:
import { Inject, Injectable } from '@nestjs/common';
import { ClientProxy, MqttRecordBuilder } from '@nestjs/microservices';
import { map, Observable, of } from 'rxjs';
import {
Order,
OrderRejectedMessage,
OrderRejectMessage,
} from '@nestjs-microservices/DAY26/domain';
import { CompensatableStep } from '@nestjs-microservices/DAY26/saga-orchestrator';
@Injectable()
export class CreateOrderStep extends CompensatableStep<Order, Order> {
constructor(@Inject('MS_CLIENT') private readonly client: ClientProxy) {
super('Create Order');
}
execute(input: Order): Observable<Order> {
return of(input);
}
executeCompensation(input: Order): Observable<Order> {
const record = new MqttRecordBuilder<OrderRejectMessage>({
data: input,
})
.setQoS(2)
.build();
return this.client
.send<OrderRejectedMessage>('Order/Reject', record)
.pipe(map((message) => message.data));
}
}
建立 decrease-inventory.step.ts
,實作 DecreaseInventoryStep
並繼承 CompensatableStep
,在 execute
透過 ClientProxy
的 send
方法發送 Product/Decrease
,並在 executeCompensation
透過 ClientProxy
的 send
方法發送 Product/Decrease/Rollback
:
import { Inject, Injectable } from '@nestjs/common';
import { ClientProxy, MqttRecordBuilder } from '@nestjs/microservices';
import { map, Observable } from 'rxjs';
import { CompensatableStep } from '@nestjs-microservices/DAY26/saga-orchestrator';
import {
Order,
ProductDecreaseMessage,
ProductDecreaseRollbackMessage,
} from '@nestjs-microservices/DAY26/domain';
@Injectable()
export class DecreaseInventoryStep extends CompensatableStep<Order, Order> {
constructor(@Inject('MS_CLIENT') private readonly client: ClientProxy) {
super('Decrease Inventory');
}
execute(input: Order): Observable<Order> {
const record = new MqttRecordBuilder<ProductDecreaseMessage>({
data: input.details,
})
.setQoS(2)
.build();
return this.client.send('Product/Decrease', record).pipe(map(() => input));
}
executeCompensation(input: Order): Observable<Order> {
const record = new MqttRecordBuilder<ProductDecreaseRollbackMessage>({
data: input.details,
})
.setQoS(2)
.build();
return this.client
.send('Product/Decrease/Rollback', record)
.pipe(map(() => input));
}
}
建立 prepare-order.step.ts
,實作 PrepareOrderStep
並繼承 CompensatableStep
,在 execute
透過 ClientProxy
的 send
方法發送 Order/Prepare
,並在 executeCompensation
透過 ClientProxy
的 send
方法發送 Order/Prepare/Rollback
:
import { Inject, Injectable } from '@nestjs/common';
import { ClientProxy, MqttRecordBuilder } from '@nestjs/microservices';
import { map, Observable } from 'rxjs';
import {
Order,
OrderPreparedMessage,
OrderPrepareMessage,
OrderPrepareRollbackDoneMessage,
OrderPrepareRollbackMessage,
} from '@nestjs-microservices/DAY26/domain';
import { CompensatableStep } from '@nestjs-microservices/DAY26/saga-orchestrator';
@Injectable()
export class PrepareOrderStep extends CompensatableStep<Order, Order> {
constructor(@Inject('MS_CLIENT') private readonly client: ClientProxy) {
super('Prepare Order');
}
execute(input: Order): Observable<Order> {
const record = new MqttRecordBuilder<OrderPrepareMessage>({
data: input.id,
})
.setQoS(2)
.build();
return this.client
.send<OrderPreparedMessage>('Order/Prepare', record)
.pipe(map((message) => message.data));
}
executeCompensation(input: Order): Observable<Order> {
const record = new MqttRecordBuilder<OrderPrepareRollbackMessage>({
data: input.id,
})
.setQoS(2)
.build();
return this.client
.send<OrderPrepareRollbackDoneMessage>('Order/Prepare/Rollback', record)
.pipe(map((message) => message.data));
}
}
建立 create-logistics.step.ts
,實作 CreateLogisticsStep
並繼承 PivotStep
,在 execute
透過 ClientProxy
的 send
方法發送 Logistics/Create
:
import { Inject, Injectable } from '@nestjs/common';
import { ClientProxy, MqttRecordBuilder } from '@nestjs/microservices';
import { map, Observable } from 'rxjs';
import {
LogisticsOrderCreatedMessage,
LogisticsOrderCreateMessage,
Order,
OrderUpdateLogisticsMessage,
} from '@nestjs-microservices/DAY26/domain';
import { PivotStep } from '@nestjs-microservices/DAY26/saga-orchestrator';
@Injectable()
export class CreateLogisticsStep extends PivotStep<
Order,
OrderUpdateLogisticsMessage['data']
> {
constructor(@Inject('MS_CLIENT') private readonly client: ClientProxy) {
super('Create Logistics');
}
execute(input: Order): Observable<OrderUpdateLogisticsMessage['data']> {
const record = new MqttRecordBuilder<LogisticsOrderCreateMessage>({
data: input.logistics.address,
})
.setQoS(2)
.build();
return this.client.send('Logistics/Create', record).pipe(
map((message: LogisticsOrderCreatedMessage) => ({
logisticsOrder: message.data,
orderId: input.id,
}))
);
}
}
建立 update-order-logistics.step.ts
,實作 UpdateOrderLogisticsStep
並繼承 Step
,在 execute
透過 ClientProxy
的 send
方法發送 Order/Logistics/Update
:
import { Inject, Injectable } from '@nestjs/common';
import { ClientProxy, MqttRecordBuilder } from '@nestjs/microservices';
import { map, Observable } from 'rxjs';
import {
Order,
OrderUpdatedLogisticsMessage,
OrderUpdateLogisticsMessage,
} from '@nestjs-microservices/DAY26/domain';
import { Step } from '@nestjs-microservices/DAY26/saga-orchestrator';
@Injectable()
export class UpdateOrderLogisticsStep extends Step<
OrderUpdateLogisticsMessage['data'],
Order
> {
constructor(@Inject('MS_CLIENT') private readonly client: ClientProxy) {
super('Update Order Logistics');
}
execute(input: OrderUpdateLogisticsMessage['data']): Observable<Order> {
const record = new MqttRecordBuilder<OrderUpdateLogisticsMessage>({
data: input,
})
.setQoS(2)
.build();
return this.client
.send<OrderUpdatedLogisticsMessage>('Order/Logistics/Update', record)
.pipe(map((message) => message.data));
}
}
建立 create-order-saga.ts
,實作 CreateOrderSata
並繼承 Orchestrator
,在 constructor
注入上述 Steps 並透過 super
傳遞給 Orchestrator
:
import { Injectable } from '@nestjs/common';
import { Orchestrator } from '@nestjs-microservices/DAY26/saga-orchestrator';
import {
CreateLogisticsStep,
CreateOrderStep,
DecreaseInventoryStep,
PrepareOrderStep,
UpdateOrderLogisticsStep,
} from './steps';
@Injectable()
export class CreateOrderSaga extends Orchestrator {
constructor(
createOrderStep: CreateOrderStep,
decreaseInventoryStep: DecreaseInventoryStep,
prepareOrderStep: PrepareOrderStep,
createLogisticsStep: CreateLogisticsStep,
updateOrderLogisticsStep: UpdateOrderLogisticsStep
) {
super([
createOrderStep,
decreaseInventoryStep,
prepareOrderStep,
createLogisticsStep,
updateOrderLogisticsStep,
]);
}
}
修改 create-order-saga.module.ts
的內容,將上述的 Steps 與 CreateOrderSaga
放入 providers
中,並將 CreateOrderSaga
放入 exports
中,讓其他 Module 使用:
import { Module } from '@nestjs/common';
import {
CreateLogisticsStep,
CreateOrderStep,
DecreaseInventoryStep,
PrepareOrderStep,
UpdateOrderLogisticsStep,
} from './steps';
import { CreateOrderSaga } from './create-order-saga';
const STEPS = [
CreateOrderStep,
DecreaseInventoryStep,
PrepareOrderStep,
CreateLogisticsStep,
UpdateOrderLogisticsStep,
];
@Module({
providers: [...STEPS, CreateOrderSaga],
exports: [CreateOrderSaga],
})
export class CreateOrderSagaModule {}
最後,調整 index.ts
的內容,將 CreateOrderSagaModule
與 CreateOrderSaga
匯出:
export * from './lib/create-order-saga.module';
export * from './lib/create-order-saga';
回顧今天的重點內容,一開始先針對 Orchestrator 的部分進行實作,進一步分解出 Step
、CompensatableStep
、PivotStep
與 Orchestrator
。由 Step 決定該步驟內要執行的事情,Orchestrator
則負責照順序執行 Step,以及在遇到失敗時該如何執行補償。
接著,針對會使用到的 type
製作了 Shared Domain Library,並在此定義了訂單、庫存、物流單以及 Command 的 type
。
最後,製作了 Create Order Saga Orchestrator,運用前面實作的各類型 Step 與 Orchestrator
將「建立訂單」的 Saga 串連起來。
下一篇將會使用上方完成的內容來完成「建立訂單」的 Saga 流程,敬請期待!