iT邦幫忙

2025 iThome 鐵人賽

DAY 18
0
Software Development

30 天打造工作室 SaaS 產品 (後端篇)系列 第 18

Day 18: 30天打造SaaS產品後端篇-後端效能優化實作

  • 分享至 

  • xImage
  •  

前情提要

在 Day 17 我們建立了 WebSocket 微服務架構,今天我們要聚焦在後端效能優化。從資料庫連線池管理、查詢優化、快取策略到背景任務處理,我們將實現企業級的後端效能調優方案,確保 SaaS 系統能夠應對高併發與大流量挑戰。

Fastify 伺服器效能調優

進階配置與最佳化

// src/config/FastifyConfig.ts
import Fastify, { FastifyInstance, FastifyServerOptions } from 'fastify';
import { IncomingMessage, Server, ServerResponse } from 'http';

interface OptimizedFastifyOptions extends FastifyServerOptions {
  performance?: {
    enableRequestTiming?: boolean;
    maxParamLength?: number;
    bodyLimit?: number;
    keepAliveTimeout?: number;
  };
}

export class FastifyOptimizer {
  static createOptimizedServer(options: OptimizedFastifyOptions = {}): FastifyInstance {
    const performanceConfig = {
      enableRequestTiming: true,
      maxParamLength: 100,
      bodyLimit: 1048576, // 1MB
      keepAliveTimeout: 72000, // 72 seconds
      ...options.performance
    };

    const server = Fastify<Server, IncomingMessage, ServerResponse>({
      // 基礎配置
      logger: {
        level: process.env.NODE_ENV === 'production' ? 'warn' : 'info',
        serializers: {
          req: (request) => ({
            method: request.method,
            url: request.url,
            hostname: request.hostname,
            remoteAddress: request.ip,
            remotePort: request.socket?.remotePort
          }),
          res: (response) => ({
            statusCode: response.statusCode
          })
        }
      },

      // 效能配置
      trustProxy: true,
      keepAliveTimeout: performanceConfig.keepAliveTimeout,
      maxParamLength: performanceConfig.maxParamLength,
      bodyLimit: performanceConfig.bodyLimit,

      // 安全配置
      ignoreTrailingSlash: true,
      caseSensitive: false,

      // 壓縮配置
      compression: {
        global: true,
        threshold: 1024,
        encodings: ['gzip', 'deflate'],
        customTypes: /^text\/|application\/json|application\/javascript/
      },

      ...options
    });

    // 效能監控 Hook
    if (performanceConfig.enableRequestTiming) {
      this.addPerformanceHooks(server);
    }

    // 連線池優化
    this.optimizeConnectionPool(server);

    // 記憶體管理
    this.setupMemoryManagement(server);

    return server;
  }

  private static addPerformanceHooks(server: FastifyInstance): void {
    // 請求開始時間記錄
    server.addHook('onRequest', async (request, reply) => {
      request.startTime = process.hrtime.bigint();
    });

    // 響應完成時間計算
    server.addHook('onSend', async (request, reply, payload) => {
      if (request.startTime) {
        const duration = Number(process.hrtime.bigint() - request.startTime) / 1000000;
        reply.header('X-Response-Time', `${duration.toFixed(2)}ms`);

        // 慢查詢警告
        if (duration > 1000) {
          server.log.warn({
            msg: 'Slow request detected',
            method: request.method,
            url: request.url,
            duration: `${duration.toFixed(2)}ms`
          });
        }
      }
      return payload;
    });

    // 錯誤處理
    server.addHook('onError', async (request, reply, error) => {
      server.log.error({
        msg: 'Request error',
        error: error.message,
        stack: error.stack,
        method: request.method,
        url: request.url
      });
    });
  }

  private static optimizeConnectionPool(server: FastifyInstance): void {
    // HTTP Keep-Alive 優化
    server.server.on('connection', (socket) => {
      socket.setKeepAlive(true, 30000);
      socket.setNoDelay(true);
      socket.setTimeout(75000);
    });

    // 連線數量監控
    let activeConnections = 0;
    server.server.on('connection', () => {
      activeConnections++;
    });

    server.server.on('close', () => {
      activeConnections--;
    });

    // 定期記錄連線狀態
    setInterval(() => {
      server.log.info({
        msg: 'Connection pool status',
        activeConnections,
        memoryUsage: process.memoryUsage()
      });
    }, 60000);
  }

  private static setupMemoryManagement(server: FastifyInstance): void {
    // 記憶體洩漏檢測
    let lastMemoryUsage = process.memoryUsage();

    setInterval(() => {
      const currentMemory = process.memoryUsage();
      const heapGrowth = currentMemory.heapUsed - lastMemoryUsage.heapUsed;

      if (heapGrowth > 50 * 1024 * 1024) { // 50MB 增長警告
        server.log.warn({
          msg: 'Potential memory leak detected',
          heapGrowth: `${(heapGrowth / 1024 / 1024).toFixed(2)}MB`,
          currentHeap: `${(currentMemory.heapUsed / 1024 / 1024).toFixed(2)}MB`
        });
      }

      lastMemoryUsage = currentMemory;

      // 強制垃圾收集(僅在開發環境)
      if (process.env.NODE_ENV === 'development' && global.gc) {
        global.gc();
      }
    }, 30000);
  }
}

