iT邦幫忙

2025 iThome 鐵人賽

DAY 29
0
Software Development

30 天打造工作室 SaaS 產品 (後端篇)系列 第 29

Day 29: 30天打造SaaS產品後端篇-微服務整合與事件驅動架構

  • 分享至 

  • xImage
  •  

前情提要

在過去的 Day 24-28 中,我們建構了 Kyo System 後端的核心架構:

  • Day 24: 建立 Fastify + TypeScript 專案架構與 Monorepo 設計
  • Day 25: 實作 JWT 認證、授權與 RBAC 權限系統
  • Day 26: 設計 Rate Limiting、Redis 快取與效能優化
  • Day 27: 建立 BullMQ 背景工作與非同步任務處理
  • Day 28: 實作監控、日誌、追蹤與可觀測性系統

今天是倒數第二天,我們將把所有服務整合在一起,完成:

  1. 微服務架構總覽與服務拆分策略
  2. 服務間通訊機制(同步與異步)
  3. Redis Pub/Sub 事件系統
  4. 分散式交易處理(Saga Pattern)
  5. 完整後端測試策略
  6. API Gateway 與服務發現
  7. 生產環境準備與檢查清單

讓我們將 Kyo System 後端推向企業級、可擴展的微服務架構!

Kyo System 微服務架構總覽

┌─────────────────────────────────────────────────────────────────┐
│                        API Gateway (Fastify)                     │
│                   - Routing  - Auth  - Rate Limiting             │
└────────┬────────────────────────────────────────────────────┬────┘
         │                                                     │
         │                                                     │
┌────────▼──────────┐  ┌──────────────┐  ┌──────────────────▼────┐
│  Auth Service     │  │ OTP Service  │  │ Notification Service  │
│  - JWT           │  │ - Send OTP   │  │ - Email              │
│  - Users         │  │ - Verify     │  │ - SMS                │
│  - RBAC          │  │ - Logs       │  │ - WebSocket          │
└────────┬──────────┘  └──────┬───────┘  └──────┬───────────────┘
         │                     │                  │
         │                     │                  │
┌────────▼─────────────────────▼──────────────────▼───────────────┐
│                         Event Bus (Redis Pub/Sub)                │
│      - user.created  - otp.sent  - otp.verified  - limit.exceeded│
└────────┬─────────────────────┬──────────────────┬───────────────┘
         │                     │                  │
         │                     │                  │
┌────────▼──────────┐  ┌──────▼───────┐  ┌──────▼────────────────┐
│   PostgreSQL      │  │ Redis Cache  │  │   BullMQ Workers      │
│   - Users         │  │ - Sessions   │  │   - SMS Queue         │
│   - OTP Logs      │  │ - Rate Limit │  │   - Email Queue       │
│   - Audit Logs    │  │ - OTP Codes  │  │   - Analytics Queue   │
└───────────────────┘  └──────────────┘  └───────────────────────┘
         │                     │                  │
         │                     │                  │
┌────────▼─────────────────────▼──────────────────▼───────────────┐
│                    Observability Stack                           │
│     - Metrics (Prometheus)  - Logs (Loki)  - Traces (Jaeger)    │
└──────────────────────────────────────────────────────────────────┘

服務拆分策略

1. 服務邊界定義

依照業務領域進行服務拆分(Domain-Driven Design):

// packages/kyo-core/src/types/service.types.ts

/**
 * Service Registry - 定義所有微服務
 */
export interface ServiceDefinition {
  name: string;
  version: string;
  baseUrl: string;
  healthCheck: string;
  dependencies: string[];
  capabilities: string[];
}

export const SERVICES = {
  AUTH: {
    name: 'auth-service',
    version: '1.0.0',
    baseUrl: process.env.AUTH_SERVICE_URL || 'http://localhost:3001',
    healthCheck: '/health',
    dependencies: ['database', 'redis'],
    capabilities: ['login', 'register', 'verify-token', 'refresh-token'],
  },
  OTP: {
    name: 'otp-service',
    version: '1.0.0',
    baseUrl: process.env.OTP_SERVICE_URL || 'http://localhost:3002',
    healthCheck: '/health',
    dependencies: ['database', 'redis', 'sms-provider'],
    capabilities: ['send-otp', 'verify-otp', 'get-logs'],
  },
  NOTIFICATION: {
    name: 'notification-service',
    version: '1.0.0',
    baseUrl: process.env.NOTIFICATION_SERVICE_URL || 'http://localhost:3003',
    healthCheck: '/health',
    dependencies: ['redis', 'email-provider', 'sms-provider'],
    capabilities: ['send-email', 'send-sms', 'websocket-notify'],
  },
  ANALYTICS: {
    name: 'analytics-service',
    version: '1.0.0',
    baseUrl: process.env.ANALYTICS_SERVICE_URL || 'http://localhost:3004',
    healthCheck: '/health',
    dependencies: ['database', 'redis'],
    capabilities: ['track-event', 'generate-report', 'get-stats'],
  },
} as const;

export type ServiceName = keyof typeof SERVICES;

2. Service Discovery Implementation

實作服務發現與健康檢查:

// packages/kyo-core/src/services/service-registry.ts
import axios, { AxiosInstance } from 'axios';
import { SERVICES, ServiceDefinition, ServiceName } from '../types/service.types';

