iT邦幫忙

2024 iThome 鐵人賽

DAY 23
0
Software Development

一個好的系統之好維護基本篇 ( 馬克版 )系列 第 23

Day-23: Domain Event 之 Transactional OutBox 與 EventBus

  • 分享至 

  • xImage
  •  

https://ithelp.ithome.com.tw/upload/images/20241008/2008935859RlsKxUtS.png


在上一篇文章我們大概看完整體的架構後,接下來我們接來說一下細節的部份,其中 domain event 這裡是重中只重,因為它影響到不同的 Bounded Context 的溝通,如果這裡沒處理好,很容易就會出問題。

然後這裡我們要來說實作很常碰到的主題:

要如何發送 Domain Event

然後這裡很容易找到幾個設計模式名詞,然後今天我們就是來講這 2 個 :

  • Transactional OutBox
  • EventBus

如何發送 Domain Event 呢 ?

整體的架構圖如下,然後幾個元素說明一下:

  • 綠色框框為 Purchase Bounded Context
  • 紫色框框為 Course Bounded Context
  • 藍色的部份為 Transactional Outbox
  • 橘色的部份為 Event Bus

https://ithelp.ithome.com.tw/upload/images/20241008/20089358okpuQUiN8H.png

不過這裡先說明一下,很多的時後有人也會在 Event Bus 中實作 Transactional Outbox 設計模式,或是單獨只使用 Event Bus 的都大有人在,但我這裡比較想分開來說明,因為這兩個目的是不一樣的 ~


Transactional OutBox

Transactional Outbox 它是一種設計模式,它主要是為了解決一致性的問題,然後在 DDD 這一塊我們是想要用來處理 :

  1. Aggregate 的狀態更新了( 資料庫已更新 ),但是 Domain Event 發送失敗怎麼辦呢 ?
  2. Aggregate 的狀態更新失敗了 ( 資料庫 commit 時失敗 ),但是 Domain Event 已發送成功怎辦呢 ?

這個設計模式的處理概念如下:

  1. 就將 domain event 儲放到一張叫 Outbox 的表 (事實上不一定要這樣叫啦)。
  2. 然後和 aggregate 更新綁在同一個 db transaction 中。
  3. 然後會有個地方輪詢,看看 outbox 有那些 domain event 還沒送到 event bus 中。

下面這段就是 1、2。

import { Injectable } from '@nestjs/common';
import { OrderRepository } from './order.repository';
import { OutboxRepository } from './outbox.repository';
import { InjectConnection } from '@nestjs/mongoose';
import { Connection } from 'mongoose';
import { OrderAggregate } from './order.aggregate';

@Injectable()
export class OrderUseCase {
  constructor(
    private readonly orderRepository: OrderRepository,
    private readonly outboxRepository: OutboxRepository,
    @InjectConnection() private readonly connection: Connection,
  ) {}

  async payOrder(orderId: number) {
    const session = await this.connection.startSession();
    session.startTransaction();

    try {
      const order = await this.orderRepository.findById(orderId);
      const event = orderAggregate.paid();

      await this.orderRepository.save(order)
      await this.outboxRepository.saveEvent(event.eventName, event.payload);

      await session.commitTransaction();
    } catch (error) {
      await session.abortTransaction();
      throw error;
    } finally {
      session.endSession();
    }
  }
}

然後下面這一段就是 3 的範例碼。

import { Injectable } from '@nestjs/common';
import { OutboxRepository } from './outbox.repository';
import { EventBusService } from './event-bus.service';\
import { Cron } from '@nestjs/schedule';

@Injectable()
export class OutboxProcessorService {
  constructor(
    private readonly outboxRepository: OutboxRepository,
    private readonly eventBusService: EventBusService,
  ) {}

  // 每 10 秒輪詢一次 Outbox 表,處理未發送的事件
  @Cron('*/10 * * * * *')  // 這個 CronJob 每 10 秒執行一次
  async processOutboxEvents() {
    try {
      const unprocessedEvents = await this.outboxRepository.findUnprocessedEvents();

      for (const event of unprocessedEvents) {
        await this.eventBusService.publishEvent(event.eventName, event.payload);
        await this.outboxRepository.markEventAsProcessed(event._id);
      }
    } catch (error) {
      console.error('Error processing outbox events:', error);
    }
  }
}

