iT邦幫忙

2025 iThome 鐵人賽

DAY 17
0
Software Development

30 天打造工作室 SaaS 產品 (後端篇)系列 第 17

Day 17: 30天打造SaaS產品後端篇-WebSocket 微服務架構與高可用性即時通信引擎實作

  • 分享至 

  • xImage
  •  

前情提要

在 Day 16 我們實作了即時協作引擎,今天我們將進行重要的架構重構:將 WebSocket 協作系統獨立為專門的微服務。這個架構升級將提供更好的可擴展性、故障隔離和維護性。我們將實作 WebSocket 微服務、服務間通信、負載均衡,以及高可用性的訊息處理系統。

微服務架構設計

新架構概覽

// packages/kyo-core/src/microservices/types.ts
export interface MicroserviceArchitecture {
  // 核心業務服務
  otpService: {
    port: 3000;
    responsibilities: ['authentication', 'user-management', 'basic-crud'];
    dependencies: ['database', 'redis'];
  };

  // WebSocket 協作服務 (新增)
  collaborationService: {
    port: 3001;
    responsibilities: ['real-time-collaboration', 'session-management', 'conflict-resolution'];
    dependencies: ['redis', 'message-queue'];
  };

  // 分析報表服務 (預計實作)
  analyticsService: {
    port: 3002;
    responsibilities: ['data-analysis', 'reporting', 'metrics'];
    dependencies: ['database', 'time-series-db'];
  };

  // 通知服務
  notificationService: {
    port: 3003;
    responsibilities: ['push-notifications', 'email', 'sms'];
    dependencies: ['redis', 'external-apis'];
  };
}

export interface ServiceConfig {
  serviceName: string;
  port: number;
  host: string;
  environment: 'development' | 'staging' | 'production';

  // 資料庫配置
  database: DatabaseConfig;
  redis: RedisConfig;
  messageQueue: MessageQueueConfig;

  // 服務發現
  serviceDiscovery: ServiceDiscoveryConfig;

  // 監控配置
  monitoring: MonitoringConfig;

  // 安全配置
  security: SecurityConfig;
}

export interface ServiceDiscoveryConfig {
  enabled: boolean;
  registry: 'consul' | 'etcd' | 'redis';
  healthCheckInterval: number;
  heartbeatInterval: number;
}

export interface MonitoringConfig {
  metrics: {
    enabled: boolean;
    endpoint: string;
    prometheus: boolean;
  };
  logging: {
    level: 'debug' | 'info' | 'warn' | 'error';
    format: 'json' | 'text';
    outputs: string[];
  };
  tracing: {
    enabled: boolean;
    jaeger?: JaegerConfig;
  };
}

WebSocket 微服務實作

協作服務主應用

// apps/kyo-collaboration-service/src/index.ts
import fastify, { FastifyInstance } from 'fastify';
import { Server } from 'socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { Redis } from 'ioredis';
import { CollaborationEngine } from './engines/CollaborationEngine';
import { ServiceRegistry } from './services/ServiceRegistry';
import { HealthMonitor } from './monitoring/HealthMonitor';
import { MetricsCollector } from './monitoring/MetricsCollector';
import { SecurityManager } from './security/SecurityManager';

export class CollaborationServiceApp {
  private app: FastifyInstance;
  private io: Server;
  private redis: Redis;
  private redisAdapter: Redis;
  private collaborationEngine: CollaborationEngine;
  private serviceRegistry: ServiceRegistry;
  private healthMonitor: HealthMonitor;
  private metricsCollector: MetricsCollector;
  private securityManager: SecurityManager;

  constructor(private config: ServiceConfig) {
    this.initializeApp();
  }

  /**
   * 初始化應用程式
   */
  private async initializeApp(): Promise<void> {
    // 創建 Fastify 實例
    this.app = fastify({
      logger: {
        level: this.config.monitoring.logging.level,
        serializers: {
          req: (req) => ({
            method: req.method,
            url: req.url,
            headers: req.headers,
            remoteAddress: req.ip,
            remotePort: req.socket?.remotePort
          }),
          res: (res) => ({
            statusCode: res.statusCode
          })
        }
      },
      trustProxy: true,
      requestIdHeader: 'x-request-id',
      requestIdLogLabel: 'requestId'
    });

    // 初始化 Redis 連接
    await this.initializeRedis();

    // 初始化 Socket.IO
    await this.initializeSocketIO();

    // 初始化核心組件
    await this.initializeCoreComponents();

    // 註冊路由
    await this.registerRoutes();

    // 註冊中間件
    await this.registerMiddleware();

    // 設置錯誤處理
    this.setupErrorHandling();
  }

  /**
   * 初始化 Redis 連接
   */
  private async initializeRedis(): Promise<void> {
    // 主 Redis 連接
    this.redis = new Redis({
      host: this.config.redis.host,
      port: this.config.redis.port,
      password: this.config.redis.password,
      db: this.config.redis.database,
      retryDelayOnFailover: 100,
      maxRetriesPerRequest: 3,
      lazyConnect: true,
      keepAlive: 30000,
      connectTimeout: 10000,
      commandTimeout: 5000
    });

    // Socket.IO Adapter 專用 Redis 連接
    this.redisAdapter = new Redis({
      host: this.config.redis.host,
      port: this.config.redis.port,
      password: this.config.redis.password,
      db: this.config.redis.database + 1, // 使用不同的資料庫
      retryDelayOnFailover: 100,
      maxRetriesPerRequest: 3,
      lazyConnect: true
    });

    // 測試連接
    await this.redis.ping();
    await this.redisAdapter.ping();

    console.log('✅ Redis connections established');
  }