export class ServiceRegistry {
  private services: Map<ServiceName, ServiceClient> = new Map();
  private healthCheckInterval: NodeJS.Timeout | null = null;

  constructor() {
    this.initializeServices();
  }

  /**
   * Initialize all service clients
   */
  private initializeServices(): void {
    for (const [name, config] of Object.entries(SERVICES)) {
      const client = new ServiceClient(config);
      this.services.set(name as ServiceName, client);
    }
  }

  /**
   * Get service client by name
   */
  getService(name: ServiceName): ServiceClient {
    const service = this.services.get(name);
    if (!service) {
      throw new Error(`Service ${name} not found`);
    }
    return service;
  }

  /**
   * Start health check monitoring
   */
  startHealthChecks(intervalMs: number = 30000): void {
    this.healthCheckInterval = setInterval(async () => {
      for (const [name, client] of this.services) {
        try {
          const isHealthy = await client.healthCheck();
          if (!isHealthy) {
            console.error(`[ServiceRegistry] ${name} is unhealthy`);
          }
        } catch (error) {
          console.error(`[ServiceRegistry] ${name} health check failed:`, error);
        }
      }
    }, intervalMs);
  }

  /**
   * Stop health check monitoring
   */
  stopHealthChecks(): void {
    if (this.healthCheckInterval) {
      clearInterval(this.healthCheckInterval);
      this.healthCheckInterval = null;
    }
  }

  /**
   * Get all services status
   */
  async getAllServicesStatus(): Promise<Record<ServiceName, boolean>> {
    const statuses: Partial<Record<ServiceName, boolean>> = {};

    await Promise.all(
      Array.from(this.services.entries()).map(async ([name, client]) => {
        try {
          statuses[name] = await client.healthCheck();
        } catch {
          statuses[name] = false;
        }
      })
    );

    return statuses as Record<ServiceName, boolean>;
  }
}

/**
 * Service Client - Handle HTTP communication with services
 */
export class ServiceClient {
  private client: AxiosInstance;
  private config: ServiceDefinition;

  constructor(config: ServiceDefinition) {
    this.config = config;
    this.client = axios.create({
      baseURL: config.baseUrl,
      timeout: 10000,
      headers: {
        'Content-Type': 'application/json',
        'X-Service-Name': config.name,
        'X-Service-Version': config.version,
      },
    });

    // Add request interceptor for logging
    this.client.interceptors.request.use(
      (config) => {
        console.log(
          `[ServiceClient] ${this.config.name} -> ${config.method?.toUpperCase()} ${config.url}`
        );
        return config;
      },
      (error) => {
        console.error(`[ServiceClient] ${this.config.name} request error:`, error);
        return Promise.reject(error);
      }
    );

    // Add response interceptor for error handling
    this.client.interceptors.response.use(
      (response) => response,
      (error) => {
        console.error(
          `[ServiceClient] ${this.config.name} response error:`,
          error.message
        );
        return Promise.reject(error);
      }
    );
  }

  /**
   * Perform health check
   */
  async healthCheck(): Promise<boolean> {
    try {
      const response = await this.client.get(this.config.healthCheck);
      return response.status === 200;
    } catch {
      return false;
    }
  }

  /**
   * Generic GET request
   */
  async get<T = any>(path: string, params?: Record<string, any>): Promise<T> {
    const response = await this.client.get<T>(path, { params });
    return response.data;
  }

  /**
   * Generic POST request
   */
  async post<T = any>(path: string, data?: any): Promise<T> {
    const response = await this.client.post<T>(path, data);
    return response.data;
  }

  /**
   * Generic PUT request
   */
  async put<T = any>(path: string, data?: any): Promise<T> {
    const response = await this.client.put<T>(path, data);
    return response.data;
  }

  /**
   * Generic DELETE request
   */
  async delete<T = any>(path: string): Promise<T> {
    const response = await this.client.delete<T>(path);
    return response.data;
  }

  /**
   * Set authentication token
   */
  setAuthToken(token: string): void {
    this.client.defaults.headers.common['Authorization'] = `Bearer ${token}`;
  }
}

// Singleton instance
export const serviceRegistry = new ServiceRegistry();

事件驅動架構 - Redis Pub/Sub

1. Event Bus Implementation

建立統一的事件總線:

// packages/kyo-core/src/events/event-bus.ts
import { Redis } from 'ioredis';
import { EventEmitter } from 'events';

/**
 * Domain Events Definition
 */
export interface DomainEvent {
  id: string;
  type: string;
  timestamp: string;
  aggregateId: string;
  aggregateType: string;
  version: number;
  payload: Record<string, any>;
  metadata?: {
    userId?: string;
    tenantId?: string;
    correlationId?: string;
    causationId?: string;
  };
}

export type EventHandler<T = any> = (event: DomainEvent<T>) => Promise<void>;

/**
 * Event Bus - Redis Pub/Sub Implementation
 */
export class EventBus extends EventEmitter {
  private publisher: Redis;
  private subscriber: Redis;
  private handlers: Map<string, EventHandler[]> = new Map();

  constructor(redisUrl: string) {
    super();
    this.publisher = new Redis(redisUrl);
    this.subscriber = new Redis(redisUrl);

    this.setupSubscriber();
  }

