iT邦幫忙

2024 iThome 鐵人賽

DAY 28
0
Software Development

用 NestJS 闖蕩微服務!系列 第 28

[用NestJS闖蕩微服務!] DAY28 - CQRS (上)

  • 分享至 

  • xImage
  •  

微服務查詢的挑戰

在微服務架構下,資料會分散在各個服務的私有資料庫中,假如有個應用會需要從多個服務獲得資料並呈現在畫面上,勢必會面臨 如何查詢 的問題。前面在介紹 API Gateway 時有提到 API Composition 的概念,透過 API Gateway 將多個服務提供的資料進行彙整再一併提供給 Client,這樣的做法確實可以解決跨服務查詢的問題,但它並不是沒有缺點,使用 API Composition 來處理簡單、不複雜的查詢是一個很直觀的作法,但在複雜且跨越多個服務查詢的情況下,它會帶來以下幾個問題:

  • 查詢成本高昂:由於需要針對多個服務進行查詢,勢必會提高網路傳輸成本,還需要考慮執行查詢的先後順序、輸出結果的轉換等,相較於直接對結果做查詢來說成本高出許多。
  • 可用性降低:由於 API Composition 會依賴於多個服務,當依賴的服務越多,API Composition 會因為服務不可用而查詢失敗的可能性就會提高。

補充:雖然說可用性會隨著依賴的服務數量增加,但可以靠一些手段讓該次查詢不會直接失敗,例如:當有服務的資料不可用時,就使用 Cache 的資料來代替,或是直接讓該資料從這筆查詢中消失,如此一來,可以提高 API Composition 的可用性,但代價是 Cache 的資料可能是過時的,或是因為缺少資料導致查詢結果不如預期。

那麼在面對複雜的查詢時該怎麼處理呢?這裡就可以來聊聊 命令查詢職責分離模式(Command Query Responsibility Segregation, CQRS) 的概念了。

什麼是 CQRS?

CQRS 是由 命令查詢分離(Command Query Segregation) 延伸而來,那什麼是 CQS 呢?簡單來說,在撰寫程式的時候,會將改變狀態的行為歸類為 命令(Command)、將不會改變狀態僅做查詢的行為歸類為 查詢(Query)。下方為範例程式碼,可以看到 Order 這個 class 提供了改變 statuscancel 方法,該方法不會回傳任何狀態,因為 Command 專注在狀態改變本身,以及提供查詢 statusgetStatus 方法:

class Order {
  private _status = 'pending';

  cancel(): void {
    this._status = 'cancel';
  }

  getStatus(): string {
    return this._status;
  }
}

補充:雖然說 Command 應該要專注在狀態改變,如果要取得狀態應該從 Query 取得,但實務上還是有可能會讓 Command 回傳相關狀態的。

CQRS 基於這個思考模式將維度上升到 模型(Model)。下方為範例程式碼,可以看到將取消訂單這個會改變狀態的行為拆分出 CancelOrderCommand,以及查詢訂單狀態的 OrderStatusQuery

class CancelOrderCommand {
  execute(orderId: string): Promise<void> {
    // ...
  }
}

class OrderStatusQuery {
  execute(orderId: string): Promise<string> {
    // ...
  }
}

CQRS 的概念還可以將維度上升至整個系統,假如有一個購物系統的應用,它會需要有庫存管理的需求,當用戶下訂單時,需要建立訂單並減少庫存,同時要處理物流相關業務,如果是一個 Monolithic 的架構,套用了 CQRS 的概念我們可以將讀取的資料庫與寫入的資料庫進行分離,再透過一些 事件(Event) 將查詢訂單頁面所需的 Model 建立起來,這樣預先快照訂單頁面所需的 Model 又稱 具體化視觀表(Materialized View)

CQRS and materialized view concept

如果換成微服務架構的話,可以將服務拆分為:訂單服務、物流服務、商品服務以及 訂單頁面服務,訂單頁面服務會接收來自其他服務的事件,藉由這些事件將訂單頁面所需的資訊預先快照起來,由訂單頁面服務來組成 Materialized View,也就是說,訂單頁面服務會是一個專門 Query 的服務,不會提供 Command 的操作:

microservices CQRS and materialized view concept

補充:以上的內容可能有點抽象,所以我們這篇將會使用 NestJS 來實作 CQRS,一方面是理解 CQRS 的設計,一方面是理解在 NestJS 要實現 CQRS 可以怎麼做。

用 NestJS 探索 CQRS

NestJS 官方有推出相關套件讓開發者可以輕易實現 CQRS。透過下方指令進行安裝:

$ npm install @nestjs/cqrs

事前準備

接下來介紹的內容會以 Todo 為中心來撰寫範例程式碼,故需要先為相關依賴撰寫基礎範例程式碼,首先,建立 todo.model.ts 來實作 Todo,該 class 即 Model:

export interface ITodo {
  id: string;
  title: string;
  description?: string;
  completed: boolean;
}

export class Todo implements ITodo {
  readonly id: string;
  title: string;
  description?: string;
  completed: boolean;

  constructor(todo: ITodo) {
    this.id = todo.id;
    this.title = todo.title;
    this.description = todo.description;
    this.completed = todo.completed;
  }
}

接著,建立 todo.repository.ts 來模擬實際操作資料庫會使用到的 Repository:

import { Injectable } from '@nestjs/common';
import { ITodo, Todo } from '../models';

export interface CreateTodo {
  title: string;
  description?: string;
}

@Injectable()
export class TodoRepository {
  private readonly todos: Array<ITodo> = [];

  async create(params: CreateTodo) {
    const todo: ITodo = {
      ...params,
      id: crypto.randomUUID(),
      completed: false,
    };
    this.todos.push(todo);
    return new Todo(todo);
  }

  async findById(id: string) {
    const item = this.todos.find((todo) => todo.id === id);
    if (!item) {
      return null;
    }
    return new Todo(item);
  }
}

注意:記得將 TodoRepository 放入 AppModuleproviders 中。

Command

NestJS 處理 Command 的方式是設計 Command Handler 來接收經由 CommandBus 發送的 Command,再根據 Command 要處理的任務來撰寫相關邏輯。

NestJS CQRS Command Concept

而一個 Command 實際上就是一個實作 ICommand 介面的 class。下方為範例程式碼,建立了一個用來建立 Todo 的 CreateTodoCommand

import { ICommand } from '@nestjs/cqrs';

export class CreateTodoCommand implements ICommand {
  constructor(
    public readonly title: string,
    public readonly description?: string
  ) {}
}

那 Command Handler 要如何實作呢?它會是一個使用 @CommandHandler 裝飾器、實作 ICommandHandler 介面的 Provider。下方是範例程式碼,在 @CommandHandler 裝飾器帶入 CreateTodoCommand,表示該 Command Handler 是專門用來處理該 Command 用的,因為實作了 ICommandHandler,需要實作 execute 方法,該方法會實作建立 Todo 的商業邏輯:

import { CommandHandler, ICommandHandler } from '@nestjs/cqrs';
import { CreateTodoCommand } from './create-todo.command';
import { Todo } from '../../models';
import { TodoRepository } from '../../repositories';

@CommandHandler(CreateTodoCommand)
export class CreateTodoHandler
  implements ICommandHandler<CreateTodoCommand, Todo>
{
  constructor(private readonly todoRepository: TodoRepository) {}

  execute(command: CreateTodoCommand): Promise<Todo> {
    return this.todoRepository.create({
      title: command.title,
      description: command.description,
    });
  }
}

注意:因為 Command Handler 本身也是 Provider,故需要掛在 Module 上,這裡可以把 CreateTodoHandler 放入 AppModuleproviders 中。

那麼 CommandBus 的部分要如何使用呢?下方是範例程式碼,修改 AppController 的內容,實作 createTodo 方法來執行 CommandBusexecute 方法,參數即 CreateTodoCommand 實例,並將 execute 方法的回傳值當作該 API 的回傳值,也就是 CreateTodoHandler 執行完產生的 Todo

import { Body, Controller, Post } from '@nestjs/common';
import { CommandBus } from '@nestjs/cqrs';
import { CreateTodo } from './repositories';
import { CreateTodoCommand } from './commands/create-todo';
import { Todo } from './models';