中間件軟體效能優化

// src/middleware/PerformanceMiddleware.ts
import { FastifyInstance, FastifyRequest, FastifyReply } from 'fastify';
import fp from 'fastify-plugin';

interface PerformanceMetrics {
  requestCount: number;
  totalResponseTime: number;
  averageResponseTime: number;
  errorCount: number;
  slowQueryCount: number;
}

export class PerformanceMiddleware {
  private static metrics: PerformanceMetrics = {
    requestCount: 0,
    totalResponseTime: 0,
    averageResponseTime: 0,
    errorCount: 0,
    slowQueryCount: 0
  };

  static plugin = fp(async (server: FastifyInstance) => {
    // 請求限制中間件軟體
    await server.register(require('@fastify/rate-limit'), {
      max: 100,           // 每個時間窗口的最大請求數
      timeWindow: 60000,  // 時間窗口(毫秒)
      hook: 'preHandler',
      keyGenerator: (request: FastifyRequest) => {
        return request.ip + request.url;
      },
      errorResponseBuilder: (request: FastifyRequest, context: any) => {
        return {
          code: 429,
          error: 'Too Many Requests',
          message: `Rate limit exceeded. Max ${context.max} requests per ${context.timeWindow}ms.`,
          retryAfter: Math.round(context.ttl / 1000)
        };
      }
    });

    // 響應壓縮
    await server.register(require('@fastify/compress'), {
      global: true,
      threshold: 1024,
      encodings: ['gzip', 'deflate', 'br'],
      customTypes: /^text\/|application\/json|application\/javascript|image\/svg\+xml/,
      zlibOptions: {
        level: 6,        // 壓縮級別(1-9)
        chunkSize: 16384 // 16KB chunks
      },
      brotliOptions: {
        quality: 8,      // Brotli 品質級別(0-11)
        lgwin: 22,       // 窗口大小
        lgblock: 0       // 自動塊大小
      }
    });

    // 快取中間件軟體
    await server.register(require('@fastify/caching'), {
      privacy: 'private',
      expiresIn: 300,     // 5 分鐘快取
      cacheSegment: 'api-cache'
    });

    // 效能指標收集
    server.addHook('onRequest', async (request: FastifyRequest, reply: FastifyReply) => {
      this.metrics.requestCount++;
      request.startTime = Date.now();
    });

    server.addHook('onSend', async (request: FastifyRequest, reply: FastifyReply, payload) => {
      const responseTime = Date.now() - (request.startTime || Date.now());
      this.metrics.totalResponseTime += responseTime;
      this.metrics.averageResponseTime = this.metrics.totalResponseTime / this.metrics.requestCount;

      if (responseTime > 1000) {
        this.metrics.slowQueryCount++;
      }

      return payload;
    });

    server.addHook('onError', async (request: FastifyRequest, reply: FastifyReply, error) => {
      this.metrics.errorCount++;
    });

    // 效能指標端點
    server.get('/metrics/performance', async () => {
      return {
        ...this.metrics,
        uptime: process.uptime(),
        memoryUsage: process.memoryUsage(),
        cpuUsage: process.cpuUsage()
      };
    });
  });

  static getMetrics(): PerformanceMetrics {
    return { ...this.metrics };
  }

  static resetMetrics(): void {
    this.metrics = {
      requestCount: 0,
      totalResponseTime: 0,
      averageResponseTime: 0,
      errorCount: 0,
      slowQueryCount: 0
    };
  }
}

資料庫連線池與查詢優化

PostgreSQL 連線池深度配置

// src/database/OptimizedDatabase.ts
import { Pool, PoolConfig, PoolClient } from 'pg';
import { EventEmitter } from 'events';

interface DatabaseMetrics {
  totalConnections: number;
  activeConnections: number;
  idleConnections: number;
  queriesExecuted: number;
  averageQueryTime: number;
  slowQueries: number;
}

export class OptimizedDatabase extends EventEmitter {
  private pool: Pool;
  private metrics: DatabaseMetrics;
  private queryTimes: number[] = [];

  constructor() {
    super();
    this.metrics = {
      totalConnections: 0,
      activeConnections: 0,
      idleConnections: 0,
      queriesExecuted: 0,
      averageQueryTime: 0,
      slowQueries: 0
    };

    this.pool = this.createOptimizedPool();
    this.setupMonitoring();
  }

