iT邦幫忙

2023 iThome 鐵人賽

DAY 20
0
Software Development

救救我啊我救我!CRUD 工程師的惡補日記系列 第 20

【RabbitMQ】在 Spring Boot 實作 Fanout 與 Direct 模式

  • 分享至 

  • xImage
  •  

昨天的文章實作了簡單的 Simple 與 Worker 模式。而接下來將加入「交換機」(exchange)這項元素,讓 producer 在發送訊息到多個 queue 時更方便。本文會介紹 exchange,並進行 queue 的綁定,再實作出「Fanout」與「Direct」模式。

此篇亦轉載到個人部落格


一、什麼是交換機(exchange)

(一)背景

回顧一下昨天介紹的 Simple 模式。情境是把留言後所發送的「站內通知」與「Email 通知」之功能,切分為一個新服務。其 producer 的程式片段如下。

@Component
public class MessageProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendNewCommentNotification(CommentMsg comment) {
        rabbitTemplate.convertAndSend("Comment Notification Queue", comment);
    }
}

public class CommentMsg implements Serializable {
    private int postId; // 文章 id
    private int creatorId; // 留言建立者 id
    private String content; // 留言內容

    // getter, setter ...
}

我們使用 RabbitTemplate,將訊息(CommentMsg)直接送至特定名稱的 queue。

假設今天產品功能變多了,並非每個使用情境,都要發送站內通知與 Email 通知這兩種。且團隊因此決定將這兩種通知的功能,細分為兩臺 server(一臺送站內,一臺送 Email)。那就代表這兩個服務各自的 queue,都要接收 CommentMsg 的訊息。

以直覺來想,producer 的程式碼會該寫成如下。

public void sendNewCommentNotification(CommentMsg comment) {
    rabbitTemplate.convertAndSend("Comment Internal Notification Queue", comment);
    rabbitTemplate.convertAndSend("Comment Email Notification Queue", comment);
}

如此一來,我們可察覺一個問題。每個服務都有自己的 queue,如果有多個服務想接收同樣的訊息,則 producer 就要為了它們,撰寫出數行 RabbitTemplate.convertAndSend 的程式,以傳送訊息到不同 queue。

(二)交換機的概念

交換機能夠消除 producer 與 queue 之間的依賴。將 exchange 加入 MQ 的架構後,producer 只要傳送訊息到 exchange,它就會幫忙轉發到 queue。而 queue 必須事先與 exchange 綁定好。官方教學的架構圖如下。
https://ithelp.ithome.com.tw/upload/images/20230928/20131107cnC5ZKtife.png

以生活情境來比喻。學校有重要事項傳遞給同學們,若召集所有相關的同學到現場,會很麻煩。於是校方選擇將公告送至各科系的辦公室。而系辦再依據公告內容的適用對象,放到「班級櫃」。班級幹部有空會來領取公告,並協助執行。

從這個情境來看,校方是 producer,而「召集相關的同學」相當於寫出好幾行發送訊息到 queue 的程式。至於系辦則是 exchange,會負責轉發訊息,看是要給特定班級、年級,還是整個科系都要通知。而存放公告的班級櫃是 queue;拿取公告的班級幹部是 consumer。

二、實作前準備

本文範例程式的情境是,系統在發送「文章 / 商品有新留言」的通知給使用者時,可分為「站內通知」與「Email 通知」。且這兩項功能被切分為兩個服務,運行在不同 server 上。

當業務邏輯要發送通知時,會將需要的資料包裝成訊息(message),讓 producer 送往 MQ,由 consumer 接收後處理,達到非同步工作的效果。

(一)配置 Queue

為避免打錯字,以下先準備一些常數,用來代表 Spring bean、queue 與 exchange 的名稱。

public class Constants {
    public static final String BEAN_INTERNAL_NOTIFICATION_QUEUE = "internalNotification";
    public static final String BEAN_EMAIL_NOTIFICATION_QUEUE = "emailNotification";
    public static final String BEAN_COMMENT_NOTIFICATION_FANOUT_EXCHANGE = "commentNotificationFanoutExchange";
    public static final String BEAN_COMMENT_NOTIFICATION_DIRECT_EXCHANGE = "commentNotificationDirectExchange";

    public static final String NAME_INTERNAL_NOTIFICATION_QUEUE = "Comment Internal Notification Queue";
    public static final String NAME_EMAIL_NOTIFICATION_QUEUE = "Comment Email Notification Queue";
    public static final String NAME_COMMENT_NOTIFICATION_FANOUT_EXCHANGE = "Comment Notification Fanout Exchange";
    public static final String NAME_COMMENT_NOTIFICATION_DIRECT_EXCHANGE = "Comment Notification Direct Exchange";
}

