iT邦幫忙

2025 iThome 鐵人賽

DAY 16
0
Software Development

30 天打造工作室 SaaS 產品 (後端篇)系列 第 16

Day 16: 30天打造SaaS產品後端篇-即時協作引擎與分散式資料同步架構實作

  • 分享至 

  • xImage
  •  

前情提要

在 Day 15 我們建構了課程模板系統的後端架構,今天我們將實作一個完整的即時協作引擎。這個系統需要支援多用戶同時操作、即時資料同步、衝突檢測與解決,以及高可用性的分散式架構。我們將使用 WebSocket、Redis Streams、以及事件驅動架構來打造一個企業級的即時協作平台。

系統架構設計

即時協作架構概覽

// packages/kyo-core/src/collaboration/types.ts
export interface CollaborationArchitecture {
  // WebSocket 連線管理
  connectionManager: ConnectionManager;

  // 事件處理引擎
  eventEngine: EventEngine;

  // 衝突解決器
  conflictResolver: ConflictResolver;

  // 狀態同步管理器
  stateSyncManager: StateSyncManager;

  // 資料持久化層
  persistenceLayer: PersistenceLayer;
}

export interface CollaborationEvent {
  id: string;
  sessionId: string;
  userId: string;
  type: CollaborationEventType;
  payload: any;
  timestamp: Date;
  version: number;
  precedence: number;
  causality: string[]; // 因果關係追蹤
  metadata: EventMetadata;
}

export interface EventMetadata {
  source: 'user' | 'system' | 'sync';
  priority: 'high' | 'medium' | 'low';
  retryCount?: number;
  conflictResolution?: ConflictResolutionStrategy;
  checksum: string;
}

export interface CollaborationSession {
  id: string;
  tenantId: string;
  name: string;
  type: 'course_planning' | 'template_editing' | 'schedule_management';

  // 參與者管理
  participants: Map<string, Participant>;
  activeConnections: Map<string, ConnectionInfo>;

  // 狀態管理
  sharedState: SharedState;
  stateVersion: number;
  lastActivity: Date;

  // 權限控制
  permissions: SessionPermissions;
  lockManager: ResourceLockManager;

  // 配置
  config: SessionConfiguration;
}

export interface SharedState {
  courses: Map<string, Course>;
  templates: Map<string, CourseTemplate>;
  selections: Map<string, UserSelection>;
  cursors: Map<string, CursorPosition>;
  locks: Map<string, ResourceLock>;
  conflictQueue: ConflictItem[];
}

export interface ResourceLock {
  resourceId: string;
  resourceType: 'course' | 'template' | 'timeslot';
  lockedBy: string;
  lockType: 'exclusive' | 'shared' | 'intention';
  acquiredAt: Date;
  expiresAt: Date;
  dependencies: string[];
}

WebSocket 連線管理器

// packages/kyo-core/src/collaboration/ConnectionManager.ts
import { WebSocketServer, WebSocket } from 'ws';
import { IncomingMessage } from 'http';
import { Redis } from 'ioredis';
import { EventEmitter } from 'events';

export class ConnectionManager extends EventEmitter {
  private wss: WebSocketServer;
  private connections: Map<string, ConnectionInfo> = new Map();
  private sessionConnections: Map<string, Set<string>> = new Map();
  private heartbeatInterval: NodeJS.Timeout;
  private redis: Redis;

  constructor(
    private config: ConnectionManagerConfig,
    private authService: AuthenticationService,
    redis: Redis
  ) {
    super();
    this.redis = redis;
    this.initializeWebSocketServer();
    this.startHeartbeat();
  }

  /**
   * 初始化 WebSocket 服務器
   */
  private initializeWebSocketServer(): void {
    this.wss = new WebSocketServer({
      port: this.config.port,
      path: '/collaboration',
      perMessageDeflate: {
        zlibDeflateOptions: {
          level: 3,
          threshold: 1024,
          concurrencyLimit: 10,
        },
      },
    });

    this.wss.on('connection', this.handleNewConnection.bind(this));
    this.wss.on('error', this.handleServerError.bind(this));

    console.log(`WebSocket server listening on port ${this.config.port}`);
  }

  /**
   * 處理新連線
   */
  private async handleNewConnection(ws: WebSocket, request: IncomingMessage): Promise<void> {
    const connectionId = this.generateConnectionId();

    try {
      // 認證檢查
      const authResult = await this.authenticateConnection(request);
      if (!authResult.success) {
        ws.close(1008, 'Authentication failed');
        return;
      }

      // 建立連線資訊
      const connectionInfo: ConnectionInfo = {
        id: connectionId,
        userId: authResult.userId,
        tenantId: authResult.tenantId,
        socket: ws,
        sessionId: null,
        connectedAt: new Date(),
        lastActivity: new Date(),
        isAlive: true,
        metadata: {
          userAgent: request.headers['user-agent'],
          ip: this.getClientIP(request),
          platform: this.detectPlatform(request),
        }
      };

      this.connections.set(connectionId, connectionInfo);

      // 設置 WebSocket 事件處理
      this.setupWebSocketHandlers(ws, connectionInfo);

      // 發送連線確認
      this.sendMessage(connectionId, {
        type: 'connection_established',
        payload: {
          connectionId,
          serverTime: new Date().toISOString(),
          supportedFeatures: this.config.supportedFeatures,
        }
      });

      this.emit('connection_established', connectionInfo);

    } catch (error) {
      console.error('Failed to handle new connection:', error);
      ws.close(1011, 'Internal server error');
    }
  }

  /**
   * 設置 WebSocket 事件處理器
   */
  private setupWebSocketHandlers(ws: WebSocket, connectionInfo: ConnectionInfo): void {
    const { id: connectionId } = connectionInfo;

    ws.on('message', async (data: Buffer) => {
      try {
        const message = JSON.parse(data.toString()) as IncomingMessage;
        await this.handleIncomingMessage(connectionId, message);
      } catch (error) {
        console.error('Failed to process message:', error);
        this.sendError(connectionId, 'invalid_message', 'Failed to process message');
      }
    });

    ws.on('pong', () => {
      const connection = this.connections.get(connectionId);
      if (connection) {
        connection.isAlive = true;
        connection.lastActivity = new Date();
      }
    });

    ws.on('close', (code: number, reason: Buffer) => {
      this.handleConnectionClose(connectionId, code, reason.toString());
    });

    ws.on('error', (error: Error) => {
      console.error(`WebSocket error for connection ${connectionId}:`, error);
      this.handleConnectionError(connectionId, error);
    });
  }