  private createOptimizedPool(): Pool {
    const config: PoolConfig = {
      // 基礎連線配置
      host: process.env.DB_HOST || 'localhost',
      port: parseInt(process.env.DB_PORT || '5432'),
      database: process.env.DB_NAME || 'kyo_saas',
      user: process.env.DB_USER || 'postgres',
      password: process.env.DB_PASSWORD,

      // 連線池優化配置
      min: 5,                    // 最小連線數
      max: 20,                   // 最大連線數
      idleTimeoutMillis: 30000,  // 閒置連線超時時間
      connectionTimeoutMillis: 10000, // 連線超時時間
      acquireTimeoutMillis: 60000,    // 取得連線超時時間

      // 效能調優
      keepAlive: true,
      keepAliveInitialDelayMillis: 10000,
      allowExitOnIdle: false,

      // SSL 配置(生產環境)
      ssl: process.env.NODE_ENV === 'production' ? {
        rejectUnauthorized: false
      } : false,

      // 查詢配置
      query_timeout: 30000,      // 查詢超時時間
      statement_timeout: 30000   // 語句超時時間
    };

    const pool = new Pool(config);

    // 連線池事件監聽
    pool.on('connect', (client: PoolClient) => {
      this.metrics.totalConnections++;
      this.emit('connect', { totalConnections: this.metrics.totalConnections });
    });

    pool.on('acquire', (client: PoolClient) => {
      this.metrics.activeConnections++;
      this.metrics.idleConnections = Math.max(0, this.metrics.idleConnections - 1);
    });

    pool.on('release', (client: PoolClient) => {
      this.metrics.activeConnections = Math.max(0, this.metrics.activeConnections - 1);
      this.metrics.idleConnections++;
    });

    pool.on('error', (err: Error, client: PoolClient) => {
      console.error('Database pool error:', err);
      this.emit('error', err);
    });

    return pool;
  }

  // 優化的查詢執行器
  async executeQuery<T = any>(
    text: string,
    params?: any[],
    options: { timeout?: number; cache?: boolean } = {}
  ): Promise<T[]> {
    const startTime = Date.now();
    const queryTimeout = options.timeout || 30000;

    try {
      // 查詢快取檢查
      if (options.cache) {
        const cacheKey = this.generateCacheKey(text, params);
        const cached = await this.getCachedQuery(cacheKey);
        if (cached) {
          return cached;
        }
      }

      // 執行查詢
      const client = await this.pool.connect();

      try {
        // 設定查詢超時
        const timeoutPromise = new Promise((_, reject) => {
          setTimeout(() => reject(new Error('Query timeout')), queryTimeout);
        });

        const queryPromise = client.query(text, params);
        const result = await Promise.race([queryPromise, timeoutPromise]) as any;

        // 記錄效能指標
        const queryTime = Date.now() - startTime;
        this.recordQueryPerformance(queryTime, text);

        // 快取結果
        if (options.cache) {
          const cacheKey = this.generateCacheKey(text, params);
          await this.setCachedQuery(cacheKey, result.rows);
        }

        return result.rows;
      } finally {
        client.release();
      }
    } catch (error) {
      const queryTime = Date.now() - startTime;
      this.recordQueryPerformance(queryTime, text, true);
      throw error;
    }
  }

  // 批次查詢優化
  async executeBatch<T = any>(
    queries: Array<{ text: string; params?: any[] }>,
    options: { transaction?: boolean } = {}
  ): Promise<T[][]> {
    const client = await this.pool.connect();

    try {
      if (options.transaction) {
        await client.query('BEGIN');
      }

      const results: T[][] = [];

      for (const query of queries) {
        const startTime = Date.now();
        try {
          const result = await client.query(query.text, query.params);
          results.push(result.rows);

          const queryTime = Date.now() - startTime;
          this.recordQueryPerformance(queryTime, query.text);
        } catch (error) {
          if (options.transaction) {
            await client.query('ROLLBACK');
          }
          throw error;
        }
      }

      if (options.transaction) {
        await client.query('COMMIT');
      }

      return results;
    } finally {
      client.release();
    }
  }

  // 預處理語句優化
  async executePrepared<T = any>(
    name: string,
    text: string,
    params: any[]
  ): Promise<T[]> {
    const client = await this.pool.connect();

    try {
      // 準備語句(如果尚未準備)
      await client.query(`PREPARE ${name} AS ${text}`);

      // 執行預處理語句
      const startTime = Date.now();
      const result = await client.query(`EXECUTE ${name}`, params);

      const queryTime = Date.now() - startTime;
      this.recordQueryPerformance(queryTime, `PREPARED: ${name}`);

      return result.rows;
    } finally {
      client.release();
    }
  }

  private recordQueryPerformance(queryTime: number, query: string, isError = false): void {
    this.metrics.queriesExecuted++;

    if (!isError) {
      this.queryTimes.push(queryTime);

      // 保持最近 1000 次查詢的時間記錄
      if (this.queryTimes.length > 1000) {
        this.queryTimes.shift();
      }

      this.metrics.averageQueryTime =
        this.queryTimes.reduce((sum, time) => sum + time, 0) / this.queryTimes.length;
    }

    // 慢查詢檢測
    if (queryTime > 1000) {
      this.metrics.slowQueries++;
      console.warn(`Slow query detected (${queryTime}ms):`, query.substring(0, 100));
    }
  }

  private generateCacheKey(text: string, params?: any[]): string {
    const hash = require('crypto').createHash('md5');
    hash.update(text);
    if (params) {
      hash.update(JSON.stringify(params));
    }
    return hash.digest('hex');
  }

  private async getCachedQuery(key: string): Promise<any[] | null> {
    // 實作快取邏輯(Redis 或記憶體快取)
    return null;
  }

  private async setCachedQuery(key: string, data: any[]): Promise<void> {
    // 實作快取設定邏輯
  }

