在 Day 15 我們建構了課程模板系統的後端架構,今天我們將實作一個完整的即時協作引擎。這個系統需要支援多用戶同時操作、即時資料同步、衝突檢測與解決,以及高可用性的分散式架構。我們將使用 WebSocket、Redis Streams、以及事件驅動架構來打造一個企業級的即時協作平台。
// packages/kyo-core/src/collaboration/types.ts
export interface CollaborationArchitecture {
// WebSocket 連線管理
connectionManager: ConnectionManager;
// 事件處理引擎
eventEngine: EventEngine;
// 衝突解決器
conflictResolver: ConflictResolver;
// 狀態同步管理器
stateSyncManager: StateSyncManager;
// 資料持久化層
persistenceLayer: PersistenceLayer;
}
export interface CollaborationEvent {
id: string;
sessionId: string;
userId: string;
type: CollaborationEventType;
payload: any;
timestamp: Date;
version: number;
precedence: number;
causality: string[]; // 因果關係追蹤
metadata: EventMetadata;
}
export interface EventMetadata {
source: 'user' | 'system' | 'sync';
priority: 'high' | 'medium' | 'low';
retryCount?: number;
conflictResolution?: ConflictResolutionStrategy;
checksum: string;
}
export interface CollaborationSession {
id: string;
tenantId: string;
name: string;
type: 'course_planning' | 'template_editing' | 'schedule_management';
// 參與者管理
participants: Map<string, Participant>;
activeConnections: Map<string, ConnectionInfo>;
// 狀態管理
sharedState: SharedState;
stateVersion: number;
lastActivity: Date;
// 權限控制
permissions: SessionPermissions;
lockManager: ResourceLockManager;
// 配置
config: SessionConfiguration;
}
export interface SharedState {
courses: Map<string, Course>;
templates: Map<string, CourseTemplate>;
selections: Map<string, UserSelection>;
cursors: Map<string, CursorPosition>;
locks: Map<string, ResourceLock>;
conflictQueue: ConflictItem[];
}
export interface ResourceLock {
resourceId: string;
resourceType: 'course' | 'template' | 'timeslot';
lockedBy: string;
lockType: 'exclusive' | 'shared' | 'intention';
acquiredAt: Date;
expiresAt: Date;
dependencies: string[];
}
// packages/kyo-core/src/collaboration/ConnectionManager.ts
import { WebSocketServer, WebSocket } from 'ws';
import { IncomingMessage } from 'http';
import { Redis } from 'ioredis';
import { EventEmitter } from 'events';
export class ConnectionManager extends EventEmitter {
private wss: WebSocketServer;
private connections: Map<string, ConnectionInfo> = new Map();
private sessionConnections: Map<string, Set<string>> = new Map();
private heartbeatInterval: NodeJS.Timeout;
private redis: Redis;
constructor(
private config: ConnectionManagerConfig,
private authService: AuthenticationService,
redis: Redis
) {
super();
this.redis = redis;
this.initializeWebSocketServer();
this.startHeartbeat();
}
/**
* 初始化 WebSocket 服務器
*/
private initializeWebSocketServer(): void {
this.wss = new WebSocketServer({
port: this.config.port,
path: '/collaboration',
perMessageDeflate: {
zlibDeflateOptions: {
level: 3,
threshold: 1024,
concurrencyLimit: 10,
},
},
});
this.wss.on('connection', this.handleNewConnection.bind(this));
this.wss.on('error', this.handleServerError.bind(this));
console.log(`WebSocket server listening on port ${this.config.port}`);
}
/**
* 處理新連線
*/
private async handleNewConnection(ws: WebSocket, request: IncomingMessage): Promise<void> {
const connectionId = this.generateConnectionId();
try {
// 認證檢查
const authResult = await this.authenticateConnection(request);
if (!authResult.success) {
ws.close(1008, 'Authentication failed');
return;
}
// 建立連線資訊
const connectionInfo: ConnectionInfo = {
id: connectionId,
userId: authResult.userId,
tenantId: authResult.tenantId,
socket: ws,
sessionId: null,
connectedAt: new Date(),
lastActivity: new Date(),
isAlive: true,
metadata: {
userAgent: request.headers['user-agent'],
ip: this.getClientIP(request),
platform: this.detectPlatform(request),
}
};
this.connections.set(connectionId, connectionInfo);
// 設置 WebSocket 事件處理
this.setupWebSocketHandlers(ws, connectionInfo);
// 發送連線確認
this.sendMessage(connectionId, {
type: 'connection_established',
payload: {
connectionId,
serverTime: new Date().toISOString(),
supportedFeatures: this.config.supportedFeatures,
}
});
this.emit('connection_established', connectionInfo);
} catch (error) {
console.error('Failed to handle new connection:', error);
ws.close(1011, 'Internal server error');
}
}
/**
* 設置 WebSocket 事件處理器
*/
private setupWebSocketHandlers(ws: WebSocket, connectionInfo: ConnectionInfo): void {
const { id: connectionId } = connectionInfo;
ws.on('message', async (data: Buffer) => {
try {
const message = JSON.parse(data.toString()) as IncomingMessage;
await this.handleIncomingMessage(connectionId, message);
} catch (error) {
console.error('Failed to process message:', error);
this.sendError(connectionId, 'invalid_message', 'Failed to process message');
}
});
ws.on('pong', () => {
const connection = this.connections.get(connectionId);
if (connection) {
connection.isAlive = true;
connection.lastActivity = new Date();
}
});
ws.on('close', (code: number, reason: Buffer) => {
this.handleConnectionClose(connectionId, code, reason.toString());
});
ws.on('error', (error: Error) => {
console.error(`WebSocket error for connection ${connectionId}:`, error);
this.handleConnectionError(connectionId, error);
});
}
/**
* 處理傳入訊息
*/
private async handleIncomingMessage(
connectionId: string,
message: IncomingMessage
): Promise<void> {
const connection = this.connections.get(connectionId);
if (!connection) return;
connection.lastActivity = new Date();
try {
switch (message.type) {
case 'join_session':
await this.handleJoinSession(connectionId, message.payload);
break;
case 'leave_session':
await this.handleLeaveSession(connectionId, message.payload);
break;
case 'collaboration_event':
await this.handleCollaborationEvent(connectionId, message.payload);
break;
case 'cursor_update':
await this.handleCursorUpdate(connectionId, message.payload);
break;
case 'selection_update':
await this.handleSelectionUpdate(connectionId, message.payload);
break;
case 'heartbeat':
this.sendMessage(connectionId, { type: 'heartbeat_ack' });
break;
case 'request_sync':
await this.handleSyncRequest(connectionId, message.payload);
break;
default:
this.sendError(connectionId, 'unknown_message_type', `Unknown message type: ${message.type}`);
}
} catch (error) {
console.error('Error handling message:', error);
this.sendError(connectionId, 'message_handling_error', error.message);
}
}
/**
* 處理加入會話
*/
private async handleJoinSession(
connectionId: string,
payload: JoinSessionPayload
): Promise<void> {
const connection = this.connections.get(connectionId);
if (!connection) return;
const { sessionId, userInfo } = payload;
try {
// 驗證權限
const hasPermission = await this.validateSessionPermission(
connection.userId,
connection.tenantId,
sessionId,
'join'
);
if (!hasPermission) {
this.sendError(connectionId, 'permission_denied', 'No permission to join session');
return;
}
// 更新連線資訊
connection.sessionId = sessionId;
// 添加到會話連線集合
if (!this.sessionConnections.has(sessionId)) {
this.sessionConnections.set(sessionId, new Set());
}
this.sessionConnections.get(sessionId)!.add(connectionId);
// 通知會話管理器
this.emit('user_joined_session', {
sessionId,
userId: connection.userId,
connectionId,
userInfo
});
// 發送成功回應
this.sendMessage(connectionId, {
type: 'session_joined',
payload: {
sessionId,
participantId: connection.userId,
success: true
}
});
// 通知其他參與者
await this.broadcastToSession(sessionId, {
type: 'participant_joined',
payload: {
userId: connection.userId,
userInfo,
timestamp: new Date().toISOString()
}
}, [connectionId]);
} catch (error) {
console.error('Failed to join session:', error);
this.sendError(connectionId, 'join_session_failed', error.message);
}
}
/**
* 處理協作事件
*/
private async handleCollaborationEvent(
connectionId: string,
payload: CollaborationEventPayload
): Promise<void> {
const connection = this.connections.get(connectionId);
if (!connection || !connection.sessionId) return;
try {
// 創建完整的事件物件
const event: CollaborationEvent = {
id: this.generateEventId(),
sessionId: connection.sessionId,
userId: connection.userId,
type: payload.type,
payload: payload.data,
timestamp: new Date(),
version: payload.version || 1,
precedence: Date.now(),
causality: payload.causality || [],
metadata: {
source: 'user',
priority: payload.priority || 'medium',
checksum: this.calculateChecksum(payload.data)
}
};
// 發送到事件處理引擎
this.emit('collaboration_event', event);
// 立即廣播到同會話的其他用戶(樂觀更新)
await this.broadcastToSession(connection.sessionId, {
type: 'collaboration_event',
payload: event
}, [connectionId]);
} catch (error) {
console.error('Failed to handle collaboration event:', error);
this.sendError(connectionId, 'event_handling_failed', error.message);
}
}
/**
* 廣播訊息到會話
*/
async broadcastToSession(
sessionId: string,
message: any,
excludeConnections: string[] = []
): Promise<void> {
const sessionConnections = this.sessionConnections.get(sessionId);
if (!sessionConnections) return;
const broadcastPromises = Array.from(sessionConnections)
.filter(connId => !excludeConnections.includes(connId))
.map(connId => this.sendMessage(connId, message));
await Promise.allSettled(broadcastPromises);
}
/**
* 發送訊息到特定連線
*/
async sendMessage(connectionId: string, message: any): Promise<boolean> {
const connection = this.connections.get(connectionId);
if (!connection || connection.socket.readyState !== WebSocket.OPEN) {
return false;
}
try {
const serializedMessage = JSON.stringify(message);
connection.socket.send(serializedMessage);
return true;
} catch (error) {
console.error(`Failed to send message to ${connectionId}:`, error);
return false;
}
}
/**
* 啟動心跳檢測
*/
private startHeartbeat(): void {
this.heartbeatInterval = setInterval(() => {
this.connections.forEach((connection, connectionId) => {
if (connection.isAlive === false) {
// 連線失效,清理資源
this.handleConnectionClose(connectionId, 1000, 'Heartbeat timeout');
return;
}
// 發送 ping
connection.isAlive = false;
if (connection.socket.readyState === WebSocket.OPEN) {
connection.socket.ping();
}
});
}, this.config.heartbeatInterval || 30000);
}
/**
* 處理連線關閉
*/
private handleConnectionClose(
connectionId: string,
code: number,
reason: string
): void {
const connection = this.connections.get(connectionId);
if (!connection) return;
// 從會話中移除
if (connection.sessionId) {
const sessionConnections = this.sessionConnections.get(connection.sessionId);
if (sessionConnections) {
sessionConnections.delete(connectionId);
// 通知其他參與者
this.broadcastToSession(connection.sessionId, {
type: 'participant_left',
payload: {
userId: connection.userId,
timestamp: new Date().toISOString(),
reason: code === 1000 ? 'normal_closure' : 'abnormal_closure'
}
});
// 通知會話管理器
this.emit('user_left_session', {
sessionId: connection.sessionId,
userId: connection.userId,
connectionId
});
}
}
// 清理連線
this.connections.delete(connectionId);
this.emit('connection_closed', {
connectionId,
userId: connection.userId,
code,
reason
});
console.log(`Connection ${connectionId} closed: ${code} - ${reason}`);
}
/**
* 獲取連線統計資訊
*/
getConnectionStats(): ConnectionStats {
const stats: ConnectionStats = {
totalConnections: this.connections.size,
activeConnections: Array.from(this.connections.values())
.filter(c => c.socket.readyState === WebSocket.OPEN).length,
sessionDistribution: new Map(),
userDistribution: new Map(),
averageLatency: 0,
uptimeSeconds: 0
};
// 統計會話分佈
this.sessionConnections.forEach((connections, sessionId) => {
stats.sessionDistribution.set(sessionId, connections.size);
});
// 統計用戶分佈
this.connections.forEach(connection => {
const count = stats.userDistribution.get(connection.userId) || 0;
stats.userDistribution.set(connection.userId, count + 1);
});
return stats;
}
/**
* 清理資源
*/
async shutdown(): Promise<void> {
// 停止心跳
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
}
// 關閉所有連線
const closePromises = Array.from(this.connections.values()).map(connection => {
return new Promise<void>((resolve) => {
if (connection.socket.readyState === WebSocket.OPEN) {
connection.socket.close(1001, 'Server shutdown');
connection.socket.once('close', () => resolve());
} else {
resolve();
}
});
});
await Promise.allSettled(closePromises);
// 關閉服務器
await new Promise<void>((resolve, reject) => {
this.wss.close((error) => {
if (error) reject(error);
else resolve();
});
});
console.log('WebSocket server shutdown complete');
}
}
// packages/kyo-core/src/collaboration/EventEngine.ts
import { Redis } from 'ioredis';
import { EventEmitter } from 'events';
export class EventEngine extends EventEmitter {
private eventStream: string = 'collaboration:events';
private processingQueues: Map<string, EventProcessor> = new Map();
private eventHistory: Map<string, CollaborationEvent[]> = new Map();
private conflictDetector: ConflictDetector;
private persistenceManager: EventPersistenceManager;
constructor(
private redis: Redis,
private config: EventEngineConfig
) {
super();
this.conflictDetector = new ConflictDetector(config.conflictDetection);
this.persistenceManager = new EventPersistenceManager(redis, config.persistence);
this.initializeEventProcessing();
}
/**
* 初始化事件處理
*/
private async initializeEventProcessing(): Promise<void> {
// 創建事件流消費者群組
try {
await this.redis.xgroup('CREATE', this.eventStream, 'processors', '0', 'MKSTREAM');
} catch (error) {
// 群組可能已存在,忽略錯誤
}
// 啟動事件處理器
for (let i = 0; i < this.config.processorCount; i++) {
const processor = new EventProcessor(
this.redis,
this.eventStream,
`processor-${i}`,
this.config
);
processor.on('event_processed', this.handleEventProcessed.bind(this));
processor.on('conflict_detected', this.handleConflictDetected.bind(this));
processor.on('processing_error', this.handleProcessingError.bind(this));
this.processingQueues.set(`processor-${i}`, processor);
await processor.start();
}
// 啟動事件清理程序
this.startEventCleanup();
}
/**
* 處理協作事件
*/
async processCollaborationEvent(event: CollaborationEvent): Promise<void> {
try {
// 1. 驗證事件
await this.validateEvent(event);
// 2. 檢測衝突
const conflicts = await this.conflictDetector.detectConflicts(event);
if (conflicts.length > 0) {
// 有衝突,進入衝突解決流程
await this.handleConflicts(event, conflicts);
return;
}
// 3. 添加到事件流
await this.addEventToStream(event);
// 4. 更新事件歷史
this.updateEventHistory(event);
// 5. 持久化事件
await this.persistenceManager.persistEvent(event);
// 6. 觸發後續處理
this.emit('event_accepted', event);
} catch (error) {
console.error('Failed to process collaboration event:', error);
this.emit('event_rejected', { event, error });
}
}
/**
* 添加事件到 Redis Stream
*/
private async addEventToStream(event: CollaborationEvent): Promise<void> {
const eventData = {
id: event.id,
sessionId: event.sessionId,
userId: event.userId,
type: event.type,
payload: JSON.stringify(event.payload),
timestamp: event.timestamp.toISOString(),
version: event.version.toString(),
precedence: event.precedence.toString(),
causality: JSON.stringify(event.causality),
metadata: JSON.stringify(event.metadata)
};
await this.redis.xadd(this.eventStream, '*', ...Object.entries(eventData).flat());
}
/**
* 衝突檢測與處理
*/
private async handleConflicts(
event: CollaborationEvent,
conflicts: DetectedConflict[]
): Promise<void> {
const conflictContext: ConflictContext = {
originalEvent: event,
conflicts,
sessionId: event.sessionId,
timestamp: new Date(),
participants: await this.getSessionParticipants(event.sessionId)
};
// 根據衝突類型選擇解決策略
const resolutionStrategy = this.selectResolutionStrategy(conflicts);
switch (resolutionStrategy) {
case 'automatic':
await this.resolveConflictsAutomatically(conflictContext);
break;
case 'user_intervention':
await this.requestUserIntervention(conflictContext);
break;
case 'last_writer_wins':
await this.applyLastWriterWins(conflictContext);
break;
case 'operational_transform':
await this.applyOperationalTransform(conflictContext);
break;
default:
// 預設:延遲處理
await this.deferConflictResolution(conflictContext);
}
}
/**
* 自動解決衝突
*/
private async resolveConflictsAutomatically(context: ConflictContext): Promise<void> {
const { originalEvent, conflicts } = context;
const resolvedEvents: CollaborationEvent[] = [];
for (const conflict of conflicts) {
switch (conflict.type) {
case 'concurrent_edit':
// 應用操作變換
const transformedEvent = await this.transformConcurrentEdit(
originalEvent,
conflict.conflictingEvents
);
resolvedEvents.push(transformedEvent);
break;
case 'resource_lock':
// 檢查鎖狀態,可能延遲執行
const lockResolution = await this.resolveLockConflict(originalEvent, conflict);
if (lockResolution.canProceed) {
resolvedEvents.push(lockResolution.transformedEvent);
} else {
await this.deferEvent(originalEvent, lockResolution.deferUntil);
}
break;
case 'state_inconsistency':
// 同步狀態後重試
await this.syncSessionState(originalEvent.sessionId);
const retriedEvent = { ...originalEvent, version: originalEvent.version + 1 };
resolvedEvents.push(retriedEvent);
break;
}
}
// 處理解決後的事件
for (const resolvedEvent of resolvedEvents) {
await this.processCollaborationEvent(resolvedEvent);
}
}
/**
* 操作變換 (Operational Transformation)
*/
private async applyOperationalTransform(context: ConflictContext): Promise<void> {
const { originalEvent, conflicts } = context;
// 獲取衝突的操作序列
const conflictingOperations = await this.extractConflictingOperations(conflicts);
// 建立變換矩陣
const transformMatrix = this.buildTransformMatrix(originalEvent, conflictingOperations);
// 應用變換
const transformedOperations = this.applyTransformations(transformMatrix);
// 生成變換後的事件
for (const transformedOp of transformedOperations) {
const transformedEvent: CollaborationEvent = {
...originalEvent,
id: this.generateEventId(),
payload: transformedOp.payload,
version: transformedOp.version,
causality: [...originalEvent.causality, ...transformedOp.dependencies]
};
await this.processCollaborationEvent(transformedEvent);
}
}
/**
* 建立變換矩陣
*/
private buildTransformMatrix(
targetOperation: CollaborationEvent,
conflictingOperations: Operation[]
): TransformMatrix {
const matrix: TransformMatrix = {
target: this.extractOperation(targetOperation),
conflicts: conflictingOperations,
transformations: new Map()
};
// 為每個衝突操作計算變換函數
conflictingOperations.forEach(conflictOp => {
const transform = this.calculateTransformation(matrix.target, conflictOp);
matrix.transformations.set(conflictOp.id, transform);
});
return matrix;
}
/**
* 計算操作變換
*/
private calculateTransformation(
targetOp: Operation,
conflictOp: Operation
): OperationTransformation {
// 根據操作類型決定變換策略
const opType1 = targetOp.type;
const opType2 = conflictOp.type;
if (opType1 === 'course_move' && opType2 === 'course_move') {
return this.transformCourseMoveOperations(targetOp, conflictOp);
}
if (opType1 === 'course_update' && opType2 === 'course_update') {
return this.transformCourseUpdateOperations(targetOp, conflictOp);
}
if (opType1 === 'course_create' && opType2 === 'course_create') {
return this.transformCourseCreateOperations(targetOp, conflictOp);
}
// 預設變換:保持原始操作
return {
transformedOperation: targetOp,
dependencies: [conflictOp.id],
confidence: 0.5
};
}
/**
* 課程移動操作變換
*/
private transformCourseMoveOperations(
op1: Operation,
op2: Operation
): OperationTransformation {
const move1 = op1.payload as CourseMovePayload;
const move2 = op2.payload as CourseMovePayload;
// 如果移動不同的課程,無需變換
if (move1.courseId !== move2.courseId) {
return {
transformedOperation: op1,
dependencies: [],
confidence: 1.0
};
}
// 相同課程的並發移動:使用時間戳決定優先級
const priority1 = op1.timestamp.getTime();
const priority2 = op2.timestamp.getTime();
if (priority1 > priority2) {
// op1 優先,保持原始操作
return {
transformedOperation: op1,
dependencies: [op2.id],
confidence: 0.8
};
} else {
// op2 優先,op1 需要調整為 no-op 或重新計算目標位置
return {
transformedOperation: {
...op1,
type: 'course_move_conflict_resolved',
payload: {
...move1,
conflictWith: op2.id,
adjustedTarget: this.calculateAdjustedTarget(move1, move2)
}
},
dependencies: [op2.id],
confidence: 0.6
};
}
}
/**
* 會話狀態同步
*/
async syncSessionState(sessionId: string): Promise<SessionState> {
const stateKey = `session:state:${sessionId}`;
try {
// 從 Redis 獲取最新狀態
const serializedState = await this.redis.get(stateKey);
if (!serializedState) {
// 重建狀態
return await this.rebuildSessionState(sessionId);
}
const state: SessionState = JSON.parse(serializedState);
// 驗證狀態完整性
const isValid = await this.validateSessionState(state);
if (!isValid) {
console.warn(`Session state ${sessionId} is invalid, rebuilding...`);
return await this.rebuildSessionState(sessionId);
}
return state;
} catch (error) {
console.error('Failed to sync session state:', error);
return await this.rebuildSessionState(sessionId);
}
}
/**
* 重建會話狀態
*/
private async rebuildSessionState(sessionId: string): Promise<SessionState> {
// 從事件歷史重建狀態
const events = await this.getSessionEventHistory(sessionId);
const state: SessionState = this.createEmptySessionState();
// 按時間順序應用所有事件
const sortedEvents = events.sort((a, b) =>
a.timestamp.getTime() - b.timestamp.getTime()
);
for (const event of sortedEvents) {
await this.applyEventToState(state, event);
}
// 保存重建的狀態
await this.saveSessionState(sessionId, state);
return state;
}
/**
* 應用事件到狀態
*/
private async applyEventToState(
state: SessionState,
event: CollaborationEvent
): Promise<void> {
switch (event.type) {
case 'course_create':
const course = event.payload as Course;
state.courses.set(course.id, course);
break;
case 'course_update':
const updateData = event.payload as CourseUpdatePayload;
const existingCourse = state.courses.get(updateData.courseId);
if (existingCourse) {
state.courses.set(updateData.courseId, {
...existingCourse,
...updateData.updates
});
}
break;
case 'course_delete':
state.courses.delete(event.payload.courseId);
break;
case 'course_move':
const moveData = event.payload as CourseMovePayload;
const courseToMove = state.courses.get(moveData.courseId);
if (courseToMove) {
state.courses.set(moveData.courseId, {
...courseToMove,
startTime: moveData.newTimeSlot.start.toISOString(),
endTime: moveData.newTimeSlot.end.toISOString()
});
}
break;
case 'template_create':
const template = event.payload as CourseTemplate;
state.templates.set(template.id, template);
break;
case 'cursor_update':
state.cursors.set(event.userId, event.payload as CursorPosition);
break;
case 'selection_update':
state.selections.set(event.userId, event.payload as UserSelection);
break;
}
// 更新狀態版本
state.version++;
state.lastUpdated = event.timestamp;
}
/**
* 事件持久化管理器
*/
private startEventCleanup(): void {
setInterval(async () => {
try {
await this.cleanupExpiredEvents();
await this.archiveOldEvents();
await this.optimizeEventStorage();
} catch (error) {
console.error('Event cleanup failed:', error);
}
}, this.config.cleanupInterval || 3600000); // 每小時執行一次
}
/**
* 清理過期事件
*/
private async cleanupExpiredEvents(): Promise<void> {
const cutoffTime = Date.now() - (this.config.eventRetentionMs || 86400000); // 24小時
// 清理 Redis Stream 中的舊事件
await this.redis.xtrim(this.eventStream, 'MINID', cutoffTime);
// 清理記憶體中的事件歷史
this.eventHistory.forEach((events, sessionId) => {
const filteredEvents = events.filter(
event => event.timestamp.getTime() > cutoffTime
);
if (filteredEvents.length === 0) {
this.eventHistory.delete(sessionId);
} else {
this.eventHistory.set(sessionId, filteredEvents);
}
});
}
/**
* 獲取引擎統計資訊
*/
getEngineStats(): EventEngineStats {
const stats: EventEngineStats = {
totalEventsProcessed: 0,
eventsPerSecond: 0,
conflictRate: 0,
averageProcessingTime: 0,
activeProcessors: this.processingQueues.size,
queueDepth: 0,
errorRate: 0,
memoryUsage: {
eventHistory: this.calculateEventHistorySize(),
processingQueues: this.calculateQueueSize(),
total: process.memoryUsage().heapUsed
}
};
return stats;
}
/**
* 關閉事件引擎
*/
async shutdown(): Promise<void> {
console.log('Shutting down event engine...');
// 停止所有處理器
const shutdownPromises = Array.from(this.processingQueues.values())
.map(processor => processor.stop());
await Promise.allSettled(shutdownPromises);
// 持久化剩餘事件
await this.persistenceManager.flushPendingEvents();
// 清理資源
this.eventHistory.clear();
this.processingQueues.clear();
console.log('Event engine shutdown complete');
}
}
// packages/kyo-core/src/collaboration/EventProcessor.ts
export class EventProcessor extends EventEmitter {
private isRunning: boolean = false;
private processingLoop: NodeJS.Timeout | null = null;
private processedCount: number = 0;
private errorCount: number = 0;
private lastProcessedId: string = '0';
constructor(
private redis: Redis,
private streamName: string,
private consumerName: string,
private config: EventProcessorConfig
) {
super();
}
/**
* 啟動處理器
*/
async start(): Promise<void> {
if (this.isRunning) return;
this.isRunning = true;
console.log(`Starting event processor: ${this.consumerName}`);
// 啟動處理循環
this.processingLoop = setInterval(
this.processEvents.bind(this),
this.config.pollInterval || 100
);
// 處理未確認的訊息
await this.processPendingMessages();
}
/**
* 處理事件
*/
private async processEvents(): Promise<void> {
if (!this.isRunning) return;
try {
// 從流中讀取事件
const results = await this.redis.xreadgroup(
'GROUP', 'processors', this.consumerName,
'COUNT', this.config.batchSize || 10,
'BLOCK', this.config.blockTime || 1000,
'STREAMS', this.streamName, '>'
);
if (!results || results.length === 0) return;
const [streamName, messages] = results[0] as [string, [string, string[]][]];
// 批次處理訊息
await this.processBatch(messages);
} catch (error) {
this.errorCount++;
this.emit('processing_error', error);
// 錯誤避讓:稍作延遲
await this.delay(this.config.errorBackoffMs || 1000);
}
}
/**
* 批次處理訊息
*/
private async processBatch(messages: [string, string[]][]) {
const processPromises = messages.map(async ([messageId, fields]) => {
try {
// 解析事件資料
const eventData = this.parseEventData(fields);
const event = this.reconstructEvent(eventData);
// 處理事件
await this.processEvent(event);
// 確認訊息處理完成
await this.redis.xack(this.streamName, 'processors', messageId);
this.processedCount++;
this.lastProcessedId = messageId;
this.emit('event_processed', { event, messageId });
} catch (error) {
console.error(`Failed to process message ${messageId}:`, error);
// 記錄處理失敗
await this.handleProcessingFailure(messageId, error);
}
});
await Promise.allSettled(processPromises);
}
/**
* 處理單個事件
*/
private async processEvent(event: CollaborationEvent): Promise<void> {
const startTime = Date.now();
try {
// 事件驗證
this.validateEvent(event);
// 根據事件類型進行處理
switch (event.type) {
case 'course_create':
await this.handleCourseCreate(event);
break;
case 'course_update':
await this.handleCourseUpdate(event);
break;
case 'course_delete':
await this.handleCourseDelete(event);
break;
case 'course_move':
await this.handleCourseMove(event);
break;
case 'template_create':
await this.handleTemplateCreate(event);
break;
default:
console.warn(`Unknown event type: ${event.type}`);
}
const processingTime = Date.now() - startTime;
this.recordProcessingMetrics(event.type, processingTime, true);
} catch (error) {
const processingTime = Date.now() - startTime;
this.recordProcessingMetrics(event.type, processingTime, false);
throw error;
}
}
/**
* 處理課程創建事件
*/
private async handleCourseCreate(event: CollaborationEvent): Promise<void> {
const course = event.payload as Course;
// 驗證課程資料
await this.validateCourseData(course);
// 檢查時間衝突
const conflicts = await this.checkTimeConflicts(course, event.sessionId);
if (conflicts.length > 0) {
this.emit('conflict_detected', {
event,
conflicts,
type: 'time_conflict'
});
return;
}
// 儲存課程
await this.saveCourse(course);
// 更新會話狀態
await this.updateSessionState(event.sessionId, {
type: 'add_course',
courseId: course.id,
course
});
// 觸發後續處理
this.emit('course_created', { event, course });
}
/**
* 處理課程移動事件
*/
private async handleCourseMove(event: CollaborationEvent): Promise<void> {
const moveData = event.payload as CourseMovePayload;
// 獲取現有課程
const existingCourse = await this.getCourse(moveData.courseId);
if (!existingCourse) {
throw new Error(`Course ${moveData.courseId} not found`);
}
// 驗證新時間位置
const isValidMove = await this.validateCourseMove(
existingCourse,
moveData.newTimeSlot,
event.sessionId
);
if (!isValidMove.valid) {
this.emit('conflict_detected', {
event,
conflicts: isValidMove.conflicts,
type: 'move_conflict'
});
return;
}
// 執行移動
const updatedCourse = {
...existingCourse,
startTime: moveData.newTimeSlot.start.toISOString(),
endTime: moveData.newTimeSlot.end.toISOString(),
updatedAt: event.timestamp.toISOString()
};
await this.saveCourse(updatedCourse);
// 更新會話狀態
await this.updateSessionState(event.sessionId, {
type: 'move_course',
courseId: moveData.courseId,
oldTimeSlot: {
start: new Date(existingCourse.startTime),
end: new Date(existingCourse.endTime)
},
newTimeSlot: moveData.newTimeSlot
});
this.emit('course_moved', { event, course: updatedCourse });
}
/**
* 驗證課程移動
*/
private async validateCourseMove(
course: Course,
newTimeSlot: TimeSlot,
sessionId: string
): Promise<MoveValidationResult> {
const conflicts: ConflictInfo[] = [];
// 檢查時間衝突
const timeConflicts = await this.checkTimeConflicts({
...course,
startTime: newTimeSlot.start.toISOString(),
endTime: newTimeSlot.end.toISOString()
}, sessionId);
conflicts.push(...timeConflicts);
// 檢查教練可用性
if (course.trainerId) {
const trainerConflicts = await this.checkTrainerAvailability(
course.trainerId,
newTimeSlot,
sessionId,
course.id
);
conflicts.push(...trainerConflicts);
}
// 檢查場地可用性
if (course.roomId) {
const roomConflicts = await this.checkRoomAvailability(
course.roomId,
newTimeSlot,
sessionId,
course.id
);
conflicts.push(...roomConflicts);
}
return {
valid: conflicts.length === 0,
conflicts
};
}
/**
* 檢查時間衝突
*/
private async checkTimeConflicts(
course: Course,
sessionId: string
): Promise<ConflictInfo[]> {
const conflicts: ConflictInfo[] = [];
// 從會話狀態獲取現有課程
const sessionState = await this.getSessionState(sessionId);
const existingCourses = Array.from(sessionState.courses.values());
const courseStart = new Date(course.startTime);
const courseEnd = new Date(course.endTime);
for (const existing of existingCourses) {
if (existing.id === course.id) continue; // 跳過自己
const existingStart = new Date(existing.startTime);
const existingEnd = new Date(existing.endTime);
// 檢查時間重疊
if (courseStart < existingEnd && courseEnd > existingStart) {
conflicts.push({
type: 'time_overlap',
description: `與課程「${existing.name}」時間重疊`,
conflictingResourceId: existing.id,
severity: 'error'
});
}
}
return conflicts;
}
/**
* 獲取處理器統計資訊
*/
getStats(): ProcessorStats {
return {
consumerName: this.consumerName,
isRunning: this.isRunning,
processedCount: this.processedCount,
errorCount: this.errorCount,
lastProcessedId: this.lastProcessedId,
uptime: this.isRunning ? Date.now() - this.startTime : 0,
processingRate: this.calculateProcessingRate()
};
}
/**
* 停止處理器
*/
async stop(): Promise<void> {
this.isRunning = false;
if (this.processingLoop) {
clearInterval(this.processingLoop);
this.processingLoop = null;
}
// 等待當前批次處理完成
await this.delay(this.config.gracefulShutdownMs || 5000);
console.log(`Event processor ${this.consumerName} stopped`);
}
}
// packages/kyo-core/src/collaboration/StateSyncManager.ts
export class StateSyncManager {
private syncIntervals: Map<string, NodeJS.Timeout> = new Map();
private stateCache: Map<string, CachedSessionState> = new Map();
private syncLocks: Map<string, SyncLock> = new Map();
private conflictResolver: StateConflictResolver;
constructor(
private redis: Redis,
private database: DatabasePool,
private config: StateSyncConfig
) {
this.conflictResolver = new StateConflictResolver(config.conflictResolution);
this.startSyncManager();
}
/**
* 啟動同步管理器
*/
private startSyncManager(): void {
// 定期同步所有活躍會話
setInterval(
this.syncAllActiveSessions.bind(this),
this.config.globalSyncInterval || 30000
);
// 清理過期的同步鎖
setInterval(
this.cleanupExpiredLocks.bind(this),
this.config.lockCleanupInterval || 60000
);
// 監聽資料庫更改通知
this.setupDatabaseNotifications();
}
/**
* 同步會話狀態
*/
async syncSessionState(sessionId: string, force: boolean = false): Promise<SyncResult> {
const syncLock = await this.acquireSyncLock(sessionId);
if (!syncLock && !force) {
return { success: false, reason: 'sync_in_progress' };
}
try {
const startTime = Date.now();
// 1. 獲取當前快取狀態
const cachedState = this.stateCache.get(sessionId);
// 2. 從 Redis 獲取最新狀態
const redisState = await this.getStateFromRedis(sessionId);
// 3. 從資料庫獲取權威狀態
const dbState = await this.getStateFromDatabase(sessionId);
// 4. 檢測狀態差異
const conflicts = await this.detectStateConflicts(cachedState, redisState, dbState);
if (conflicts.length > 0) {
// 5. 解決狀態衝突
const resolvedState = await this.conflictResolver.resolveStateConflicts(
conflicts,
{ cached: cachedState, redis: redisState, database: dbState }
);
// 6. 應用解決結果
await this.applyResolvedState(sessionId, resolvedState);
} else {
// 無衝突,使用最新的 Redis 狀態
await this.updateCacheState(sessionId, redisState);
}
// 7. 持久化到資料庫
if (redisState && this.shouldPersistToDatabase(redisState)) {
await this.persistStateToDatabase(sessionId, redisState);
}
const syncDuration = Date.now() - startTime;
return {
success: true,
syncDuration,
conflictsResolved: conflicts.length,
stateVersion: redisState?.version || 0
};
} catch (error) {
console.error(`Failed to sync session state ${sessionId}:`, error);
return { success: false, reason: 'sync_error', error: error.message };
} finally {
if (syncLock) {
await this.releaseSyncLock(sessionId);
}
}
}
/**
* 檢測狀態衝突
*/
private async detectStateConflicts(
cached: CachedSessionState | undefined,
redis: SessionState | null,
database: SessionState | null
): Promise<StateConflict[]> {
const conflicts: StateConflict[] = [];
// Redis vs Database 衝突檢測
if (redis && database) {
const versionConflict = this.detectVersionConflict(redis, database);
if (versionConflict) conflicts.push(versionConflict);
const dataConflicts = await this.detectDataConflicts(redis, database);
conflicts.push(...dataConflicts);
}
// Cache vs Redis 衝突檢測
if (cached && redis) {
const cacheConflicts = this.detectCacheConflicts(cached.state, redis);
conflicts.push(...cacheConflicts);
}
return conflicts;
}
/**
* 檢測版本衝突
*/
private detectVersionConflict(
state1: SessionState,
state2: SessionState
): StateConflict | null {
if (state1.version !== state2.version) {
return {
type: 'version_mismatch',
description: `Version mismatch: ${state1.version} vs ${state2.version}`,
severity: 'high',
affectedResources: ['session_state'],
conflictData: {
version1: state1.version,
version2: state2.version,
timestamp1: state1.lastUpdated,
timestamp2: state2.lastUpdated
}
};
}
return null;
}
/**
* 檢測資料衝突
*/
private async detectDataConflicts(
state1: SessionState,
state2: SessionState
): Promise<StateConflict[]> {
const conflicts: StateConflict[] = [];
// 課程資料衝突
const courseConflicts = this.detectCourseConflicts(state1.courses, state2.courses);
conflicts.push(...courseConflicts);
// 模板資料衝突
const templateConflicts = this.detectTemplateConflicts(state1.templates, state2.templates);
conflicts.push(...templateConflicts);
return conflicts;
}
/**
* 檢測課程衝突
*/
private detectCourseConflicts(
courses1: Map<string, Course>,
courses2: Map<string, Course>
): StateConflict[] {
const conflicts: StateConflict[] = [];
const allCourseIds = new Set([...courses1.keys(), ...courses2.keys()]);
allCourseIds.forEach(courseId => {
const course1 = courses1.get(courseId);
const course2 = courses2.get(courseId);
if (!course1 && course2) {
// 狀態1中缺少課程
conflicts.push({
type: 'course_missing',
description: `Course ${courseId} missing in state1`,
severity: 'medium',
affectedResources: [courseId],
conflictData: { missingCourse: course2 }
});
} else if (course1 && !course2) {
// 狀態2中缺少課程
conflicts.push({
type: 'course_missing',
description: `Course ${courseId} missing in state2`,
severity: 'medium',
affectedResources: [courseId],
conflictData: { missingCourse: course1 }
});
} else if (course1 && course2) {
// 檢查課程內容差異
const differences = this.findCourseDifferences(course1, course2);
if (differences.length > 0) {
conflicts.push({
type: 'course_data_conflict',
description: `Course ${courseId} data differs`,
severity: 'medium',
affectedResources: [courseId],
conflictData: {
course1,
course2,
differences
}
});
}
}
});
return conflicts;
}
/**
* 應用解決後的狀態
*/
private async applyResolvedState(
sessionId: string,
resolvedState: ResolvedSessionState
): Promise<void> {
const { finalState, resolutionActions } = resolvedState;
// 1. 更新 Redis 狀態
await this.saveStateToRedis(sessionId, finalState);
// 2. 更新本地快取
await this.updateCacheState(sessionId, finalState);
// 3. 執行解決動作
for (const action of resolutionActions) {
await this.executeResolutionAction(sessionId, action);
}
// 4. 通知相關服務
await this.notifyStateResolution(sessionId, resolvedState);
}
/**
* 執行解決動作
*/
private async executeResolutionAction(
sessionId: string,
action: ResolutionAction
): Promise<void> {
switch (action.type) {
case 'merge_courses':
await this.mergeCourseData(sessionId, action.data);
break;
case 'rollback_changes':
await this.rollbackChanges(sessionId, action.data);
break;
case 'sync_from_authoritative':
await this.syncFromAuthoritativeSource(sessionId, action.data);
break;
case 'notify_participants':
await this.notifyParticipants(sessionId, action.data);
break;
case 'create_backup':
await this.createStateBackup(sessionId, action.data);
break;
default:
console.warn(`Unknown resolution action type: ${action.type}`);
}
}
/**
* 批次同步多個會話
*/
async batchSyncSessions(sessionIds: string[]): Promise<BatchSyncResult> {
const results: Map<string, SyncResult> = new Map();
const batchStartTime = Date.now();
// 並行同步(但限制並發數量)
const concurrencyLimit = this.config.maxConcurrentSyncs || 5;
const batches = this.chunkArray(sessionIds, concurrencyLimit);
for (const batch of batches) {
const batchPromises = batch.map(async (sessionId) => {
const result = await this.syncSessionState(sessionId);
results.set(sessionId, result);
return result;
});
await Promise.allSettled(batchPromises);
}
const batchDuration = Date.now() - batchStartTime;
const successCount = Array.from(results.values()).filter(r => r.success).length;
return {
totalSessions: sessionIds.length,
successCount,
failureCount: sessionIds.length - successCount,
batchDuration,
results
};
}
/**
* 監控同步性能
*/
getSyncMetrics(): SyncMetrics {
const activeSyncs = Array.from(this.syncLocks.values()).filter(
lock => !lock.expired
).length;
return {
activeSessions: this.stateCache.size,
activeSyncs,
totalSyncsCompleted: this.totalSyncsCompleted,
averageSyncDuration: this.calculateAverageSyncDuration(),
syncSuccessRate: this.calculateSyncSuccessRate(),
memoryUsage: {
stateCache: this.calculateStateCacheSize(),
syncLocks: this.syncLocks.size,
total: process.memoryUsage().heapUsed
},
lastSyncTimestamps: this.getLastSyncTimestamps()
};
}
/**
* 清理資源
*/
async shutdown(): Promise<void> {
console.log('Shutting down state sync manager...');
// 停止所有同步間隔
this.syncIntervals.forEach(interval => clearInterval(interval));
this.syncIntervals.clear();
// 等待進行中的同步完成
const pendingSyncs = Array.from(this.syncLocks.entries())
.filter(([_, lock]) => !lock.expired)
.map(([sessionId, _]) =>
this.waitForSyncCompletion(sessionId, 10000) // 最多等待10秒
);
await Promise.allSettled(pendingSyncs);
// 持久化快取狀態
const persistPromises = Array.from(this.stateCache.entries())
.map(([sessionId, cached]) =>
this.persistStateToDatabase(sessionId, cached.state)
);
await Promise.allSettled(persistPromises);
// 清理資源
this.stateCache.clear();
this.syncLocks.clear();
console.log('State sync manager shutdown complete');
}
}
今天我們實作了完整的即時協作引擎和分散式資料同步系統,包括:
WebSocket 連線管理器
事件處理引擎
操作變換系統
分散式狀態管理