  /**
   * 處理傳入訊息
   */
  private async handleIncomingMessage(
    connectionId: string,
    message: IncomingMessage
  ): Promise<void> {
    const connection = this.connections.get(connectionId);
    if (!connection) return;

    connection.lastActivity = new Date();

    try {
      switch (message.type) {
        case 'join_session':
          await this.handleJoinSession(connectionId, message.payload);
          break;

        case 'leave_session':
          await this.handleLeaveSession(connectionId, message.payload);
          break;

        case 'collaboration_event':
          await this.handleCollaborationEvent(connectionId, message.payload);
          break;

        case 'cursor_update':
          await this.handleCursorUpdate(connectionId, message.payload);
          break;

        case 'selection_update':
          await this.handleSelectionUpdate(connectionId, message.payload);
          break;

        case 'heartbeat':
          this.sendMessage(connectionId, { type: 'heartbeat_ack' });
          break;

        case 'request_sync':
          await this.handleSyncRequest(connectionId, message.payload);
          break;

        default:
          this.sendError(connectionId, 'unknown_message_type', `Unknown message type: ${message.type}`);
      }

    } catch (error) {
      console.error('Error handling message:', error);
      this.sendError(connectionId, 'message_handling_error', error.message);
    }
  }

  /**
   * 處理加入會話
   */
  private async handleJoinSession(
    connectionId: string,
    payload: JoinSessionPayload
  ): Promise<void> {
    const connection = this.connections.get(connectionId);
    if (!connection) return;

    const { sessionId, userInfo } = payload;

    try {
      // 驗證權限
      const hasPermission = await this.validateSessionPermission(
        connection.userId,
        connection.tenantId,
        sessionId,
        'join'
      );

      if (!hasPermission) {
        this.sendError(connectionId, 'permission_denied', 'No permission to join session');
        return;
      }

      // 更新連線資訊
      connection.sessionId = sessionId;

      // 添加到會話連線集合
      if (!this.sessionConnections.has(sessionId)) {
        this.sessionConnections.set(sessionId, new Set());
      }
      this.sessionConnections.get(sessionId)!.add(connectionId);

      // 通知會話管理器
      this.emit('user_joined_session', {
        sessionId,
        userId: connection.userId,
        connectionId,
        userInfo
      });

      // 發送成功回應
      this.sendMessage(connectionId, {
        type: 'session_joined',
        payload: {
          sessionId,
          participantId: connection.userId,
          success: true
        }
      });

      // 通知其他參與者
      await this.broadcastToSession(sessionId, {
        type: 'participant_joined',
        payload: {
          userId: connection.userId,
          userInfo,
          timestamp: new Date().toISOString()
        }
      }, [connectionId]);

    } catch (error) {
      console.error('Failed to join session:', error);
      this.sendError(connectionId, 'join_session_failed', error.message);
    }
  }

  /**
   * 處理協作事件
   */
  private async handleCollaborationEvent(
    connectionId: string,
    payload: CollaborationEventPayload
  ): Promise<void> {
    const connection = this.connections.get(connectionId);
    if (!connection || !connection.sessionId) return;

    try {
      // 創建完整的事件物件
      const event: CollaborationEvent = {
        id: this.generateEventId(),
        sessionId: connection.sessionId,
        userId: connection.userId,
        type: payload.type,
        payload: payload.data,
        timestamp: new Date(),
        version: payload.version || 1,
        precedence: Date.now(),
        causality: payload.causality || [],
        metadata: {
          source: 'user',
          priority: payload.priority || 'medium',
          checksum: this.calculateChecksum(payload.data)
        }
      };

      // 發送到事件處理引擎
      this.emit('collaboration_event', event);

      // 立即廣播到同會話的其他用戶(樂觀更新)
      await this.broadcastToSession(connection.sessionId, {
        type: 'collaboration_event',
        payload: event
      }, [connectionId]);

    } catch (error) {
      console.error('Failed to handle collaboration event:', error);
      this.sendError(connectionId, 'event_handling_failed', error.message);
    }
  }

  /**
   * 廣播訊息到會話
   */
  async broadcastToSession(
    sessionId: string,
    message: any,
    excludeConnections: string[] = []
  ): Promise<void> {
    const sessionConnections = this.sessionConnections.get(sessionId);
    if (!sessionConnections) return;

    const broadcastPromises = Array.from(sessionConnections)
      .filter(connId => !excludeConnections.includes(connId))
      .map(connId => this.sendMessage(connId, message));

    await Promise.allSettled(broadcastPromises);
  }

  /**
   * 發送訊息到特定連線
   */
  async sendMessage(connectionId: string, message: any): Promise<boolean> {
    const connection = this.connections.get(connectionId);
    if (!connection || connection.socket.readyState !== WebSocket.OPEN) {
      return false;
    }

    try {
      const serializedMessage = JSON.stringify(message);
      connection.socket.send(serializedMessage);
      return true;
    } catch (error) {
      console.error(`Failed to send message to ${connectionId}:`, error);
      return false;
    }
  }

  /**
   * 啟動心跳檢測
   */
  private startHeartbeat(): void {
    this.heartbeatInterval = setInterval(() => {
      this.connections.forEach((connection, connectionId) => {
        if (connection.isAlive === false) {
          // 連線失效,清理資源
          this.handleConnectionClose(connectionId, 1000, 'Heartbeat timeout');
          return;
        }

        // 發送 ping
        connection.isAlive = false;
        if (connection.socket.readyState === WebSocket.OPEN) {
          connection.socket.ping();
        }
      });
    }, this.config.heartbeatInterval || 30000);
  }