不過在實作時我一直有在想一件事,那就是 outboxRepository 這個要每一個 bounded context 都要有一個,還是共用一個就好,目前兩者都有好有壞:

  • 如果共用 1 個,那事實上只要建好後,接下來新讓任何的 BC 應該都不用做啥。
  • 如果共用 1 個,那可能 outbox 表很容易變很大。
  • 如果每個 BC 都各自有,比較獨立,但相對的每一次加個 BC,在輪詢那可能要調整。

目前會先傾向所有的 BC 共用 1 個,因為感覺比較好維護,而且如果太大設到期時間就好。


Event Bus

Event Bus 事實上就是 Pub-Sub 模式中有時後很常出現的 Broker,它的目的就是要讓 Publisher 與 Subscriber 不知道對方的存在,你要叫他 Event Broker 應該也是可以,我自已只是比較偏好 Event Bus。

然後它主要的職責在於:

  • 接收 publisher 的事件,然後轉發給 subscriber。
  • 消息過濾。
  • 有些高級的的它,還可以持久化與重放機制,但這也不代表 transactional 就沒用了,因為如果是發送到 event bus 中的網路有問題,那也沒用。

這個地方就是上面 transactional outbox 中輪詢 outbox 資料表後,會來將 domain event 送到 eventBusService 就好。

import { Injectable } from '@golevelup/nestjs-rabbitmq';

@Injectable()
export class EventBusService {
  constructor(private readonly rabbitMQService: RabbitMQService) {}

  async publishEvent(eventName: string, payload: any): Promise<void> {
    await this.rabbitMQService.publish(
      'events_exchange',
      eventName,
      payload
    );
  }
}

然後這個就是事件訂閱者的地方,不過說來慚愧,在用 Decorator 實作時 @HandleEvent 時有碰到一些問題,還沒有解出來,所以這裡就先貼一下概念碼,主要就是來表達,它就是用這個方法來處理其它 Bounded Context 的事件。

// src/course/course.usecase.ts
import { Injectable } from '@nestjs/common';
import { HandleEvent } from '../common/decorators/handle-event.decorator';
import { OutboxRepository } from '../event-bus/outbox.repository';

@Injectable()
export class CourseUseCase {

  @HandleEvent('OrderCreated')
  async handleOrderCreated(eventPayload: any) {
    console.log('Handling OrderCreated event:', eventPayload);
  }
}


Q & A

1. 我可以只單獨用一個嗎 ?

以發 domain event 的情境下不太建議,如果沒有用 transactional outbox 機制,你就要有重送 domain event 的機制,雖然不是說做不到,但就我的想法它的工應該是會比做 transactional outbox 還麻煩。

2. Event Bus、Observer Pattern、Pub-Sub Pattern 的關係

在我腦海中事實上這 Observer Patter 與 Pub-Sub Patter 兩個幾乎是一樣的東西沒錯,但仔細查了一下資料,我發現有個比較重要的差別:

  • Observer Pattern: Subject 和 Observers 是彼此知道對方的存在,適用於 BC 內自已的事件處理。
  • Pub-Sub Pattern: Publisher 和 Subscriber 不知道對方的存在,通常中間有 Broker 也可以說是 Event Bus,適用於 BC 外的事件處理。

但他們本質上都是相同的,它們的目的都是要:

用事件機制來解耦

然後 Event Bus 比較就是 Pub-Sub Pattern 中裡面的 Broker。

https://ithelp.ithome.com.tw/upload/images/20241008/20089358XGSTVPJtyB.png

https://ithelp.ithome.com.tw/upload/images/20241008/200893588IKWNcAOOr.png


小結

本篇文章中我們談到了如何將 Domain Event 發送到需要的 Bounded Context 手上,其中我們談到了 Transactional OutBox 與 Event Bus 這兩個東西,它們都是設計模式,或是某個設計模式中的東西。

不過認真的說這裡還是有很多設計細節可以想 ~ 之後在來慢慢補充,寫到 23 天真的有點疲了……


上一篇
Day-22: 好的軟體架構的特點 ( Base Clean Architecture + DDD )
下一篇
Day-24: Domain Driven Design 與 API 設計的難處
系列文
一個好的系統之好維護基本篇 ( 馬克版 )30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言