  /**
   * Setup Redis subscriber
   */
  private setupSubscriber(): void {
    this.subscriber.on('message', async (channel, message) => {
      try {
        const event: DomainEvent = JSON.parse(message);
        await this.handleEvent(event);
      } catch (error) {
        console.error('[EventBus] Failed to parse event:', error);
      }
    });
  }

  /**
   * Publish event to Redis
   */
  async publish(event: DomainEvent): Promise<void> {
    const channel = `events:${event.type}`;

    console.log(`[EventBus] Publishing event: ${event.type}`, {
      id: event.id,
      aggregateId: event.aggregateId,
    });

    // Publish to Redis channel
    await this.publisher.publish(channel, JSON.stringify(event));

    // Also store in event stream (for event sourcing)
    await this.storeEvent(event);
  }

  /**
   * Subscribe to event type
   */
  async subscribe(eventType: string, handler: EventHandler): Promise<void> {
    const channel = `events:${eventType}`;

    // Add handler to local registry
    if (!this.handlers.has(eventType)) {
      this.handlers.set(eventType, []);
    }
    this.handlers.get(eventType)!.push(handler);

    // Subscribe to Redis channel
    await this.subscriber.subscribe(channel);

    console.log(`[EventBus] Subscribed to: ${eventType}`);
  }

  /**
   * Unsubscribe from event type
   */
  async unsubscribe(eventType: string): Promise<void> {
    const channel = `events:${eventType}`;

    this.handlers.delete(eventType);
    await this.subscriber.unsubscribe(channel);

    console.log(`[EventBus] Unsubscribed from: ${eventType}`);
  }

  /**
   * Handle incoming event
   */
  private async handleEvent(event: DomainEvent): Promise<void> {
    const handlers = this.handlers.get(event.type) || [];

    console.log(`[EventBus] Handling event: ${event.type}`, {
      handlers: handlers.length,
    });

    // Execute all handlers in parallel
    await Promise.all(
      handlers.map(async (handler) => {
        try {
          await handler(event);
        } catch (error) {
          console.error(
            `[EventBus] Handler failed for ${event.type}:`,
            error
          );
          // Publish failed event for retry/dead letter queue
          await this.publishFailedEvent(event, error);
        }
      })
    );
  }

  /**
   * Store event for event sourcing
   */
  private async storeEvent(event: DomainEvent): Promise<void> {
    const streamKey = `event-stream:${event.aggregateType}:${event.aggregateId}`;

    await this.publisher.xadd(
      streamKey,
      '*',
      'event',
      JSON.stringify(event)
    );
  }

  /**
   * Publish failed event
   */
  private async publishFailedEvent(event: DomainEvent, error: any): Promise<void> {
    const failedEvent: DomainEvent = {
      id: `${event.id}-failed`,
      type: 'event.failed',
      timestamp: new Date().toISOString(),
      aggregateId: event.id,
      aggregateType: 'Event',
      version: 1,
      payload: {
        originalEvent: event,
        error: {
          message: error.message,
          stack: error.stack,
        },
      },
    };

    await this.publisher.publish(
      'events:event.failed',
      JSON.stringify(failedEvent)
    );
  }

  /**
   * Get event history for aggregate
   */
  async getEventHistory(
    aggregateType: string,
    aggregateId: string
  ): Promise<DomainEvent[]> {
    const streamKey = `event-stream:${aggregateType}:${aggregateId}`;

    const events = await this.publisher.xrange(streamKey, '-', '+');

    return events.map((entry) => {
      const [, fields] = entry;
      return JSON.parse(fields[1]);
    });
  }

  /**
   * Close connections
   */
  async close(): Promise<void> {
    await this.publisher.quit();
    await this.subscriber.quit();
  }
}

2. Domain Events Definition

定義業務領域事件:

// packages/kyo-core/src/events/domain-events.ts
import { DomainEvent } from './event-bus';

/**
 * User Events
 */
export interface UserCreatedEvent extends DomainEvent {
  type: 'user.created';
  payload: {
    userId: string;
    email: string;
    tenantId: string;
    role: string;
  };
}

export interface UserUpdatedEvent extends DomainEvent {
  type: 'user.updated';
  payload: {
    userId: string;
    changes: Record<string, any>;
  };
}

/**
 * OTP Events
 */
export interface OTPSentEvent extends DomainEvent {
  type: 'otp.sent';
  payload: {
    requestId: string;
    userId: string;
    tenantId: string;
    phoneNumber: string;
    expiresAt: string;
  };
}

export interface OTPVerifiedEvent extends DomainEvent {
  type: 'otp.verified';
  payload: {
    requestId: string;
    userId: string;
    tenantId: string;
    phoneNumber: string;
    verifiedAt: string;
  };
}

export interface OTPFailedEvent extends DomainEvent {
  type: 'otp.failed';
  payload: {
    requestId: string;
    userId: string;
    phoneNumber: string;
    reason: string;
    attemptsRemaining: number;
  };
}

/**
 * Rate Limit Events
 */
export interface RateLimitExceededEvent extends DomainEvent {
  type: 'rate-limit.exceeded';
  payload: {
    userId: string;
    tenantId: string;
    endpoint: string;
    limit: number;
    window: number;
    resetAt: string;
  };
}

/**
 * Notification Events
 */
export interface NotificationSentEvent extends DomainEvent {
  type: 'notification.sent';
  payload: {
    notificationId: string;
    userId: string;
    type: 'email' | 'sms' | 'websocket';
    status: 'success' | 'failed';
  };
}