  /**
   * 處理連線關閉
   */
  private handleConnectionClose(
    connectionId: string,
    code: number,
    reason: string
  ): void {
    const connection = this.connections.get(connectionId);
    if (!connection) return;

    // 從會話中移除
    if (connection.sessionId) {
      const sessionConnections = this.sessionConnections.get(connection.sessionId);
      if (sessionConnections) {
        sessionConnections.delete(connectionId);

        // 通知其他參與者
        this.broadcastToSession(connection.sessionId, {
          type: 'participant_left',
          payload: {
            userId: connection.userId,
            timestamp: new Date().toISOString(),
            reason: code === 1000 ? 'normal_closure' : 'abnormal_closure'
          }
        });

        // 通知會話管理器
        this.emit('user_left_session', {
          sessionId: connection.sessionId,
          userId: connection.userId,
          connectionId
        });
      }
    }

    // 清理連線
    this.connections.delete(connectionId);

    this.emit('connection_closed', {
      connectionId,
      userId: connection.userId,
      code,
      reason
    });

    console.log(`Connection ${connectionId} closed: ${code} - ${reason}`);
  }

  /**
   * 獲取連線統計資訊
   */
  getConnectionStats(): ConnectionStats {
    const stats: ConnectionStats = {
      totalConnections: this.connections.size,
      activeConnections: Array.from(this.connections.values())
        .filter(c => c.socket.readyState === WebSocket.OPEN).length,
      sessionDistribution: new Map(),
      userDistribution: new Map(),
      averageLatency: 0,
      uptimeSeconds: 0
    };

    // 統計會話分佈
    this.sessionConnections.forEach((connections, sessionId) => {
      stats.sessionDistribution.set(sessionId, connections.size);
    });

    // 統計用戶分佈
    this.connections.forEach(connection => {
      const count = stats.userDistribution.get(connection.userId) || 0;
      stats.userDistribution.set(connection.userId, count + 1);
    });

    return stats;
  }

  /**
   * 清理資源
   */
  async shutdown(): Promise<void> {
    // 停止心跳
    if (this.heartbeatInterval) {
      clearInterval(this.heartbeatInterval);
    }

    // 關閉所有連線
    const closePromises = Array.from(this.connections.values()).map(connection => {
      return new Promise<void>((resolve) => {
        if (connection.socket.readyState === WebSocket.OPEN) {
          connection.socket.close(1001, 'Server shutdown');
          connection.socket.once('close', () => resolve());
        } else {
          resolve();
        }
      });
    });

    await Promise.allSettled(closePromises);

    // 關閉服務器
    await new Promise<void>((resolve, reject) => {
      this.wss.close((error) => {
        if (error) reject(error);
        else resolve();
      });
    });

    console.log('WebSocket server shutdown complete');
  }
}

事件處理引擎

// packages/kyo-core/src/collaboration/EventEngine.ts
import { Redis } from 'ioredis';
import { EventEmitter } from 'events';

export class EventEngine extends EventEmitter {
  private eventStream: string = 'collaboration:events';
  private processingQueues: Map<string, EventProcessor> = new Map();
  private eventHistory: Map<string, CollaborationEvent[]> = new Map();
  private conflictDetector: ConflictDetector;
  private persistenceManager: EventPersistenceManager;

  constructor(
    private redis: Redis,
    private config: EventEngineConfig
  ) {
    super();
    this.conflictDetector = new ConflictDetector(config.conflictDetection);
    this.persistenceManager = new EventPersistenceManager(redis, config.persistence);
    this.initializeEventProcessing();
  }

  /**
   * 初始化事件處理
   */
  private async initializeEventProcessing(): Promise<void> {
    // 創建事件流消費者群組
    try {
      await this.redis.xgroup('CREATE', this.eventStream, 'processors', '0', 'MKSTREAM');
    } catch (error) {
      // 群組可能已存在,忽略錯誤
    }

    // 啟動事件處理器
    for (let i = 0; i < this.config.processorCount; i++) {
      const processor = new EventProcessor(
        this.redis,
        this.eventStream,
        `processor-${i}`,
        this.config
      );

      processor.on('event_processed', this.handleEventProcessed.bind(this));
      processor.on('conflict_detected', this.handleConflictDetected.bind(this));
      processor.on('processing_error', this.handleProcessingError.bind(this));

      this.processingQueues.set(`processor-${i}`, processor);
      await processor.start();
    }

    // 啟動事件清理程序
    this.startEventCleanup();
  }

  /**
   * 處理協作事件
   */
  async processCollaborationEvent(event: CollaborationEvent): Promise<void> {
    try {
      // 1. 驗證事件
      await this.validateEvent(event);

      // 2. 檢測衝突
      const conflicts = await this.conflictDetector.detectConflicts(event);

      if (conflicts.length > 0) {
        // 有衝突,進入衝突解決流程
        await this.handleConflicts(event, conflicts);
        return;
      }

      // 3. 添加到事件流
      await this.addEventToStream(event);

      // 4. 更新事件歷史
      this.updateEventHistory(event);

      // 5. 持久化事件
      await this.persistenceManager.persistEvent(event);

      // 6. 觸發後續處理
      this.emit('event_accepted', event);

    } catch (error) {
      console.error('Failed to process collaboration event:', error);
      this.emit('event_rejected', { event, error });
    }
  }

  /**
   * 添加事件到 Redis Stream
   */
  private async addEventToStream(event: CollaborationEvent): Promise<void> {
    const eventData = {
      id: event.id,
      sessionId: event.sessionId,
      userId: event.userId,
      type: event.type,
      payload: JSON.stringify(event.payload),
      timestamp: event.timestamp.toISOString(),
      version: event.version.toString(),
      precedence: event.precedence.toString(),
      causality: JSON.stringify(event.causality),
      metadata: JSON.stringify(event.metadata)
    };

    await this.redis.xadd(this.eventStream, '*', ...Object.entries(eventData).flat());
  }

