在微服務架構下,資料會分散在各個服務的私有資料庫中,假如有個應用會需要從多個服務獲得資料並呈現在畫面上,勢必會面臨 如何查詢 的問題。前面在介紹 API Gateway 時有提到 API Composition 的概念,透過 API Gateway 將多個服務提供的資料進行彙整再一併提供給 Client,這樣的做法確實可以解決跨服務查詢的問題,但它並不是沒有缺點,使用 API Composition 來處理簡單、不複雜的查詢是一個很直觀的作法,但在複雜且跨越多個服務查詢的情況下,它會帶來以下幾個問題:
補充:雖然說可用性會隨著依賴的服務數量增加,但可以靠一些手段讓該次查詢不會直接失敗,例如:當有服務的資料不可用時,就使用 Cache 的資料來代替,或是直接讓該資料從這筆查詢中消失,如此一來,可以提高 API Composition 的可用性,但代價是 Cache 的資料可能是過時的,或是因為缺少資料導致查詢結果不如預期。
那麼在面對複雜的查詢時該怎麼處理呢?這裡就可以來聊聊 命令查詢職責分離模式(Command Query Responsibility Segregation, CQRS) 的概念了。
CQRS 是由 命令查詢分離(Command Query Segregation) 延伸而來,那什麼是 CQS 呢?簡單來說,在撰寫程式的時候,會將改變狀態的行為歸類為 命令(Command)、將不會改變狀態僅做查詢的行為歸類為 查詢(Query)。下方為範例程式碼,可以看到 Order
這個 class
提供了改變 status
的 cancel
方法,該方法不會回傳任何狀態,因為 Command 專注在狀態改變本身,以及提供查詢 status
的 getStatus
方法:
class Order {
private _status = 'pending';
cancel(): void {
this._status = 'cancel';
}
getStatus(): string {
return this._status;
}
}
補充:雖然說 Command 應該要專注在狀態改變,如果要取得狀態應該從 Query 取得,但實務上還是有可能會讓 Command 回傳相關狀態的。
CQRS 基於這個思考模式將維度上升到 模型(Model)。下方為範例程式碼,可以看到將取消訂單這個會改變狀態的行為拆分出 CancelOrderCommand
,以及查詢訂單狀態的 OrderStatusQuery
:
class CancelOrderCommand {
execute(orderId: string): Promise<void> {
// ...
}
}
class OrderStatusQuery {
execute(orderId: string): Promise<string> {
// ...
}
}
CQRS 的概念還可以將維度上升至整個系統,假如有一個購物系統的應用,它會需要有庫存管理的需求,當用戶下訂單時,需要建立訂單並減少庫存,同時要處理物流相關業務,如果是一個 Monolithic 的架構,套用了 CQRS 的概念我們可以將讀取的資料庫與寫入的資料庫進行分離,再透過一些 事件(Event) 將查詢訂單頁面所需的 Model 建立起來,這樣預先快照訂單頁面所需的 Model 又稱 具體化視觀表(Materialized View):
如果換成微服務架構的話,可以將服務拆分為:訂單服務、物流服務、商品服務以及 訂單頁面服務,訂單頁面服務會接收來自其他服務的事件,藉由這些事件將訂單頁面所需的資訊預先快照起來,由訂單頁面服務來組成 Materialized View,也就是說,訂單頁面服務會是一個專門 Query 的服務,不會提供 Command 的操作:
補充:以上的內容可能有點抽象,所以我們這篇將會使用 NestJS 來實作 CQRS,一方面是理解 CQRS 的設計,一方面是理解在 NestJS 要實現 CQRS 可以怎麼做。
NestJS 官方有推出相關套件讓開發者可以輕易實現 CQRS。透過下方指令進行安裝:
$ npm install @nestjs/cqrs
接下來介紹的內容會以 Todo 為中心來撰寫範例程式碼,故需要先為相關依賴撰寫基礎範例程式碼,首先,建立 todo.model.ts
來實作 Todo
,該 class
即 Model:
export interface ITodo {
id: string;
title: string;
description?: string;
completed: boolean;
}
export class Todo implements ITodo {
readonly id: string;
title: string;
description?: string;
completed: boolean;
constructor(todo: ITodo) {
this.id = todo.id;
this.title = todo.title;
this.description = todo.description;
this.completed = todo.completed;
}
}
接著,建立 todo.repository.ts
來模擬實際操作資料庫會使用到的 Repository:
import { Injectable } from '@nestjs/common';
import { ITodo, Todo } from '../models';
export interface CreateTodo {
title: string;
description?: string;
}
@Injectable()
export class TodoRepository {
private readonly todos: Array<ITodo> = [];
async create(params: CreateTodo) {
const todo: ITodo = {
...params,
id: crypto.randomUUID(),
completed: false,
};
this.todos.push(todo);
return new Todo(todo);
}
async findById(id: string) {
const item = this.todos.find((todo) => todo.id === id);
if (!item) {
return null;
}
return new Todo(item);
}
}
注意:記得將
TodoRepository
放入AppModule
的providers
中。
NestJS 處理 Command 的方式是設計 Command Handler 來接收經由 CommandBus
發送的 Command,再根據 Command 要處理的任務來撰寫相關邏輯。
而一個 Command 實際上就是一個實作 ICommand
介面的 class
。下方為範例程式碼,建立了一個用來建立 Todo 的 CreateTodoCommand
:
import { ICommand } from '@nestjs/cqrs';
export class CreateTodoCommand implements ICommand {
constructor(
public readonly title: string,
public readonly description?: string
) {}
}
那 Command Handler 要如何實作呢?它會是一個使用 @CommandHandler
裝飾器、實作 ICommandHandler
介面的 Provider。下方是範例程式碼,在 @CommandHandler
裝飾器帶入 CreateTodoCommand
,表示該 Command Handler 是專門用來處理該 Command 用的,因為實作了 ICommandHandler
,需要實作 execute
方法,該方法會實作建立 Todo 的商業邏輯:
import { CommandHandler, ICommandHandler } from '@nestjs/cqrs';
import { CreateTodoCommand } from './create-todo.command';
import { Todo } from '../../models';
import { TodoRepository } from '../../repositories';
@CommandHandler(CreateTodoCommand)
export class CreateTodoHandler
implements ICommandHandler<CreateTodoCommand, Todo>
{
constructor(private readonly todoRepository: TodoRepository) {}
execute(command: CreateTodoCommand): Promise<Todo> {
return this.todoRepository.create({
title: command.title,
description: command.description,
});
}
}
注意:因為 Command Handler 本身也是 Provider,故需要掛在 Module 上,這裡可以把
CreateTodoHandler
放入AppModule
的providers
中。
那麼 CommandBus
的部分要如何使用呢?下方是範例程式碼,修改 AppController
的內容,實作 createTodo
方法來執行 CommandBus
的 execute
方法,參數即 CreateTodoCommand
實例,並將 execute
方法的回傳值當作該 API 的回傳值,也就是 CreateTodoHandler
執行完產生的 Todo
:
import { Body, Controller, Post } from '@nestjs/common';
import { CommandBus } from '@nestjs/cqrs';
import { CreateTodo } from './repositories';
import { CreateTodoCommand } from './commands/create-todo';
import { Todo } from './models';
@Controller()
export class AppController {
constructor(private readonly commandBus: CommandBus) {}
@Post('todos')
createTodo(@Body() payload: CreateTodo) {
return this.commandBus.execute<CreateTodoCommand, Todo>(
new CreateTodoCommand(payload.title, payload.description)
);
}
使用 Postman 進行測試,透過 POST
方法存取 http://localhost:3000/api/todos 並使用下方內容作為 Payload,會得到建立後的 Todo 資訊:
{
"title": "Test",
"description": "Testing"
}
Query 的實現方式與 Command 大同小異,會由 Query Handler 接收來自 QueryBus
的 Query,再根據 Query 要查詢的內容進行查詢。
Query 實際上是一個實作 IQuery
介面的 class
。下方是範例程式碼,建立一個以 ID 查詢 Todo 的 Query:
import { IQuery } from '@nestjs/cqrs;
export class FindTodoByIdQuery implements IQuery {
constructor(public readonly id: string) {}
}
Query Handler 會是一個使用 @QueryHandler
裝飾器、實作 IQueryHandler
介面的 Provider。下方是範例程式碼,在 @QueryHandler
裝飾器帶入 FindTodoByIdQuery
,表示該 Query Handler 是專門用來處理該 Query 用的,因為實作了 IQueryHandler
,需要實作 execute
方法,該方法會實作以指定 ID 查詢 Todo 的商業邏輯:
import { IQueryHandler, QueryHandler } from '@nestjs/cqrs';
import { FindTodoByIdQuery } from './find-todo-by-id.query';
import { Todo } from '../../models';
import { TodoRepository } from '../../repositories';
@QueryHandler(FindTodoByIdQuery)
export class FindTodoByIdHandler
implements IQueryHandler<FindTodoByIdQuery, Todo | null>
{
constructor(private readonly todoRepository: TodoRepository) {}
execute(query: FindTodoByIdQuery): Promise<Todo | null> {
return this.todoRepository.findById(query.id);
}
}
注意:因為 Query Handler 本身也是 Provider,故需要掛在 Module 上,這裡可以把
FindTodoByIdHandler
放入AppModule
的providers
中。
那麼 QueryBus
要如何使用呢?下方是範例程式碼,修改 AppController
的內容,實作 findTodoById
方法來執行 QueryBus
的 execute
方法,參數即 FindTodoByIdQuery
實例,並將 execute
方法的回傳值當作該 API 的回傳值,也就是 FindTodoByIdHandler
執行完找到的 Todo
:
// ...
import { CommandBus, QueryBus } from '@nestjs/cqrs';
import { Todo } from './models';
import { FindTodoByIdQuery } from './queries/find-todo-by-id';
@Controller()
export class AppController {
constructor(
// ...
private readonly queryBus: QueryBus
) {}
// ...
@Get('todos/:id')
findTodoById(@Param('id') id: string) {
return this.queryBus.execute<FindTodoByIdQuery, Todo | null>(new FindTodoByIdQuery(id));
}
}
使用 Postman 進行測試,透過 GET
方法存取 http://localhost:3000/api/todos/<TODO_ID> 並將 <TODO_ID>
換成建立後的 Todo ID,如此一來,便可以獲得該 Todo 資訊:
Event 的處理方式也跟 Command 差不多,需要透過 Event Handler 來接收經由 EventBus
發送的 Event,再根據 Event 要處理的任務來撰寫相關邏輯。
Event 實際上是一個實作 IEvent
介面的 class
。下方是範例程式碼,建立一個用來通知 Todo 建立完成的 Event:
import { IEvent } from '@nestjs/cqrs';
import { ITodo } from '../../models';
export class CreatedTodoEvent implements IEvent {
constructor(public readonly todo: ITodo) {}
}
Event Handler 會是一個使用 @EventHandler
裝飾器、實作 IEventHandler
介面的 Provider。下方是範例程式碼,在 @EventHandler
裝飾器帶入 CreatedTodoEvent
,表示該 Event Handler 是專門用來處理該 Event 用的,因為實作了 IEventHandler
,需要實作 handle
方法,該方法會實作 Todo 建立完之後要執行的相關商業邏輯:
import { EventsHandler, IEventHandler } from '@nestjs/cqrs';
import { CreatedTodoEvent } from './created-todo.event';
@EventsHandler(CreatedTodoEvent)
export class CreatedTodoHandler implements IEventHandler<CreatedTodoEvent> {
handle(event: CreatedTodoEvent) {
console.log('Todo', event.todo.title);
}
}
注意:因為 Event Handler 本身也是 Provider,故需要掛在 Module 上,這裡可以把
CreatedTodoHandler
放入AppModule
的providers
中。
那麼 EventBus
要如何使用呢?下方是範例程式碼,修改 CreateTodoHandler
的內容,在建立完 Todo 之後、回傳 Todo
之前,執行 EventBus
的 publish
方法,帶入的參數即 CreatedTodoEvent
實例:
import { CommandHandler, EventBus, ICommandHandler } from '@nestjs/cqrs';
import { CreateTodoCommand } from './create-todo.command';
import { Todo } from '../../models';
import { TodoRepository } from '../../repositories';
import { CreatedTodoEvent } from '../../events/created-todo';
@CommandHandler(CreateTodoCommand)
export class CreateTodoHandler
implements ICommandHandler<CreateTodoCommand, Todo>
{
constructor(
private readonly todoRepository: TodoRepository,
private readonly eventBus: EventBus
) {}
async execute(command: CreateTodoCommand): Promise<Todo> {
const todo = await this.todoRepository.create({
title: command.title,
description: command.description,
});
this.eventBus.publish(new CreatedTodoEvent(todo));
return todo;
}
}
使用 Postman 進行測試,透過 POST
方法存取 http://localhost:3000/api/todos 並使用下方內容作為 Payload,會在終端機看到該筆 Todo 的 title
:
{
"title": "Test",
"description": "Testing"
}
除了使用 EventBus
來傳送 Event 外,還可以透過 Model 來發送,不過在使用上需要額外處理。NestJS 有提供 AggregateRoot
這個 class
讓 Model 來繼承,透過它可以讓 Model 有處理 Event 的能力,它提供的方法如下:
從上方方法可以看出,要透過 Model 來管理、發送事件,需要手動呼叫 apply
將 Event 暫存起來,再透過 commit
將 Event 發送出去,假如要自動完成這個步驟的話,AggregateRoot
有提供 autoCommit
這個 setter
讓每次 apply
的 Event 都立即執行 commit
。
接下來要實際示範一次如何運用 Model 來處理 Event,會以「完成 Todo」為例來進行實作。下方是範例程式碼,建立 CompletedTodoEvent
供後續發送 Event 使用:
import { IEvent } from '@nestjs/cqrs';
import { ITodo } from '../../models';
export class CompletedTodoEvent implements IEvent {
constructor(public readonly todo: ITodo) {}
}
接著,建立 CompletedTodoHandler
來接收 CompletedTodoEvent
,並在收到後,將 todo
的 title
印出來:
import { EventsHandler, IEventHandler } from '@nestjs/cqrs';
import { CompletedTodoEvent } from './completed-todo.event';
@EventsHandler(CompletedTodoEvent)
export class CompletedTodoHandler implements IEventHandler<CompletedTodoEvent> {
handle(event: CompletedTodoEvent) {
console.log('todo completed.', event.todo.title);
}
}
進入重點部分,修改 Todo
的內容,讓它繼承 AggregateRoot
並實作 complete
方法,在這裡透過 apply
方法將 CompletedTodoEvent
實例放入 Uncommitted Event 列表中:
import { AggregateRoot } from '@nestjs/cqrs';
import { CompletedTodoEvent } from '../events/completed-todo';
// ...
export class Todo extends AggregateRoot implements ITodo {
// ...
complete() {
this.completed = true;
this.apply(new CompletedTodoEvent(this));
}
}
調整 TodoRepository
的內容,新增 completeById
方法,在這裡模擬更新 Todo 的行為,將結果用來建立 Todo
實例,並呼叫其 complete
方法進而將 CompletedTodoEvent
放入 Uncommitted Event 列表中:
// ...
@Injectable()
export class TodoRepository {
// ...
async completeById(id: string) {
const todo = this.todos.find((todo) => todo.id === id);
if (!todo) {
return null;
}
todo.completed = true;
const domain = new Todo(todo);
domain.complete();
return domain;
}
}
接下來要建立「完成 Todo」的 API,首先,定義 CompleteTodoCommand
:
import { ICommand } from '@nestjs/cqrs';
export class CompleteTodoCommand implements ICommand {
constructor(public readonly id: string) {}
}
再來要建立 CompleteTodoHandler
,透過 TodoRepository
的 completeById
方法達成,並呼叫回傳的 Todo
實例的 commit
將 Uncommitted 的 CompletedTodoEvent
送出,不過在送出前、後,可以將 Uncommitted Event 印出來進行觀察:
import { TodoRepository } from './../../repositories';
import { CommandHandler, EventPublisher, ICommandHandler } from '@nestjs/cqrs';
import { CompleteTodoCommand } from './complete-todo.command';
@CommandHandler(CompleteTodoCommand)
export class CompleteTodoHandler
implements ICommandHandler<CompleteTodoCommand, void>
{
constructor(
private readonly todoRepository: TodoRepository,
private readonly eventPublisher: EventPublisher
) {}
async execute(command: CompleteTodoCommand): Promise<void> {
const todo = await this.todoRepository.completeById(command.id);
this.eventPublisher.mergeObjectContext(todo);
console.log('Uncommit Events1', todo?.getUncommittedEvents());
todo?.commit();
console.log('Uncommit Events2', todo?.getUncommittedEvents());
}
}
最後,修改 AppController
的內容,設計 completeTodoById
方法來發送 CompleteTodoCommand
:
// ...
import { CompleteTodoCommand } from './commands/complete-todo';
@Controller()
export class AppController {
// ...
@HttpCode(HttpStatus.NO_CONTENT)
@Post('todos/:id[:]complete')
completeTodoById(@Param('id') id: string) {
return this.commandBus.execute<CompleteTodoCommand, void>(
new CompleteTodoCommand(id)
);
}
}
注意:要記得將
CompleteTodoHandler
與CompletedTodoHandler
放入AppModule
的providers
中。
使用 Postman 建立一筆 Todo 後,透過 POST
方法存取 http://localhost:3000/api/todos/<TODO_ID>:complete,其中 <TODO_ID>
置換成建立的 Todo ID,會在終端機看到 Uncommitted Event 的變化與完成的 Todo:
Event Handler 發生異常時,無法 被 Exception Filter 捕捉,只能夠手動處理,比如:try...catch
。NestJS 針對這個問題提供了相關解法,當異常發生時,EventBus
會將錯誤整理成下方格式的訊息,並將該訊息發送至 UnhandledExceptionBus
:
export interface UnhandledExceptionInfo<Cause = IEvent | ICommand, Exception = any> {
/**
* The exception that was thrown.
*/
exception: Exception;
/**
* The cause of the exception (event or command reference).
*/
cause: Cause;
}
如果要接收錯誤訊息並針對錯誤進行處理,可以注入 UnhandledExceptionBus
,它是一個 Observable
,只需要透過 subscribe
進行訂閱即可。下方是範例程式碼,修改 CompletedTodoHandler
,在 handle
直接拋出 CompletedTodoHandleException
,並在 constructor
直接訂閱 UnhandledExceptionBus
:
import {
EventsHandler,
IEventHandler,
UnhandledExceptionBus,
} from '@nestjs/cqrs';
import { OnModuleDestroy } from '@nestjs/common';
import { Subject, takeUntil } from 'rxjs';
// ...
class CompletedTodoHandleException extends Error {
constructor(message: string) {
super(message);
}
}
@EventsHandler(CompletedTodoEvent)
export class CompletedTodoHandler
implements IEventHandler<CompletedTodoEvent>, OnModuleDestroy
{
private readonly destroy$ = new Subject<void>();
constructor(private readonly unhandledExceptionBus: UnhandledExceptionBus) {
this.unhandledExceptionBus
.pipe(takeUntil(this.destroy$))
.subscribe((info) => {
console.log('Unhandled exception!', info);
});
}
handle(event: CompletedTodoEvent) {
throw new CompletedTodoHandleException('Testing...');
console.log('todo completed.', event.todo.title);
}
onModuleDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}
使用 Postman 建立一筆 Todo 後,透過 POST
方法存取 http://localhost:3000/api/todos/<TODO_ID>:complete,其中 <TODO_ID>
置換成建立的 Todo ID,會在終端機看到未處理的 Exception:
但這樣的處理方式是針對 全部 Event Handler 的異常做處理,如果要針對特定異常做處理,可以使用 ofType
來過濾 Exception。下方是範例程式碼,使用 ofType
過濾 Exception 為 CompletedTodoHandleException
的異常,並將 handle
內拋出的錯誤換成 Error
:
import {
// ...
ofType,
} from '@nestjs/cqrs';
// ...
@EventsHandler(CompletedTodoEvent)
export class CompletedTodoHandler
implements IEventHandler<CompletedTodoEvent>, OnModuleDestroy
{
private readonly destroy$ = new Subject<void>();
constructor(private readonly unhandledExceptionBus: UnhandledExceptionBus) {
this.unhandledExceptionBus
.pipe(ofType(CompletedTodoHandleException), takeUntil(this.destroy$))
.subscribe((info) => {
console.log('Unhandled exception!', info);
});
}
handle(event: CompletedTodoEvent) {
throw new Error('Testing...');
console.log('todo completed.', event.todo.title);
}
// ...
}
使用 Postman 建立一筆 Todo 後,透過 POST
方法存取 http://localhost:3000/api/todos/<TODO_ID>:complete,其中 <TODO_ID>
置換成建立的 Todo ID,終端機不會印出相關資訊:
回顧一下本篇的重點,在一開始先提及微服務在查詢上所面臨的挑戰,雖然 API Composition 可以達成跨服務的查詢,但在較複雜的查詢情境可能會面臨成本過於高昂與降低服務可用性的問題,於是 CQRS 會是一個複雜查詢下的選擇。CQRS 是一種軟體設計模式,不僅可以應用於單一服務的設計,也可以將維度上升至整個系統,進一步套用在微服務架構上,將讀寫的資料庫分離,並針對頁面所需的資料額外快照成另一個 Read Model - Materialized View。文章中運用 NestJS 的 CQRS Module 來撰寫基於 CQRS 模式的應用程式,進而讓大家更加理解 CQRS 的概念,以及理解在 NestJS 可以怎麼實作 CQRS。運用 CQRS Module 可以很直觀地撰寫 Command、Query、Event 以及對應的 Handler,將職責劃分清楚,提升整體架構的可維護性。
下一篇將用 CQRS 來實現微服務下的高效查詢,敬請期待!