iT邦幫忙

2024 iThome 鐵人賽

DAY 22
0

前言

我們需要 Create User 的時候 Create 一個 Default Todo List,因為這是跨服務的事件,無法單純使用 MediatR 來完成這個功能,所以我們使用 Message Queue 來傳遞這個 Event,並利用 RabbitMQ 來實作。

https://ithelp.ithome.com.tw/upload/images/20241004/20168953xTGTqrEjwl.png

RabbitMQ 簡介

RabbitMQ 是一個開源的消息代理軟體,屬於消息佇列系統。它實現了 AMQP(Advanced Message Queuing Protocol),用於在分散式系統中傳遞消息,支持高效的消息分發、處理和資料持久化。

核心概念

  • Producer:消息的發送者。
  • Queue:存放消息的佇列,等待消費者處理。
  • Consumer:從佇列接收並處理消息的實體。
  • Exchange:負責將消息路由到適當的佇列。
  • Binding:連接 Exchange 和 Queue,決定消息的路由規則。

主要特性

  • 支持多種消息協議(如 AMQP、STOMP、MQTT)。
  • 支持持久化和保證消息不丟失。
  • 可橫向擴展,適合高並發環境。
  • 提供消息路由、過濾和多種模式的消息傳遞機制。

使用場景

  • 任務排程系統
  • 分散式應用的通信
  • 即時數據處理

簡單工作流程

  1. Producer 發送消息到 Exchange。
  2. Exchange 根據 Binding 將消息路由到適當的 Queue。
  3. Consumer 從 Queue 取出並處理消息。

https://ithelp.ithome.com.tw/upload/images/20241004/20168953Yk9AhewQP2.png

Domain Event 和 Integration Event

我一開始在 Common.Library 內有做一個 Interface 叫做 IDomainEvent,為什麼是 DomainEvent 而不用 Event 就好?

因為這裡還有一個重要的概念需要介紹:Integration Event

什麼是 Domain Event?

Domain Event 是指在領域內發生的、具有業務意義的事件,代表已發生的事實。它通常在應用內部傳遞,解耦不同的業務邏輯,比如 UserCreatedEvent 表示用戶已經創建成功。

什麼是 Integration Event?

Integration Event 是一個會影響其他微服務、邊界上下文或外部系統的事件。這些事件會被推送到消息隊列,通知其他系統,從而觸發跨系統的副作用。例如,UserCreatedIntegrationEvent 會讓其他服務知道用戶已創建。

兩者的區別

  • Domain Event 只在應用內部傳遞,用來解耦應用內部邏輯。
  • Integration Event 用於跨服務通信,通常會引發其他系統的動作。

簡單來說,Domain Event 聚焦在單一領域內的業務事件,而 Integration Event 負責服務之間的集成與協作。

建立 Integration Event

我們先在 Common.Library 的 Seedwork 內建立 IIntegrationEvent

namespace Common.Library.Seedwork;

public interface IIntegrationEvent
{
    DateTime CreatedDateTime { get; }
}

並且新增一個 IntegrationEvents 資料夾,在裡面實作我們今天的目標 UserCreatedIntegrationEvent

using Common.Library.Seedwork;

namespace Common.Library.IntegrationEvents;

public record UserCreatedIntegrationEvent(Guid UserId, DateTime CreatedDateTime) : IIntegrationEvent;

Producer 實作

MediatR

我們這裡還是繼續利用 MediatR 當作服務內部的 Eventbus 來傳遞 Domain Event,之後再來實作 Handler 來實作 MQ 的 Producer。

先做 UserCreatedEvent,這裡的事件很單純且不會被改變,用 record 實作比較快

using Common.Library.Seedwork;

namespace Account.Domain.Events;

public record UserCreatedEvent(Guid UserId) : IDomainEvent;

User Aggregate 註冊後要觸發這個 Event

public static User Create(
    string firstName,
    string lastName,
    string email,
    string password)
{
    var user = new User()
    {
        Id = UserId.Create(),
        FirstName = firstName,
        LastName = lastName,
        Email = email,
        PasswordHash = HashPassword(password), // Hash
        CreatedDateTime = DateTime.UtcNow,
        UpdatedDateTime = DateTime.UtcNow
    };

    user.AddDomainEvent(new UserCreatedEvent(user.Id.Value));

    return user;
}

準備一下 Handler,等等來這裡把 Message 推到 MQ 內

using Account.Domain.Events;
using MediatR;

namespace Account.Application;

public class UserCreatedEventHandler : INotificationHandler<UserCreatedEvent>
{
    public Task Handle(UserCreatedEvent notification, CancellationToken cancellationToken)
    {
        throw new NotImplementedException();
    }
}

把該 DI 的物件都放進來

using System.Reflection;
using MediatR;
using Microsoft.Extensions.DependencyInjection;