  /**
   * 衝突檢測與處理
   */
  private async handleConflicts(
    event: CollaborationEvent,
    conflicts: DetectedConflict[]
  ): Promise<void> {
    const conflictContext: ConflictContext = {
      originalEvent: event,
      conflicts,
      sessionId: event.sessionId,
      timestamp: new Date(),
      participants: await this.getSessionParticipants(event.sessionId)
    };

    // 根據衝突類型選擇解決策略
    const resolutionStrategy = this.selectResolutionStrategy(conflicts);

    switch (resolutionStrategy) {
      case 'automatic':
        await this.resolveConflictsAutomatically(conflictContext);
        break;

      case 'user_intervention':
        await this.requestUserIntervention(conflictContext);
        break;

      case 'last_writer_wins':
        await this.applyLastWriterWins(conflictContext);
        break;

      case 'operational_transform':
        await this.applyOperationalTransform(conflictContext);
        break;

      default:
        // 預設:延遲處理
        await this.deferConflictResolution(conflictContext);
    }
  }

  /**
   * 自動解決衝突
   */
  private async resolveConflictsAutomatically(context: ConflictContext): Promise<void> {
    const { originalEvent, conflicts } = context;
    const resolvedEvents: CollaborationEvent[] = [];

    for (const conflict of conflicts) {
      switch (conflict.type) {
        case 'concurrent_edit':
          // 應用操作變換
          const transformedEvent = await this.transformConcurrentEdit(
            originalEvent,
            conflict.conflictingEvents
          );
          resolvedEvents.push(transformedEvent);
          break;

        case 'resource_lock':
          // 檢查鎖狀態,可能延遲執行
          const lockResolution = await this.resolveLockConflict(originalEvent, conflict);
          if (lockResolution.canProceed) {
            resolvedEvents.push(lockResolution.transformedEvent);
          } else {
            await this.deferEvent(originalEvent, lockResolution.deferUntil);
          }
          break;

        case 'state_inconsistency':
          // 同步狀態後重試
          await this.syncSessionState(originalEvent.sessionId);
          const retriedEvent = { ...originalEvent, version: originalEvent.version + 1 };
          resolvedEvents.push(retriedEvent);
          break;
      }
    }

    // 處理解決後的事件
    for (const resolvedEvent of resolvedEvents) {
      await this.processCollaborationEvent(resolvedEvent);
    }
  }

  /**
   * 操作變換 (Operational Transformation)
   */
  private async applyOperationalTransform(context: ConflictContext): Promise<void> {
    const { originalEvent, conflicts } = context;

    // 獲取衝突的操作序列
    const conflictingOperations = await this.extractConflictingOperations(conflicts);

    // 建立變換矩陣
    const transformMatrix = this.buildTransformMatrix(originalEvent, conflictingOperations);

    // 應用變換
    const transformedOperations = this.applyTransformations(transformMatrix);

    // 生成變換後的事件
    for (const transformedOp of transformedOperations) {
      const transformedEvent: CollaborationEvent = {
        ...originalEvent,
        id: this.generateEventId(),
        payload: transformedOp.payload,
        version: transformedOp.version,
        causality: [...originalEvent.causality, ...transformedOp.dependencies]
      };

      await this.processCollaborationEvent(transformedEvent);
    }
  }

  /**
   * 建立變換矩陣
   */
  private buildTransformMatrix(
    targetOperation: CollaborationEvent,
    conflictingOperations: Operation[]
  ): TransformMatrix {
    const matrix: TransformMatrix = {
      target: this.extractOperation(targetOperation),
      conflicts: conflictingOperations,
      transformations: new Map()
    };

    // 為每個衝突操作計算變換函數
    conflictingOperations.forEach(conflictOp => {
      const transform = this.calculateTransformation(matrix.target, conflictOp);
      matrix.transformations.set(conflictOp.id, transform);
    });

    return matrix;
  }

  /**
   * 計算操作變換
   */
  private calculateTransformation(
    targetOp: Operation,
    conflictOp: Operation
  ): OperationTransformation {
    // 根據操作類型決定變換策略
    const opType1 = targetOp.type;
    const opType2 = conflictOp.type;

    if (opType1 === 'course_move' && opType2 === 'course_move') {
      return this.transformCourseMoveOperations(targetOp, conflictOp);
    }

    if (opType1 === 'course_update' && opType2 === 'course_update') {
      return this.transformCourseUpdateOperations(targetOp, conflictOp);
    }

    if (opType1 === 'course_create' && opType2 === 'course_create') {
      return this.transformCourseCreateOperations(targetOp, conflictOp);
    }

    // 預設變換:保持原始操作
    return {
      transformedOperation: targetOp,
      dependencies: [conflictOp.id],
      confidence: 0.5
    };
  }

  /**
   * 課程移動操作變換
   */
  private transformCourseMoveOperations(
    op1: Operation,
    op2: Operation
  ): OperationTransformation {
    const move1 = op1.payload as CourseMovePayload;
    const move2 = op2.payload as CourseMovePayload;

    // 如果移動不同的課程,無需變換
    if (move1.courseId !== move2.courseId) {
      return {
        transformedOperation: op1,
        dependencies: [],
        confidence: 1.0
      };
    }

    // 相同課程的並發移動:使用時間戳決定優先級
    const priority1 = op1.timestamp.getTime();
    const priority2 = op2.timestamp.getTime();

    if (priority1 > priority2) {
      // op1 優先,保持原始操作
      return {
        transformedOperation: op1,
        dependencies: [op2.id],
        confidence: 0.8
      };
    } else {
      // op2 優先,op1 需要調整為 no-op 或重新計算目標位置
      return {
        transformedOperation: {
          ...op1,
          type: 'course_move_conflict_resolved',
          payload: {
            ...move1,
            conflictWith: op2.id,
            adjustedTarget: this.calculateAdjustedTarget(move1, move2)
          }
        },
        dependencies: [op2.id],
        confidence: 0.6
      };
    }
  }

  /**
   * 會話狀態同步
   */
  async syncSessionState(sessionId: string): Promise<SessionState> {
    const stateKey = `session:state:${sessionId}`;

    try {
      // 從 Redis 獲取最新狀態
      const serializedState = await this.redis.get(stateKey);

      if (!serializedState) {
        // 重建狀態
        return await this.rebuildSessionState(sessionId);
      }

      const state: SessionState = JSON.parse(serializedState);

      // 驗證狀態完整性
      const isValid = await this.validateSessionState(state);

      if (!isValid) {
        console.warn(`Session state ${sessionId} is invalid, rebuilding...`);
        return await this.rebuildSessionState(sessionId);
      }

      return state;

    } catch (error) {
      console.error('Failed to sync session state:', error);
      return await this.rebuildSessionState(sessionId);
    }
  }