  /**
   * 初始化 Socket.IO 服務器
   */
  private async initializeSocketIO(): Promise<void> {
    // 註冊 Socket.IO 插件
    await this.app.register(require('@fastify/socket.io'), {
      cors: {
        origin: this.getAllowedOrigins(),
        methods: ['GET', 'POST'],
        credentials: true
      },
      allowEIO3: true,
      transports: ['websocket', 'polling'],
      pingTimeout: 60000,
      pingInterval: 25000
    });

    this.io = this.app.io;

    // 設置 Redis Adapter 用於多實例通信
    const pubClient = this.redisAdapter;
    const subClient = this.redisAdapter.duplicate();

    this.io.adapter(createAdapter(pubClient, subClient));

    // 設置 Socket.IO 中間件
    this.setupSocketMiddleware();

    // 設置 Socket.IO 事件處理
    this.setupSocketEventHandlers();

    console.log('✅ Socket.IO server initialized');
  }

  /**
   * 初始化核心組件
   */
  private async initializeCoreComponents(): Promise<void> {
    // 安全管理器
    this.securityManager = new SecurityManager(this.config.security);

    // 協作引擎
    this.collaborationEngine = new CollaborationEngine({
      redis: this.redis,
      socketIO: this.io,
      securityManager: this.securityManager,
      config: this.config.collaboration
    });

    // 服務註冊
    this.serviceRegistry = new ServiceRegistry({
      redis: this.redis,
      serviceName: this.config.serviceName,
      serviceConfig: this.config.serviceDiscovery,
      healthEndpoint: '/health'
    });

    // 健康監控
    this.healthMonitor = new HealthMonitor({
      redis: this.redis,
      socketIO: this.io,
      collaborationEngine: this.collaborationEngine
    });

    // 指標收集器
    this.metricsCollector = new MetricsCollector({
      redis: this.redis,
      socketIO: this.io,
      collaborationEngine: this.collaborationEngine,
      prometheusEnabled: this.config.monitoring.metrics.prometheus
    });

    await this.collaborationEngine.initialize();
    await this.serviceRegistry.register();
    await this.healthMonitor.start();
    await this.metricsCollector.start();

    console.log('✅ Core components initialized');
  }

  /**
   * 設置 Socket 中間件
   */
  private setupSocketMiddleware(): void {
    // 認證中間件
    this.io.use(async (socket, next) => {
      try {
        const token = socket.handshake.auth.token;
        if (!token) {
          throw new Error('Authentication token required');
        }

        const user = await this.securityManager.validateToken(token);
        socket.data.user = user;
        socket.data.userId = user.id;
        socket.data.tenantId = user.tenantId;

        next();
      } catch (error) {
        next(new Error('Authentication failed'));
      }
    });

    // 速率限制中間件
    this.io.use(async (socket, next) => {
      const rateLimitResult = await this.securityManager.checkRateLimit(
        socket.data.userId,
        'websocket_connection'
      );

      if (!rateLimitResult.allowed) {
        next(new Error('Rate limit exceeded'));
        return;
      }

      next();
    });

    // 連接追蹤中間件
    this.io.use((socket, next) => {
      socket.data.connectedAt = new Date();
      socket.data.connectionId = this.generateConnectionId();

      this.metricsCollector.recordConnection(socket.data);

      next();
    });
  }

  /**
   * 設置 Socket 事件處理器
   */
  private setupSocketEventHandlers(): void {
    this.io.on('connection', (socket) => {
      console.log(`🔗 Client connected: ${socket.data.connectionId} (User: ${socket.data.userId})`);

      // 加入用戶房間(用於廣播)
      socket.join(`user:${socket.data.userId}`);
      socket.join(`tenant:${socket.data.tenantId}`);

      // 連接事件
      socket.emit('connection_established', {
        connectionId: socket.data.connectionId,
        serverTime: new Date().toISOString(),
        capabilities: ['real-time-collaboration', 'cursor-tracking', 'selection-sync']
      });

      // 會話管理事件
      socket.on('join_session', async (data, callback) => {
        try {
          const result = await this.collaborationEngine.handleJoinSession(
            socket,
            data.sessionId,
            data.sessionInfo
          );
          callback({ success: true, data: result });
        } catch (error) {
          callback({ success: false, error: error.message });
        }
      });

      socket.on('leave_session', async (data, callback) => {
        try {
          await this.collaborationEngine.handleLeaveSession(
            socket,
            data.sessionId
          );
          callback({ success: true });
        } catch (error) {
          callback({ success: false, error: error.message });
        }
      });

      // 協作事件
      socket.on('collaboration_event', async (event) => {
        try {
          await this.collaborationEngine.handleCollaborationEvent(socket, event);
        } catch (error) {
          socket.emit('operation_rejected', {
            eventId: event.id,
            reason: error.message
          });
        }
      });

      // 游標和選擇事件
      socket.on('cursor_move', (data) => {
        this.collaborationEngine.handleCursorMove(socket, data);
      });

      socket.on('selection_change', (data) => {
        this.collaborationEngine.handleSelectionChange(socket, data);
      });

      // 心跳
      socket.on('ping', () => {
        socket.emit('pong');
      });

      // 斷線處理
      socket.on('disconnect', (reason) => {
        console.log(`🔌 Client disconnected: ${socket.data.connectionId} (${reason})`);

        this.collaborationEngine.handleDisconnection(socket);
        this.metricsCollector.recordDisconnection(socket.data, reason);
      });

      // 錯誤處理
      socket.on('error', (error) => {
        console.error(`❌ Socket error for ${socket.data.connectionId}:`, error);
        this.metricsCollector.recordError(socket.data, error);
      });
    });
  }