以下是在程式中進行 queue 的配置。

@Configuration
public class RabbitConfig {

    @Bean(name = BEAN_INTERNAL_NOTIFICATION_QUEUE)
    public Queue internalNotificationQueue() {
        return new Queue(NAME_INTERNAL_NOTIFICATION_QUEUE);
    }

    @Bean(name = BEAN_EMAIL_NOTIFICATION_QUEUE)
    public Queue emailNotificationQueue() {
        return new Queue(NAME_EMAIL_NOTIFICATION_QUEUE);
    }
}

(二)訊息 Model

由於本文加入了「exchange」與「routingKey」的概念,因此請先在訊息的 model 類別添加對應的欄位。還可攜帶其他想傳給 consumer 的資料。

public class CommentMsg implements Serializable {
    private String content;
    private String mode; // fanout | direct
    private String routingKey;

    // getter, setter ...
}

其中 mode 欄位代表 producer 要以何種模式執行(Fanout 或 Direct),用來判斷要將訊息送給哪一個 exchange。而 rountingKey 欄位則是讓 exchange 判斷要送到哪些 queue。

(三)準備 Consumer

關於 consumer 的邏輯,筆者只取用 CommentMsg 訊息的其中一個欄位,印出簡單的 log。

@Component
public class InternalNotificationConsumer {
    private static final Logger LOGGER =
            LoggerFactory.getLogger(InternalNotificationConsumer.class);

    @RabbitListener(queues = NAME_INTERNAL_NOTIFICATION_QUEUE)
    public void processMsg(@Payload CommentMsg comment) {
        LOGGER.info("站內通知:{}", comment.getContent());
    }
}

@Component
public class EmailNotificationConsumer {
    private static final Logger LOGGER =
            LoggerFactory.getLogger(EmailNotificationConsumer.class);

    @RabbitListener(queues = NAME_EMAIL_NOTIFICATION_QUEUE)
    public void processMsg(@Payload CommentMsg comment) {
        LOGGER.info("Email 通知:{}", comment.getContent());
    }
}

(四)準備 Producer

在 producer 呼叫 RabbitTemplate.convertAndSend 方法時,請選用正確的 overloading 方法。第一個參數是指定 exchange 名稱;第二個參數叫做 routingKey,在 Direct 模式中會用來指定 queue,而 Fanout 模式不會用到。

@Component
public class MessageProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendNewCommentNotification(CommentMsg comment) {
        String exchange;
        String routingKey;
        if ("direct".equalsIgnoreCase(comment.getMode())) {
            exchange = NAME_COMMENT_NOTIFICATION_DIRECT_EXCHANGE;
            routingKey = comment.getRoutingKey();
        } else {
            exchange = NAME_COMMENT_NOTIFICATION_FANOUT_EXCHANGE;
            routingKey = "";
        }

        rabbitTemplate.convertAndSend(exchange, routingKey, comment);
    }
}

(五)準備 Controller

為了進行測試,請在 controller 準備 API,藉此呼叫 producer 發送訊息。

@RestController
public class MessageController {

    @Autowired
    private MessageProducer producer;

    @PostMapping("/comment")
    public void notifyNewComment(@RequestBody CommentMsg comment) {
        producer.sendNewCommentNotification(comment);
    }
}

以上就是練習本文兩個模式的前置作業了。

三、實作 Fanout 模式

(一)定義

了解 exchange 的概念後,讓我們繼續以「發送通知」的情境為例子,認識「Fanout」模式。該模式是指將訊息發送給所有與該 exchange 綁定的 queue,也就是「廣播」。

Fanout 這個單字,有「發散」的意思。

(二)配置 Exchange 並綁定 Queue

請建立 exchange,只要在 FanoutExchange 的建構子傳入名稱即可。此名稱將會在 producer 傳送訊息時被指定。

以下是配置程式,綁定時需要建立 Binding 元件。

@Configuration
public class RabbitConfig {
    // ...

    @Bean(name = BEAN_COMMENT_NOTIFICATION_FANOUT_EXCHANGE)
    public FanoutExchange commentNotificationFanoutExchange() {
        return new FanoutExchange(NAME_COMMENT_NOTIFICATION_FANOUT_EXCHANGE);
    }

    @Bean
    public Binding bindInternalNotificationQueueToFanoutExchange(
            @Qualifier(BEAN_INTERNAL_NOTIFICATION_QUEUE) Queue queue,
            @Qualifier(BEAN_COMMENT_NOTIFICATION_FANOUT_EXCHANGE) FanoutExchange exchange
    ) {
        return BindingBuilder.bind(queue).to(exchange);
    }