  /**
   * 重建會話狀態
   */
  private async rebuildSessionState(sessionId: string): Promise<SessionState> {
    // 從事件歷史重建狀態
    const events = await this.getSessionEventHistory(sessionId);
    const state: SessionState = this.createEmptySessionState();

    // 按時間順序應用所有事件
    const sortedEvents = events.sort((a, b) =>
      a.timestamp.getTime() - b.timestamp.getTime()
    );

    for (const event of sortedEvents) {
      await this.applyEventToState(state, event);
    }

    // 保存重建的狀態
    await this.saveSessionState(sessionId, state);

    return state;
  }

  /**
   * 應用事件到狀態
   */
  private async applyEventToState(
    state: SessionState,
    event: CollaborationEvent
  ): Promise<void> {
    switch (event.type) {
      case 'course_create':
        const course = event.payload as Course;
        state.courses.set(course.id, course);
        break;

      case 'course_update':
        const updateData = event.payload as CourseUpdatePayload;
        const existingCourse = state.courses.get(updateData.courseId);
        if (existingCourse) {
          state.courses.set(updateData.courseId, {
            ...existingCourse,
            ...updateData.updates
          });
        }
        break;

      case 'course_delete':
        state.courses.delete(event.payload.courseId);
        break;

      case 'course_move':
        const moveData = event.payload as CourseMovePayload;
        const courseToMove = state.courses.get(moveData.courseId);
        if (courseToMove) {
          state.courses.set(moveData.courseId, {
            ...courseToMove,
            startTime: moveData.newTimeSlot.start.toISOString(),
            endTime: moveData.newTimeSlot.end.toISOString()
          });
        }
        break;

      case 'template_create':
        const template = event.payload as CourseTemplate;
        state.templates.set(template.id, template);
        break;

      case 'cursor_update':
        state.cursors.set(event.userId, event.payload as CursorPosition);
        break;

      case 'selection_update':
        state.selections.set(event.userId, event.payload as UserSelection);
        break;
    }

    // 更新狀態版本
    state.version++;
    state.lastUpdated = event.timestamp;
  }

  /**
   * 事件持久化管理器
   */
  private startEventCleanup(): void {
    setInterval(async () => {
      try {
        await this.cleanupExpiredEvents();
        await this.archiveOldEvents();
        await this.optimizeEventStorage();
      } catch (error) {
        console.error('Event cleanup failed:', error);
      }
    }, this.config.cleanupInterval || 3600000); // 每小時執行一次
  }

  /**
   * 清理過期事件
   */
  private async cleanupExpiredEvents(): Promise<void> {
    const cutoffTime = Date.now() - (this.config.eventRetentionMs || 86400000); // 24小時

    // 清理 Redis Stream 中的舊事件
    await this.redis.xtrim(this.eventStream, 'MINID', cutoffTime);

    // 清理記憶體中的事件歷史
    this.eventHistory.forEach((events, sessionId) => {
      const filteredEvents = events.filter(
        event => event.timestamp.getTime() > cutoffTime
      );

      if (filteredEvents.length === 0) {
        this.eventHistory.delete(sessionId);
      } else {
        this.eventHistory.set(sessionId, filteredEvents);
      }
    });
  }

  /**
   * 獲取引擎統計資訊
   */
  getEngineStats(): EventEngineStats {
    const stats: EventEngineStats = {
      totalEventsProcessed: 0,
      eventsPerSecond: 0,
      conflictRate: 0,
      averageProcessingTime: 0,
      activeProcessors: this.processingQueues.size,
      queueDepth: 0,
      errorRate: 0,
      memoryUsage: {
        eventHistory: this.calculateEventHistorySize(),
        processingQueues: this.calculateQueueSize(),
        total: process.memoryUsage().heapUsed
      }
    };

    return stats;
  }

  /**
   * 關閉事件引擎
   */
  async shutdown(): Promise<void> {
    console.log('Shutting down event engine...');

    // 停止所有處理器
    const shutdownPromises = Array.from(this.processingQueues.values())
      .map(processor => processor.stop());

    await Promise.allSettled(shutdownPromises);

    // 持久化剩餘事件
    await this.persistenceManager.flushPendingEvents();

    // 清理資源
    this.eventHistory.clear();
    this.processingQueues.clear();

    console.log('Event engine shutdown complete');
  }
}

Redis Streams 事件處理器

// packages/kyo-core/src/collaboration/EventProcessor.ts
export class EventProcessor extends EventEmitter {
  private isRunning: boolean = false;
  private processingLoop: NodeJS.Timeout | null = null;
  private processedCount: number = 0;
  private errorCount: number = 0;
  private lastProcessedId: string = '0';

  constructor(
    private redis: Redis,
    private streamName: string,
    private consumerName: string,
    private config: EventProcessorConfig
  ) {
    super();
  }

  /**
   * 啟動處理器
   */
  async start(): Promise<void> {
    if (this.isRunning) return;

    this.isRunning = true;
    console.log(`Starting event processor: ${this.consumerName}`);

    // 啟動處理循環
    this.processingLoop = setInterval(
      this.processEvents.bind(this),
      this.config.pollInterval || 100
    );

    // 處理未確認的訊息
    await this.processPendingMessages();
  }

  /**
   * 處理事件
   */
  private async processEvents(): Promise<void> {
    if (!this.isRunning) return;

    try {
      // 從流中讀取事件
      const results = await this.redis.xreadgroup(
        'GROUP', 'processors', this.consumerName,
        'COUNT', this.config.batchSize || 10,
        'BLOCK', this.config.blockTime || 1000,
        'STREAMS', this.streamName, '>'
      );

      if (!results || results.length === 0) return;

      const [streamName, messages] = results[0] as [string, [string, string[]][]];

      // 批次處理訊息
      await this.processBatch(messages);

    } catch (error) {
      this.errorCount++;
      this.emit('processing_error', error);

      // 錯誤避讓:稍作延遲
      await this.delay(this.config.errorBackoffMs || 1000);
    }
  }