  private setupMonitoring(): void {
    // 定期報告連線池狀態
    setInterval(() => {
      console.log('Database Pool Metrics:', {
        ...this.metrics,
        poolInfo: {
          totalCount: this.pool.totalCount,
          idleCount: this.pool.idleCount,
          waitingCount: this.pool.waitingCount
        }
      });
    }, 60000);
  }

  async getMetrics(): Promise<DatabaseMetrics & { poolInfo: any }> {
    return {
      ...this.metrics,
      poolInfo: {
        totalCount: this.pool.totalCount,
        idleCount: this.pool.idleCount,
        waitingCount: this.pool.waitingCount
      }
    };
  }

  async gracefulShutdown(): Promise<void> {
    console.log('Closing database connections...');
    await this.pool.end();
    console.log('Database connections closed.');
  }
}

查詢優化與索引策略

// src/database/QueryOptimizer.ts
export class QueryOptimizer {
  private db: OptimizedDatabase;

  constructor(database: OptimizedDatabase) {
    this.db = database;
  }

  // 分頁查詢優化
  async optimizedPagination<T>(
    baseQuery: string,
    countQuery: string,
    params: any[],
    page: number = 1,
    limit: number = 20
  ): Promise<{ data: T[]; total: number; page: number; totalPages: number }> {
    const offset = (page - 1) * limit;

    // 並行執行查詢和計數
    const [data, countResult] = await Promise.all([
      this.db.executeQuery<T>(
        `${baseQuery} LIMIT $${params.length + 1} OFFSET $${params.length + 2}`,
        [...params, limit, offset],
        { cache: true }
      ),
      this.db.executeQuery<{ count: string }>(countQuery, params, { cache: true })
    ]);

    const total = parseInt(countResult[0]?.count || '0');
    const totalPages = Math.ceil(total / limit);

    return {
      data,
      total,
      page,
      totalPages
    };
  }

  // 複雜查詢優化
  async optimizedJoinQuery<T>(
    tables: string[],
    conditions: string[],
    params: any[],
    options: {
      select?: string[];
      orderBy?: string;
      groupBy?: string;
      having?: string;
      limit?: number;
    } = {}
  ): Promise<T[]> {
    const selectClause = options.select?.join(', ') || '*';
    const fromClause = tables[0];
    const joinClauses = tables.slice(1).map((table, index) => {
      // 假設標準命名約定用於外鍵
      return `LEFT JOIN ${table} ON ${tables[0]}.${table.slice(0, -1)}_id = ${table}.id`;
    }).join(' ');

    const whereClause = conditions.length > 0 ? `WHERE ${conditions.join(' AND ')}` : '';
    const groupByClause = options.groupBy ? `GROUP BY ${options.groupBy}` : '';
    const havingClause = options.having ? `HAVING ${options.having}` : '';
    const orderByClause = options.orderBy ? `ORDER BY ${options.orderBy}` : '';
    const limitClause = options.limit ? `LIMIT ${options.limit}` : '';

    const query = `
      SELECT ${selectClause}
      FROM ${fromClause}
      ${joinClauses}
      ${whereClause}
      ${groupByClause}
      ${havingClause}
      ${orderByClause}
      ${limitClause}
    `.trim();

    return this.db.executeQuery<T>(query, params, { cache: true });
  }

  // 索引建議分析
  async analyzeQueryPerformance(query: string, params?: any[]): Promise<{
    executionTime: number;
    planningTime: number;
    indexSuggestions: string[];
    cost: number;
  }> {
    const explainQuery = `EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON) ${query}`;

    const result = await this.db.executeQuery(explainQuery, params);
    const plan = result[0]['QUERY PLAN'][0];

    const executionTime = plan['Execution Time'];
    const planningTime = plan['Planning Time'];
    const cost = plan['Plan']['Total Cost'];

    // 分析並產生索引建議
    const indexSuggestions = this.generateIndexSuggestions(plan['Plan']);

    return {
      executionTime,
      planningTime,
      indexSuggestions,
      cost
    };
  }

  private generateIndexSuggestions(plan: any): string[] {
    const suggestions: string[] = [];

    // 遞迴分析查詢計劃
    const analyzePlan = (node: any) => {
      if (node['Node Type'] === 'Seq Scan') {
        const relation = node['Relation Name'];
        const filter = node['Filter'];

        if (filter) {
          suggestions.push(`CREATE INDEX idx_${relation}_filter ON ${relation} (extracted_columns_from_filter);`);
        }
      }

      if (node['Node Type'] === 'Sort' && node['Sort Key']) {
        const sortKeys = node['Sort Key'];
        suggestions.push(`CREATE INDEX idx_sort ON table_name (${sortKeys.join(', ')});`);
      }

      if (node['Plans']) {
        node['Plans'].forEach((childPlan: any) => analyzePlan(childPlan));
      }
    };

    analyzePlan(plan);
    return [...new Set(suggestions)]; // 去重
  }