  /**
   * 註冊 HTTP 路由
   */
  private async registerRoutes(): Promise<void> {
    // 健康檢查端點
    this.app.get('/health', async (request, reply) => {
      const health = await this.healthMonitor.getHealthStatus();

      if (health.status === 'healthy') {
        reply.code(200).send(health);
      } else {
        reply.code(503).send(health);
      }
    });

    // 指標端點
    this.app.get('/metrics', async (request, reply) => {
      if (this.config.monitoring.metrics.prometheus) {
        const metrics = await this.metricsCollector.getPrometheusMetrics();
        reply.type('text/plain').send(metrics);
      } else {
        const metrics = await this.metricsCollector.getMetrics();
        reply.send(metrics);
      }
    });

    // 會話管理 API
    this.app.post('/sessions/create', async (request, reply) => {
      try {
        const { userInfo, capabilities } = request.body as any;
        const sessionToken = await this.collaborationEngine.createSessionToken(
          userInfo,
          capabilities
        );
        reply.send(sessionToken);
      } catch (error) {
        reply.code(400).send({ error: error.message });
      }
    });

    this.app.get('/sessions/:sessionId', async (request, reply) => {
      try {
        const { sessionId } = request.params as any;
        const sessionInfo = await this.collaborationEngine.getSessionInfo(sessionId);
        reply.send(sessionInfo);
      } catch (error) {
        reply.code(404).send({ error: 'Session not found' });
      }
    });

    this.app.delete('/sessions/:sessionId', async (request, reply) => {
      try {
        const { sessionId } = request.params as any;
        await this.collaborationEngine.destroySession(sessionId);
        reply.code(204).send();
      } catch (error) {
        reply.code(400).send({ error: error.message });
      }
    });

    // 統計 API
    this.app.get('/stats/connections', async (request, reply) => {
      const stats = await this.metricsCollector.getConnectionStats();
      reply.send(stats);
    });

    this.app.get('/stats/sessions', async (request, reply) => {
      const stats = await this.collaborationEngine.getSessionStats();
      reply.send(stats);
    });
  }

  /**
   * 註冊中間件
   */
  private async registerMiddleware(): Promise<void> {
    // CORS
    await this.app.register(require('@fastify/cors'), {
      origin: this.getAllowedOrigins(),
      credentials: true
    });

    // 速率限制
    await this.app.register(require('@fastify/rate-limit'), {
      max: 100,
      timeWindow: '1 minute',
      redis: this.redis
    });

    // 安全標頭
    await this.app.register(require('@fastify/helmet'), {
      contentSecurityPolicy: false
    });

    // 請求日誌
    await this.app.register(require('@fastify/request-context'));
  }

  /**
   * 設置錯誤處理
   */
  private setupErrorHandling(): void {
    // 全域錯誤處理器
    this.app.setErrorHandler(async (error, request, reply) => {
      const errorId = this.generateErrorId();

      request.log.error({
        errorId,
        error: {
          message: error.message,
          stack: error.stack,
          statusCode: error.statusCode
        }
      }, 'Request error');

      // 記錄錯誤指標
      this.metricsCollector.recordHttpError(error, request);

      const statusCode = error.statusCode || 500;
      const message = statusCode >= 500 ? 'Internal Server Error' : error.message;

      reply.code(statusCode).send({
        error: {
          message,
          errorId,
          timestamp: new Date().toISOString()
        }
      });
    });

    // 未捕獲異常處理
    process.on('uncaughtException', (error) => {
      console.error('💥 Uncaught Exception:', error);
      this.gracefulShutdown('SIGTERM');
    });

    process.on('unhandledRejection', (reason, promise) => {
      console.error('💥 Unhandled Rejection at:', promise, 'reason:', reason);
      this.gracefulShutdown('SIGTERM');
    });

    // 優雅關閉
    ['SIGTERM', 'SIGINT'].forEach((signal) => {
      process.on(signal, () => {
        this.gracefulShutdown(signal);
      });
    });
  }

  /**
   * 啟動服務
   */
  async start(): Promise<void> {
    try {
      await this.app.listen({
        port: this.config.port,
        host: this.config.host
      });

      console.log(`🚀 Collaboration service started on ${this.config.host}:${this.config.port}`);
      console.log(`📊 Health check: http://${this.config.host}:${this.config.port}/health`);
      console.log(`📈 Metrics: http://${this.config.host}:${this.config.port}/metrics`);

    } catch (error) {
      console.error('💥 Failed to start collaboration service:', error);
      process.exit(1);
    }
  }