  /**
   * 批次處理訊息
   */
  private async processBatch(messages: [string, string[]][]) {
    const processPromises = messages.map(async ([messageId, fields]) => {
      try {
        // 解析事件資料
        const eventData = this.parseEventData(fields);
        const event = this.reconstructEvent(eventData);

        // 處理事件
        await this.processEvent(event);

        // 確認訊息處理完成
        await this.redis.xack(this.streamName, 'processors', messageId);

        this.processedCount++;
        this.lastProcessedId = messageId;

        this.emit('event_processed', { event, messageId });

      } catch (error) {
        console.error(`Failed to process message ${messageId}:`, error);

        // 記錄處理失敗
        await this.handleProcessingFailure(messageId, error);
      }
    });

    await Promise.allSettled(processPromises);
  }

  /**
   * 處理單個事件
   */
  private async processEvent(event: CollaborationEvent): Promise<void> {
    const startTime = Date.now();

    try {
      // 事件驗證
      this.validateEvent(event);

      // 根據事件類型進行處理
      switch (event.type) {
        case 'course_create':
          await this.handleCourseCreate(event);
          break;

        case 'course_update':
          await this.handleCourseUpdate(event);
          break;

        case 'course_delete':
          await this.handleCourseDelete(event);
          break;

        case 'course_move':
          await this.handleCourseMove(event);
          break;

        case 'template_create':
          await this.handleTemplateCreate(event);
          break;

        default:
          console.warn(`Unknown event type: ${event.type}`);
      }

      const processingTime = Date.now() - startTime;
      this.recordProcessingMetrics(event.type, processingTime, true);

    } catch (error) {
      const processingTime = Date.now() - startTime;
      this.recordProcessingMetrics(event.type, processingTime, false);
      throw error;
    }
  }

  /**
   * 處理課程創建事件
   */
  private async handleCourseCreate(event: CollaborationEvent): Promise<void> {
    const course = event.payload as Course;

    // 驗證課程資料
    await this.validateCourseData(course);

    // 檢查時間衝突
    const conflicts = await this.checkTimeConflicts(course, event.sessionId);
    if (conflicts.length > 0) {
      this.emit('conflict_detected', {
        event,
        conflicts,
        type: 'time_conflict'
      });
      return;
    }

    // 儲存課程
    await this.saveCourse(course);

    // 更新會話狀態
    await this.updateSessionState(event.sessionId, {
      type: 'add_course',
      courseId: course.id,
      course
    });

    // 觸發後續處理
    this.emit('course_created', { event, course });
  }

  /**
   * 處理課程移動事件
   */
  private async handleCourseMove(event: CollaborationEvent): Promise<void> {
    const moveData = event.payload as CourseMovePayload;

    // 獲取現有課程
    const existingCourse = await this.getCourse(moveData.courseId);
    if (!existingCourse) {
      throw new Error(`Course ${moveData.courseId} not found`);
    }

    // 驗證新時間位置
    const isValidMove = await this.validateCourseMove(
      existingCourse,
      moveData.newTimeSlot,
      event.sessionId
    );

    if (!isValidMove.valid) {
      this.emit('conflict_detected', {
        event,
        conflicts: isValidMove.conflicts,
        type: 'move_conflict'
      });
      return;
    }

    // 執行移動
    const updatedCourse = {
      ...existingCourse,
      startTime: moveData.newTimeSlot.start.toISOString(),
      endTime: moveData.newTimeSlot.end.toISOString(),
      updatedAt: event.timestamp.toISOString()
    };

    await this.saveCourse(updatedCourse);

    // 更新會話狀態
    await this.updateSessionState(event.sessionId, {
      type: 'move_course',
      courseId: moveData.courseId,
      oldTimeSlot: {
        start: new Date(existingCourse.startTime),
        end: new Date(existingCourse.endTime)
      },
      newTimeSlot: moveData.newTimeSlot
    });

    this.emit('course_moved', { event, course: updatedCourse });
  }

  /**
   * 驗證課程移動
   */
  private async validateCourseMove(
    course: Course,
    newTimeSlot: TimeSlot,
    sessionId: string
  ): Promise<MoveValidationResult> {
    const conflicts: ConflictInfo[] = [];

    // 檢查時間衝突
    const timeConflicts = await this.checkTimeConflicts({
      ...course,
      startTime: newTimeSlot.start.toISOString(),
      endTime: newTimeSlot.end.toISOString()
    }, sessionId);

    conflicts.push(...timeConflicts);

    // 檢查教練可用性
    if (course.trainerId) {
      const trainerConflicts = await this.checkTrainerAvailability(
        course.trainerId,
        newTimeSlot,
        sessionId,
        course.id
      );
      conflicts.push(...trainerConflicts);
    }

    // 檢查場地可用性
    if (course.roomId) {
      const roomConflicts = await this.checkRoomAvailability(
        course.roomId,
        newTimeSlot,
        sessionId,
        course.id
      );
      conflicts.push(...roomConflicts);
    }

    return {
      valid: conflicts.length === 0,
      conflicts
    };
  }

  /**
   * 檢查時間衝突
   */
  private async checkTimeConflicts(
    course: Course,
    sessionId: string
  ): Promise<ConflictInfo[]> {
    const conflicts: ConflictInfo[] = [];

    // 從會話狀態獲取現有課程
    const sessionState = await this.getSessionState(sessionId);
    const existingCourses = Array.from(sessionState.courses.values());

    const courseStart = new Date(course.startTime);
    const courseEnd = new Date(course.endTime);

    for (const existing of existingCourses) {
      if (existing.id === course.id) continue; // 跳過自己

      const existingStart = new Date(existing.startTime);
      const existingEnd = new Date(existing.endTime);

      // 檢查時間重疊
      if (courseStart < existingEnd && courseEnd > existingStart) {
        conflicts.push({
          type: 'time_overlap',
          description: `與課程「${existing.name}」時間重疊`,
          conflictingResourceId: existing.id,
          severity: 'error'
        });
      }
    }

    return conflicts;
  }

