在上一篇文章我們大概看完整體的架構後,接下來我們接來說一下細節的部份,其中 domain event 這裡是重中只重,因為它影響到不同的 Bounded Context 的溝通,如果這裡沒處理好,很容易就會出問題。
然後這裡我們要來說實作很常碰到的主題:
要如何發送 Domain Event
然後這裡很容易找到幾個設計模式名詞,然後今天我們就是來講這 2 個 :
整體的架構圖如下,然後幾個元素說明一下:
不過這裡先說明一下,很多的時後有人也會在 Event Bus 中實作 Transactional Outbox 設計模式,或是單獨只使用 Event Bus 的都大有人在,但我這裡比較想分開來說明,因為這兩個目的是不一樣的 ~
Transactional Outbox 它是一種設計模式,它主要是為了解決一致性的問題,然後在 DDD 這一塊我們是想要用來處理 :
- Aggregate 的狀態更新了( 資料庫已更新 ),但是 Domain Event 發送失敗怎麼辦呢 ?
- Aggregate 的狀態更新失敗了 ( 資料庫 commit 時失敗 ),但是 Domain Event 已發送成功怎辦呢 ?
這個設計模式的處理概念如下:
下面這段就是 1、2。
import { Injectable } from '@nestjs/common';
import { OrderRepository } from './order.repository';
import { OutboxRepository } from './outbox.repository';
import { InjectConnection } from '@nestjs/mongoose';
import { Connection } from 'mongoose';
import { OrderAggregate } from './order.aggregate';
@Injectable()
export class OrderUseCase {
constructor(
private readonly orderRepository: OrderRepository,
private readonly outboxRepository: OutboxRepository,
@InjectConnection() private readonly connection: Connection,
) {}
async payOrder(orderId: number) {
const session = await this.connection.startSession();
session.startTransaction();
try {
const order = await this.orderRepository.findById(orderId);
const event = orderAggregate.paid();
await this.orderRepository.save(order)
await this.outboxRepository.saveEvent(event.eventName, event.payload);
await session.commitTransaction();
} catch (error) {
await session.abortTransaction();
throw error;
} finally {
session.endSession();
}
}
}
然後下面這一段就是 3 的範例碼。
import { Injectable } from '@nestjs/common';
import { OutboxRepository } from './outbox.repository';
import { EventBusService } from './event-bus.service';\
import { Cron } from '@nestjs/schedule';
@Injectable()
export class OutboxProcessorService {
constructor(
private readonly outboxRepository: OutboxRepository,
private readonly eventBusService: EventBusService,
) {}
// 每 10 秒輪詢一次 Outbox 表,處理未發送的事件
@Cron('*/10 * * * * *') // 這個 CronJob 每 10 秒執行一次
async processOutboxEvents() {
try {
const unprocessedEvents = await this.outboxRepository.findUnprocessedEvents();
for (const event of unprocessedEvents) {
await this.eventBusService.publishEvent(event.eventName, event.payload);
await this.outboxRepository.markEventAsProcessed(event._id);
}
} catch (error) {
console.error('Error processing outbox events:', error);
}
}
}
不過在實作時我一直有在想一件事,那就是 outboxRepository 這個要每一個 bounded context 都要有一個,還是共用一個就好,目前兩者都有好有壞:
目前會先傾向所有的 BC 共用 1 個,因為感覺比較好維護,而且如果太大設到期時間就好。
Event Bus 事實上就是 Pub-Sub 模式中有時後很常出現的 Broker,它的目的就是要讓 Publisher 與 Subscriber 不知道對方的存在,你要叫他 Event Broker 應該也是可以,我自已只是比較偏好 Event Bus。
然後它主要的職責在於:
這個地方就是上面 transactional outbox 中輪詢 outbox 資料表後,會來將 domain event 送到 eventBusService 就好。
import { Injectable } from '@golevelup/nestjs-rabbitmq';
@Injectable()
export class EventBusService {
constructor(private readonly rabbitMQService: RabbitMQService) {}
async publishEvent(eventName: string, payload: any): Promise<void> {
await this.rabbitMQService.publish(
'events_exchange',
eventName,
payload
);
}
}
然後這個就是事件訂閱者的地方,不過說來慚愧,在用 Decorator 實作時 @HandleEvent 時有碰到一些問題,還沒有解出來,所以這裡就先貼一下概念碼,主要就是來表達,它就是用這個方法來處理其它 Bounded Context 的事件。
// src/course/course.usecase.ts
import { Injectable } from '@nestjs/common';
import { HandleEvent } from '../common/decorators/handle-event.decorator';
import { OutboxRepository } from '../event-bus/outbox.repository';
@Injectable()
export class CourseUseCase {
@HandleEvent('OrderCreated')
async handleOrderCreated(eventPayload: any) {
console.log('Handling OrderCreated event:', eventPayload);
}
}
以發 domain event 的情境下不太建議,如果沒有用 transactional outbox 機制,你就要有重送 domain event 的機制,雖然不是說做不到,但就我的想法它的工應該是會比做 transactional outbox 還麻煩。
在我腦海中事實上這 Observer Patter 與 Pub-Sub Patter 兩個幾乎是一樣的東西沒錯,但仔細查了一下資料,我發現有個比較重要的差別:
但他們本質上都是相同的,它們的目的都是要:
用事件機制來解耦
然後 Event Bus 比較就是 Pub-Sub Pattern 中裡面的 Broker。
本篇文章中我們談到了如何將 Domain Event 發送到需要的 Bounded Context 手上,其中我們談到了 Transactional OutBox 與 Event Bus 這兩個東西,它們都是設計模式,或是某個設計模式中的東西。
不過認真的說這裡還是有很多設計細節可以想 ~ 之後在來慢慢補充,寫到 23 天真的有點疲了……