/**
 * Event Creators
 */
export function createUserCreatedEvent(
  userId: string,
  email: string,
  tenantId: string,
  role: string,
  metadata?: any
): UserCreatedEvent {
  return {
    id: `user-created-${Date.now()}`,
    type: 'user.created',
    timestamp: new Date().toISOString(),
    aggregateId: userId,
    aggregateType: 'User',
    version: 1,
    payload: { userId, email, tenantId, role },
    metadata,
  };
}

export function createOTPSentEvent(
  requestId: string,
  userId: string,
  tenantId: string,
  phoneNumber: string,
  expiresAt: string,
  metadata?: any
): OTPSentEvent {
  return {
    id: `otp-sent-${requestId}`,
    type: 'otp.sent',
    timestamp: new Date().toISOString(),
    aggregateId: requestId,
    aggregateType: 'OTP',
    version: 1,
    payload: { requestId, userId, tenantId, phoneNumber, expiresAt },
    metadata,
  };
}

export function createOTPVerifiedEvent(
  requestId: string,
  userId: string,
  tenantId: string,
  phoneNumber: string,
  verifiedAt: string,
  metadata?: any
): OTPVerifiedEvent {
  return {
    id: `otp-verified-${requestId}`,
    type: 'otp.verified',
    timestamp: new Date().toISOString(),
    aggregateId: requestId,
    aggregateType: 'OTP',
    version: 1,
    payload: { requestId, userId, tenantId, phoneNumber, verifiedAt },
    metadata,
  };
}

export function createRateLimitExceededEvent(
  userId: string,
  tenantId: string,
  endpoint: string,
  limit: number,
  window: number,
  resetAt: string,
  metadata?: any
): RateLimitExceededEvent {
  return {
    id: `rate-limit-${Date.now()}`,
    type: 'rate-limit.exceeded',
    timestamp: new Date().toISOString(),
    aggregateId: userId,
    aggregateType: 'RateLimit',
    version: 1,
    payload: { userId, tenantId, endpoint, limit, window, resetAt },
    metadata,
  };
}

3. Event Handlers Registration

註冊事件處理器:

// apps/kyo-otp-service/src/events/handlers.ts
import { EventBus } from '@kyong/kyo-core';
import { NotificationService } from '../services/notification.service';
import { AnalyticsService } from '../services/analytics.service';

export async function registerEventHandlers(
  eventBus: EventBus,
  notificationService: NotificationService,
  analyticsService: AnalyticsService
): Promise<void> {
  // Handle OTP sent events
  await eventBus.subscribe('otp.sent', async (event) => {
    console.log('[EventHandler] Handling otp.sent', event);

    const { userId, tenantId, phoneNumber, expiresAt } = event.payload;

    // Send notification to user
    await notificationService.notifyOTPSent(userId, tenantId, {
      requestId: event.aggregateId,
      phoneNumber,
      expiresAt,
    });

    // Track analytics
    await analyticsService.trackEvent('otp_sent', {
      userId,
      tenantId,
      timestamp: event.timestamp,
    });
  });

  // Handle OTP verified events
  await eventBus.subscribe('otp.verified', async (event) => {
    console.log('[EventHandler] Handling otp.verified', event);

    const { userId, tenantId, phoneNumber, verifiedAt } = event.payload;

    // Send success notification
    await notificationService.notifyOTPVerified(userId, tenantId, {
      requestId: event.aggregateId,
      phoneNumber,
      verifiedAt,
    });

    // Track analytics
    await analyticsService.trackEvent('otp_verified', {
      userId,
      tenantId,
      timestamp: event.timestamp,
    });

    // Update user verification status
    // await userService.markPhoneVerified(userId, phoneNumber);
  });

  // Handle rate limit exceeded events
  await eventBus.subscribe('rate-limit.exceeded', async (event) => {
    console.log('[EventHandler] Handling rate-limit.exceeded', event);

    const { userId, tenantId, endpoint, limit, resetAt } = event.payload;

    // Send warning notification
    await notificationService.notifyRateLimitExceeded(userId, tenantId, {
      endpoint,
      limit,
      resetAt,
    });

    // Track for security monitoring
    await analyticsService.trackEvent('rate_limit_exceeded', {
      userId,
      tenantId,
      endpoint,
      timestamp: event.timestamp,
    });
  });

  // Handle user created events
  await eventBus.subscribe('user.created', async (event) => {
    console.log('[EventHandler] Handling user.created', event);

    const { userId, email, tenantId } = event.payload;

    // Send welcome email
    await notificationService.sendEmail({
      to: email,
      subject: '歡迎加入 Kyo System',
      template: 'welcome',
      data: { userId, email },
    });

    // Initialize user quota
    // await quotaService.initializeUserQuota(userId, tenantId);
  });

  console.log('[EventHandlers] All event handlers registered');
}

分散式交易處理 - Saga Pattern

1. Saga Orchestrator Implementation

實作 Saga 編排器處理分散式交易:

// packages/kyo-core/src/saga/saga-orchestrator.ts
import { EventBus, DomainEvent } from '../events/event-bus';
import { Redis } from 'ioredis';

export interface SagaStep {
  name: string;
  execute: (data: any) => Promise<any>;
  compensate: (data: any) => Promise<void>;
}

export interface SagaDefinition {
  id: string;
  name: string;
  steps: SagaStep[];
}