  /**
   * 獲取處理器統計資訊
   */
  getStats(): ProcessorStats {
    return {
      consumerName: this.consumerName,
      isRunning: this.isRunning,
      processedCount: this.processedCount,
      errorCount: this.errorCount,
      lastProcessedId: this.lastProcessedId,
      uptime: this.isRunning ? Date.now() - this.startTime : 0,
      processingRate: this.calculateProcessingRate()
    };
  }

  /**
   * 停止處理器
   */
  async stop(): Promise<void> {
    this.isRunning = false;

    if (this.processingLoop) {
      clearInterval(this.processingLoop);
      this.processingLoop = null;
    }

    // 等待當前批次處理完成
    await this.delay(this.config.gracefulShutdownMs || 5000);

    console.log(`Event processor ${this.consumerName} stopped`);
  }
}

分散式資料同步

狀態同步管理器

// packages/kyo-core/src/collaboration/StateSyncManager.ts
export class StateSyncManager {
  private syncIntervals: Map<string, NodeJS.Timeout> = new Map();
  private stateCache: Map<string, CachedSessionState> = new Map();
  private syncLocks: Map<string, SyncLock> = new Map();
  private conflictResolver: StateConflictResolver;

  constructor(
    private redis: Redis,
    private database: DatabasePool,
    private config: StateSyncConfig
  ) {
    this.conflictResolver = new StateConflictResolver(config.conflictResolution);
    this.startSyncManager();
  }

  /**
   * 啟動同步管理器
   */
  private startSyncManager(): void {
    // 定期同步所有活躍會話
    setInterval(
      this.syncAllActiveSessions.bind(this),
      this.config.globalSyncInterval || 30000
    );

    // 清理過期的同步鎖
    setInterval(
      this.cleanupExpiredLocks.bind(this),
      this.config.lockCleanupInterval || 60000
    );

    // 監聽資料庫更改通知
    this.setupDatabaseNotifications();
  }

  /**
   * 同步會話狀態
   */
  async syncSessionState(sessionId: string, force: boolean = false): Promise<SyncResult> {
    const syncLock = await this.acquireSyncLock(sessionId);
    if (!syncLock && !force) {
      return { success: false, reason: 'sync_in_progress' };
    }

    try {
      const startTime = Date.now();

      // 1. 獲取當前快取狀態
      const cachedState = this.stateCache.get(sessionId);

      // 2. 從 Redis 獲取最新狀態
      const redisState = await this.getStateFromRedis(sessionId);

      // 3. 從資料庫獲取權威狀態
      const dbState = await this.getStateFromDatabase(sessionId);

      // 4. 檢測狀態差異
      const conflicts = await this.detectStateConflicts(cachedState, redisState, dbState);

      if (conflicts.length > 0) {
        // 5. 解決狀態衝突
        const resolvedState = await this.conflictResolver.resolveStateConflicts(
          conflicts,
          { cached: cachedState, redis: redisState, database: dbState }
        );

        // 6. 應用解決結果
        await this.applyResolvedState(sessionId, resolvedState);
      } else {
        // 無衝突,使用最新的 Redis 狀態
        await this.updateCacheState(sessionId, redisState);
      }

      // 7. 持久化到資料庫
      if (redisState && this.shouldPersistToDatabase(redisState)) {
        await this.persistStateToDatabase(sessionId, redisState);
      }

      const syncDuration = Date.now() - startTime;

      return {
        success: true,
        syncDuration,
        conflictsResolved: conflicts.length,
        stateVersion: redisState?.version || 0
      };

    } catch (error) {
      console.error(`Failed to sync session state ${sessionId}:`, error);
      return { success: false, reason: 'sync_error', error: error.message };
    } finally {
      if (syncLock) {
        await this.releaseSyncLock(sessionId);
      }
    }
  }

  /**
   * 檢測狀態衝突
   */
  private async detectStateConflicts(
    cached: CachedSessionState | undefined,
    redis: SessionState | null,
    database: SessionState | null
  ): Promise<StateConflict[]> {
    const conflicts: StateConflict[] = [];

    // Redis vs Database 衝突檢測
    if (redis && database) {
      const versionConflict = this.detectVersionConflict(redis, database);
      if (versionConflict) conflicts.push(versionConflict);

      const dataConflicts = await this.detectDataConflicts(redis, database);
      conflicts.push(...dataConflicts);
    }

    // Cache vs Redis 衝突檢測
    if (cached && redis) {
      const cacheConflicts = this.detectCacheConflicts(cached.state, redis);
      conflicts.push(...cacheConflicts);
    }

    return conflicts;
  }

  /**
   * 檢測版本衝突
   */
  private detectVersionConflict(
    state1: SessionState,
    state2: SessionState
  ): StateConflict | null {
    if (state1.version !== state2.version) {
      return {
        type: 'version_mismatch',
        description: `Version mismatch: ${state1.version} vs ${state2.version}`,
        severity: 'high',
        affectedResources: ['session_state'],
        conflictData: {
          version1: state1.version,
          version2: state2.version,
          timestamp1: state1.lastUpdated,
          timestamp2: state2.lastUpdated
        }
      };
    }
    return null;
  }

  /**
   * 檢測資料衝突
   */
  private async detectDataConflicts(
    state1: SessionState,
    state2: SessionState
  ): Promise<StateConflict[]> {
    const conflicts: StateConflict[] = [];

    // 課程資料衝突
    const courseConflicts = this.detectCourseConflicts(state1.courses, state2.courses);
    conflicts.push(...courseConflicts);

    // 模板資料衝突
    const templateConflicts = this.detectTemplateConflicts(state1.templates, state2.templates);
    conflicts.push(...templateConflicts);

    return conflicts;
  }