  /**
   * 優雅關閉
   */
  private async gracefulShutdown(signal: string): Promise<void> {
    console.log(`📴 Received ${signal}, starting graceful shutdown...`);

    try {
      // 停止接受新連接
      console.log('⏹️  Stopping HTTP server...');
      await this.app.close();

      // 關閉 Socket.IO 連接
      console.log('🔌 Closing WebSocket connections...');
      this.io.close();

      // 停止核心組件
      console.log('⚙️  Shutting down core components...');
      await this.collaborationEngine.shutdown();
      await this.serviceRegistry.unregister();
      await this.healthMonitor.stop();
      await this.metricsCollector.stop();

      // 關閉 Redis 連接
      console.log('🔴 Closing Redis connections...');
      await this.redis.quit();
      await this.redisAdapter.quit();

      console.log('✅ Graceful shutdown completed');
      process.exit(0);

    } catch (error) {
      console.error('💥 Error during shutdown:', error);
      process.exit(1);
    }
  }

  /**
   * 獲取允許的來源
   */
  private getAllowedOrigins(): string[] {
    const origins = [
      'http://localhost:3000',
      'http://localhost:5173',
      'https://kyong-saas.com',
      'https://*.kyong-saas.com'
    ];

    if (this.config.environment === 'development') {
      origins.push('http://localhost:*');
    }

    return origins;
  }

  /**
   * 生成連接 ID
   */
  private generateConnectionId(): string {
    return `conn_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
  }

  /**
   * 生成錯誤 ID
   */
  private generateErrorId(): string {
    return `err_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
  }
}

// 啟動應用程式
async function bootstrap() {
  const config: ServiceConfig = {
    serviceName: 'collaboration-service',
    port: parseInt(process.env.PORT || '3001'),
    host: process.env.HOST || '0.0.0.0',
    environment: (process.env.NODE_ENV as any) || 'development',

    database: {
      url: process.env.DATABASE_URL || 'postgresql://localhost:5432/kyong_saas',
    },

    redis: {
      host: process.env.REDIS_HOST || 'localhost',
      port: parseInt(process.env.REDIS_PORT || '6379'),
      password: process.env.REDIS_PASSWORD,
      database: parseInt(process.env.REDIS_DB || '0')
    },

    messageQueue: {
      type: 'redis',
      redis: {
        host: process.env.REDIS_HOST || 'localhost',
        port: parseInt(process.env.REDIS_PORT || '6379'),
        database: parseInt(process.env.REDIS_MQ_DB || '1')
      }
    },

    serviceDiscovery: {
      enabled: process.env.SERVICE_DISCOVERY_ENABLED === 'true',
      registry: 'redis',
      healthCheckInterval: 30000,
      heartbeatInterval: 10000
    },

    monitoring: {
      metrics: {
        enabled: true,
        endpoint: '/metrics',
        prometheus: process.env.PROMETHEUS_ENABLED === 'true'
      },
      logging: {
        level: (process.env.LOG_LEVEL as any) || 'info',
        format: 'json',
        outputs: ['stdout']
      },
      tracing: {
        enabled: process.env.TRACING_ENABLED === 'true'
      }
    },

    security: {
      jwtSecret: process.env.JWT_SECRET || 'your-secret-key',
      rateLimiting: {
        enabled: true,
        windowMs: 60000,
        maxRequests: 100
      }
    }
  };

  const app = new CollaborationServiceApp(config);
  await app.start();
}

// 如果直接執行此文件,啟動應用程式
if (require.main === module) {
  bootstrap().catch((error) => {
    console.error('💥 Failed to start application:', error);
    process.exit(1);
  });
}

export { CollaborationServiceApp };

協作引擎核心

// apps/kyo-collaboration-service/src/engines/CollaborationEngine.ts
import { Server, Socket } from 'socket.io';
import { Redis } from 'ioredis';
import { EventEmitter } from 'events';
import { SecurityManager } from '../security/SecurityManager';
import { ConflictResolver } from './ConflictResolver';
import { SessionManager } from './SessionManager';
import { OperationalTransform } from './OperationalTransform';

export class CollaborationEngine extends EventEmitter {
  private sessionManager: SessionManager;
  private conflictResolver: ConflictResolver;
  private operationalTransform: OperationalTransform;
  private activeConnections: Map<string, Socket> = new Map();
  private sessionConnections: Map<string, Set<string>> = new Map();
  private operationQueue: Map<string, OperationQueue> = new Map();

  constructor(
    private config: {
      redis: Redis;
      socketIO: Server;
      securityManager: SecurityManager;
      config: any;
    }
  ) {
    super();
    this.initializeComponents();
  }

  /**
   * 初始化組件
   */
  private initializeComponents(): void {
    this.sessionManager = new SessionManager(this.config.redis);
    this.conflictResolver = new ConflictResolver(this.config.redis);
    this.operationalTransform = new OperationalTransform();

    // 設置事件監聽
    this.sessionManager.on('session_created', this.handleSessionCreated.bind(this));
    this.sessionManager.on('session_destroyed', this.handleSessionDestroyed.bind(this));
    this.conflictResolver.on('conflict_resolved', this.handleConflictResolved.bind(this));
  }

