在 Day 16 我們實作了即時協作引擎,今天我們將進行重要的架構重構:將 WebSocket 協作系統獨立為專門的微服務。這個架構升級將提供更好的可擴展性、故障隔離和維護性。我們將實作 WebSocket 微服務、服務間通信、負載均衡,以及高可用性的訊息處理系統。
// packages/kyo-core/src/microservices/types.ts
export interface MicroserviceArchitecture {
// 核心業務服務
otpService: {
port: 3000;
responsibilities: ['authentication', 'user-management', 'basic-crud'];
dependencies: ['database', 'redis'];
};
// WebSocket 協作服務 (新增)
collaborationService: {
port: 3001;
responsibilities: ['real-time-collaboration', 'session-management', 'conflict-resolution'];
dependencies: ['redis', 'message-queue'];
};
// 分析報表服務 (預計實作)
analyticsService: {
port: 3002;
responsibilities: ['data-analysis', 'reporting', 'metrics'];
dependencies: ['database', 'time-series-db'];
};
// 通知服務
notificationService: {
port: 3003;
responsibilities: ['push-notifications', 'email', 'sms'];
dependencies: ['redis', 'external-apis'];
};
}
export interface ServiceConfig {
serviceName: string;
port: number;
host: string;
environment: 'development' | 'staging' | 'production';
// 資料庫配置
database: DatabaseConfig;
redis: RedisConfig;
messageQueue: MessageQueueConfig;
// 服務發現
serviceDiscovery: ServiceDiscoveryConfig;
// 監控配置
monitoring: MonitoringConfig;
// 安全配置
security: SecurityConfig;
}
export interface ServiceDiscoveryConfig {
enabled: boolean;
registry: 'consul' | 'etcd' | 'redis';
healthCheckInterval: number;
heartbeatInterval: number;
}
export interface MonitoringConfig {
metrics: {
enabled: boolean;
endpoint: string;
prometheus: boolean;
};
logging: {
level: 'debug' | 'info' | 'warn' | 'error';
format: 'json' | 'text';
outputs: string[];
};
tracing: {
enabled: boolean;
jaeger?: JaegerConfig;
};
}
// apps/kyo-collaboration-service/src/index.ts
import fastify, { FastifyInstance } from 'fastify';
import { Server } from 'socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { Redis } from 'ioredis';
import { CollaborationEngine } from './engines/CollaborationEngine';
import { ServiceRegistry } from './services/ServiceRegistry';
import { HealthMonitor } from './monitoring/HealthMonitor';
import { MetricsCollector } from './monitoring/MetricsCollector';
import { SecurityManager } from './security/SecurityManager';
export class CollaborationServiceApp {
private app: FastifyInstance;
private io: Server;
private redis: Redis;
private redisAdapter: Redis;
private collaborationEngine: CollaborationEngine;
private serviceRegistry: ServiceRegistry;
private healthMonitor: HealthMonitor;
private metricsCollector: MetricsCollector;
private securityManager: SecurityManager;
constructor(private config: ServiceConfig) {
this.initializeApp();
}
/**
* 初始化應用程式
*/
private async initializeApp(): Promise<void> {
// 創建 Fastify 實例
this.app = fastify({
logger: {
level: this.config.monitoring.logging.level,
serializers: {
req: (req) => ({
method: req.method,
url: req.url,
headers: req.headers,
remoteAddress: req.ip,
remotePort: req.socket?.remotePort
}),
res: (res) => ({
statusCode: res.statusCode
})
}
},
trustProxy: true,
requestIdHeader: 'x-request-id',
requestIdLogLabel: 'requestId'
});
// 初始化 Redis 連接
await this.initializeRedis();
// 初始化 Socket.IO
await this.initializeSocketIO();
// 初始化核心組件
await this.initializeCoreComponents();
// 註冊路由
await this.registerRoutes();
// 註冊中間件
await this.registerMiddleware();
// 設置錯誤處理
this.setupErrorHandling();
}
/**
* 初始化 Redis 連接
*/
private async initializeRedis(): Promise<void> {
// 主 Redis 連接
this.redis = new Redis({
host: this.config.redis.host,
port: this.config.redis.port,
password: this.config.redis.password,
db: this.config.redis.database,
retryDelayOnFailover: 100,
maxRetriesPerRequest: 3,
lazyConnect: true,
keepAlive: 30000,
connectTimeout: 10000,
commandTimeout: 5000
});
// Socket.IO Adapter 專用 Redis 連接
this.redisAdapter = new Redis({
host: this.config.redis.host,
port: this.config.redis.port,
password: this.config.redis.password,
db: this.config.redis.database + 1, // 使用不同的資料庫
retryDelayOnFailover: 100,
maxRetriesPerRequest: 3,
lazyConnect: true
});
// 測試連接
await this.redis.ping();
await this.redisAdapter.ping();
console.log('✅ Redis connections established');
}
/**
* 初始化 Socket.IO 服務器
*/
private async initializeSocketIO(): Promise<void> {
// 註冊 Socket.IO 插件
await this.app.register(require('@fastify/socket.io'), {
cors: {
origin: this.getAllowedOrigins(),
methods: ['GET', 'POST'],
credentials: true
},
allowEIO3: true,
transports: ['websocket', 'polling'],
pingTimeout: 60000,
pingInterval: 25000
});
this.io = this.app.io;
// 設置 Redis Adapter 用於多實例通信
const pubClient = this.redisAdapter;
const subClient = this.redisAdapter.duplicate();
this.io.adapter(createAdapter(pubClient, subClient));
// 設置 Socket.IO 中間件
this.setupSocketMiddleware();
// 設置 Socket.IO 事件處理
this.setupSocketEventHandlers();
console.log('✅ Socket.IO server initialized');
}
/**
* 初始化核心組件
*/
private async initializeCoreComponents(): Promise<void> {
// 安全管理器
this.securityManager = new SecurityManager(this.config.security);
// 協作引擎
this.collaborationEngine = new CollaborationEngine({
redis: this.redis,
socketIO: this.io,
securityManager: this.securityManager,
config: this.config.collaboration
});
// 服務註冊
this.serviceRegistry = new ServiceRegistry({
redis: this.redis,
serviceName: this.config.serviceName,
serviceConfig: this.config.serviceDiscovery,
healthEndpoint: '/health'
});
// 健康監控
this.healthMonitor = new HealthMonitor({
redis: this.redis,
socketIO: this.io,
collaborationEngine: this.collaborationEngine
});
// 指標收集器
this.metricsCollector = new MetricsCollector({
redis: this.redis,
socketIO: this.io,
collaborationEngine: this.collaborationEngine,
prometheusEnabled: this.config.monitoring.metrics.prometheus
});
await this.collaborationEngine.initialize();
await this.serviceRegistry.register();
await this.healthMonitor.start();
await this.metricsCollector.start();
console.log('✅ Core components initialized');
}
/**
* 設置 Socket 中間件
*/
private setupSocketMiddleware(): void {
// 認證中間件
this.io.use(async (socket, next) => {
try {
const token = socket.handshake.auth.token;
if (!token) {
throw new Error('Authentication token required');
}
const user = await this.securityManager.validateToken(token);
socket.data.user = user;
socket.data.userId = user.id;
socket.data.tenantId = user.tenantId;
next();
} catch (error) {
next(new Error('Authentication failed'));
}
});
// 速率限制中間件
this.io.use(async (socket, next) => {
const rateLimitResult = await this.securityManager.checkRateLimit(
socket.data.userId,
'websocket_connection'
);
if (!rateLimitResult.allowed) {
next(new Error('Rate limit exceeded'));
return;
}
next();
});
// 連接追蹤中間件
this.io.use((socket, next) => {
socket.data.connectedAt = new Date();
socket.data.connectionId = this.generateConnectionId();
this.metricsCollector.recordConnection(socket.data);
next();
});
}
/**
* 設置 Socket 事件處理器
*/
private setupSocketEventHandlers(): void {
this.io.on('connection', (socket) => {
console.log(`🔗 Client connected: ${socket.data.connectionId} (User: ${socket.data.userId})`);
// 加入用戶房間(用於廣播)
socket.join(`user:${socket.data.userId}`);
socket.join(`tenant:${socket.data.tenantId}`);
// 連接事件
socket.emit('connection_established', {
connectionId: socket.data.connectionId,
serverTime: new Date().toISOString(),
capabilities: ['real-time-collaboration', 'cursor-tracking', 'selection-sync']
});
// 會話管理事件
socket.on('join_session', async (data, callback) => {
try {
const result = await this.collaborationEngine.handleJoinSession(
socket,
data.sessionId,
data.sessionInfo
);
callback({ success: true, data: result });
} catch (error) {
callback({ success: false, error: error.message });
}
});
socket.on('leave_session', async (data, callback) => {
try {
await this.collaborationEngine.handleLeaveSession(
socket,
data.sessionId
);
callback({ success: true });
} catch (error) {
callback({ success: false, error: error.message });
}
});
// 協作事件
socket.on('collaboration_event', async (event) => {
try {
await this.collaborationEngine.handleCollaborationEvent(socket, event);
} catch (error) {
socket.emit('operation_rejected', {
eventId: event.id,
reason: error.message
});
}
});
// 游標和選擇事件
socket.on('cursor_move', (data) => {
this.collaborationEngine.handleCursorMove(socket, data);
});
socket.on('selection_change', (data) => {
this.collaborationEngine.handleSelectionChange(socket, data);
});
// 心跳
socket.on('ping', () => {
socket.emit('pong');
});
// 斷線處理
socket.on('disconnect', (reason) => {
console.log(`🔌 Client disconnected: ${socket.data.connectionId} (${reason})`);
this.collaborationEngine.handleDisconnection(socket);
this.metricsCollector.recordDisconnection(socket.data, reason);
});
// 錯誤處理
socket.on('error', (error) => {
console.error(`❌ Socket error for ${socket.data.connectionId}:`, error);
this.metricsCollector.recordError(socket.data, error);
});
});
}
/**
* 註冊 HTTP 路由
*/
private async registerRoutes(): Promise<void> {
// 健康檢查端點
this.app.get('/health', async (request, reply) => {
const health = await this.healthMonitor.getHealthStatus();
if (health.status === 'healthy') {
reply.code(200).send(health);
} else {
reply.code(503).send(health);
}
});
// 指標端點
this.app.get('/metrics', async (request, reply) => {
if (this.config.monitoring.metrics.prometheus) {
const metrics = await this.metricsCollector.getPrometheusMetrics();
reply.type('text/plain').send(metrics);
} else {
const metrics = await this.metricsCollector.getMetrics();
reply.send(metrics);
}
});
// 會話管理 API
this.app.post('/sessions/create', async (request, reply) => {
try {
const { userInfo, capabilities } = request.body as any;
const sessionToken = await this.collaborationEngine.createSessionToken(
userInfo,
capabilities
);
reply.send(sessionToken);
} catch (error) {
reply.code(400).send({ error: error.message });
}
});
this.app.get('/sessions/:sessionId', async (request, reply) => {
try {
const { sessionId } = request.params as any;
const sessionInfo = await this.collaborationEngine.getSessionInfo(sessionId);
reply.send(sessionInfo);
} catch (error) {
reply.code(404).send({ error: 'Session not found' });
}
});
this.app.delete('/sessions/:sessionId', async (request, reply) => {
try {
const { sessionId } = request.params as any;
await this.collaborationEngine.destroySession(sessionId);
reply.code(204).send();
} catch (error) {
reply.code(400).send({ error: error.message });
}
});
// 統計 API
this.app.get('/stats/connections', async (request, reply) => {
const stats = await this.metricsCollector.getConnectionStats();
reply.send(stats);
});
this.app.get('/stats/sessions', async (request, reply) => {
const stats = await this.collaborationEngine.getSessionStats();
reply.send(stats);
});
}
/**
* 註冊中間件
*/
private async registerMiddleware(): Promise<void> {
// CORS
await this.app.register(require('@fastify/cors'), {
origin: this.getAllowedOrigins(),
credentials: true
});
// 速率限制
await this.app.register(require('@fastify/rate-limit'), {
max: 100,
timeWindow: '1 minute',
redis: this.redis
});
// 安全標頭
await this.app.register(require('@fastify/helmet'), {
contentSecurityPolicy: false
});
// 請求日誌
await this.app.register(require('@fastify/request-context'));
}
/**
* 設置錯誤處理
*/
private setupErrorHandling(): void {
// 全域錯誤處理器
this.app.setErrorHandler(async (error, request, reply) => {
const errorId = this.generateErrorId();
request.log.error({
errorId,
error: {
message: error.message,
stack: error.stack,
statusCode: error.statusCode
}
}, 'Request error');
// 記錄錯誤指標
this.metricsCollector.recordHttpError(error, request);
const statusCode = error.statusCode || 500;
const message = statusCode >= 500 ? 'Internal Server Error' : error.message;
reply.code(statusCode).send({
error: {
message,
errorId,
timestamp: new Date().toISOString()
}
});
});
// 未捕獲異常處理
process.on('uncaughtException', (error) => {
console.error('💥 Uncaught Exception:', error);
this.gracefulShutdown('SIGTERM');
});
process.on('unhandledRejection', (reason, promise) => {
console.error('💥 Unhandled Rejection at:', promise, 'reason:', reason);
this.gracefulShutdown('SIGTERM');
});
// 優雅關閉
['SIGTERM', 'SIGINT'].forEach((signal) => {
process.on(signal, () => {
this.gracefulShutdown(signal);
});
});
}
/**
* 啟動服務
*/
async start(): Promise<void> {
try {
await this.app.listen({
port: this.config.port,
host: this.config.host
});
console.log(`🚀 Collaboration service started on ${this.config.host}:${this.config.port}`);
console.log(`📊 Health check: http://${this.config.host}:${this.config.port}/health`);
console.log(`📈 Metrics: http://${this.config.host}:${this.config.port}/metrics`);
} catch (error) {
console.error('💥 Failed to start collaboration service:', error);
process.exit(1);
}
}
/**
* 優雅關閉
*/
private async gracefulShutdown(signal: string): Promise<void> {
console.log(`📴 Received ${signal}, starting graceful shutdown...`);
try {
// 停止接受新連接
console.log('⏹️ Stopping HTTP server...');
await this.app.close();
// 關閉 Socket.IO 連接
console.log('🔌 Closing WebSocket connections...');
this.io.close();
// 停止核心組件
console.log('⚙️ Shutting down core components...');
await this.collaborationEngine.shutdown();
await this.serviceRegistry.unregister();
await this.healthMonitor.stop();
await this.metricsCollector.stop();
// 關閉 Redis 連接
console.log('🔴 Closing Redis connections...');
await this.redis.quit();
await this.redisAdapter.quit();
console.log('✅ Graceful shutdown completed');
process.exit(0);
} catch (error) {
console.error('💥 Error during shutdown:', error);
process.exit(1);
}
}
/**
* 獲取允許的來源
*/
private getAllowedOrigins(): string[] {
const origins = [
'http://localhost:3000',
'http://localhost:5173',
'https://kyong-saas.com',
'https://*.kyong-saas.com'
];
if (this.config.environment === 'development') {
origins.push('http://localhost:*');
}
return origins;
}
/**
* 生成連接 ID
*/
private generateConnectionId(): string {
return `conn_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
/**
* 生成錯誤 ID
*/
private generateErrorId(): string {
return `err_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
}
// 啟動應用程式
async function bootstrap() {
const config: ServiceConfig = {
serviceName: 'collaboration-service',
port: parseInt(process.env.PORT || '3001'),
host: process.env.HOST || '0.0.0.0',
environment: (process.env.NODE_ENV as any) || 'development',
database: {
url: process.env.DATABASE_URL || 'postgresql://localhost:5432/kyong_saas',
},
redis: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
password: process.env.REDIS_PASSWORD,
database: parseInt(process.env.REDIS_DB || '0')
},
messageQueue: {
type: 'redis',
redis: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
database: parseInt(process.env.REDIS_MQ_DB || '1')
}
},
serviceDiscovery: {
enabled: process.env.SERVICE_DISCOVERY_ENABLED === 'true',
registry: 'redis',
healthCheckInterval: 30000,
heartbeatInterval: 10000
},
monitoring: {
metrics: {
enabled: true,
endpoint: '/metrics',
prometheus: process.env.PROMETHEUS_ENABLED === 'true'
},
logging: {
level: (process.env.LOG_LEVEL as any) || 'info',
format: 'json',
outputs: ['stdout']
},
tracing: {
enabled: process.env.TRACING_ENABLED === 'true'
}
},
security: {
jwtSecret: process.env.JWT_SECRET || 'your-secret-key',
rateLimiting: {
enabled: true,
windowMs: 60000,
maxRequests: 100
}
}
};
const app = new CollaborationServiceApp(config);
await app.start();
}
// 如果直接執行此文件,啟動應用程式
if (require.main === module) {
bootstrap().catch((error) => {
console.error('💥 Failed to start application:', error);
process.exit(1);
});
}
export { CollaborationServiceApp };
// apps/kyo-collaboration-service/src/engines/CollaborationEngine.ts
import { Server, Socket } from 'socket.io';
import { Redis } from 'ioredis';
import { EventEmitter } from 'events';
import { SecurityManager } from '../security/SecurityManager';
import { ConflictResolver } from './ConflictResolver';
import { SessionManager } from './SessionManager';
import { OperationalTransform } from './OperationalTransform';
export class CollaborationEngine extends EventEmitter {
private sessionManager: SessionManager;
private conflictResolver: ConflictResolver;
private operationalTransform: OperationalTransform;
private activeConnections: Map<string, Socket> = new Map();
private sessionConnections: Map<string, Set<string>> = new Map();
private operationQueue: Map<string, OperationQueue> = new Map();
constructor(
private config: {
redis: Redis;
socketIO: Server;
securityManager: SecurityManager;
config: any;
}
) {
super();
this.initializeComponents();
}
/**
* 初始化組件
*/
private initializeComponents(): void {
this.sessionManager = new SessionManager(this.config.redis);
this.conflictResolver = new ConflictResolver(this.config.redis);
this.operationalTransform = new OperationalTransform();
// 設置事件監聽
this.sessionManager.on('session_created', this.handleSessionCreated.bind(this));
this.sessionManager.on('session_destroyed', this.handleSessionDestroyed.bind(this));
this.conflictResolver.on('conflict_resolved', this.handleConflictResolved.bind(this));
}
/**
* 初始化引擎
*/
async initialize(): Promise<void> {
await this.sessionManager.initialize();
await this.conflictResolver.initialize();
// 訂閱跨實例事件
await this.subscribeToClusterEvents();
console.log('✅ Collaboration engine initialized');
}
/**
* 處理加入會話
*/
async handleJoinSession(
socket: Socket,
sessionId: string,
sessionInfo: SessionJoinInfo
): Promise<JoinSessionResult> {
const userId = socket.data.userId;
const connectionId = socket.data.connectionId;
try {
// 驗證會話權限
const hasPermission = await this.sessionManager.validateSessionPermission(
userId,
sessionId,
'join'
);
if (!hasPermission) {
throw new Error('No permission to join session');
}
// 加入會話
await this.sessionManager.joinSession(sessionId, {
userId,
connectionId,
socketId: socket.id,
joinedAt: new Date(),
userInfo: sessionInfo.userInfo
});
// 將 socket 加入房間
socket.join(sessionId);
// 記錄連接
this.activeConnections.set(connectionId, socket);
if (!this.sessionConnections.has(sessionId)) {
this.sessionConnections.set(sessionId, new Set());
}
this.sessionConnections.get(sessionId)!.add(connectionId);
// 初始化操作佇列
if (!this.operationQueue.has(sessionId)) {
this.operationQueue.set(sessionId, new OperationQueue(sessionId, this.config.redis));
}
// 獲取會話狀態
const sessionState = await this.sessionManager.getSessionState(sessionId);
// 通知其他參與者
socket.to(sessionId).emit('user_joined', {
userId,
userInfo: sessionInfo.userInfo,
timestamp: new Date().toISOString()
});
return {
sessionId,
participantId: userId,
sessionState,
existingParticipants: await this.sessionManager.getSessionParticipants(sessionId)
};
} catch (error) {
throw new Error(`Failed to join session: ${error.message}`);
}
}
/**
* 處理離開會話
*/
async handleLeaveSession(socket: Socket, sessionId: string): Promise<void> {
const userId = socket.data.userId;
const connectionId = socket.data.connectionId;
try {
// 離開會話
await this.sessionManager.leaveSession(sessionId, userId);
// 離開 socket 房間
socket.leave(sessionId);
// 清理連接記錄
this.activeConnections.delete(connectionId);
const sessionConnections = this.sessionConnections.get(sessionId);
if (sessionConnections) {
sessionConnections.delete(connectionId);
if (sessionConnections.size === 0) {
this.sessionConnections.delete(sessionId);
// 清理操作佇列
const queue = this.operationQueue.get(sessionId);
if (queue) {
await queue.destroy();
this.operationQueue.delete(sessionId);
}
}
}
// 通知其他參與者
socket.to(sessionId).emit('user_left', {
userId,
timestamp: new Date().toISOString()
});
} catch (error) {
throw new Error(`Failed to leave session: ${error.message}`);
}
}
/**
* 處理協作事件
*/
async handleCollaborationEvent(
socket: Socket,
event: CollaborationEvent
): Promise<void> {
const sessionId = event.sessionId;
const userId = socket.data.userId;
try {
// 驗證事件
await this.validateCollaborationEvent(event, userId);
// 獲取操作佇列
const queue = this.operationQueue.get(sessionId);
if (!queue) {
throw new Error('Session not found or not joined');
}
// 檢測衝突
const conflicts = await this.conflictResolver.detectConflicts(event, sessionId);
if (conflicts.length > 0) {
// 有衝突,進入衝突解決流程
const resolution = await this.conflictResolver.resolveConflicts(
event,
conflicts,
sessionId
);
if (resolution.success) {
// 衝突已解決,應用變換後的操作
for (const resolvedEvent of resolution.resolvedEvents) {
await this.applyCollaborationEvent(resolvedEvent, socket);
}
} else {
// 衝突無法自動解決,通知客戶端
socket.emit('conflict_detected', {
originalEvent: event,
conflicts,
requiresManualResolution: true
});
return;
}
} else {
// 無衝突,直接應用操作
await this.applyCollaborationEvent(event, socket);
}
} catch (error) {
socket.emit('operation_rejected', {
eventId: event.id,
reason: error.message,
timestamp: new Date().toISOString()
});
}
}
/**
* 應用協作事件
*/
private async applyCollaborationEvent(
event: CollaborationEvent,
sourceSocket: Socket
): Promise<void> {
const sessionId = event.sessionId;
// 將事件加入佇列
const queue = this.operationQueue.get(sessionId)!;
await queue.enqueue(event);
// 更新會話狀態
await this.sessionManager.applyEventToState(sessionId, event);
// 廣播事件到會話中的其他用戶
sourceSocket.to(sessionId).emit('collaboration_event', event);
// 跨實例廣播
await this.broadcastToCluster('collaboration_event', {
sessionId,
event,
excludeInstanceId: process.env.INSTANCE_ID
});
// 記錄指標
this.emit('event_processed', {
sessionId,
eventType: event.type,
userId: event.userId,
timestamp: event.timestamp
});
}
/**
* 處理游標移動
*/
handleCursorMove(socket: Socket, data: CursorMoveData): void {
const { sessionId, position } = data;
const userId = socket.data.userId;
// 節流處理,避免過於頻繁的更新
this.throttle(`cursor_${userId}`, () => {
// 廣播游標位置
socket.to(sessionId).emit('cursor_moved', {
userId,
position,
timestamp: new Date().toISOString()
});
// 更新 Redis 中的游標位置
this.sessionManager.updateCursorPosition(sessionId, userId, position);
}, 50);
}
/**
* 處理選擇變更
*/
handleSelectionChange(socket: Socket, data: SelectionChangeData): void {
const { sessionId, selection } = data;
const userId = socket.data.userId;
// 廣播選擇變更
socket.to(sessionId).emit('selection_changed', {
userId,
selection,
timestamp: new Date().toISOString()
});
// 更新會話狀態
this.sessionManager.updateUserSelection(sessionId, userId, selection);
}
/**
* 處理斷線
*/
async handleDisconnection(socket: Socket): Promise<void> {
const connectionId = socket.data.connectionId;
const userId = socket.data.userId;
// 清理活動連接
this.activeConnections.delete(connectionId);
// 從所有會話中移除
for (const [sessionId, connections] of this.sessionConnections) {
if (connections.has(connectionId)) {
connections.delete(connectionId);
// 通知會話中的其他用戶
socket.to(sessionId).emit('user_left', {
userId,
reason: 'disconnection',
timestamp: new Date().toISOString()
});
// 從會話管理器中移除
await this.sessionManager.leaveSession(sessionId, userId);
// 如果會話沒有其他用戶,清理資源
if (connections.size === 0) {
this.sessionConnections.delete(sessionId);
const queue = this.operationQueue.get(sessionId);
if (queue) {
await queue.destroy();
this.operationQueue.delete(sessionId);
}
}
}
}
}
/**
* 建立會話令牌
*/
async createSessionToken(
userInfo: any,
capabilities: string[]
): Promise<string> {
return this.config.securityManager.generateSessionToken(userInfo, capabilities);
}
/**
* 獲取會話資訊
*/
async getSessionInfo(sessionId: string): Promise<any> {
return this.sessionManager.getSessionInfo(sessionId);
}
/**
* 銷毀會話
*/
async destroySession(sessionId: string): Promise<void> {
// 通知所有參與者
this.config.socketIO.to(sessionId).emit('session_destroyed', {
sessionId,
timestamp: new Date().toISOString()
});
// 清理資源
await this.sessionManager.destroySession(sessionId);
const queue = this.operationQueue.get(sessionId);
if (queue) {
await queue.destroy();
this.operationQueue.delete(sessionId);
}
this.sessionConnections.delete(sessionId);
}
/**
* 獲取會話統計
*/
async getSessionStats(): Promise<SessionStats> {
return {
activeSessions: this.sessionConnections.size,
totalConnections: this.activeConnections.size,
sessionsDetails: await Promise.all(
Array.from(this.sessionConnections.keys()).map(async sessionId => ({
sessionId,
participantCount: this.sessionConnections.get(sessionId)?.size || 0,
sessionInfo: await this.sessionManager.getSessionInfo(sessionId)
}))
)
};
}
/**
* 訂閱集群事件
*/
private async subscribeToClusterEvents(): Promise<void> {
await this.config.redis.subscribe('collaboration:cluster:events');
this.config.redis.on('message', (channel, message) => {
if (channel === 'collaboration:cluster:events') {
const event = JSON.parse(message);
this.handleClusterEvent(event);
}
});
}
/**
* 廣播到集群
*/
private async broadcastToCluster(eventType: string, data: any): Promise<void> {
await this.config.redis.publish('collaboration:cluster:events', JSON.stringify({
type: eventType,
data,
instanceId: process.env.INSTANCE_ID,
timestamp: new Date().toISOString()
}));
}
/**
* 處理集群事件
*/
private handleClusterEvent(clusterEvent: any): void {
// 忽略來自同一實例的事件
if (clusterEvent.instanceId === process.env.INSTANCE_ID) {
return;
}
switch (clusterEvent.type) {
case 'collaboration_event':
const { sessionId, event } = clusterEvent.data;
this.config.socketIO.to(sessionId).emit('collaboration_event', event);
break;
// 其他集群事件處理...
}
}
/**
* 節流函數
*/
private throttleTimers: Map<string, NodeJS.Timeout> = new Map();
private throttle(key: string, fn: Function, delay: number): void {
const existingTimer = this.throttleTimers.get(key);
if (existingTimer) {
clearTimeout(existingTimer);
}
const timer = setTimeout(() => {
fn();
this.throttleTimers.delete(key);
}, delay);
this.throttleTimers.set(key, timer);
}
/**
* 驗證協作事件
*/
private async validateCollaborationEvent(
event: CollaborationEvent,
userId: string
): Promise<void> {
// 基本驗證
if (!event.id || !event.sessionId || !event.type) {
throw new Error('Invalid event format');
}
if (event.userId !== userId) {
throw new Error('Event userId mismatch');
}
// 會話權限驗證
const hasPermission = await this.sessionManager.validateSessionPermission(
userId,
event.sessionId,
'edit'
);
if (!hasPermission) {
throw new Error('No permission to edit in this session');
}
// 速率限制檢查
const rateLimitOk = await this.config.securityManager.checkRateLimit(
userId,
'collaboration_event'
);
if (!rateLimitOk.allowed) {
throw new Error('Rate limit exceeded');
}
}
/**
* 關閉引擎
*/
async shutdown(): Promise<void> {
console.log('🛑 Shutting down collaboration engine...');
// 清理節流計時器
this.throttleTimers.forEach(timer => clearTimeout(timer));
this.throttleTimers.clear();
// 關閉所有操作佇列
await Promise.all(
Array.from(this.operationQueue.values()).map(queue => queue.destroy())
);
this.operationQueue.clear();
// 清理會話管理器
await this.sessionManager.shutdown();
await this.conflictResolver.shutdown();
// 清理連接記錄
this.activeConnections.clear();
this.sessionConnections.clear();
console.log('✅ Collaboration engine shutdown complete');
}
}
// 輔助類別
class OperationQueue {
constructor(
private sessionId: string,
private redis: Redis
) {}
async enqueue(operation: CollaborationEvent): Promise<void> {
const key = `session:${this.sessionId}:operations`;
await this.redis.lpush(key, JSON.stringify(operation));
await this.redis.expire(key, 3600); // 1小時過期
}
async destroy(): Promise<void> {
const key = `session:${this.sessionId}:operations`;
await this.redis.del(key);
}
}
interface SessionStats {
activeSessions: number;
totalConnections: number;
sessionsDetails: Array<{
sessionId: string;
participantCount: number;
sessionInfo: any;
}>;
}
// apps/kyo-collaboration-service/src/services/ServiceRegistry.ts
import { Redis } from 'ioredis';
import { EventEmitter } from 'events';
export class ServiceRegistry extends EventEmitter {
private heartbeatInterval: NodeJS.Timeout | null = null;
private healthCheckInterval: NodeJS.Timeout | null = null;
private isRegistered = false;
constructor(
private config: {
redis: Redis;
serviceName: string;
serviceConfig: any;
healthEndpoint: string;
}
) {
super();
}
/**
* 註冊服務
*/
async register(): Promise<void> {
const serviceInfo = {
name: this.config.serviceName,
id: this.generateServiceId(),
host: process.env.SERVICE_HOST || 'localhost',
port: parseInt(process.env.PORT || '3001'),
healthEndpoint: this.config.healthEndpoint,
capabilities: ['websocket', 'real-time-collaboration'],
metadata: {
version: process.env.SERVICE_VERSION || '1.0.0',
environment: process.env.NODE_ENV || 'development',
instanceId: process.env.INSTANCE_ID,
startedAt: new Date().toISOString()
},
registeredAt: new Date().toISOString(),
lastHeartbeat: new Date().toISOString()
};
// 註冊到 Redis
const serviceKey = `services:${this.config.serviceName}:${serviceInfo.id}`;
await this.config.redis.setex(
serviceKey,
this.config.serviceConfig.heartbeatInterval * 2, // TTL是心跳間隔的2倍
JSON.stringify(serviceInfo)
);
// 添加到服務列表
await this.config.redis.sadd(
`services:${this.config.serviceName}:instances`,
serviceInfo.id
);
this.isRegistered = true;
// 開始心跳
this.startHeartbeat(serviceInfo);
// 開始健康檢查
this.startHealthCheck();
console.log(`✅ Service registered: ${this.config.serviceName}:${serviceInfo.id}`);
this.emit('registered', serviceInfo);
}
/**
* 取消註冊服務
*/
async unregister(): Promise<void> {
if (!this.isRegistered) return;
const serviceId = process.env.INSTANCE_ID || this.generateServiceId();
// 停止心跳和健康檢查
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
this.heartbeatInterval = null;
}
if (this.healthCheckInterval) {
clearInterval(this.healthCheckInterval);
this.healthCheckInterval = null;
}
// 從 Redis 移除
const serviceKey = `services:${this.config.serviceName}:${serviceId}`;
await this.config.redis.del(serviceKey);
await this.config.redis.srem(
`services:${this.config.serviceName}:instances`,
serviceId
);
this.isRegistered = false;
console.log(`✅ Service unregistered: ${this.config.serviceName}:${serviceId}`);
this.emit('unregistered');
}
/**
* 發現服務
*/
async discoverService(serviceName: string): Promise<ServiceInstance[]> {
const instanceIds = await this.config.redis.smembers(
`services:${serviceName}:instances`
);
const services: ServiceInstance[] = [];
for (const instanceId of instanceIds) {
const serviceKey = `services:${serviceName}:${instanceId}`;
const serviceData = await this.config.redis.get(serviceKey);
if (serviceData) {
const serviceInfo = JSON.parse(serviceData);
services.push(serviceInfo);
}
}
return services.filter(service => this.isServiceHealthy(service));
}
/**
* 獲取負載均衡的服務實例
*/
async getServiceInstance(serviceName: string, strategy: 'round-robin' | 'random' = 'round-robin'): Promise<ServiceInstance | null> {
const services = await this.discoverService(serviceName);
if (services.length === 0) {
return null;
}
switch (strategy) {
case 'round-robin':
return this.roundRobinSelect(serviceName, services);
case 'random':
return services[Math.floor(Math.random() * services.length)];
default:
return services[0];
}
}
/**
* 開始心跳
*/
private startHeartbeat(serviceInfo: any): void {
this.heartbeatInterval = setInterval(async () => {
try {
const serviceKey = `services:${this.config.serviceName}:${serviceInfo.id}`;
// 更新心跳時間
serviceInfo.lastHeartbeat = new Date().toISOString();
await this.config.redis.setex(
serviceKey,
this.config.serviceConfig.heartbeatInterval * 2,
JSON.stringify(serviceInfo)
);
} catch (error) {
console.error('❌ Heartbeat failed:', error);
this.emit('heartbeat_failed', error);
}
}, this.config.serviceConfig.heartbeatInterval);
}
/**
* 開始健康檢查
*/
private startHealthCheck(): void {
this.healthCheckInterval = setInterval(async () => {
try {
await this.performHealthCheck();
} catch (error) {
console.error('❌ Health check failed:', error);
this.emit('health_check_failed', error);
}
}, this.config.serviceConfig.healthCheckInterval);
}
/**
* 執行健康檢查
*/
private async performHealthCheck(): Promise<void> {
// 檢查 Redis 連接
await this.config.redis.ping();
// 檢查其他依賴服務
// ... 額外的健康檢查邏輯
this.emit('health_check_passed');
}
/**
* 檢查服務是否健康
*/
private isServiceHealthy(service: ServiceInstance): boolean {
const lastHeartbeat = new Date(service.lastHeartbeat);
const now = new Date();
const timeDiff = now.getTime() - lastHeartbeat.getTime();
// 如果心跳超過2個間隔,認為服務不健康
return timeDiff < (this.config.serviceConfig.heartbeatInterval * 2);
}
/**
* 輪詢選擇
*/
private roundRobinCounters: Map<string, number> = new Map();
private roundRobinSelect(serviceName: string, services: ServiceInstance[]): ServiceInstance {
const currentCounter = this.roundRobinCounters.get(serviceName) || 0;
const selectedIndex = currentCounter % services.length;
this.roundRobinCounters.set(serviceName, currentCounter + 1);
return services[selectedIndex];
}
/**
* 生成服務 ID
*/
private generateServiceId(): string {
return `${process.env.INSTANCE_ID || 'local'}_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
}
interface ServiceInstance {
name: string;
id: string;
host: string;
port: number;
healthEndpoint: string;
capabilities: string[];
metadata: any;
registeredAt: string;
lastHeartbeat: string;
}
今天我們完成了 WebSocket 微服務的完整架構升級:
微服務拆分
高可用性設計
企業級功能
性能最佳化