namespace Account.Application;

public static class AccountApplicationRegister
{
    public static IServiceCollection AddAccountApplication(this IServiceCollection services)
    {
        services.AddScoped<IAccountService, AccountService>();
        services.AddMediatR(Assembly.GetExecutingAssembly());

        return services;
    }
}

還有 DomainEventsInterceptor 也要記得

services.AddScoped<DomainEventsInterceptor>();

AccountContext

optionsBuilder.AddInterceptors(_domainEventsInterceptor);

實作 RabbitMQService

接著,我打算在 Common.Library 中做一個簡易的 RabbitMQService 然讓所有微服務可以利用,這個工具可以拿來發送和接收 Messages。

先做一個 RabbitMQSettings 來拿到 RabbitMQ 的連線資訊

namespace Common.Library.Models;

internal class RabbitMQSettings
{
    public static string SectionName { get; } = "RabbitMQSettings";
    public string HostName { get; init; } = null!;
    public int Port { get; init; }
}

接著做一個 RabbitMQService 來 Publish Message,記得安裝一下 RabbitMQ.Client 套件。

這裡我把 Queue 的名稱設定為 Event 的名稱,Message 則是序列化 integrationEvent 的 JSON 文字。

using System.Text;
using System.Text.Json;
using Common.Library.Models;
using Microsoft.Extensions.Configuration;
using RabbitMQ.Client;

namespace Common.Library.Services;

public class RabbitMQService : IDisposable
{
    private readonly IConfiguration _configuration;
    private readonly RabbitMQSettings _rabbitMQSettings;
    private readonly IConnection _connection;
    private readonly IModel _channel;

    public RabbitMQService(IConfiguration configuration)
    {        
        _configuration = configuration;
        _rabbitMQSettings = _configuration.GetSection(RabbitMQSettings.SectionName).Get<RabbitMQSettings>() ?? throw new NullReferenceException();

        var factory = new ConnectionFactory()
        {
            HostName = _rabbitMQSettings.HostName,
            Port = _rabbitMQSettings.Port
        };
        
        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();
    }

    public void SendMessage<TIntegrationEvent>(TIntegrationEvent integrationEvent, string? queueName = null) where TIntegrationEvent : IIntegrationEvent
    {
        // Initialize queueName based on the event type if not provided
        queueName ??= typeof(TIntegrationEvent).Name;

        // Declare a queue
        _channel.QueueDeclare(queue: queueName,
                              durable: false,
                              exclusive: false,
                              autoDelete: false,
                              arguments: null);

        // Serialize 
        var message = JsonSerializer.Serialize(integrationEvent);
        var body = Encoding.UTF8.GetBytes(message);

        // Publish message
        _channel.BasicPublish(exchange: "",
                              routingKey: queueName,
                              basicProperties: null,
                              body: body);
    }

    public void Dispose()
    {
        _channel.Close();
        _connection.Close();
    }
}

東西都做好了就拿到一開始做好的 UserCreatedEventHandler 使用看看,這邊我把物件序列化成 JSON 格式當作 Message

public Task Handle(UserCreatedEvent notification, CancellationToken cancellationToken)
{        
    var integrationEvent = new UserCreatedIntegrationEvent(notification.UserId, DateTime.UtcNow);

    _rabbitMQService.SendMessage(integrationEvent);

    return Task.CompletedTask;
}

記得加入 RabbitMQ 連線資訊到 appsettings

{
  "RabbitMQSettings": {
    "HostName": "localhost",
    "Port": 5672
  }
}

在 Account.Application 的 AccountApplicationRegister DI RabbitMQService

services.AddSingleton<RabbitMQService>();

測試

Register 新的 User

建立一個 Emma Watson 的使用者

https://ithelp.ithome.com.tw/upload/images/20241004/201689531xeWyd9nwB.png

RabbitMQ Management

開啟瀏覽器到 http://localhost:15672/ 並輸入 guest / guest 來登入 RabbitMQ Management,到 Queues and Streams 可以看到 UserCreatedIntegrationEvent 有一個 Message

https://ithelp.ithome.com.tw/upload/images/20241004/20168953lgIvLZ30bZ.png

進入這個 Queue 內點選 Get Message(s) 可以看到內容,與我們 Emma Watson 的 ID 一致

https://ithelp.ithome.com.tw/upload/images/20241004/20168953WHt69ZPyNt.png

結語

這章節我們成功的將跨服務的 Events 送到 RabbitMQ 上等待其他服務來 Consume,下一篇我們將要做一個 Listener 來監聽這些事件。


上一篇
Day 21 - Mediator Pattern - MediatR 和 Domain Event 實作
下一篇
Day 23 - Integration Event:RabbitMQ 與 Consumer 開發實作
系列文
DDD? Clean Architecture? Microservices? 帶你用.NET實作打造一個現代化微服務!30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言