今天來稍微優化一下之前的推送通知系統,為當前不在線上的用戶保存通知消息,等到他們上線時再自動推送給他們。
首先我們要實作保存的機制,當推送沒有成功時,將推送消息 by 用戶 id 儲存到 Redis 中,這個實作在 NotificationServiceImpl
中:
@Override
public void pushToUsers(NotificationTO notification) {
var userIds = notification.getUserIds();
var success = 0;
var total = userIds.size();
var msg = notification.getTitle() + " " + notification.getContent();
for (var userId : userIds) {
var pushSuccess = sessionManager.sendMessage(userId, msg);
if (pushSuccess) {
success++;
} else {
saveOfflineMessage(userId, msg);
}
}
log.info("Push result: {}/{} users received notification", success, total);
}
private void saveOfflineMessage(Integer userId, String msg) {
try {
redisHelper.rightPush(RedisKey.PUSH_OFFLINE.getValue(), "offline: " + msg, userId);
} catch (Exception e) {
log.warn("Failed to save offline message for user {}", userId, e);
}
}
我們會在這個包裝的方法中,存入消息的同時,也為這個資料設定 TTL 自動過期時間,避免記憶體佔用的問題:
public void rightPush(String cacheName, Object value, Object... keys) {
var key = getCacheKey(cacheName, keys);
var ttl = redisCacheConfig.getTTL(cacheName);
redisTemplate.opsForList().rightPush(key, value);
redisTemplate.expire(key, ttl);
}
getTTL
是在這個 Config 類設定的:
@Getter
@Setter
@ConfigurationProperties(prefix = "redis")
@Configuration
public class RedisCacheConfig {
@Getter
@Setter
public static class Wrapper {
private String cacheName;
private Duration ttl;
}
private List<Wrapper> caches;
private final Map<String, Wrapper> cacheMap = new HashMap<>();
public Duration getTTL(String cacheName) {
if (cacheMap.isEmpty()) {
synchronized (cacheMap) {
cacheMap.putAll(caches.stream().collect(Collectors.toMap(Wrapper::getCacheName, Function.identity())));
}
}
var find = cacheMap.get(cacheName);
if (find != null) {
return find.getTtl();
}
return Duration.ZERO;
}
}
redis 的 cache name 跟 TTL 會先在 yaml 文件定義好:
redis:
caches:
- cache-name: rate-limiter
ttl: 5m
- cache-name: push-offline
ttl: 5m
消息存在 Redis,等用戶上線時會重新跟 WebSocket 建立握手協議,這時就是一個補發通知消息給用戶的好時機:
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
var userId = getUserId(session);
if (userId != null) {
sessionManager.addSession(userId, session);
sendOfflineMessage(userId);
} else {
log.warn("UserId doesn't exist, closing connection");
session.close();
}
}
當 sessionManager.addSession(userId, session);
把用戶 session 存到 Redis 裡後,馬上調用 sendOfflineMessage
用 userId 把 Redis 裡的通知給找出來重新推送給用戶:
private void sendOfflineMessage(Integer userId) {
try {
var msg = redisHelper.getList(RedisKey.PUSH_OFFLINE.getValue(), userId);
if (msg != null && !msg.isEmpty()) {
msg.forEach(message -> {
sessionManager.sendMessage(userId, (String) message);
});
redisHelper.delete(RedisKey.PUSH_OFFLINE.getValue(), userId);
}
} catch (Exception e) {
log.warn("Failed to process offline message for user: {}", userId, e);
}
}
測試步驟是先 run 兩個 SpringBoot 實例,然後連上一個用戶後發送站內信,user 2 馬上可以收到,但 user 1 因為沒有上線所以沒有通知。不過這時 Redis 裡已經有 user 1 的通知了,等 user 1 一連線馬上就能收到離線通知:
還有好多能做的優化.. 但想要一點一點慢慢做,不然很容易做過去也沒深入理解,等完賽後繼續做吧。