@Controller()
export class AppController {
  constructor(private readonly commandBus: CommandBus) {}

  @Post('todos')
  createTodo(@Body() payload: CreateTodo) {
    return this.commandBus.execute<CreateTodoCommand, Todo>(
      new CreateTodoCommand(payload.title, payload.description)
    );
  }

使用 Postman 進行測試,透過 POST 方法存取 http://localhost:3000/api/todos 並使用下方內容作為 Payload,會得到建立後的 Todo 資訊:

{
  "title": "Test",
  "description": "Testing"
}

Create Todo Result

Query

Query 的實現方式與 Command 大同小異,會由 Query Handler 接收來自 QueryBus 的 Query,再根據 Query 要查詢的內容進行查詢。

NestJS CQRS Query Concept

Query 實際上是一個實作 IQuery 介面的 class。下方是範例程式碼,建立一個以 ID 查詢 Todo 的 Query:

import { IQuery } from '@nestjs/cqrs;

export class FindTodoByIdQuery implements IQuery {
  constructor(public readonly id: string) {}
}

Query Handler 會是一個使用 @QueryHandler 裝飾器、實作 IQueryHandler 介面的 Provider。下方是範例程式碼,在 @QueryHandler 裝飾器帶入 FindTodoByIdQuery,表示該 Query Handler 是專門用來處理該 Query 用的,因為實作了 IQueryHandler,需要實作 execute 方法,該方法會實作以指定 ID 查詢 Todo 的商業邏輯:

import { IQueryHandler, QueryHandler } from '@nestjs/cqrs';
import { FindTodoByIdQuery } from './find-todo-by-id.query';
import { Todo } from '../../models';
import { TodoRepository } from '../../repositories';

@QueryHandler(FindTodoByIdQuery)
export class FindTodoByIdHandler
  implements IQueryHandler<FindTodoByIdQuery, Todo | null>
{
  constructor(private readonly todoRepository: TodoRepository) {}

  execute(query: FindTodoByIdQuery): Promise<Todo | null> {
    return this.todoRepository.findById(query.id);
  }
}

注意:因為 Query Handler 本身也是 Provider,故需要掛在 Module 上,這裡可以把 FindTodoByIdHandler 放入 AppModuleproviders 中。

那麼 QueryBus 要如何使用呢?下方是範例程式碼,修改 AppController 的內容,實作 findTodoById 方法來執行 QueryBusexecute 方法,參數即 FindTodoByIdQuery 實例,並將 execute 方法的回傳值當作該 API 的回傳值,也就是 FindTodoByIdHandler 執行完找到的 Todo

// ...
import { CommandBus, QueryBus } from '@nestjs/cqrs';
import { Todo } from './models';
import { FindTodoByIdQuery } from './queries/find-todo-by-id';

@Controller()
export class AppController {
  constructor(
    // ...
    private readonly queryBus: QueryBus
  ) {}
  // ...
  @Get('todos/:id')
  findTodoById(@Param('id') id: string) {
    return this.queryBus.execute<FindTodoByIdQuery, Todo | null>(new FindTodoByIdQuery(id));
  }
}

使用 Postman 進行測試,透過 GET 方法存取 http://localhost:3000/api/todos/<TODO_ID> 並將 <TODO_ID> 換成建立後的 Todo ID,如此一來,便可以獲得該 Todo 資訊:

Get Todo Result

Event

Event 的處理方式也跟 Command 差不多,需要透過 Event Handler 來接收經由 EventBus 發送的 Event,再根據 Event 要處理的任務來撰寫相關邏輯。

NestJS CQRS Event Concept

Event 實際上是一個實作 IEvent 介面的 class。下方是範例程式碼,建立一個用來通知 Todo 建立完成的 Event:

import { IEvent } from '@nestjs/cqrs';
import { ITodo } from '../../models';

export class CreatedTodoEvent implements IEvent {
  constructor(public readonly todo: ITodo) {}
}

Event Handler 會是一個使用 @EventHandler 裝飾器、實作 IEventHandler 介面的 Provider。下方是範例程式碼,在 @EventHandler 裝飾器帶入 CreatedTodoEvent,表示該 Event Handler 是專門用來處理該 Event 用的,因為實作了 IEventHandler,需要實作 handle 方法,該方法會實作 Todo 建立完之後要執行的相關商業邏輯:

import { EventsHandler, IEventHandler } from '@nestjs/cqrs';
import { CreatedTodoEvent } from './created-todo.event';

@EventsHandler(CreatedTodoEvent)
export class CreatedTodoHandler implements IEventHandler<CreatedTodoEvent> {
  handle(event: CreatedTodoEvent) {
    console.log('Todo', event.todo.title);
  }
}

注意:因為 Event Handler 本身也是 Provider,故需要掛在 Module 上,這裡可以把 CreatedTodoHandler 放入 AppModuleproviders 中。

那麼 EventBus 要如何使用呢?下方是範例程式碼,修改 CreateTodoHandler 的內容,在建立完 Todo 之後、回傳 Todo 之前,執行 EventBuspublish 方法,帶入的參數即 CreatedTodoEvent 實例:

import { CommandHandler, EventBus, ICommandHandler } from '@nestjs/cqrs';
import { CreateTodoCommand } from './create-todo.command';
import { Todo } from '../../models';
import { TodoRepository } from '../../repositories';
import { CreatedTodoEvent } from '../../events/created-todo';

@CommandHandler(CreateTodoCommand)
export class CreateTodoHandler
  implements ICommandHandler<CreateTodoCommand, Todo>
{
  constructor(
    private readonly todoRepository: TodoRepository,
    private readonly eventBus: EventBus
  ) {}

  async execute(command: CreateTodoCommand): Promise<Todo> {
    const todo = await this.todoRepository.create({
      title: command.title,
      description: command.description,
    });

    this.eventBus.publish(new CreatedTodoEvent(todo));

    return todo;
  }
}

使用 Postman 進行測試,透過 POST 方法存取 http://localhost:3000/api/todos 並使用下方內容作為 Payload,會在終端機看到該筆 Todo 的 title

{
  "title": "Test",
  "description": "Testing"
}

Created Todo Result1

Model-Driven Event

除了使用 EventBus 來傳送 Event 外,還可以透過 Model 來發送,不過在使用上需要額外處理。NestJS 有提供 AggregateRoot 這個 class 讓 Model 來繼承,透過它可以讓 Model 有處理 Event 的能力,它提供的方法如下:

  • apply:將欲發送的 Event 暫存起來,這些 Event 為 未提交(Uncommitted) 狀態。
  • commit:將 Uncommitted 的 Event 全部發送。
  • uncommit:取消所有 Event。
  • getUncommittedEvents:取得 Uncommitted 的所有 Event。
  • publish:發送指定的 Event。
  • publishAll:發送指定的複數 Event。

從上方方法可以看出,要透過 Model 來管理、發送事件,需要手動呼叫 apply 將 Event 暫存起來,再透過 commit 將 Event 發送出去,假如要自動完成這個步驟的話,AggregateRoot 有提供 autoCommit 這個 setter 讓每次 apply 的 Event 都立即執行 commit

接下來要實際示範一次如何運用 Model 來處理 Event,會以「完成 Todo」為例來進行實作。下方是範例程式碼,建立 CompletedTodoEvent 供後續發送 Event 使用:

import { IEvent } from '@nestjs/cqrs';
import { ITodo } from '../../models';

export class CompletedTodoEvent implements IEvent {
  constructor(public readonly todo: ITodo) {}
}

接著,建立 CompletedTodoHandler 來接收 CompletedTodoEvent,並在收到後,將 todotitle 印出來:

import { EventsHandler, IEventHandler } from '@nestjs/cqrs';
import { CompletedTodoEvent } from './completed-todo.event';

@EventsHandler(CompletedTodoEvent)
export class CompletedTodoHandler implements IEventHandler<CompletedTodoEvent> {
  handle(event: CompletedTodoEvent) {
    console.log('todo completed.', event.todo.title);
  }
}

進入重點部分,修改 Todo 的內容,讓它繼承 AggregateRoot 並實作 complete 方法,在這裡透過 apply 方法將 CompletedTodoEvent 實例放入 Uncommitted Event 列表中:

import { AggregateRoot } from '@nestjs/cqrs';
import { CompletedTodoEvent } from '../events/completed-todo';
// ...
export class Todo extends AggregateRoot implements ITodo {
  // ...
  complete() {
    this.completed = true;
    this.apply(new CompletedTodoEvent(this));
  }
}

調整 TodoRepository 的內容,新增 completeById 方法,在這裡模擬更新 Todo 的行為,將結果用來建立 Todo 實例,並呼叫其 complete 方法進而將 CompletedTodoEvent 放入 Uncommitted Event 列表中:

// ...
@Injectable()
export class TodoRepository {
  // ...
  async completeById(id: string) {
    const todo = this.todos.find((todo) => todo.id === id);
    if (!todo) {
      return null;
    }
    todo.completed = true;

    const domain = new Todo(todo);
    domain.complete();
    return domain;
  }
}

接下來要建立「完成 Todo」的 API,首先,定義 CompleteTodoCommand

import { ICommand } from '@nestjs/cqrs';

export class CompleteTodoCommand implements ICommand {
  constructor(public readonly id: string) {}
}

再來要建立 CompleteTodoHandler,透過 TodoRepositorycompleteById 方法達成,並呼叫回傳的 Todo 實例的 commit 將 Uncommitted 的 CompletedTodoEvent 送出,不過在送出前、後,可以將 Uncommitted Event 印出來進行觀察:

import { TodoRepository } from './../../repositories';
import { CommandHandler, EventPublisher, ICommandHandler } from '@nestjs/cqrs';
import { CompleteTodoCommand } from './complete-todo.command';

@CommandHandler(CompleteTodoCommand)
export class CompleteTodoHandler
  implements ICommandHandler<CompleteTodoCommand, void>
{
  constructor(
    private readonly todoRepository: TodoRepository,
    private readonly eventPublisher: EventPublisher
  ) {}

  async execute(command: CompleteTodoCommand): Promise<void> {
    const todo = await this.todoRepository.completeById(command.id);
    this.eventPublisher.mergeObjectContext(todo);
    console.log('Uncommit Events1', todo?.getUncommittedEvents());
    todo?.commit();
    console.log('Uncommit Events2', todo?.getUncommittedEvents());
  }
}

最後,修改 AppController 的內容,設計 completeTodoById 方法來發送 CompleteTodoCommand

// ...
import { CompleteTodoCommand } from './commands/complete-todo';

@Controller()
export class AppController {
  // ...
  @HttpCode(HttpStatus.NO_CONTENT)
  @Post('todos/:id[:]complete')
  completeTodoById(@Param('id') id: string) {
    return this.commandBus.execute<CompleteTodoCommand, void>(
      new CompleteTodoCommand(id)
    );
  }
}

注意:要記得將 CompleteTodoHandlerCompletedTodoHandler 放入 AppModuleproviders 中。

使用 Postman 建立一筆 Todo 後,透過 POST 方法存取 http://localhost:3000/api/todos/<TODO_ID>:complete,其中 <TODO_ID> 置換成建立的 Todo ID,會在終端機看到 Uncommitted Event 的變化與完成的 Todo:

Completed Todo Result

異常處理

Event Handler 發生異常時,無法 被 Exception Filter 捕捉,只能夠手動處理,比如:try...catch。NestJS 針對這個問題提供了相關解法,當異常發生時,EventBus 會將錯誤整理成下方格式的訊息,並將該訊息發送至 UnhandledExceptionBus

export interface UnhandledExceptionInfo<Cause = IEvent | ICommand, Exception = any> {
  /**
   * The exception that was thrown.
   */
  exception: Exception;
  /**
   * The cause of the exception (event or command reference).
   */
  cause: Cause;
}

如果要接收錯誤訊息並針對錯誤進行處理,可以注入 UnhandledExceptionBus,它是一個 Observable,只需要透過 subscribe 進行訂閱即可。下方是範例程式碼,修改 CompletedTodoHandler,在 handle 直接拋出 CompletedTodoHandleException,並在 constructor 直接訂閱 UnhandledExceptionBus

import {
  EventsHandler,
  IEventHandler,
  UnhandledExceptionBus,
} from '@nestjs/cqrs';
import { OnModuleDestroy } from '@nestjs/common';
import { Subject, takeUntil } from 'rxjs';
// ...

class CompletedTodoHandleException extends Error {
  constructor(message: string) {
    super(message);
  }
}

@EventsHandler(CompletedTodoEvent)
export class CompletedTodoHandler
  implements IEventHandler<CompletedTodoEvent>, OnModuleDestroy
{
  private readonly destroy$ = new Subject<void>();

  constructor(private readonly unhandledExceptionBus: UnhandledExceptionBus) {
    this.unhandledExceptionBus
      .pipe(takeUntil(this.destroy$))
      .subscribe((info) => {
        console.log('Unhandled exception!', info);
      });
  }

  handle(event: CompletedTodoEvent) {
    throw new CompletedTodoHandleException('Testing...');
    console.log('todo completed.', event.todo.title);
  }

  onModuleDestroy() {
    this.destroy$.next();
    this.destroy$.complete();
  }
}

使用 Postman 建立一筆 Todo 後,透過 POST 方法存取 http://localhost:3000/api/todos/<TODO_ID>:complete,其中 <TODO_ID> 置換成建立的 Todo ID,會在終端機看到未處理的 Exception:

Completed Todo Exception Result1

但這樣的處理方式是針對 全部 Event Handler 的異常做處理,如果要針對特定異常做處理,可以使用 ofType 來過濾 Exception。下方是範例程式碼,使用 ofType 過濾 Exception 為 CompletedTodoHandleException 的異常,並將 handle 內拋出的錯誤換成 Error

import {
  // ...
  ofType,
} from '@nestjs/cqrs';
// ...

@EventsHandler(CompletedTodoEvent)
export class CompletedTodoHandler
  implements IEventHandler<CompletedTodoEvent>, OnModuleDestroy
{
  private readonly destroy$ = new Subject<void>();

  constructor(private readonly unhandledExceptionBus: UnhandledExceptionBus) {
    this.unhandledExceptionBus
      .pipe(ofType(CompletedTodoHandleException), takeUntil(this.destroy$))
      .subscribe((info) => {
        console.log('Unhandled exception!', info);
      });
  }

  handle(event: CompletedTodoEvent) {
    throw new Error('Testing...');
    console.log('todo completed.', event.todo.title);
  }

  // ...
}

使用 Postman 建立一筆 Todo 後,透過 POST 方法存取 http://localhost:3000/api/todos/<TODO_ID>:complete,其中 <TODO_ID> 置換成建立的 Todo ID,終端機不會印出相關資訊:

Completed Todo Exception Result2

小結

回顧一下本篇的重點,在一開始先提及微服務在查詢上所面臨的挑戰,雖然 API Composition 可以達成跨服務的查詢,但在較複雜的查詢情境可能會面臨成本過於高昂與降低服務可用性的問題,於是 CQRS 會是一個複雜查詢下的選擇。CQRS 是一種軟體設計模式,不僅可以應用於單一服務的設計,也可以將維度上升至整個系統,進一步套用在微服務架構上,將讀寫的資料庫分離,並針對頁面所需的資料額外快照成另一個 Read Model - Materialized View。文章中運用 NestJS 的 CQRS Module 來撰寫基於 CQRS 模式的應用程式,進而讓大家更加理解 CQRS 的概念,以及理解在 NestJS 可以怎麼實作 CQRS。運用 CQRS Module 可以很直觀地撰寫 Command、Query、Event 以及對應的 Handler,將職責劃分清楚,提升整體架構的可維護性。

下一篇將用 CQRS 來實現微服務下的高效查詢,敬請期待!


上一篇
[用NestJS闖蕩微服務!] DAY27 - Saga (四)
下一篇
[用NestJS闖蕩微服務!] DAY29 - CQRS (下)
系列文
用 NestJS 闖蕩微服務!30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言