  /**
   * 初始化引擎
   */
  async initialize(): Promise<void> {
    await this.sessionManager.initialize();
    await this.conflictResolver.initialize();

    // 訂閱跨實例事件
    await this.subscribeToClusterEvents();

    console.log('✅ Collaboration engine initialized');
  }

  /**
   * 處理加入會話
   */
  async handleJoinSession(
    socket: Socket,
    sessionId: string,
    sessionInfo: SessionJoinInfo
  ): Promise<JoinSessionResult> {
    const userId = socket.data.userId;
    const connectionId = socket.data.connectionId;

    try {
      // 驗證會話權限
      const hasPermission = await this.sessionManager.validateSessionPermission(
        userId,
        sessionId,
        'join'
      );

      if (!hasPermission) {
        throw new Error('No permission to join session');
      }

      // 加入會話
      await this.sessionManager.joinSession(sessionId, {
        userId,
        connectionId,
        socketId: socket.id,
        joinedAt: new Date(),
        userInfo: sessionInfo.userInfo
      });

      // 將 socket 加入房間
      socket.join(sessionId);

      // 記錄連接
      this.activeConnections.set(connectionId, socket);

      if (!this.sessionConnections.has(sessionId)) {
        this.sessionConnections.set(sessionId, new Set());
      }
      this.sessionConnections.get(sessionId)!.add(connectionId);

      // 初始化操作佇列
      if (!this.operationQueue.has(sessionId)) {
        this.operationQueue.set(sessionId, new OperationQueue(sessionId, this.config.redis));
      }

      // 獲取會話狀態
      const sessionState = await this.sessionManager.getSessionState(sessionId);

      // 通知其他參與者
      socket.to(sessionId).emit('user_joined', {
        userId,
        userInfo: sessionInfo.userInfo,
        timestamp: new Date().toISOString()
      });

      return {
        sessionId,
        participantId: userId,
        sessionState,
        existingParticipants: await this.sessionManager.getSessionParticipants(sessionId)
      };

    } catch (error) {
      throw new Error(`Failed to join session: ${error.message}`);
    }
  }

  /**
   * 處理離開會話
   */
  async handleLeaveSession(socket: Socket, sessionId: string): Promise<void> {
    const userId = socket.data.userId;
    const connectionId = socket.data.connectionId;

    try {
      // 離開會話
      await this.sessionManager.leaveSession(sessionId, userId);

      // 離開 socket 房間
      socket.leave(sessionId);

      // 清理連接記錄
      this.activeConnections.delete(connectionId);

      const sessionConnections = this.sessionConnections.get(sessionId);
      if (sessionConnections) {
        sessionConnections.delete(connectionId);

        if (sessionConnections.size === 0) {
          this.sessionConnections.delete(sessionId);
          // 清理操作佇列
          const queue = this.operationQueue.get(sessionId);
          if (queue) {
            await queue.destroy();
            this.operationQueue.delete(sessionId);
          }
        }
      }

      // 通知其他參與者
      socket.to(sessionId).emit('user_left', {
        userId,
        timestamp: new Date().toISOString()
      });

    } catch (error) {
      throw new Error(`Failed to leave session: ${error.message}`);
    }
  }

  /**
   * 處理協作事件
   */
  async handleCollaborationEvent(
    socket: Socket,
    event: CollaborationEvent
  ): Promise<void> {
    const sessionId = event.sessionId;
    const userId = socket.data.userId;

    try {
      // 驗證事件
      await this.validateCollaborationEvent(event, userId);

      // 獲取操作佇列
      const queue = this.operationQueue.get(sessionId);
      if (!queue) {
        throw new Error('Session not found or not joined');
      }

      // 檢測衝突
      const conflicts = await this.conflictResolver.detectConflicts(event, sessionId);

      if (conflicts.length > 0) {
        // 有衝突,進入衝突解決流程
        const resolution = await this.conflictResolver.resolveConflicts(
          event,
          conflicts,
          sessionId
        );

        if (resolution.success) {
          // 衝突已解決,應用變換後的操作
          for (const resolvedEvent of resolution.resolvedEvents) {
            await this.applyCollaborationEvent(resolvedEvent, socket);
          }
        } else {
          // 衝突無法自動解決,通知客戶端
          socket.emit('conflict_detected', {
            originalEvent: event,
            conflicts,
            requiresManualResolution: true
          });
          return;
        }
      } else {
        // 無衝突,直接應用操作
        await this.applyCollaborationEvent(event, socket);
      }

    } catch (error) {
      socket.emit('operation_rejected', {
        eventId: event.id,
        reason: error.message,
        timestamp: new Date().toISOString()
      });
    }
  }

  /**
   * 應用協作事件
   */
  private async applyCollaborationEvent(
    event: CollaborationEvent,
    sourceSocket: Socket
  ): Promise<void> {
    const sessionId = event.sessionId;

    // 將事件加入佇列
    const queue = this.operationQueue.get(sessionId)!;
    await queue.enqueue(event);

    // 更新會話狀態
    await this.sessionManager.applyEventToState(sessionId, event);

    // 廣播事件到會話中的其他用戶
    sourceSocket.to(sessionId).emit('collaboration_event', event);

    // 跨實例廣播
    await this.broadcastToCluster('collaboration_event', {
      sessionId,
      event,
      excludeInstanceId: process.env.INSTANCE_ID
    });

    // 記錄指標
    this.emit('event_processed', {
      sessionId,
      eventType: event.type,
      userId: event.userId,
      timestamp: event.timestamp
    });
  }

