iT邦幫忙

2025 iThome 鐵人賽

DAY 19
0
Software Development

系統設計一招一式:最基本的功練到爛熟就是殺手鐧,從單體架構到分布式系統的 Lab 實作筆記系列 第 19

Day 19 | Redis Pub/Sub 分布式 WebSocket 實作:從配置到核心功能完整實現

  • 分享至 

  • xImage
  •  

前言

昨天我們完成了 Redis Pub/Sub 的理論前導,理解了為什麼需要引入 Redis Pub/Sub 來解決分布式 WebSocket 的 Session 管理問題。今天我們要將理論轉化為實際的代碼實現,包括基礎配置、分布式 SessionManager 的核心邏輯。

但今天碰到一點技術問題,下班後也沒有時間好好解決,寫得比較敷衍一點,之後會再找時間補上,測試部分則留到明天繼續。

實作:Redis Pub/Sub 基礎設施。

Redis Pub/Sub 配置類

先定義頻道:

public interface WebSocketChannel {

    String PUSH_MESSAGE = "ws:push";
}

再把頻道配置進去 ListnerContainer

@Configuration
@RequiredArgsConstructor
public class RedisPubSubConfig {

    private final WebSocketMessageListner messageListner;

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
        var container = new RedisMessageListenerContainer();

        container.setConnectionFactory(redisConnectionFactory);
        container.addMessageListener(new MessageListenerAdapter(messageListner, "handlePushMessage"),
                new ChannelTopic(WebSocketChannel.PUSH_MESSAGE));

        return container;
    }
}

Pub

發送 Redis 廣播消息

@Slf4j
@Component
@RequiredArgsConstructor
public class RedisPublisher {

    private final RedisTemplate<String, Object> redisTemplate;

    public void pubMessage(Integer userId, String message) {
        try {
            redisTemplate.convertAndSend(WebSocketChannel.PUSH_MESSAGE, message);
        } catch (Exception e) {
            log.warn("Failed to pub message", e);
        }
    }
}

Sub

訂閱者:監聽消息

@Slf4j
@Component
@RequiredArgsConstructor
public class WebSocketMessageListner {

    private final DistributedSessionManager sessionManager;

    public void handlePushMessage(String message) {
        try {
            var om = new ObjectMapper();
            var pushData = om.readValue(message, PushMessageData.class);
            log.info("Received push message from instance {}", sessionManager.getInstanceId());
            sessionManager.handleCrossInstancePush(pushData.getUserId(), pushData.getMessage());
        } catch (Exception e) {
            log.warn("Failed to handle push message", e);
        }
    }
}

分布式 Session 管理

重點是 sendMessage,本地沒有連接 session 就發 Redis Pub,到 Sub 監聽到再調用 handleCrossInstancePush 發送:

@Slf4j
@Component
@RequiredArgsConstructor
public class DistributedSessionManager {

    private final ConcurrentHashMap<Integer, WebSocketSession> localSessions = new ConcurrentHashMap<>();
    private final RedisTemplate<String, Object> redisTemplate;
    private final RedisPublisher publisher;
    private final String instanceId = generateInstanceId();

    public void addSession(Integer userId, WebSocketSession session) {
        localSessions.put(userId, session);
        var redisKey = RedisKey.WS_USER_SESSION.getValue().concat("::").concat(String.valueOf(userId));
        redisTemplate.opsForValue().set(redisKey, instanceId);
        log.info("User {} connected to instance {}", userId, instanceId);
    }

    public void removeSession(Integer userId) {
        localSessions.remove(userId);
        var redisKey = RedisKey.WS_USER_SESSION.getValue().concat("::").concat(String.valueOf(userId));
        redisTemplate.delete(redisKey);
        log.info("User {} disconnected from instance {}", userId, instanceId);
    }

    public boolean sendMessage(Integer userId, String message) {
        var session = localSessions.get(userId);
        if (session != null && session.isOpen()) {
            return sendLocalMessage(session, message);
        }
        
        var redisKey = RedisKey.WS_USER_SESSION.getValue().concat("::").concat(String.valueOf(userId));
        var instanceId = redisTemplate.opsForValue().get(redisKey);
        if (instanceId != null) {
            publisher.pubMessage(userId, message);
            log.info("User {} message sent to instance {}", userId, instanceId);
            return true;
        }
        return false;
    }

    public void handleCrossInstancePush(Integer userId, String message) {
        var session = localSessions.get(userId);
        if (session != null && session.isOpen()) {
            sendLocalMessage(session, message);
            log.info("User {} message sent to cross instance {}", userId, instanceId);
        }
    }

    private boolean sendLocalMessage(WebSocketSession session, String message) {
        try {
            session.sendMessage(new TextMessage(message));
            return true;
        } catch (Exception e) {
            log.warn("Failed to send message", e);
            return false;
        }
    }

    private String generateInstanceId() {
        return "instance-" + UUID.randomUUID().toString();
    }
    
    public String getInstanceId() {
        return instanceId;
    }
}

總結

今天碰到一點技術問題,沒辦法測試,明天調整一下再接再勵,今天大致把流程需要的配置、工具類都先創建好了,明天把環境問題搞定就能繼續了。


上一篇
Day 18 | Redis Pub/Sub 分布式 WebSocket:理論前導
系列文
系統設計一招一式:最基本的功練到爛熟就是殺手鐧,從單體架構到分布式系統的 Lab 實作筆記19
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言