export interface SagaExecution {
  sagaId: string;
  definitionId: string;
  status: 'pending' | 'running' | 'completed' | 'failed' | 'compensating' | 'compensated';
  currentStep: number;
  data: any;
  results: any[];
  error?: string;
  startedAt: string;
  completedAt?: string;
}

/**
 * Saga Orchestrator - Coordinate distributed transactions
 */
export class SagaOrchestrator {
  private redis: Redis;
  private eventBus: EventBus;
  private definitions: Map<string, SagaDefinition> = new Map();

  constructor(redisUrl: string, eventBus: EventBus) {
    this.redis = new Redis(redisUrl);
    this.eventBus = eventBus;
  }

  /**
   * Register saga definition
   */
  registerSaga(definition: SagaDefinition): void {
    this.definitions.set(definition.id, definition);
    console.log(`[SagaOrchestrator] Registered saga: ${definition.name}`);
  }

  /**
   * Start saga execution
   */
  async startSaga(definitionId: string, initialData: any): Promise<string> {
    const definition = this.definitions.get(definitionId);
    if (!definition) {
      throw new Error(`Saga definition not found: ${definitionId}`);
    }

    const sagaId = `saga-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;

    const execution: SagaExecution = {
      sagaId,
      definitionId,
      status: 'pending',
      currentStep: 0,
      data: initialData,
      results: [],
      startedAt: new Date().toISOString(),
    };

    // Store saga execution state
    await this.saveSagaExecution(execution);

    // Publish saga started event
    await this.eventBus.publish({
      id: `${sagaId}-started`,
      type: 'saga.started',
      timestamp: new Date().toISOString(),
      aggregateId: sagaId,
      aggregateType: 'Saga',
      version: 1,
      payload: {
        sagaId,
        definitionId: definition.id,
        name: definition.name,
      },
    });

    // Execute saga asynchronously
    this.executeSaga(sagaId).catch((error) => {
      console.error(`[SagaOrchestrator] Saga execution failed: ${sagaId}`, error);
    });

    return sagaId;
  }

  /**
   * Execute saga steps
   */
  private async executeSaga(sagaId: string): Promise<void> {
    let execution = await this.loadSagaExecution(sagaId);
    const definition = this.definitions.get(execution.definitionId)!;

    execution.status = 'running';
    await this.saveSagaExecution(execution);

    try {
      // Execute each step sequentially
      for (let i = execution.currentStep; i < definition.steps.length; i++) {
        const step = definition.steps[i];
        execution.currentStep = i;

        console.log(`[SagaOrchestrator] Executing step: ${step.name}`, {
          sagaId,
          step: i + 1,
          total: definition.steps.length,
        });

        // Execute step
        const result = await step.execute(execution.data);
        execution.results.push(result);

        // Update data for next step
        execution.data = { ...execution.data, [`step${i}Result`]: result };

        // Save progress
        await this.saveSagaExecution(execution);

        // Publish step completed event
        await this.eventBus.publish({
          id: `${sagaId}-step-${i}-completed`,
          type: 'saga.step.completed',
          timestamp: new Date().toISOString(),
          aggregateId: sagaId,
          aggregateType: 'Saga',
          version: i + 1,
          payload: {
            sagaId,
            stepName: step.name,
            stepIndex: i,
            result,
          },
        });
      }

      // Saga completed successfully
      execution.status = 'completed';
      execution.completedAt = new Date().toISOString();
      await this.saveSagaExecution(execution);

      // Publish saga completed event
      await this.eventBus.publish({
        id: `${sagaId}-completed`,
        type: 'saga.completed',
        timestamp: new Date().toISOString(),
        aggregateId: sagaId,
        aggregateType: 'Saga',
        version: definition.steps.length + 1,
        payload: {
          sagaId,
          results: execution.results,
        },
      });

      console.log(`[SagaOrchestrator] Saga completed: ${sagaId}`);
    } catch (error) {
      console.error(`[SagaOrchestrator] Saga step failed: ${sagaId}`, error);

      execution.status = 'failed';
      execution.error = error.message;
      await this.saveSagaExecution(execution);

      // Start compensation
      await this.compensateSaga(sagaId);
    }
  }

  /**
   * Compensate saga (rollback)
   */
  private async compensateSaga(sagaId: string): Promise<void> {
    let execution = await this.loadSagaExecution(sagaId);
    const definition = this.definitions.get(execution.definitionId)!;

    execution.status = 'compensating';
    await this.saveSagaExecution(execution);

    console.log(`[SagaOrchestrator] Starting compensation: ${sagaId}`);

    // Compensate steps in reverse order
    for (let i = execution.currentStep; i >= 0; i--) {
      const step = definition.steps[i];

      try {
        console.log(`[SagaOrchestrator] Compensating step: ${step.name}`, {
          sagaId,
          step: i + 1,
        });

        await step.compensate(execution.data);

        // Publish compensation event
        await this.eventBus.publish({
          id: `${sagaId}-step-${i}-compensated`,
          type: 'saga.step.compensated',
          timestamp: new Date().toISOString(),
          aggregateId: sagaId,
          aggregateType: 'Saga',
          version: i + 1,
          payload: {
            sagaId,
            stepName: step.name,
            stepIndex: i,
          },
        });
      } catch (error) {
        console.error(
          `[SagaOrchestrator] Compensation failed for step: ${step.name}`,
          error
        );
        // Continue with other compensations
      }
    }

    execution.status = 'compensated';
    execution.completedAt = new Date().toISOString();
    await this.saveSagaExecution(execution);

    // Publish saga compensated event
    await this.eventBus.publish({
      id: `${sagaId}-compensated`,
      type: 'saga.compensated',
      timestamp: new Date().toISOString(),
      aggregateId: sagaId,
      aggregateType: 'Saga',
      version: definition.steps.length + 2,
      payload: {
        sagaId,
        error: execution.error,
      },
    });

    console.log(`[SagaOrchestrator] Saga compensated: ${sagaId}`);
  }

  /**
   * Get saga status
   */
  async getSagaStatus(sagaId: string): Promise<SagaExecution> {
    return await this.loadSagaExecution(sagaId);
  }

  /**
   * Save saga execution state to Redis
   */
  private async saveSagaExecution(execution: SagaExecution): Promise<void> {
    const key = `saga:${execution.sagaId}`;
    await this.redis.set(key, JSON.stringify(execution), 'EX', 86400); // 24 hours TTL
  }

  /**
   * Load saga execution state from Redis
   */
  private async loadSagaExecution(sagaId: string): Promise<SagaExecution> {
    const key = `saga:${sagaId}`;
    const data = await this.redis.get(key);

    if (!data) {
      throw new Error(`Saga execution not found: ${sagaId}`);
    }

    return JSON.parse(data);
  }

  /**
   * Close connections
   */
  async close(): Promise<void> {
    await this.redis.quit();
  }
}

2. Example Saga: User Registration with OTP

實作用戶註冊的 Saga 流程:

// apps/kyo-otp-service/src/sagas/user-registration.saga.ts
import { SagaOrchestrator, SagaDefinition, SagaStep } from '@kyong/kyo-core';
import { ServiceRegistry } from '@kyong/kyo-core';

export interface UserRegistrationData {
  email: string;
  password: string;
  phoneNumber: string;
  otp: string;
}

/**
 * User Registration Saga
 * Steps:
 * 1. Create user account
 * 2. Send OTP
 * 3. Verify OTP
 * 4. Activate account
 */
export function createUserRegistrationSaga(
  serviceRegistry: ServiceRegistry
): SagaDefinition {
  const authService = serviceRegistry.getService('AUTH');
  const otpService = serviceRegistry.getService('OTP');

  const steps: SagaStep[] = [
    // Step 1: Create user account (pending status)
    {
      name: 'create-user',
      execute: async (data: UserRegistrationData) => {
        console.log('[Saga] Creating user account');

        const result = await authService.post('/api/users', {
          email: data.email,
          password: data.password,
          status: 'pending',
        });

        return { userId: result.id };
      },
      compensate: async (data: any) => {
        console.log('[Saga] Compensating: Delete user account');

        if (data.step0Result?.userId) {
          await authService.delete(`/api/users/${data.step0Result.userId}`);
        }
      },
    },

    // Step 2: Send OTP
    {
      name: 'send-otp',
      execute: async (data: any) => {
        console.log('[Saga] Sending OTP');

        const { userId } = data.step0Result;

        const result = await otpService.post('/api/otp/send', {
          userId,
          phoneNumber: data.phoneNumber,
        });

        return { requestId: result.requestId };
      },
      compensate: async (data: any) => {
        console.log('[Saga] Compensating: Cancel OTP request');

        if (data.step1Result?.requestId) {
          await otpService.delete(`/api/otp/${data.step1Result.requestId}`);
        }
      },
    },

    // Step 3: Verify OTP
    {
      name: 'verify-otp',
      execute: async (data: any) => {
        console.log('[Saga] Verifying OTP');

        const { requestId } = data.step1Result;

        const result = await otpService.post('/api/otp/verify', {
          requestId,
          otp: data.otp,
        });

        if (!result.valid) {
          throw new Error('Invalid OTP');
        }

        return { verified: true };
      },
      compensate: async (data: any) => {
        console.log('[Saga] Compensating: Mark OTP as invalid');
        // No compensation needed for verification
      },
    },

    // Step 4: Activate account
    {
      name: 'activate-account',
      execute: async (data: any) => {
        console.log('[Saga] Activating user account');

        const { userId } = data.step0Result;

        await authService.put(`/api/users/${userId}`, {
          status: 'active',
          phoneNumber: data.phoneNumber,
          phoneVerified: true,
        });

        return { activated: true };
      },
      compensate: async (data: any) => {
        console.log('[Saga] Compensating: Deactivate account');

        if (data.step0Result?.userId) {
          await authService.put(`/api/users/${data.step0Result.userId}`, {
            status: 'pending',
            phoneVerified: false,
          });
        }
      },
    },
  ];

  return {
    id: 'user-registration-saga',
    name: 'User Registration with OTP Verification',
    steps,
  };
}

/**
 * Start user registration saga
 */
export async function startUserRegistration(
  orchestrator: SagaOrchestrator,
  data: UserRegistrationData
): Promise<string> {
  return await orchestrator.startSaga('user-registration-saga', data);
}

API Gateway Implementation

建立統一的 API Gateway:

// apps/kyo-api-gateway/src/app.ts
import Fastify from 'fastify';
import cors from '@fastify/cors';
import helmet from '@fastify/helmet';
import rateLimit from '@fastify/rate-limit';
import httpProxy from '@fastify/http-proxy';
import { ServiceRegistry } from '@kyong/kyo-core';
import { jwtMiddleware } from './middleware/jwt';
import { SERVICES } from '@kyong/kyo-core';

const app = Fastify({
  logger: {
    level: process.env.LOG_LEVEL || 'info',
  },
});

// Security
await app.register(helmet);
await app.register(cors, {
  origin: process.env.CORS_ORIGIN || 'http://localhost:5173',
  credentials: true,
});

// Rate Limiting
await app.register(rateLimit, {
  max: 100,
  timeWindow: '1 minute',
});

// Service Registry
const serviceRegistry = new ServiceRegistry();
serviceRegistry.startHealthChecks();

// JWT Middleware for protected routes
app.addHook('preHandler', jwtMiddleware);

// Health Check
app.get('/health', async () => {
  const servicesStatus = await serviceRegistry.getAllServicesStatus();
  const allHealthy = Object.values(servicesStatus).every((status) => status);

  return {
    status: allHealthy ? 'healthy' : 'degraded',
    services: servicesStatus,
    timestamp: new Date().toISOString(),
  };
});

// Route: Auth Service
await app.register(httpProxy, {
  upstream: SERVICES.AUTH.baseUrl,
  prefix: '/api/auth',
  rewritePrefix: '/api/auth',
  http2: false,
});

// Route: OTP Service
await app.register(httpProxy, {
  upstream: SERVICES.OTP.baseUrl,
  prefix: '/api/otp',
  rewritePrefix: '/api/otp',
  http2: false,
  preHandler: app.auth, // Require authentication
});

// Route: Notification Service
await app.register(httpProxy, {
  upstream: SERVICES.NOTIFICATION.baseUrl,
  prefix: '/api/notifications',
  rewritePrefix: '/api/notifications',
  http2: false,
  preHandler: app.auth,
});

// Route: Analytics Service
await app.register(httpProxy, {
  upstream: SERVICES.ANALYTICS.baseUrl,
  prefix: '/api/analytics',
  rewritePrefix: '/api/analytics',
  http2: false,
  preHandler: app.auth,
});

// Error Handler
app.setErrorHandler((error, request, reply) => {
  app.log.error({ error, request }, 'Request error');

  reply.status(error.statusCode || 500).send({
    error: error.message || 'Internal Server Error',
    statusCode: error.statusCode || 500,
  });
});

// Graceful Shutdown
const shutdown = async () => {
  console.log('Shutting down API Gateway...');
  serviceRegistry.stopHealthChecks();
  await app.close();
  process.exit(0);
};

process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);

export default app;

完整後端測試策略

1. Integration Tests with Service Registry

// apps/kyo-otp-service/test/integration/service-integration.test.ts
import { test } from 'node:test';
import assert from 'node:assert';
import { ServiceRegistry } from '@kyong/kyo-core';

test('Service Integration Tests', async (t) => {
  const registry = new ServiceRegistry();

  await t.test('should get all services', () => {
    const authService = registry.getService('AUTH');
    const otpService = registry.getService('OTP');

    assert.ok(authService);
    assert.ok(otpService);
  });

  await t.test('should perform health checks', async () => {
    const statuses = await registry.getAllServicesStatus();

    assert.ok(typeof statuses === 'object');
    assert.ok('AUTH' in statuses);
    assert.ok('OTP' in statuses);
  });
});

2. Contract Testing with Pact

安裝 Pact:

pnpm add -D @pact-foundation/pact

建立 Consumer Contract Test:

// apps/kyo-otp-service/test/pact/otp-service.consumer.test.ts
import { test } from 'node:test';
import { PactV3, MatchersV3 } from '@pact-foundation/pact';
import { ServiceClient } from '@kyong/kyo-core';

const { like, iso8601DateTime } = MatchersV3;

const provider = new PactV3({
  consumer: 'kyo-dashboard',
  provider: 'kyo-otp-service',
  dir: './pacts',
});

test('OTP Service Contract Tests', async (t) => {
  await t.test('should send OTP successfully', async () => {
    await provider
      .given('user is authenticated')
      .uponReceiving('a request to send OTP')
      .withRequest({
        method: 'POST',
        path: '/api/otp/send',
        headers: {
          'Content-Type': 'application/json',
          Authorization: 'Bearer valid-token',
        },
        body: {
          phoneNumber: '0912345678',
        },
      })
      .willRespondWith({
        status: 200,
        headers: {
          'Content-Type': 'application/json',
        },
        body: {
          success: true,
          requestId: like('otp-123'),
          expiresAt: iso8601DateTime(),
        },
      })
      .executeTest(async (mockServer) => {
        const client = new ServiceClient({
          name: 'otp-service',
          version: '1.0.0',
          baseUrl: mockServer.url,
          healthCheck: '/health',
          dependencies: [],
          capabilities: [],
        });

        client.setAuthToken('valid-token');

        const response = await client.post('/api/otp/send', {
          phoneNumber: '0912345678',
        });

        assert.ok(response.success);
        assert.ok(response.requestId);
      });
  });
});

3. Load Testing with k6

建立負載測試腳本:

// apps/kyo-otp-service/test/load/otp-load-test.js
import http from 'k6/http';
import { check, sleep } from 'k6';
import { Rate } from 'k6/metrics';

const errorRate = new Rate('errors');

export const options = {
  stages: [
    { duration: '30s', target: 10 },   // Ramp up to 10 users
    { duration: '1m', target: 50 },    // Ramp up to 50 users
    { duration: '2m', target: 100 },   // Ramp up to 100 users
    { duration: '1m', target: 50 },    // Ramp down to 50 users
    { duration: '30s', target: 0 },    // Ramp down to 0 users
  ],
  thresholds: {
    http_req_duration: ['p(95)<500', 'p(99)<1000'], // 95% < 500ms, 99% < 1s
    errors: ['rate<0.1'], // Error rate < 10%
  },
};

const BASE_URL = __ENV.BASE_URL || 'http://localhost:3000';
const JWT_TOKEN = __ENV.JWT_TOKEN || 'test-token';

export default function () {
  // Login
  let loginRes = http.post(`${BASE_URL}/api/auth/login`, JSON.stringify({
    email: 'test@example.com',
    password: 'password123',
  }), {
    headers: { 'Content-Type': 'application/json' },
  });

  check(loginRes, {
    'login successful': (r) => r.status === 200,
  }) || errorRate.add(1);

  const token = loginRes.json('token');

  sleep(1);

  // Send OTP
  let otpRes = http.post(`${BASE_URL}/api/otp/send`, JSON.stringify({
    phoneNumber: '0912345678',
  }), {
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${token}`,
    },
  });

  check(otpRes, {
    'otp sent successfully': (r) => r.status === 200 || r.status === 429,
    'response time OK': (r) => r.timings.duration < 1000,
  }) || errorRate.add(1);

  sleep(2);

  // Verify OTP
  if (otpRes.status === 200) {
    const requestId = otpRes.json('requestId');

    let verifyRes = http.post(`${BASE_URL}/api/otp/verify`, JSON.stringify({
      requestId: requestId,
      otp: '123456',
    }), {
      headers: {
        'Content-Type': 'application/json',
        'Authorization': `Bearer ${token}`,
      },
    });

    check(verifyRes, {
      'otp verified': (r) => r.status === 200 || r.status === 400,
    }) || errorRate.add(1);
  }

  sleep(1);
}

