iT邦幫忙

2025 iThome 鐵人賽

DAY 13
1

前言

前兩篇補充了 Kafka 的基礎知識後,今天要來推進一下站內信的進度,事實上 Kafka 在今天的進度中原本是沒有角色的,因為站內信之所以會用到 Kafka 主要還是即時推播的作用,而即時推播目前還沒有這麼快可以實作到,因為那需要整合 WebSocket 進來我們的系統才有辦法實現,可能中間要再加個兩三篇幫助我撐到週末.. 所以今天下面加入的 Kafka 應用算是有點硬要,其實可以直接一個 API Call 就把站內信保存到 DB,但為了幫 Kafka 增加一點存在感所以我還是發了一個消息給 MAIL 的 Kafka Topic,讓 Consumer 監聽到才保存,下面是我部分實作的程式碼,還有很多基本的程式碼但跟業務邏輯關係不大,所以就不貼上來了,這些都是當天才趕緊 Code 出來的,一點一滴也是花不少時間,內容不全還請見諒。

實作:Table 規劃&站內信發送

今天一大重點是要生一張可以保存站內信的資料庫 Table 出來,我是希望可以先一張表搞定,之後真的有需要再慢慢擴展,不然要規劃 Table 需要花不少時間,我是打算先做一個 Send By User Role 的功能,這個 Role 是基於 RBAC 模型定義的,發送時由平台作為 Sender 發出,所以這邊沒有規劃 Sender 欄位,然後選擇要發給哪個類型的用戶,就是這麼簡單,之後再根據用戶類型查詢他的站內信即可,但今天也沒時間寫到查詢未讀信件了,等明後天再說吧。

CREATE TABLE CORE_BASE.MAIL (
    ID BIGINT PRIMARY KEY AUTO_INCREMENT,
    RECEIVER_ROLE VARCHAR(20) NOT NULL,
    CONTENT TEXT NOT NULL,
    STATUS TINYINT NOT NULL COMMENT '0=unread | 1=read',
    VERSION INT DEFAULT 0,
    CREATE_TIME TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    UPDATE_TIME TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

業務邏輯:MailService & Kafka Sender

這邊就如剛剛所說,其實在這個業務邏輯層應該是要一直保存站內信到資料庫就好,然後保存完再發一個 Kafka 消息到推播的 Topic,把任務 Pass 給 WebSocket,但這邊就先暫時改成用 Kafka Consumer 監聽去保存資料,這週我們再趕一下進度,希望可以整合進 WebSocket:

@Service
@RequiredArgsConstructor
public class MailServiceImpl implements MailService {

    private final KafkaSender kafkaSender;

    @Override
    public void sendMail(MailSendTO mailSendTO) {
        kafkaSender.send(Topic.MAIL, mailSendTO);
    }

}

這邊因為我們的 Producer 跟 Consumer 都是 String 序列化跟反序列化的,所以在發送前我們要先用 ObjectMapper 把物件轉換成字串:

@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaSender {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void send(String topic, Object body) {
        try {
            var om = new ObjectMapper();
            var message = om.writeValueAsString(body);
            kafkaTemplate.send(topic, message);
        } catch (JsonProcessingException e) {
            throw new BaseException(StatusCode.UNKNOW_ERR, "Json parse failed");
        }
    }
}

Kafka Consumer

Consumer 監聽到後才會把字串再度轉換回物件,這邊要觀察的是 log 有沒有 Receive 到我們的 Kafka Message:

@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaConsumer {

    private final IMailRepo mailRepo;

    @KafkaListener(topics = Topic.MAIL)
    public void processMailNotification(String content) {
        log.info("Receive topic [{}] and message=[{}]", Topic.MAIL, content);
        try {
            var om = new ObjectMapper();
            var mail = om.readValue(content, MailSendTO.class);
            var model = new Mail();
            model.setReceiverRole(mail.getReceiverRole());
            model.setContent(mail.getContent());
            mailRepo.saveAndFlush(model);
        } catch (Exception e) {
            log.warn("Failed to process mail task", e);
        }
    }
}

日誌:

Receive topic [MAIL] and message=[{"receiverRole":"USER","content":"test01"}]

然後完成後去資料庫看一下是否真的有資料被存入了,這樣我們這一階段的進度就完成囉:

https://ithelp.ithome.com.tw/upload/images/20250903/20161582pxqnXwTsQ8.png

總結

今天其實也是規劃到一半才驚覺 Kafka 今天根本用不到,但想說如果單純 CRUD 好像也蠻沒意義的,如果明天有時間或目前站內信沒什麼一定要加的功能,那就可以先補一下 WebSocket 的基礎知識,幫助我複習一下,今天先到這邊,我們完成了:

  • 最基本的 Table 創建
  • 最基本的站內信 API
  • 用 Kafka Pub/Sub 來保存資料

明天見吧!


上一篇
Day 12 | Kafka 基礎知識補充 | Topic, Partition, Consumer Group
下一篇
Day 14 | 站內信實作 Lab 第 0.1 版 | 站內信基礎微優化
系列文
系統設計一招一式:最基本的功練到爛熟就是殺手鐧,從單體架構到分布式系統的 Lab 實作筆記16
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言