  /**
   * 處理游標移動
   */
  handleCursorMove(socket: Socket, data: CursorMoveData): void {
    const { sessionId, position } = data;
    const userId = socket.data.userId;

    // 節流處理,避免過於頻繁的更新
    this.throttle(`cursor_${userId}`, () => {
      // 廣播游標位置
      socket.to(sessionId).emit('cursor_moved', {
        userId,
        position,
        timestamp: new Date().toISOString()
      });

      // 更新 Redis 中的游標位置
      this.sessionManager.updateCursorPosition(sessionId, userId, position);
    }, 50);
  }

  /**
   * 處理選擇變更
   */
  handleSelectionChange(socket: Socket, data: SelectionChangeData): void {
    const { sessionId, selection } = data;
    const userId = socket.data.userId;

    // 廣播選擇變更
    socket.to(sessionId).emit('selection_changed', {
      userId,
      selection,
      timestamp: new Date().toISOString()
    });

    // 更新會話狀態
    this.sessionManager.updateUserSelection(sessionId, userId, selection);
  }

  /**
   * 處理斷線
   */
  async handleDisconnection(socket: Socket): Promise<void> {
    const connectionId = socket.data.connectionId;
    const userId = socket.data.userId;

    // 清理活動連接
    this.activeConnections.delete(connectionId);

    // 從所有會話中移除
    for (const [sessionId, connections] of this.sessionConnections) {
      if (connections.has(connectionId)) {
        connections.delete(connectionId);

        // 通知會話中的其他用戶
        socket.to(sessionId).emit('user_left', {
          userId,
          reason: 'disconnection',
          timestamp: new Date().toISOString()
        });

        // 從會話管理器中移除
        await this.sessionManager.leaveSession(sessionId, userId);

        // 如果會話沒有其他用戶,清理資源
        if (connections.size === 0) {
          this.sessionConnections.delete(sessionId);

          const queue = this.operationQueue.get(sessionId);
          if (queue) {
            await queue.destroy();
            this.operationQueue.delete(sessionId);
          }
        }
      }
    }
  }

  /**
   * 建立會話令牌
   */
  async createSessionToken(
    userInfo: any,
    capabilities: string[]
  ): Promise<string> {
    return this.config.securityManager.generateSessionToken(userInfo, capabilities);
  }

  /**
   * 獲取會話資訊
   */
  async getSessionInfo(sessionId: string): Promise<any> {
    return this.sessionManager.getSessionInfo(sessionId);
  }

  /**
   * 銷毀會話
   */
  async destroySession(sessionId: string): Promise<void> {
    // 通知所有參與者
    this.config.socketIO.to(sessionId).emit('session_destroyed', {
      sessionId,
      timestamp: new Date().toISOString()
    });

    // 清理資源
    await this.sessionManager.destroySession(sessionId);

    const queue = this.operationQueue.get(sessionId);
    if (queue) {
      await queue.destroy();
      this.operationQueue.delete(sessionId);
    }

    this.sessionConnections.delete(sessionId);
  }

  /**
   * 獲取會話統計
   */
  async getSessionStats(): Promise<SessionStats> {
    return {
      activeSessions: this.sessionConnections.size,
      totalConnections: this.activeConnections.size,
      sessionsDetails: await Promise.all(
        Array.from(this.sessionConnections.keys()).map(async sessionId => ({
          sessionId,
          participantCount: this.sessionConnections.get(sessionId)?.size || 0,
          sessionInfo: await this.sessionManager.getSessionInfo(sessionId)
        }))
      )
    };
  }

  /**
   * 訂閱集群事件
   */
  private async subscribeToClusterEvents(): Promise<void> {
    await this.config.redis.subscribe('collaboration:cluster:events');

    this.config.redis.on('message', (channel, message) => {
      if (channel === 'collaboration:cluster:events') {
        const event = JSON.parse(message);
        this.handleClusterEvent(event);
      }
    });
  }

  /**
   * 廣播到集群
   */
  private async broadcastToCluster(eventType: string, data: any): Promise<void> {
    await this.config.redis.publish('collaboration:cluster:events', JSON.stringify({
      type: eventType,
      data,
      instanceId: process.env.INSTANCE_ID,
      timestamp: new Date().toISOString()
    }));
  }

  /**
   * 處理集群事件
   */
  private handleClusterEvent(clusterEvent: any): void {
    // 忽略來自同一實例的事件
    if (clusterEvent.instanceId === process.env.INSTANCE_ID) {
      return;
    }

    switch (clusterEvent.type) {
      case 'collaboration_event':
        const { sessionId, event } = clusterEvent.data;
        this.config.socketIO.to(sessionId).emit('collaboration_event', event);
        break;

      // 其他集群事件處理...
    }
  }

  /**
   * 節流函數
   */
  private throttleTimers: Map<string, NodeJS.Timeout> = new Map();

