iT邦幫忙

2023 iThome 鐵人賽

DAY 19
0
Software Development

救救我啊我救我!CRUD 工程師的惡補日記系列 第 19

【RabbitMQ】在 Spring Boot 實作 Simple 與 Worker 模式

  • 分享至 

  • xImage
  •  

昨天的進度是安裝 RabbitMQ,並準備 Spring Boot 專案與之連接。而本文會以最小的規模實作「傳送資料到 MQ」與「從 MQ 拿取資料做處理」的過程。今天要介紹的設計模式,是相對單純的「Simple」與「Worker」模式。

此篇亦轉載到個人部落格


一、名詞解釋

透過 MQ,可以將服務給串起來。為了方便稱呼,我們通常稱傳送資料到 MQ 的 server 為生產者(producer);拿取 MQ 中資料進行處理的 server 為消費者(consumer);而資料為「訊息」(message)。

從架構上來看,透過 MQ 串起 producer 與 consumer 的做法,會有幾種設計模式。接下來的文章,就讓我們逐一去了解並實作。

二、Simple 模式

(一)定義

最單純的模式是「Simple」。在架構中,就只是一個 producer、一個 MQ,跟一個 consumer 而已。如下圖。
https://ithelp.ithome.com.tw/upload/images/20230923/201311072pM0NCB6Xd.png

在程式實作中,會有 4 個東西要準備,分別是:queue、訊息 model、producer 與 consumer。本節會透過 Simple 模式,讓讀者在完成程式後,體驗 producer 與 consumer 之間傳接資料。

(二)配置 Queue

昨天的文章已經示範過如何配置 queue,在此不贅述。

import org.springframework.amqp.core.Queue;

@Configuration
public class RabbitConfig {

    @Bean
    public Queue commentNotificationQueue() {
        return new Queue("Comment Notification Queue");
    }
}

(三)設計訊息 Model

根據上一篇舉例的「留言通知」情境,下面簡單設計了一個叫做「CommentMsg」的類別,做為要傳送到 MQ 的訊息。

public class CommentMsg implements Serializable {
    private int postId; // 文章 id
    private int creatorId; // 留言建立者 id
    private String content; // 留言內容

    // getter, setter ...
}

RabbitMQ 只能傳送 Stringbyte[]Serializable 型態的資料,故此處實作 Serializable 介面。

(四)實作 Producer

接著實作 producer,它會是一個元件。其職責是在業務邏輯完成後被呼叫,將訊息傳送至 MQ。

@Component
public class MessageProducer {
    private static final Logger LOGGER = LoggerFactory.getLogger(MessageProducer.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendNewCommentNotification(CommentMsg comment) {
        rabbitTemplate.convertAndSend("Comment Notification Queue", comment);
        LOGGER.info("{} sends message successfully.", getClass().getSimpleName());
    }
}

此處呼叫了 RabbitTemplateconvertAndSend 方法,將 CommentMsg 訊息傳送到名為「Comment Notification Queue」的 queue 裡面。

(五)實作 Consumer

最後實作 consumer 元件。其職責是接收 producer 送來的訊息,進行需要的處理。

@Component
public class NotificationConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(NotificationConsumer.class);

    @RabbitListener(queues = "Comment Notification Queue")
    public void processMsg(@Payload CommentMsg comment) {
        LOGGER.info("{} receives message successfully.", getClass().getSimpleName());

        // 模擬從 DB 取得文章標題和留言者名字
        String userName = getUserName(comment.getCreatorId());
        String postTitle = getPostTitle(comment.getPostId());

        // 模擬發送通知
        LOGGER.info("站內通知:{}在文章「{}」上留言", userName, postTitle);
        LOGGER.info("信件主旨:文章「{}」有新留言", postTitle);
        LOGGER.info("信件內容:{}在「{}」上留言:{}", userName, postTitle, comment.getContent());
    }
    
    // 以 Map 代替真實 DB
    private final Map<Integer, String> userNameMap = Map.of(1, "Vincent", 2, "Dora");
    private final Map<Integer, String> postNameMap = Map.of(1, "2023 週年慶", 2, "iPhone 15 週邊上架");

    private String getUserName(int userId) {
        return userNameMap.get(userId);
    }

    private String getPostTitle(int itemId) {
        return postNameMap.get(itemId);
    }
}