  // 自動索引創建
  async createOptimalIndexes(tableName: string): Promise<void> {
    const commonIndexes = [
      // 外鍵索引
      `CREATE INDEX IF NOT EXISTS idx_${tableName}_created_at ON ${tableName} (created_at);`,
      `CREATE INDEX IF NOT EXISTS idx_${tableName}_updated_at ON ${tableName} (updated_at);`,

      // 複合索引(根據常見查詢模式)
      `CREATE INDEX IF NOT EXISTS idx_${tableName}_status_created ON ${tableName} (status, created_at);`,

      // 部分索引(針對活躍記錄)
      `CREATE INDEX IF NOT EXISTS idx_${tableName}_active ON ${tableName} (id) WHERE deleted_at IS NULL;`
    ];

    for (const indexQuery of commonIndexes) {
      try {
        await this.db.executeQuery(indexQuery);
        console.log(`Created index for ${tableName}`);
      } catch (error) {
        console.warn(`Failed to create index: ${error}`);
      }
    }
  }
}

Redis 快取策略最佳化

多層快取架構

// src/cache/OptimizedCache.ts
import Redis from 'ioredis';
import { LRUCache } from 'lru-cache';

interface CacheStrategy {
  l1: boolean;    // 記憶體快取
  l2: boolean;    // Redis 快取
  ttl: number;    // 存活時間
  maxSize?: number; // 最大快取大小
}

export class OptimizedCache {
  private redis: Redis;
  private memoryCache: LRUCache<string, any>;
  private strategies: Map<string, CacheStrategy>;

  constructor() {
    this.redis = this.createRedisClient();
    this.memoryCache = this.createMemoryCache();
    this.strategies = new Map();
    this.setupDefaultStrategies();
  }

  private createRedisClient(): Redis {
    const redis = new Redis({
      host: process.env.REDIS_HOST || 'localhost',
      port: parseInt(process.env.REDIS_PORT || '6379'),
      password: process.env.REDIS_PASSWORD,
      db: parseInt(process.env.REDIS_DB || '0'),

      // 連線池配置
      maxRetriesPerRequest: 3,
      retryDelayOnFailover: 100,
      connectTimeout: 10000,
      commandTimeout: 5000,
      lazyConnect: true,

      // 效能優化
      keepAlive: 30000,
      enableOfflineQueue: false,
      maxRetriesPerRequest: 3,

      // 叢集配置(如果使用 Redis Cluster)
      enableReadyCheck: true,
      showFriendlyErrorStack: true,

      // Lua 腳本快取
      enableAutoPipelining: true,
      maxmemoryPolicy: 'allkeys-lru'
    });

    // Redis 事件監聽
    redis.on('connect', () => {
      console.log('Redis connected');
    });

    redis.on('ready', () => {
      console.log('Redis ready');
    });

    redis.on('error', (err) => {
      console.error('Redis error:', err);
    });

    redis.on('reconnecting', (time) => {
      console.log(`Redis reconnecting in ${time}ms`);
    });

    return redis;
  }

  private createMemoryCache(): LRUCache<string, any> {
    return new LRUCache({
      max: 1000,                    // 最大項目數
      maxSize: 50 * 1024 * 1024,   // 最大 50MB
      sizeCalculation: (value) => {
        return JSON.stringify(value).length;
      },
      ttl: 5 * 60 * 1000,         // 5 分鐘 TTL
      allowStale: false,
      updateAgeOnGet: true,
      updateAgeOnHas: true
    });
  }

  private setupDefaultStrategies(): void {
    // 用戶資料 - 雙層快取
    this.strategies.set('user', {
      l1: true,
      l2: true,
      ttl: 15 * 60, // 15 分鐘
      maxSize: 10 * 1024 // 10KB
    });

    // 會話資料 - 僅 Redis
    this.strategies.set('session', {
      l1: false,
      l2: true,
      ttl: 30 * 60, // 30 分鐘
    });

    // 設定資料 - 長時間快取
    this.strategies.set('config', {
      l1: true,
      l2: true,
      ttl: 60 * 60, // 1 小時
    });

    // API 響應 - 短時間快取
    this.strategies.set('api', {
      l1: true,
      l2: false,
      ttl: 2 * 60, // 2 分鐘
    });
  }

  // 智能快取取得
  async get<T>(key: string, type: string = 'default'): Promise<T | null> {
    const strategy = this.strategies.get(type) || this.strategies.get('default')!;

    // L1 快取檢查
    if (strategy.l1) {
      const l1Result = this.memoryCache.get(key);
      if (l1Result !== undefined) {
        return l1Result;
      }
    }

    // L2 快取檢查
    if (strategy.l2) {
      const l2Result = await this.redis.get(key);
      if (l2Result !== null) {
        const parsed = JSON.parse(l2Result);

        // 回填 L1 快取
        if (strategy.l1) {
          this.memoryCache.set(key, parsed, { ttl: strategy.ttl * 1000 });
        }

        return parsed;
      }
    }

    return null;
  }

  // 智能快取設定
  async set<T>(key: string, value: T, type: string = 'default', customTtl?: number): Promise<void> {
    const strategy = this.strategies.get(type) || this.strategies.get('default')!;
    const ttl = customTtl || strategy.ttl;

    // L1 快取設定
    if (strategy.l1) {
      this.memoryCache.set(key, value, { ttl: ttl * 1000 });
    }

    // L2 快取設定
    if (strategy.l2) {
      await this.redis.setex(key, ttl, JSON.stringify(value));
    }
  }