  private throttle(key: string, fn: Function, delay: number): void {
    const existingTimer = this.throttleTimers.get(key);
    if (existingTimer) {
      clearTimeout(existingTimer);
    }

    const timer = setTimeout(() => {
      fn();
      this.throttleTimers.delete(key);
    }, delay);

    this.throttleTimers.set(key, timer);
  }

  /**
   * 驗證協作事件
   */
  private async validateCollaborationEvent(
    event: CollaborationEvent,
    userId: string
  ): Promise<void> {
    // 基本驗證
    if (!event.id || !event.sessionId || !event.type) {
      throw new Error('Invalid event format');
    }

    if (event.userId !== userId) {
      throw new Error('Event userId mismatch');
    }

    // 會話權限驗證
    const hasPermission = await this.sessionManager.validateSessionPermission(
      userId,
      event.sessionId,
      'edit'
    );

    if (!hasPermission) {
      throw new Error('No permission to edit in this session');
    }

    // 速率限制檢查
    const rateLimitOk = await this.config.securityManager.checkRateLimit(
      userId,
      'collaboration_event'
    );

    if (!rateLimitOk.allowed) {
      throw new Error('Rate limit exceeded');
    }
  }

  /**
   * 關閉引擎
   */
  async shutdown(): Promise<void> {
    console.log('🛑 Shutting down collaboration engine...');

    // 清理節流計時器
    this.throttleTimers.forEach(timer => clearTimeout(timer));
    this.throttleTimers.clear();

    // 關閉所有操作佇列
    await Promise.all(
      Array.from(this.operationQueue.values()).map(queue => queue.destroy())
    );
    this.operationQueue.clear();

    // 清理會話管理器
    await this.sessionManager.shutdown();
    await this.conflictResolver.shutdown();

    // 清理連接記錄
    this.activeConnections.clear();
    this.sessionConnections.clear();

    console.log('✅ Collaboration engine shutdown complete');
  }
}

// 輔助類別
class OperationQueue {
  constructor(
    private sessionId: string,
    private redis: Redis
  ) {}

  async enqueue(operation: CollaborationEvent): Promise<void> {
    const key = `session:${this.sessionId}:operations`;
    await this.redis.lpush(key, JSON.stringify(operation));
    await this.redis.expire(key, 3600); // 1小時過期
  }

  async destroy(): Promise<void> {
    const key = `session:${this.sessionId}:operations`;
    await this.redis.del(key);
  }
}

interface SessionStats {
  activeSessions: number;
  totalConnections: number;
  sessionsDetails: Array<{
    sessionId: string;
    participantCount: number;
    sessionInfo: any;
  }>;
}

服務間通信與發現

服務註冊與發現

// apps/kyo-collaboration-service/src/services/ServiceRegistry.ts
import { Redis } from 'ioredis';
import { EventEmitter } from 'events';

export class ServiceRegistry extends EventEmitter {
  private heartbeatInterval: NodeJS.Timeout | null = null;
  private healthCheckInterval: NodeJS.Timeout | null = null;
  private isRegistered = false;

  constructor(
    private config: {
      redis: Redis;
      serviceName: string;
      serviceConfig: any;
      healthEndpoint: string;
    }
  ) {
    super();
  }

  /**
   * 註冊服務
   */
  async register(): Promise<void> {
    const serviceInfo = {
      name: this.config.serviceName,
      id: this.generateServiceId(),
      host: process.env.SERVICE_HOST || 'localhost',
      port: parseInt(process.env.PORT || '3001'),
      healthEndpoint: this.config.healthEndpoint,
      capabilities: ['websocket', 'real-time-collaboration'],
      metadata: {
        version: process.env.SERVICE_VERSION || '1.0.0',
        environment: process.env.NODE_ENV || 'development',
        instanceId: process.env.INSTANCE_ID,
        startedAt: new Date().toISOString()
      },
      registeredAt: new Date().toISOString(),
      lastHeartbeat: new Date().toISOString()
    };

    // 註冊到 Redis
    const serviceKey = `services:${this.config.serviceName}:${serviceInfo.id}`;
    await this.config.redis.setex(
      serviceKey,
      this.config.serviceConfig.heartbeatInterval * 2, // TTL是心跳間隔的2倍
      JSON.stringify(serviceInfo)
    );

    // 添加到服務列表
    await this.config.redis.sadd(
      `services:${this.config.serviceName}:instances`,
      serviceInfo.id
    );

    this.isRegistered = true;

    // 開始心跳
    this.startHeartbeat(serviceInfo);

    // 開始健康檢查
    this.startHealthCheck();

    console.log(`✅ Service registered: ${this.config.serviceName}:${serviceInfo.id}`);
    this.emit('registered', serviceInfo);
  }

  /**
   * 取消註冊服務
   */
  async unregister(): Promise<void> {
    if (!this.isRegistered) return;

    const serviceId = process.env.INSTANCE_ID || this.generateServiceId();

    // 停止心跳和健康檢查
    if (this.heartbeatInterval) {
      clearInterval(this.heartbeatInterval);
      this.heartbeatInterval = null;
    }

    if (this.healthCheckInterval) {
      clearInterval(this.healthCheckInterval);
      this.healthCheckInterval = null;
    }

    // 從 Redis 移除
    const serviceKey = `services:${this.config.serviceName}:${serviceId}`;
    await this.config.redis.del(serviceKey);
    await this.config.redis.srem(
      `services:${this.config.serviceName}:instances`,
      serviceId
    );

    this.isRegistered = false;

    console.log(`✅ Service unregistered: ${this.config.serviceName}:${serviceId}`);
    this.emit('unregistered');
  }

