前兩篇補充了 Kafka 的基礎知識後,今天要來推進一下站內信的進度,事實上 Kafka 在今天的進度中原本是沒有角色的,因為站內信之所以會用到 Kafka 主要還是即時推播的作用,而即時推播目前還沒有這麼快可以實作到,因為那需要整合 WebSocket 進來我們的系統才有辦法實現,可能中間要再加個兩三篇幫助我撐到週末.. 所以今天下面加入的 Kafka 應用算是有點硬要,其實可以直接一個 API Call 就把站內信保存到 DB,但為了幫 Kafka 增加一點存在感所以我還是發了一個消息給 MAIL 的 Kafka Topic,讓 Consumer 監聽到才保存,下面是我部分實作的程式碼,還有很多基本的程式碼但跟業務邏輯關係不大,所以就不貼上來了,這些都是當天才趕緊 Code 出來的,一點一滴也是花不少時間,內容不全還請見諒。
今天一大重點是要生一張可以保存站內信的資料庫 Table 出來,我是希望可以先一張表搞定,之後真的有需要再慢慢擴展,不然要規劃 Table 需要花不少時間,我是打算先做一個 Send By User Role 的功能,這個 Role 是基於 RBAC 模型定義的,發送時由平台作為 Sender 發出,所以這邊沒有規劃 Sender 欄位,然後選擇要發給哪個類型的用戶,就是這麼簡單,之後再根據用戶類型查詢他的站內信即可,但今天也沒時間寫到查詢未讀信件了,等明後天再說吧。
CREATE TABLE CORE_BASE.MAIL (
ID BIGINT PRIMARY KEY AUTO_INCREMENT,
RECEIVER_ROLE VARCHAR(20) NOT NULL,
CONTENT TEXT NOT NULL,
STATUS TINYINT NOT NULL COMMENT '0=unread | 1=read',
VERSION INT DEFAULT 0,
CREATE_TIME TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UPDATE_TIME TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
這邊就如剛剛所說,其實在這個業務邏輯層應該是要一直保存站內信到資料庫就好,然後保存完再發一個 Kafka 消息到推播的 Topic,把任務 Pass 給 WebSocket,但這邊就先暫時改成用 Kafka Consumer 監聽去保存資料,這週我們再趕一下進度,希望可以整合進 WebSocket:
@Service
@RequiredArgsConstructor
public class MailServiceImpl implements MailService {
private final KafkaSender kafkaSender;
@Override
public void sendMail(MailSendTO mailSendTO) {
kafkaSender.send(Topic.MAIL, mailSendTO);
}
}
這邊因為我們的 Producer 跟 Consumer 都是 String 序列化跟反序列化的,所以在發送前我們要先用 ObjectMapper 把物件轉換成字串:
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaSender {
private final KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, Object body) {
try {
var om = new ObjectMapper();
var message = om.writeValueAsString(body);
kafkaTemplate.send(topic, message);
} catch (JsonProcessingException e) {
throw new BaseException(StatusCode.UNKNOW_ERR, "Json parse failed");
}
}
}
Consumer 監聽到後才會把字串再度轉換回物件,這邊要觀察的是 log 有沒有 Receive 到我們的 Kafka Message:
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaConsumer {
private final IMailRepo mailRepo;
@KafkaListener(topics = Topic.MAIL)
public void processMailNotification(String content) {
log.info("Receive topic [{}] and message=[{}]", Topic.MAIL, content);
try {
var om = new ObjectMapper();
var mail = om.readValue(content, MailSendTO.class);
var model = new Mail();
model.setReceiverRole(mail.getReceiverRole());
model.setContent(mail.getContent());
mailRepo.saveAndFlush(model);
} catch (Exception e) {
log.warn("Failed to process mail task", e);
}
}
}
日誌:
Receive topic [MAIL] and message=[{"receiverRole":"USER","content":"test01"}]
然後完成後去資料庫看一下是否真的有資料被存入了,這樣我們這一階段的進度就完成囉:
今天其實也是規劃到一半才驚覺 Kafka 今天根本用不到,但想說如果單純 CRUD 好像也蠻沒意義的,如果明天有時間或目前站內信沒什麼一定要加的功能,那就可以先補一下 WebSocket 的基礎知識,幫助我複習一下,今天先到這邊,我們完成了:
明天見吧!