    @Bean
    public Binding bindEmailNotificationQueueToFanoutExchange(
            @Qualifier(BEAN_EMAIL_NOTIFICATION_QUEUE) Queue queue,
            @Qualifier(BEAN_COMMENT_NOTIFICATION_FANOUT_EXCHANGE) FanoutExchange exchange
    ) {
        return BindingBuilder.bind(queue).to(exchange);
    }
}

過程中注入了剛剛建立的 queue 與 exchange 元件,並透過 BindingBuilder 分別完成兩個 queue 的綁定。

這些「元件」並不是要拿來注入到其他地方的,而是會被 RabbitMQ 的 library 讀取,所以在 RabbitMQ 的服務中才能完成 queue 和 exchange 的建立與綁定。

(三)測試

可發送請求到第二節準備的 API。

POST http://localhost:8080/comment
{
    "content": "這是 Fanout 模式,訊息會送給所有 queue!",
    "mode": "fanout",
    "routingKey": null
}

以上面的請求為例,每個綁定在 exchange 的 queue,其對應的 consumer 都能收到訊息,這就是 Fanout 模式。

四、實作 Direct 模式

(一)定義

前面的 Fanout 模式會將訊息傳送給所有綁定的 queue。而本節的「Direct」模式則是選擇其中一個 queue。

在 Direct 模式,producer 會在發送訊息給 exchange 時,指定一個代表 queue 的 key。該 key 會在 queue 一開始綁定到 exchange 時設定好。

以生活情境來比喻,就像向郵局申請的「郵政信箱」。如果自己或家人有服兵役,可能會有點印象,寄信到所屬單位時,收件地址會寫「XX郵政幾號」,例如「桃園龍潭郵政91004號」。而這個郵政信箱的代號,就相當於 queue 的 key。

這麼做的好處是,當 queue 的名稱改變了,producer 的程式並不需要做調整,因為它是對著 key 發送訊息。

(二)配置 Exchange 並綁定 Queue

請建立 exchange,只要在 DirectExchange 的建構子傳入名稱即可。以下是配置程式。

@Configuration
public class RabbitConfig {
    // ...

    @Bean(name = BEAN_COMMENT_NOTIFICATION_DIRECT_EXCHANGE)
    public DirectExchange commentNotificationDirectExchange() {
        return new DirectExchange(NAME_COMMENT_NOTIFICATION_DIRECT_EXCHANGE);
    }

    @Bean
    public Binding bindInternalNotificationQueueToDirectExchange(
            @Qualifier(BEAN_INTERNAL_NOTIFICATION_QUEUE) Queue queue,
            @Qualifier(BEAN_COMMENT_NOTIFICATION_DIRECT_EXCHANGE) DirectExchange exchange
    ) {
        return BindingBuilder.bind(queue).to(exchange).with("Internal");
    }

    @Bean
    public Binding bindEmailNotificationQueueToDirectExchange(
            @Qualifier(BEAN_EMAIL_NOTIFICATION_QUEUE) Queue queue,
            @Qualifier(BEAN_COMMENT_NOTIFICATION_DIRECT_EXCHANGE) DirectExchange exchange
    ) {
        return BindingBuilder.bind(queue).to(exchange).with("Email");
    }
}

由於使用的是 DirectExchange,因此 BindingBuilder 會要求我們提供叫做 routingKey 的參數。於是這裡將 queue 的 key 給傳入。

(三)測試

可發送請求到第二節準備的 API。

POST http://localhost:8080/comment
{
    "content": "這是 Direct 模式,只會發送站內通知!",
    "mode": "direct",
    "routingKey": "Internal"
}

以上面的請求為例,會發送訊息給 key 為「Internal」的 queue。也就是只有「Comment Internal Notification Queue」這個 queue 會收到訊息,並由 InternalNotificationConsumer 印出 log。

Ref:
[DATA] 訊息佇列 02 - RabbitMQ 簡介與 5 種設計模式
SpringBoot RabbitMQ 七種工作模式入門

本文的完成專案:
https://github.com/ntub46010/SpringBootUsingRabbitMQ/tree/fanout-and-direct-pattern


今日文章到此結束!
最後推廣一下自己的部落格,我是「新手工程師的程式教室」的作者,請多指教/images/emoticon/emoticon41.gif


上一篇
【RabbitMQ】在 Spring Boot 實作 Simple 與 Worker 模式
下一篇
【RabbitMQ】在 Spring Boot 實作 Routing 與 Topic 模式
系列文
救救我啊我救我!CRUD 工程師的惡補日記50
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言