  // 批次操作優化
  async mget<T>(keys: string[], type: string = 'default'): Promise<Map<string, T>> {
    const strategy = this.strategies.get(type) || this.strategies.get('default')!;
    const result = new Map<string, T>();
    const missingKeys: string[] = [];

    // L1 批次檢查
    if (strategy.l1) {
      keys.forEach(key => {
        const value = this.memoryCache.get(key);
        if (value !== undefined) {
          result.set(key, value);
        } else {
          missingKeys.push(key);
        }
      });
    } else {
      missingKeys.push(...keys);
    }

    // L2 批次檢查
    if (strategy.l2 && missingKeys.length > 0) {
      const pipeline = this.redis.pipeline();
      missingKeys.forEach(key => pipeline.get(key));

      const l2Results = await pipeline.exec();

      if (l2Results) {
        l2Results.forEach(([err, value], index) => {
          if (!err && value !== null) {
            const key = missingKeys[index];
            const parsed = JSON.parse(value as string);
            result.set(key, parsed);

            // 回填 L1 快取
            if (strategy.l1) {
              this.memoryCache.set(key, parsed, { ttl: strategy.ttl * 1000 });
            }
          }
        });
      }
    }

    return result;
  }

  // 智能快取失效
  async invalidate(pattern: string): Promise<number> {
    // L1 快取失效
    let invalidatedCount = 0;

    for (const key of this.memoryCache.keys()) {
      if (key.includes(pattern)) {
        this.memoryCache.delete(key);
        invalidatedCount++;
      }
    }

    // L2 快取失效
    const keys = await this.redis.keys(`*${pattern}*`);
    if (keys.length > 0) {
      const pipeline = this.redis.pipeline();
      keys.forEach(key => pipeline.del(key));
      await pipeline.exec();
      invalidatedCount += keys.length;
    }

    return invalidatedCount;
  }

  // 快取穿透防護
  async getWithFallback<T>(
    key: string,
    fallbackFn: () => Promise<T>,
    type: string = 'default',
    preventCacheBreakdown: boolean = true
  ): Promise<T> {
    // 嘗試從快取獲取
    const cached = await this.get<T>(key, type);
    if (cached !== null) {
      return cached;
    }

    // 防止快取擊穿 - 使用分散式鎖
    if (preventCacheBreakdown) {
      const lockKey = `lock:${key}`;
      const lockValue = Date.now().toString();

      const acquired = await this.redis.set(lockKey, lockValue, 'PX', 10000, 'NX');

      if (acquired === 'OK') {
        try {
          // 再次檢查快取(雙重檢查)
          const doubleCheck = await this.get<T>(key, type);
          if (doubleCheck !== null) {
            return doubleCheck;
          }

          // 執行降級函數
          const result = await fallbackFn();

          // 設定快取
          await this.set(key, result, type);

          return result;
        } finally {
          // 釋放鎖
          const script = `
            if redis.call("get", KEYS[1]) == ARGV[1] then
              return redis.call("del", KEYS[1])
            else
              return 0
            end
          `;
          await this.redis.eval(script, 1, lockKey, lockValue);
        }
      } else {
        // 等待鎖釋放後重試
        await new Promise(resolve => setTimeout(resolve, 100));
        return this.getWithFallback(key, fallbackFn, type, false);
      }
    } else {
      const result = await fallbackFn();
      await this.set(key, result, type);
      return result;
    }
  }

  // 快取效能監控
  async getStats(): Promise<{
    memoryCache: {
      size: number;
      itemCount: number;
      hitRate: number;
    };
    redis: {
      usedMemory: string;
      connectedClients: number;
      commandsProcessed: number;
    };
  }> {
    const memoryCacheStats = {
      size: this.memoryCache.calculatedSize,
      itemCount: this.memoryCache.size,
      hitRate: 0 // 需要自行實作統計
    };

    const redisInfo = await this.redis.info('memory');
    const redisStats = await this.redis.info('stats');

    return {
      memoryCache: memoryCacheStats,
      redis: {
        usedMemory: this.parseRedisInfo(redisInfo, 'used_memory_human'),
        connectedClients: parseInt(this.parseRedisInfo(redisStats, 'connected_clients')),
        commandsProcessed: parseInt(this.parseRedisInfo(redisStats, 'total_commands_processed'))
      }
    };
  }

  private parseRedisInfo(info: string, key: string): string {
    const lines = info.split('\r\n');
    const line = lines.find(l => l.startsWith(`${key}:`));
    return line ? line.split(':')[1] : '0';
  }

  async gracefulShutdown(): Promise<void> {
    console.log('Closing cache connections...');
    this.memoryCache.clear();
    await this.redis.quit();
    console.log('Cache connections closed.');
  }
}

背景任務與佇列處理

高效能任務佇列系統

// src/queue/OptimizedQueue.ts
import Bull, { Queue, Job, JobOptions } from 'bull';
import { EventEmitter } from 'events';

interface TaskMetrics {
  totalJobs: number;
  completedJobs: number;
  failedJobs: number;
  activeJobs: number;
  waitingJobs: number;
  averageProcessingTime: number;
}

export class OptimizedQueue extends EventEmitter {
  private queues: Map<string, Queue> = new Map();
  private metrics: Map<string, TaskMetrics> = new Map();
  private processingTimes: Map<string, number[]> = new Map();

