iT邦幫忙

2024 iThome 鐵人賽

DAY 29
0
Software Development

用 NestJS 闖蕩微服務!系列 第 29

[用NestJS闖蕩微服務!] DAY29 - CQRS (下)

  • 分享至 

  • xImage
  •  

微服務下的高效查詢

前一篇文章有提到,在微服務架構下,可以運用 CQRS 的概念來實現 查詢專用服務,比如:查詢訂單頁面的專用服務。這類型的服務僅提供使用者進行 Query 不會提供 Command 操作,而它的存在,主要是用來解決複雜、跨多個服務的查詢問題,讓查詢本身變得單純,進而加速查詢的處理速度。在這樣的設計下,使用者如果要針對資料做新增、修改、刪除,就會向該領域的服務發送請求,並在該操作實現時,發送相關事件讓訂閱其事件的查詢專用服務可以知道有這筆異動,進而針對自身保有的資料做處理,產生只能讀取的 Read Model,而這種預算的查詢結果又稱 Materialized View。

Microservices CQRS Result

以這種方式進行設計具體來說有以下幾個優點:

  • 高效查詢:由於查詢專用服務會透過事件來預先取得其他服務的資訊,當使用者向它發送查詢請求時,只需要從自身資料庫中取出預先處理好的資料即可,大幅減少跨服務查詢的成本。
  • 實現多樣化查詢:在某些查詢條件下,依靠單一儲存結構的資料是很有挑戰性的,但如果使用 CQRS 可以針對不同種查詢設計不同的 Read Model,從而避免這個限制。
  • 關注點分離:可以將服務之間的 Query、Command 行為拆分得更乾淨,降低維護的困難度。

當然這個設計也有幾個缺點:

  • 開發與維運成本提高:需要額外開發查詢專用服務外,還增加了維運服務、資料庫等成本。
  • 僅保證最終一致性:當更新了某筆資料時,該領域服務需要發送事件給查詢專用服務,該服務再根據情況更新自身持有的資料,如果使用者在還沒更新查詢專用服務的資料時就進行查詢,那麼就會得到過時的資料。當然這個問題是可以透過一些手段讓使用者不會感到困惑,比如:客戶端先針對結果進行快取,確認查詢專用服務的資料已經是最新的時候,才使用該服務提供的資料。

用 NestJS 實現查詢專用服務

接下來會使用 NestJS 來實現極簡版的查詢專用服務,該服務會提供使用者查詢訂單頁面的資訊,所以又稱訂單查詢服務,而它會依賴於以下幾個服務:

  • 訂單服務:建立訂單、更新訂單的領域服務,會在建立、更新訂單時發送事件。
  • 商品服務:處理「減少可銷售庫存」的領域服務,會在減少可銷售庫存時發送事件。
  • 物流服務:建立物流單、更新物流單的領域服務,會在建立、更新物流單時發送事件。

整體情境流程如下:

  1. 使用者向訂單服務發送建立訂單請求。
  2. 訂單服務建立訂單後,會向商品服務請求減少可銷售庫存,同時發送 Topic 為 order.created 的事件讓訂單查詢服務接收並建立 Materialized View。
  3. 商品服務減少可銷售庫存後,會向訂單服務通知減少成功,同時發送 Topic 為 product.decreased 的事件讓訂單查詢服務接收並更新 Materialized View。
  4. 訂單服務將狀態進行調整,並向物流服務發送建立物流單的請求,同時發送 Topic 為 order.updated 的事件讓訂單查詢服務接收並更新 Materialized View。
  5. 物流服務建立物流單後,發送 Topic 為 logistics.created 的事件讓訂單查詢服務接收並更新 Materialized View。
  6. 物流服務更新物流單狀態,並發送 Topic 為 logistics.updated 的事件讓訂單查詢服務接收並更新 Materialized View。

注意:接下來的實作內容會使用 Kafka 作為微服務應用程式溝通的媒介,相關教學可以參考前面介紹的 Kafka Transporter

注意:接下來的內容為了簡化實作邏輯將 不會 使用真正的資料庫,也 不會 把上述的三個服務建置起來,主要是將查詢專用服務的實現邏輯呈現出來,若有需要可以自行實作呦。

實作 Shared Domain Library

為了方便我們取用不同 Domain 的 Type,這裡透過 Nx 建立了名為 domain 的 Library 來保存訂單、商品、物流單以及訂單頁面資訊的 type