執行負載測試:

k6 run test/load/otp-load-test.js

生產環境部署檢查清單

## 後端部署檢查清單

### Code Quality
- [ ] 所有 TypeScript 編譯錯誤已修復
- [ ] ESLint 無警告
- [ ] 程式碼覆蓋率 > 80%
- [ ] 無 TODO 或 FIXME 註解

### Testing
- [ ] 單元測試全部通過
- [ ] 整合測試全部通過
- [ ] Contract 測試全部通過
- [ ] 負載測試達標 (p95 < 500ms)
- [ ] Chaos 測試完成

### Security
- [ ] 所有依賴套件已更新
- [ ] 無高危漏洞 (npm audit)
- [ ] JWT 金鑰已輪替
- [ ] API Rate Limiting 已啟用
- [ ] CORS 設定正確
- [ ] SQL Injection 防護
- [ ] XSS 防護

### Database
- [ ] Migration 腳本已測試
- [ ] 索引已優化
- [ ] 連線池設定正確
- [ ] 備份策略已測試
- [ ] Rollback 腳本已準備

### Infrastructure
- [ ] 環境變數已設定
- [ ] Secrets 已加密儲存
- [ ] 健康檢查 endpoint 正常
- [ ] 服務發現設定正確
- [ ] Load Balancer 設定完成