此處宣告了名為 processMsg 的處理方法,並冠上 @RabbitListener 標記,指定 queue 的名稱。且方法參數用 @Payload 標記接收訊息(這點與在 controller 接收 request body 很像)。

筆者認為,應該是在其他臺 server 跑自己的 Spring Boot,並執行 consumer 的程式才比較合理。而 RabbitMQ 也在獨立的 server 上運行。此處為了方便練習,刻意都在同一個程式專案中實作。

三、測試

實作完 producer 與 consumer 後,讓我們實際測試能否正常傳接訊息。由於 producer 需要有人呼叫,因此這裡建立一個 controller 去執行它。接下來的幾篇文章也都可以用這套做法來測試。

@RestController
public class MessageController {

    @Autowired
    private MessageProducer producer;

    @PostMapping("/comment")
    public void notifyNewComment(@RequestBody CommentMsg comment) {
        producer.sendNewCommentNotification(comment);
    }
}

讀者可自行用 Postman 或 curl 等工具,發送請求到該 API。

POST http://localhost:8080/comment
{
    "postId": 1,
    "creatorId": 2,
    "content": "把錢錢變成喜歡的樣子!"
}

隨後可看見印出的 log。
https://ithelp.ithome.com.tw/upload/images/20230923/201311073aWHfO8d4R.jpg

多發送幾次請求的話,在 RabbitMQ 的管理介面,也能看見有流量通過。
https://ithelp.ithome.com.tw/upload/images/20230923/201311075QUyF3nlNt.jpg

四、Worker 模式

(一)定義

認識 Simple 模式後,要理解 Worker 模式就容易多了。

背景是這樣的,採取 Simple 模式後,若請求真的很多,MQ 將會累積大量排隊中的訊息資料。雖說昨天文章的例子,提到可以接受「通知晚一點到」、「關鍵字晚一點生效」,但如果這些訊息消化不完,「晚一點」會變成不知道等到什麼時候

Worker 模式的目的,是藉由增設一至多個 consumer,來加快消化訊息的速度。官方的架構示意圖如下。
https://ithelp.ithome.com.tw/upload/images/20230925/201311070FfGBjXgpH.png

這些增設的 consumer,原則上也會是在新的 server 上運行。由於 consumer 要執行的程式邏輯都相同,且它們都是接收來自相同 IP 地址的 MQ 訊息,所以我們說 MQ 要做「水平擴展」是容易的。

(二)增設 Consumer

Worker 模式是透過增設 consumer 來加速消化訊息。本文示範上為了方便,就直接在同一個程式專案中,複製第二節寫好的 consumer。

@Component
public class NotificationWorkerConsumer1 {
    private static final Logger LOGGER = LoggerFactory.getLogger(NotificationWorkerConsumer1.class);

    @RabbitListener(queues = "Comment Notification Queue")
    public void processMsg(@Payload CommentMsg comment) {
        LOGGER.info("{} receives message successfully.", getClass().getSimpleName());
    }
}

@Component
public class NotificationWorkerConsumer2 {
    private static final Logger LOGGER = LoggerFactory.getLogger(NotificationWorkerConsumer2.class);

    @RabbitListener(queues = "Comment Notification Queue")
    public void processMsg(@Payload CommentMsg comment) {
        LOGGER.info("{} receives message successfully.", getClass().getSimpleName());
    }
}

(三)測試

測試的方式與本文第三節相同。呼叫 POST http://localhost:8080/comment 的 API 後,看到印出的 log 如下。
https://ithelp.ithome.com.tw/upload/images/20231024/2013110742g44kFoAf.jpg

可以注意到,consumer 會輪流接收訊息並處理。

Ref:SpringBoot - 第二十六章 | RabbitMQ的集成和使用

本文介紹了簡單的 Simple 與 Worker 模式,明天將繼續解說其他複雜一點的模式。

本文的完成專案:
https://github.com/ntub46010/SpringBootUsingRabbitMQ/tree/simple-and-worker-pattern


今日文章到此結束!
最後推廣一下自己的部落格,我是「新手工程師的程式教室」的作者,請多指教/images/emoticon/emoticon41.gif


上一篇
【RabbitMQ】認識訊息佇列並導入到 Spring Boot
下一篇
【RabbitMQ】在 Spring Boot 實作 Fanout 與 Direct 模式
系列文
救救我啊我救我!CRUD 工程師的惡補日記50
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言