注意:針對 Domain Type 要如何透過 Library 進行管理有許多方式,我會建議依照 Domain 性質各自拆分不同的 Library,比如:order-domainlogistics-domainproduct-domainorder-view-domain。不過因為這篇文章的重點是在實現 Materialized View,所以就統一放在 domain Library 中,避免失焦。

下方是範例程式碼,在 domain Library 新增 logistics.ts 並宣告物流單相關的 type

export type LogisticsOrderId = string;
export type LogisticsOrderStatus =
  | 'preparing'
  | 'shipping'
  | 'fulfilled'
  | 'cancelled';

export type LogisticsOrder = {
  id: LogisticsOrderId;
  address: string;
  status: LogisticsOrderStatus;
};

新增 product.ts 並宣告商品的 type

export type ProductId = string;

export type Product = {
  id: Product;
  name: string;
  price: number;
};

新增 order.ts 並宣告訂單相關的 type

export type OrderId = string;
export type OrderStatus = 'pending' | 'ongoing' | 'done' | 'cancelled';

export type Order = {
  id: OrderId;
  owner: string;
  status: OrderStatus;
  details: Array<OrderDetail>;
  logistics: OrderLogisticsDetail | null;
};

export type OrderDetail = {
  productId: string;
  count: number;
};

export type OrderLogisticsDetail = {
  logisticsOrderId: string;
};

新增 order-view.ts 並宣告訂單頁面資訊的 type,這裡需要特別注意,由於建立訂單的流程不會立即完成減少可銷售庫存,對於訂單服務來說並不會持有商品服務的邏輯,所以在訂單建立時bdetails 會是 Array<OrderDetail>,直到商品服務完成減少可銷售庫存,訂單查詢服務才會透過 Message Broker 取得來自商品服務的事件,進而將 details 替換成 Array<OrderViewDetail>

import { LogisticsOrder } from './logistics';
import { OrderDetail, OrderId, OrderStatus } from './order';
import { Product } from './product';

export type OrderView = {
  orderId: OrderId;
  owner: string;
  status: OrderStatus;
  details: Array<OrderViewDetail> | Array<OrderDetail>;
  logistics: LogisticsOrder | null;
};

export type OrderViewDetail = {
  product: Product;
  count: number;
};

新增 message.ts 並宣告非同步事件的 type,這些事件都會以 messageId 作為 RelationID,以便更新對應的商品頁面資訊:

import { LogisticsOrder } from './logistics';
import { Order } from './order';
import { Product } from './product';

export type Message<T> = { data: T; messageId: string };

export type OrderCreatedMessage = Message<Order>;
export type ProductDecreasedMessage = Message<
  Array<{ product: Product; count: number }>
>;
export type OrderUpdatedMessage = Message<Order>;
export type LogisticsOrderCreatedMessage = Message<LogisticsOrder>;
export type LogisticsOrderUpdatedMessage = Message<LogisticsOrder>;

最後,在 index.ts 匯出這些檔案的內容:

export * from './lib/order';
export * from './lib/product';
export * from './lib/logistics';
export * from './lib/order-view';
export * from './lib/message';

實作 Order View Service

透過 Nx 建立一個名為 order-view-service 的 Application。首先,修改 main.ts 的內容,這裡要運用 connectMicroservice 的方式來建立微服務應用程式,原因是這個服務必須同時提供查詢功能的 API:

import { Logger } from '@nestjs/common';
import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { AppModule } from './app/app.module';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  const microservice = app.connectMicroservice<MicroserviceOptions>({
    transport: Transport.KAFKA,
    options: {
      client: {
        brokers: ['localhost:9094'],
      },
    },
  });
  const globalPrefix = 'api';
  app.setGlobalPrefix(globalPrefix);
  const port = process.env.PORT || 3000;
  await microservice.listen();
  await app.listen(port);
  Logger.log(
    `🚀 Application is running on: http://localhost:${port}/${globalPrefix}`
  );
}
bootstrap();

接著,設計一個 OrderViewRepository 來模擬操作資料庫,共需要提供用來建立 OrderViewcreate、取得 OrderViewgetByOrderId、更新 statusupdateStatusByOrderId、更新 detailsupdateDetailsByOrderId 以及更新 logisticsupdateLogisticsByOrderId

import { Injectable } from '@nestjs/common';
import {
  LogisticsOrder,
  OrderDetail,
  OrderId,
  OrderStatus,
  OrderView,
  OrderViewDetail,
} from '@nestjs-microservices/DAY29/domain';

export type CreateOrderView = {
  orderId: string;
  status: OrderStatus;
  owner: string;
  details: Array<OrderDetail>;
};

