昨天我們完成了 Redis Pub/Sub 的理論前導,理解了為什麼需要引入 Redis Pub/Sub 來解決分布式 WebSocket 的 Session 管理問題。今天我們要將理論轉化為實際的代碼實現,包括基礎配置、分布式 SessionManager 的核心邏輯。
但今天碰到一點技術問題,下班後也沒有時間好好解決,寫得比較敷衍一點,之後會再找時間補上,測試部分則留到明天繼續。
先定義頻道:
public interface WebSocketChannel {
String PUSH_MESSAGE = "ws:push";
}
再把頻道配置進去 ListnerContainer
@Configuration
@RequiredArgsConstructor
public class RedisPubSubConfig {
private final WebSocketMessageListner messageListner;
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
var container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
container.addMessageListener(new MessageListenerAdapter(messageListner, "handlePushMessage"),
new ChannelTopic(WebSocketChannel.PUSH_MESSAGE));
return container;
}
}
發送 Redis 廣播消息
@Slf4j
@Component
@RequiredArgsConstructor
public class RedisPublisher {
private final RedisTemplate<String, Object> redisTemplate;
public void pubMessage(Integer userId, String message) {
try {
redisTemplate.convertAndSend(WebSocketChannel.PUSH_MESSAGE, message);
} catch (Exception e) {
log.warn("Failed to pub message", e);
}
}
}
訂閱者:監聽消息
@Slf4j
@Component
@RequiredArgsConstructor
public class WebSocketMessageListner {
private final DistributedSessionManager sessionManager;
public void handlePushMessage(String message) {
try {
var om = new ObjectMapper();
var pushData = om.readValue(message, PushMessageData.class);
log.info("Received push message from instance {}", sessionManager.getInstanceId());
sessionManager.handleCrossInstancePush(pushData.getUserId(), pushData.getMessage());
} catch (Exception e) {
log.warn("Failed to handle push message", e);
}
}
}
重點是 sendMessage,本地沒有連接 session 就發 Redis Pub,到 Sub 監聽到再調用 handleCrossInstancePush 發送:
@Slf4j
@Component
@RequiredArgsConstructor
public class DistributedSessionManager {
private final ConcurrentHashMap<Integer, WebSocketSession> localSessions = new ConcurrentHashMap<>();
private final RedisTemplate<String, Object> redisTemplate;
private final RedisPublisher publisher;
private final String instanceId = generateInstanceId();
public void addSession(Integer userId, WebSocketSession session) {
localSessions.put(userId, session);
var redisKey = RedisKey.WS_USER_SESSION.getValue().concat("::").concat(String.valueOf(userId));
redisTemplate.opsForValue().set(redisKey, instanceId);
log.info("User {} connected to instance {}", userId, instanceId);
}
public void removeSession(Integer userId) {
localSessions.remove(userId);
var redisKey = RedisKey.WS_USER_SESSION.getValue().concat("::").concat(String.valueOf(userId));
redisTemplate.delete(redisKey);
log.info("User {} disconnected from instance {}", userId, instanceId);
}
public boolean sendMessage(Integer userId, String message) {
var session = localSessions.get(userId);
if (session != null && session.isOpen()) {
return sendLocalMessage(session, message);
}
var redisKey = RedisKey.WS_USER_SESSION.getValue().concat("::").concat(String.valueOf(userId));
var instanceId = redisTemplate.opsForValue().get(redisKey);
if (instanceId != null) {
publisher.pubMessage(userId, message);
log.info("User {} message sent to instance {}", userId, instanceId);
return true;
}
return false;
}
public void handleCrossInstancePush(Integer userId, String message) {
var session = localSessions.get(userId);
if (session != null && session.isOpen()) {
sendLocalMessage(session, message);
log.info("User {} message sent to cross instance {}", userId, instanceId);
}
}
private boolean sendLocalMessage(WebSocketSession session, String message) {
try {
session.sendMessage(new TextMessage(message));
return true;
} catch (Exception e) {
log.warn("Failed to send message", e);
return false;
}
}
private String generateInstanceId() {
return "instance-" + UUID.randomUUID().toString();
}
public String getInstanceId() {
return instanceId;
}
}
今天碰到一點技術問題,沒辦法測試,明天調整一下再接再勵,今天大致把流程需要的配置、工具類都先創建好了,明天把環境問題搞定就能繼續了。