  constructor() {
    super();
    this.setupDefaultQueues();
  }

  private setupDefaultQueues(): void {
    // 高優先級佇列 - 即時處理
    this.createQueue('high-priority', {
      concurrency: 10,
      priority: 'high',
      delay: 0,
      removeOnComplete: 100,
      removeOnFail: 50
    });

    // 一般佇列 - 正常處理
    this.createQueue('normal', {
      concurrency: 5,
      priority: 'normal',
      delay: 0,
      removeOnComplete: 50,
      removeOnFail: 25
    });

    // 低優先級佇列 - 批次處理
    this.createQueue('low-priority', {
      concurrency: 2,
      priority: 'low',
      delay: 5000,
      removeOnComplete: 25,
      removeOnFail: 10
    });

    // 定時任務佇列
    this.createQueue('scheduled', {
      concurrency: 3,
      priority: 'normal',
      delay: 0,
      removeOnComplete: 200,
      removeOnFail: 100
    });
  }

  private createQueue(name: string, options: {
    concurrency: number;
    priority: string;
    delay: number;
    removeOnComplete: number;
    removeOnFail: number;
  }): void {
    const queue = new Bull(name, {
      redis: {
        host: process.env.REDIS_HOST || 'localhost',
        port: parseInt(process.env.REDIS_PORT || '6379'),
        password: process.env.REDIS_PASSWORD,
        db: parseInt(process.env.REDIS_DB || '1') // 使用不同的 DB
      },
      defaultJobOptions: {
        removeOnComplete: options.removeOnComplete,
        removeOnFail: options.removeOnFail,
        attempts: 3,
        backoff: {
          type: 'exponential',
          delay: 2000
        }
      },
      settings: {
        stalledInterval: 30 * 1000,    // 30 秒
        maxStalledCount: 1,
        retryProcessDelay: 5 * 1000,   // 5 秒
      }
    });

    // 設定處理器
    queue.process(options.concurrency, this.createJobProcessor(name));

    // 事件監聽
    this.setupQueueEvents(queue, name);

    this.queues.set(name, queue);
    this.metrics.set(name, {
      totalJobs: 0,
      completedJobs: 0,
      failedJobs: 0,
      activeJobs: 0,
      waitingJobs: 0,
      averageProcessingTime: 0
    });
    this.processingTimes.set(name, []);
  }

  private createJobProcessor(queueName: string) {
    return async (job: Job) => {
      const startTime = Date.now();

      try {
        console.log(`Processing job ${job.id} in queue ${queueName}`);

        // 根據任務類型執行不同邏輯
        const result = await this.executeJob(job);

        const processingTime = Date.now() - startTime;
        this.recordProcessingTime(queueName, processingTime);

        console.log(`Job ${job.id} completed in ${processingTime}ms`);
        return result;

      } catch (error) {
        const processingTime = Date.now() - startTime;
        console.error(`Job ${job.id} failed after ${processingTime}ms:`, error);
        throw error;
      }
    };
  }

  private async executeJob(job: Job): Promise<any> {
    const { type, data } = job.data;

    switch (type) {
      case 'send-notification':
        return this.sendNotification(data);

      case 'process-payment':
        return this.processPayment(data);

      case 'generate-report':
        return this.generateReport(data);

      case 'backup-data':
        return this.backupData(data);

      case 'cleanup-files':
        return this.cleanupFiles(data);

      default:
        throw new Error(`Unknown job type: ${type}`);
    }
  }

  private setupQueueEvents(queue: Queue, name: string): void {
    queue.on('completed', (job: Job, result: any) => {
      const metrics = this.metrics.get(name)!;
      metrics.completedJobs++;
      this.updateActiveJobCount(name);
      this.emit('job-completed', { queueName: name, jobId: job.id, result });
    });

    queue.on('failed', (job: Job, error: Error) => {
      const metrics = this.metrics.get(name)!;
      metrics.failedJobs++;
      this.updateActiveJobCount(name);
      this.emit('job-failed', { queueName: name, jobId: job.id, error: error.message });
    });

    queue.on('active', (job: Job) => {
      this.updateActiveJobCount(name);
      this.emit('job-active', { queueName: name, jobId: job.id });
    });

    queue.on('waiting', (job: Job) => {
      this.updateWaitingJobCount(name);
    });

    queue.on('stalled', (job: Job) => {
      console.warn(`Job ${job.id} in queue ${name} has stalled`);
      this.emit('job-stalled', { queueName: name, jobId: job.id });
    });
  }

  // 添加任務到佇列
  async addJob(
    queueName: string,
    type: string,
    data: any,
    options: JobOptions = {}
  ): Promise<Job> {
    const queue = this.queues.get(queueName);
    if (!queue) {
      throw new Error(`Queue ${queueName} not found`);
    }

    const metrics = this.metrics.get(queueName)!;
    metrics.totalJobs++;

    const job = await queue.add({ type, data }, {
      priority: this.getPriority(type),
      delay: options.delay || 0,
      attempts: options.attempts || 3,
      ...options
    });

    this.updateWaitingJobCount(queueName);
    return job;
  }