### Monitoring
- [ ] Prometheus metrics 已設定
- [ ] 日誌聚合已設定 (Loki/ELK)
- [ ] 追蹤已啟用 (Jaeger)
- [ ] 告警規則已設定
- [ ] Dashboard 已建立

### Performance
- [ ] API 回應時間 < 500ms (p95)
- [ ] 資料庫查詢優化
- [ ] Redis 快取策略正確
- [ ] Connection pooling 設定
- [ ] Graceful shutdown 實作

### Documentation
- [ ] API 文件已更新
- [ ] Runbook 已撰寫
- [ ] 架構圖已更新
- [ ] 部署文件已完成

今日總結

今天我們完成了 Kyo System 後端的微服務整合與事件驅動架構:

完成項目

  1. 微服務架構

    • Service Registry 與服務發現
    • Service Client 實作
    • 健康檢查機制
    • API Gateway 設計
  2. 事件驅動架構

    • Redis Pub/Sub Event Bus
    • Domain Events 定義
    • Event Handlers 註冊
    • Event Sourcing 儲存
  3. 分散式交易

    • Saga Pattern 實作
    • Saga Orchestrator
    • 補償機制 (Compensation)
    • 用戶註冊 Saga 範例
  4. 完整測試策略

    • Integration Tests
    • Contract Tests (Pact)
    • Load Tests (k6)
    • 生產部署檢查清單

技術特點

  • 鬆耦合架構:透過 Event Bus 實現服務間的異步通訊
  • 可靠性保證:Saga Pattern 確保分散式交易的一致性
  • 可觀測性:完整的監控、日誌、追蹤系統
  • 生產就緒:全方位的測試與部署檢查

上一篇
Day 28: 30天打造SaaS產品後端篇-資料分析引擎與報表生成
系列文
30 天打造工作室 SaaS 產品 (後端篇)29
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言