  /**
   * 發現服務
   */
  async discoverService(serviceName: string): Promise<ServiceInstance[]> {
    const instanceIds = await this.config.redis.smembers(
      `services:${serviceName}:instances`
    );

    const services: ServiceInstance[] = [];

    for (const instanceId of instanceIds) {
      const serviceKey = `services:${serviceName}:${instanceId}`;
      const serviceData = await this.config.redis.get(serviceKey);

      if (serviceData) {
        const serviceInfo = JSON.parse(serviceData);
        services.push(serviceInfo);
      }
    }

    return services.filter(service => this.isServiceHealthy(service));
  }

  /**
   * 獲取負載均衡的服務實例
   */
  async getServiceInstance(serviceName: string, strategy: 'round-robin' | 'random' = 'round-robin'): Promise<ServiceInstance | null> {
    const services = await this.discoverService(serviceName);

    if (services.length === 0) {
      return null;
    }

    switch (strategy) {
      case 'round-robin':
        return this.roundRobinSelect(serviceName, services);
      case 'random':
        return services[Math.floor(Math.random() * services.length)];
      default:
        return services[0];
    }
  }

  /**
   * 開始心跳
   */
  private startHeartbeat(serviceInfo: any): void {
    this.heartbeatInterval = setInterval(async () => {
      try {
        const serviceKey = `services:${this.config.serviceName}:${serviceInfo.id}`;

        // 更新心跳時間
        serviceInfo.lastHeartbeat = new Date().toISOString();

        await this.config.redis.setex(
          serviceKey,
          this.config.serviceConfig.heartbeatInterval * 2,
          JSON.stringify(serviceInfo)
        );

      } catch (error) {
        console.error('❌ Heartbeat failed:', error);
        this.emit('heartbeat_failed', error);
      }
    }, this.config.serviceConfig.heartbeatInterval);
  }

  /**
   * 開始健康檢查
   */
  private startHealthCheck(): void {
    this.healthCheckInterval = setInterval(async () => {
      try {
        await this.performHealthCheck();
      } catch (error) {
        console.error('❌ Health check failed:', error);
        this.emit('health_check_failed', error);
      }
    }, this.config.serviceConfig.healthCheckInterval);
  }

  /**
   * 執行健康檢查
   */
  private async performHealthCheck(): Promise<void> {
    // 檢查 Redis 連接
    await this.config.redis.ping();

    // 檢查其他依賴服務
    // ... 額外的健康檢查邏輯

    this.emit('health_check_passed');
  }

  /**
   * 檢查服務是否健康
   */
  private isServiceHealthy(service: ServiceInstance): boolean {
    const lastHeartbeat = new Date(service.lastHeartbeat);
    const now = new Date();
    const timeDiff = now.getTime() - lastHeartbeat.getTime();

    // 如果心跳超過2個間隔,認為服務不健康
    return timeDiff < (this.config.serviceConfig.heartbeatInterval * 2);
  }

  /**
   * 輪詢選擇
   */
  private roundRobinCounters: Map<string, number> = new Map();

  private roundRobinSelect(serviceName: string, services: ServiceInstance[]): ServiceInstance {
    const currentCounter = this.roundRobinCounters.get(serviceName) || 0;
    const selectedIndex = currentCounter % services.length;

    this.roundRobinCounters.set(serviceName, currentCounter + 1);

    return services[selectedIndex];
  }

  /**
   * 生成服務 ID
   */
  private generateServiceId(): string {
    return `${process.env.INSTANCE_ID || 'local'}_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
  }
}

interface ServiceInstance {
  name: string;
  id: string;
  host: string;
  port: number;
  healthEndpoint: string;
  capabilities: string[];
  metadata: any;
  registeredAt: string;
  lastHeartbeat: string;
}

今日總結

今天我們完成了 WebSocket 微服務的完整架構升級:

核心架構改進

  1. 微服務拆分

    • WebSocket 協作引擎獨立為專門服務
    • 清晰的服務邊界和職責劃分
    • 服務間解耦和獨立部署
  2. 高可用性設計

    • Redis Adapter 支援多實例
    • 服務註冊與發現機制
    • 優雅關閉和故障恢復
  3. 企業級功能

    • 完整的監控和指標收集
    • 安全認證和速率限制
    • 操作佇列和事件持久化
  4. 性能最佳化

    • 集群間事件廣播
    • 智能負載均衡
    • 資源清理和記憶體管理

技術特色

  • 可擴展性: 橫向擴展支援大量並發
  • 可靠性: 多層故障恢復機制
  • 可維護性: 模組化設計易於維護
  • 可觀測性: 全面的監控和日誌記錄

上一篇
Day 16: 30天打造SaaS產品後端篇-即時協作引擎與分散式資料同步架構實作
下一篇
Day 18: 30天打造SaaS產品後端篇-後端效能優化實作
系列文
30 天打造工作室 SaaS 產品 (後端篇)19
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言