  // 批次添加任務
  async addBulkJobs(
    queueName: string,
    jobs: Array<{ type: string; data: any; options?: JobOptions }>
  ): Promise<Job[]> {
    const queue = this.queues.get(queueName);
    if (!queue) {
      throw new Error(`Queue ${queueName} not found`);
    }

    const bulkJobs = jobs.map(job => ({
      data: { type: job.type, data: job.data },
      opts: {
        priority: this.getPriority(job.type),
        attempts: 3,
        ...job.options
      }
    }));

    const result = await queue.addBulk(bulkJobs);

    const metrics = this.metrics.get(queueName)!;
    metrics.totalJobs += jobs.length;
    this.updateWaitingJobCount(queueName);

    return result;
  }

  // 定時任務
  async scheduleJob(
    type: string,
    data: any,
    cronExpression: string,
    options: JobOptions = {}
  ): Promise<void> {
    const queue = this.queues.get('scheduled');
    if (!queue) {
      throw new Error('Scheduled queue not found');
    }

    await queue.add(
      { type, data },
      {
        repeat: { cron: cronExpression },
        ...options
      }
    );
  }

  private getPriority(type: string): number {
    const priorityMap: Record<string, number> = {
      'send-notification': 1,    // 最高優先級
      'process-payment': 2,
      'generate-report': 5,
      'backup-data': 8,
      'cleanup-files': 10        // 最低優先級
    };

    return priorityMap[type] || 5;
  }

  // 實作具體任務處理方法
  private async sendNotification(data: any): Promise<void> {
    // 模擬發送通知
    await new Promise(resolve => setTimeout(resolve, 100));
    console.log('Notification sent:', data);
  }

  private async processPayment(data: any): Promise<void> {
    // 模擬處理付款
    await new Promise(resolve => setTimeout(resolve, 500));
    console.log('Payment processed:', data);
  }

  private async generateReport(data: any): Promise<void> {
    // 模擬生成報表
    await new Promise(resolve => setTimeout(resolve, 2000));
    console.log('Report generated:', data);
  }

  private async backupData(data: any): Promise<void> {
    // 模擬資料備份
    await new Promise(resolve => setTimeout(resolve, 5000));
    console.log('Data backed up:', data);
  }

  private async cleanupFiles(data: any): Promise<void> {
    // 模擬清理檔案
    await new Promise(resolve => setTimeout(resolve, 1000));
    console.log('Files cleaned up:', data);
  }

  private recordProcessingTime(queueName: string, time: number): void {
    const times = this.processingTimes.get(queueName)!;
    times.push(time);

    // 保持最近 100 次的處理時間
    if (times.length > 100) {
      times.shift();
    }

    const metrics = this.metrics.get(queueName)!;
    metrics.averageProcessingTime = times.reduce((sum, t) => sum + t, 0) / times.length;
  }

  private async updateActiveJobCount(queueName: string): Promise<void> {
    const queue = this.queues.get(queueName);
    if (queue) {
      const active = await queue.getActive();
      const metrics = this.metrics.get(queueName)!;
      metrics.activeJobs = active.length;
    }
  }

  private async updateWaitingJobCount(queueName: string): Promise<void> {
    const queue = this.queues.get(queueName);
    if (queue) {
      const waiting = await queue.getWaiting();
      const metrics = this.metrics.get(queueName)!;
      metrics.waitingJobs = waiting.length;
    }
  }

  // 取得佇列統計
  async getQueueStats(queueName?: string): Promise<Map<string, TaskMetrics> | TaskMetrics> {
    if (queueName) {
      const metrics = this.metrics.get(queueName);
      if (!metrics) {
        throw new Error(`Queue ${queueName} not found`);
      }
      await this.updateActiveJobCount(queueName);
      await this.updateWaitingJobCount(queueName);
      return metrics;
    }

    // 更新所有佇列的即時統計
    for (const name of this.queues.keys()) {
      await this.updateActiveJobCount(name);
      await this.updateWaitingJobCount(name);
    }

    return new Map(this.metrics);
  }

  // 清理佇列
  async cleanQueue(queueName: string, grace: number = 5000): Promise<void> {
    const queue = this.queues.get(queueName);
    if (queue) {
      await queue.clean(grace, 'completed');
      await queue.clean(grace, 'failed');
      console.log(`Queue ${queueName} cleaned`);
    }
  }

  // 優雅關閉
  async gracefulShutdown(): Promise<void> {
    console.log('Shutting down queues...');

    for (const [name, queue] of this.queues) {
      console.log(`Closing queue: ${name}`);
      await queue.close();
    }

    console.log('All queues closed.');
  }
}

今日總結

今天我們完成了全面的後端效能優化實作:

  1. Fastify 優化:連線池管理、請求時間監控、記憶體洩漏檢測
  2. 資料庫優化:連線池配置、查詢效能分析、索引策略
  3. 快取架構:多層快取、智能失效、防擊穿保護
  4. 佇列系統:優先級處理、批次操作、定時任務
  5. 效能監控:即時指標收集、自動警報、效能分析

參考資源


上一篇
Day 17: 30天打造SaaS產品後端篇-WebSocket 微服務架構與高可用性即時通信引擎實作
下一篇
Day 19: 30天打造SaaS產品後端篇-身份驗證與授權系統
系列文
30 天打造工作室 SaaS 產品 (後端篇)19
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言