  /**
   * 檢測課程衝突
   */
  private detectCourseConflicts(
    courses1: Map<string, Course>,
    courses2: Map<string, Course>
  ): StateConflict[] {
    const conflicts: StateConflict[] = [];
    const allCourseIds = new Set([...courses1.keys(), ...courses2.keys()]);

    allCourseIds.forEach(courseId => {
      const course1 = courses1.get(courseId);
      const course2 = courses2.get(courseId);

      if (!course1 && course2) {
        // 狀態1中缺少課程
        conflicts.push({
          type: 'course_missing',
          description: `Course ${courseId} missing in state1`,
          severity: 'medium',
          affectedResources: [courseId],
          conflictData: { missingCourse: course2 }
        });
      } else if (course1 && !course2) {
        // 狀態2中缺少課程
        conflicts.push({
          type: 'course_missing',
          description: `Course ${courseId} missing in state2`,
          severity: 'medium',
          affectedResources: [courseId],
          conflictData: { missingCourse: course1 }
        });
      } else if (course1 && course2) {
        // 檢查課程內容差異
        const differences = this.findCourseDifferences(course1, course2);
        if (differences.length > 0) {
          conflicts.push({
            type: 'course_data_conflict',
            description: `Course ${courseId} data differs`,
            severity: 'medium',
            affectedResources: [courseId],
            conflictData: {
              course1,
              course2,
              differences
            }
          });
        }
      }
    });

    return conflicts;
  }

  /**
   * 應用解決後的狀態
   */
  private async applyResolvedState(
    sessionId: string,
    resolvedState: ResolvedSessionState
  ): Promise<void> {
    const { finalState, resolutionActions } = resolvedState;

    // 1. 更新 Redis 狀態
    await this.saveStateToRedis(sessionId, finalState);

    // 2. 更新本地快取
    await this.updateCacheState(sessionId, finalState);

    // 3. 執行解決動作
    for (const action of resolutionActions) {
      await this.executeResolutionAction(sessionId, action);
    }

    // 4. 通知相關服務
    await this.notifyStateResolution(sessionId, resolvedState);
  }

  /**
   * 執行解決動作
   */
  private async executeResolutionAction(
    sessionId: string,
    action: ResolutionAction
  ): Promise<void> {
    switch (action.type) {
      case 'merge_courses':
        await this.mergeCourseData(sessionId, action.data);
        break;

      case 'rollback_changes':
        await this.rollbackChanges(sessionId, action.data);
        break;

      case 'sync_from_authoritative':
        await this.syncFromAuthoritativeSource(sessionId, action.data);
        break;

      case 'notify_participants':
        await this.notifyParticipants(sessionId, action.data);
        break;

      case 'create_backup':
        await this.createStateBackup(sessionId, action.data);
        break;

      default:
        console.warn(`Unknown resolution action type: ${action.type}`);
    }
  }

  /**
   * 批次同步多個會話
   */
  async batchSyncSessions(sessionIds: string[]): Promise<BatchSyncResult> {
    const results: Map<string, SyncResult> = new Map();
    const batchStartTime = Date.now();

    // 並行同步(但限制並發數量)
    const concurrencyLimit = this.config.maxConcurrentSyncs || 5;
    const batches = this.chunkArray(sessionIds, concurrencyLimit);

    for (const batch of batches) {
      const batchPromises = batch.map(async (sessionId) => {
        const result = await this.syncSessionState(sessionId);
        results.set(sessionId, result);
        return result;
      });

      await Promise.allSettled(batchPromises);
    }

    const batchDuration = Date.now() - batchStartTime;
    const successCount = Array.from(results.values()).filter(r => r.success).length;

    return {
      totalSessions: sessionIds.length,
      successCount,
      failureCount: sessionIds.length - successCount,
      batchDuration,
      results
    };
  }

  /**
   * 監控同步性能
   */
  getSyncMetrics(): SyncMetrics {
    const activeSyncs = Array.from(this.syncLocks.values()).filter(
      lock => !lock.expired
    ).length;

    return {
      activeSessions: this.stateCache.size,
      activeSyncs,
      totalSyncsCompleted: this.totalSyncsCompleted,
      averageSyncDuration: this.calculateAverageSyncDuration(),
      syncSuccessRate: this.calculateSyncSuccessRate(),
      memoryUsage: {
        stateCache: this.calculateStateCacheSize(),
        syncLocks: this.syncLocks.size,
        total: process.memoryUsage().heapUsed
      },
      lastSyncTimestamps: this.getLastSyncTimestamps()
    };
  }

  /**
   * 清理資源
   */
  async shutdown(): Promise<void> {
    console.log('Shutting down state sync manager...');

    // 停止所有同步間隔
    this.syncIntervals.forEach(interval => clearInterval(interval));
    this.syncIntervals.clear();

    // 等待進行中的同步完成
    const pendingSyncs = Array.from(this.syncLocks.entries())
      .filter(([_, lock]) => !lock.expired)
      .map(([sessionId, _]) =>
        this.waitForSyncCompletion(sessionId, 10000) // 最多等待10秒
      );

    await Promise.allSettled(pendingSyncs);

    // 持久化快取狀態
    const persistPromises = Array.from(this.stateCache.entries())
      .map(([sessionId, cached]) =>
        this.persistStateToDatabase(sessionId, cached.state)
      );

    await Promise.allSettled(persistPromises);

    // 清理資源
    this.stateCache.clear();
    this.syncLocks.clear();

    console.log('State sync manager shutdown complete');
  }
}

今日總結

今天我們實作了完整的即時協作引擎和分散式資料同步系統,包括:

核心架構組件

  1. WebSocket 連線管理器

    • 高並發連線處理
    • 自動心跳檢測與重連
    • 權限驗證與會話管理
  2. 事件處理引擎

    • Redis Streams 事件流
    • 並行事件處理器
    • 智能衝突檢測與解決
  3. 操作變換系統

    • 即時協作衝突解決
    • 因果關係追蹤
    • 自動狀態同步
  4. 分散式狀態管理

    • 多層狀態同步
    • 版本控制與衝突解決
    • 資料一致性保證

技術特色

  • 高可用性: 分散式架構支援高並發
  • 即時性: 低延遲的事件處理和狀態同步
  • 一致性: 強一致性的資料同步機制
  • 可擴展性: 橫向擴展的處理能力

上一篇
Day 15: 30天打造SaaS產品後端篇-課程模板系統後端架構與批次操作引擎實作
系列文
30 天打造工作室 SaaS 產品 (後端篇)16
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言