@Injectable()
export class OrderViewRepository {
  private readonly orderViews: Array<OrderView> = [];

  async create(params: CreateOrderView) {
    const orderView: OrderView = {
      orderId: params.orderId,
      status: params.status,
      owner: params.owner,
      details: params.details,
      logistics: null,
    };
    this.orderViews.push(orderView);
    return orderView;
  }

  async getByOrderId(orderId: OrderId) {
    return (
      this.orderViews.find((orderView) => orderView.orderId === orderId) ?? null
    );
  }

  async updateStatusByOrderId(orderId: OrderId, status: OrderStatus) {
    const orderView = this.orderViews.find(
      (orderView) => orderView.orderId === orderId
    );
    if (!orderView) {
      return null;
    }
    orderView.status = status;
    return orderView;
  }

  async updateDetailsByOrderId(
    orderId: OrderId,
    details: Array<OrderViewDetail>
  ) {
    const orderView = this.orderViews.find(
      (orderView) => orderView.orderId === orderId
    );
    if (!orderView) {
      return null;
    }
    orderView.details = details;
    return orderView;
  }

  async updateLogisticsByOrderId(orderId: OrderId, logistics: LogisticsOrder) {
    const orderView = this.orderViews.find(
      (orderView) => orderView.orderId === orderId
    );
    if (!orderView) {
      return null;
    }
    orderView.logistics = logistics;
    return orderView;
  }
}

OrderViewRepository 放入 OrderViewRepositoryModuleproviders 內並匯出:

import { Module } from '@nestjs/common';
import { OrderViewRepository } from './order-view.repository';

@Module({
  providers: [OrderViewRepository],
  exports: [OrderViewRepository],
})
export class OrderViewRepositoryModule {}

接著,設計 EventReceiverController 來接收訂單服務、商品服務與物流服務的事件,進而建立、修改訂單資訊頁面的資料:

import { Controller, Logger } from '@nestjs/common';
import { EventPattern, Payload } from '@nestjs/microservices';
import {
  LogisticsOrderCreatedMessage,
  LogisticsOrderUpdatedMessage,
  OrderCreatedMessage,
  OrderUpdatedMessage,
  ProductDecreasedMessage,
} from '@nestjs-microservices/DAY29/domain';
import { OrderViewRepository } from '../repositories';

@Controller()
export class EventReceiverController {
  constructor(private readonly orderViewRepository: OrderViewRepository) {}

  @EventPattern('order.created')
  async onOrderCreated(@Payload() message: OrderCreatedMessage) {
    await this.orderViewRepository.create({
      orderId: message.data.id,
      status: message.data.status,
      owner: message.data.owner,
      details: message.data.details,
    });
    Logger.log('Order View Created!');
  }

  @EventPattern('product.decreased')
  async onProductDecreased(@Payload() message: ProductDecreasedMessage) {
    await this.orderViewRepository.updateDetailsByOrderId(
      message.messageId,
      message.data
    );
    Logger.log('Order View Details Updated!');
  }

  @EventPattern('order.updated')
  async onOrderUpdated(@Payload() message: OrderUpdatedMessage) {
    const status = message.data.status;
    await this.orderViewRepository.updateStatusByOrderId(
      message.data.id,
      status
    );
    Logger.log('Order View Status Updated!');
  }

  @EventPattern('logistics.created')
  async onLogisticsCreated(@Payload() message: LogisticsOrderCreatedMessage) {
    await this.orderViewRepository.updateLogisticsByOrderId(
      message.messageId,
      message.data
    );
    Logger.log('Order View Logistics Created!');
  }

  @EventPattern('logistics.updated')
  async onLogisticsUpdated(@Payload() message: LogisticsOrderUpdatedMessage) {
    await this.orderViewRepository.updateLogisticsByOrderId(
      message.messageId,
      message.data
    );
    Logger.log('Order View Logistics Updated!');
  }
}

EventReceiverController 放入 EventReceiverModulecontrollers 內,並匯入 OrderViewRepositoryModule 以便取用 OrderViewRepository

import { Module } from '@nestjs/common';
import { EventReceiverController } from './event-receiver.controller';
import { OrderViewRepositoryModule } from '../repositories';

@Module({
  imports: [OrderViewRepositoryModule],
  controllers: [EventReceiverController],
})
export class EventReceiverModule {}

調整 AppModule 的內容,將 EventReceiverModuleOrderViewRepositoryModule 同時匯入,待會會需要在 AppController 實作取得訂單頁面資訊的 API:

import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { EventReceiverModule } from './event-receiver';
import { OrderViewRepositoryModule } from './repositories';

@Module({
  imports: [EventReceiverModule, OrderViewRepositoryModule],
  controllers: [AppController],
})
export class AppModule {}

修改 AppController 的內容,設計 getOrderViewByOrderId 方法來透過訂單 ID 取得訂單頁面資訊:

import { Controller, Get, Param } from '@nestjs/common';
import { OrderViewRepository } from './repositories';

@Controller()
export class AppController {
  constructor(private readonly orderViewRepository: OrderViewRepository) {}

  @Get('orders/:id')
  getOrderViewByOrderId(@Param('id') id: string) {
    return this.orderViewRepository.getByOrderId(id);
  }
}

注意:在啟動 order-view-service 之前,請先使用 DAY8 文中提到建立 Topic 的指令將訂閱的 Topic 都先建立起來。

整合測試

由於將焦點放在實現 Materialized View 的訂單查詢服務,省去了其他服務的實作,這邊將會直接透過 bitnami/kafka 裡面的 Shell Script 來發送事件。打開終端機輸入下方指令啟動 Producer,並指定 order.created 為要寫入的 Topic:

$ docker run --network <NETWORK_NAME> --rm -it bitnami/kafka kafka-console-producer.sh --topic order.created --bootstrap-server <HOSTNAME>:9092

輸入下方內容來模擬訂單服務在建立訂單後所發送的事件:

{"data":{"id":"1","status":"pending","owner":"HAO","details":[{"productId":"a","count":10},{"productId":"b","count":5}],"logistics":null},"messageId":"1"}

退出當前的 Producer,並改成指定 product.decreased 為要寫入的 Topic:

$ docker run --network <NETWORK_NAME> --rm -it bitnami/kafka kafka-console-producer.sh --topic product.decreased --bootstrap-server <HOSTNAME>:9092

輸入下方內容來模擬商品服務在減少可銷售庫存後所發送的事件:

{"data":[{"product":{"id":"a","name":"NestJS book","price":500},"count":10},{"product":{"id":"b","name":"CD","price":300},"count":5}],"messageId":"1"}

退出當前的 Producer,並改成指定 order.updated 為要寫入的 Topic:

$ docker run --network <NETWORK_NAME> --rm -it bitnami/kafka kafka-console-producer.sh --topic order.updated --bootstrap-server <HOSTNAME>:9092

輸入下方內容來模擬訂單服務在更新訂單狀態後所發送的事件:

{"data":{"id":"1","status":"ongoing","owner":"HAO","details":[{"productId":"a","count":10},{"productId":"b","count":5}],"logistics":null},"messageId":"1"}

退出當前的 Producer,並改成指定 logistics.created 為要寫入的 Topic:

$ docker run --network <NETWORK_NAME> --rm -it bitnami/kafka kafka-console-producer.sh --topic logistics.created --bootstrap-server <HOSTNAME>:9092

輸入下方內容來模擬物流服務建立物流單後所發送的事件:

{"data":{"id":"abc-123","address":"test road","status":"preparing"},"messageId":"1"}

退出當前的 Producer,並改成指定 logistics.updated 為要寫入的 Topic:

$ docker run --network <NETWORK_NAME> --rm -it bitnami/kafka kafka-console-producer.sh --topic logistics.updated --bootstrap-server <HOSTNAME>:9092

輸入下方內容來模擬物流服務更改物流單狀態後所發送的事件:

{"data":{"id":"abc-123","address":"test road","status":"fulfilled"},"messageId":"1"}

最後,使用 Postman 透過 GET 方法存取 http://localhost:3000/api/orders/1,訂單資訊服務會以非常快的速度將事前準備好的 Materialized View 提供出去,因為對該服務而言,只是一個很簡單的查詢:

Materialized View Result

小結

回顧一下今天的重點內容,上篇已經掌握了 CQRS 的基本概念,這篇就將它應用在微服務架構下,並解釋要如何實作以及採用這個設計帶來的優劣。在介紹完之後,使用 NestJS 與 Kafka 實現了查詢專用服務,並將 Materialized View 的概念實作出來,也可以從實作的結果觀察到查詢的效益非常顯著。

本次的系列文將進入尾聲,明天的內容將會著重在本系列文的感言與總結,謝謝各位的支持!


上一篇
[用NestJS闖蕩微服務!] DAY28 - CQRS (上)
下一篇
[用NestJS闖蕩微服務!] DAY30 - 閉幕式
系列文
用 NestJS 闖蕩微服務!30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言