在 Day 17 我們建立了 WebSocket 微服務架構,今天我們要聚焦在後端效能優化。從資料庫連線池管理、查詢優化、快取策略到背景任務處理,我們將實現企業級的後端效能調優方案,確保 SaaS 系統能夠應對高併發與大流量挑戰。
// src/config/FastifyConfig.ts
import Fastify, { FastifyInstance, FastifyServerOptions } from 'fastify';
import { IncomingMessage, Server, ServerResponse } from 'http';
interface OptimizedFastifyOptions extends FastifyServerOptions {
performance?: {
enableRequestTiming?: boolean;
maxParamLength?: number;
bodyLimit?: number;
keepAliveTimeout?: number;
};
}
export class FastifyOptimizer {
static createOptimizedServer(options: OptimizedFastifyOptions = {}): FastifyInstance {
const performanceConfig = {
enableRequestTiming: true,
maxParamLength: 100,
bodyLimit: 1048576, // 1MB
keepAliveTimeout: 72000, // 72 seconds
...options.performance
};
const server = Fastify<Server, IncomingMessage, ServerResponse>({
// 基礎配置
logger: {
level: process.env.NODE_ENV === 'production' ? 'warn' : 'info',
serializers: {
req: (request) => ({
method: request.method,
url: request.url,
hostname: request.hostname,
remoteAddress: request.ip,
remotePort: request.socket?.remotePort
}),
res: (response) => ({
statusCode: response.statusCode
})
}
},
// 效能配置
trustProxy: true,
keepAliveTimeout: performanceConfig.keepAliveTimeout,
maxParamLength: performanceConfig.maxParamLength,
bodyLimit: performanceConfig.bodyLimit,
// 安全配置
ignoreTrailingSlash: true,
caseSensitive: false,
// 壓縮配置
compression: {
global: true,
threshold: 1024,
encodings: ['gzip', 'deflate'],
customTypes: /^text\/|application\/json|application\/javascript/
},
...options
});
// 效能監控 Hook
if (performanceConfig.enableRequestTiming) {
this.addPerformanceHooks(server);
}
// 連線池優化
this.optimizeConnectionPool(server);
// 記憶體管理
this.setupMemoryManagement(server);
return server;
}
private static addPerformanceHooks(server: FastifyInstance): void {
// 請求開始時間記錄
server.addHook('onRequest', async (request, reply) => {
request.startTime = process.hrtime.bigint();
});
// 響應完成時間計算
server.addHook('onSend', async (request, reply, payload) => {
if (request.startTime) {
const duration = Number(process.hrtime.bigint() - request.startTime) / 1000000;
reply.header('X-Response-Time', `${duration.toFixed(2)}ms`);
// 慢查詢警告
if (duration > 1000) {
server.log.warn({
msg: 'Slow request detected',
method: request.method,
url: request.url,
duration: `${duration.toFixed(2)}ms`
});
}
}
return payload;
});
// 錯誤處理
server.addHook('onError', async (request, reply, error) => {
server.log.error({
msg: 'Request error',
error: error.message,
stack: error.stack,
method: request.method,
url: request.url
});
});
}
private static optimizeConnectionPool(server: FastifyInstance): void {
// HTTP Keep-Alive 優化
server.server.on('connection', (socket) => {
socket.setKeepAlive(true, 30000);
socket.setNoDelay(true);
socket.setTimeout(75000);
});
// 連線數量監控
let activeConnections = 0;
server.server.on('connection', () => {
activeConnections++;
});
server.server.on('close', () => {
activeConnections--;
});
// 定期記錄連線狀態
setInterval(() => {
server.log.info({
msg: 'Connection pool status',
activeConnections,
memoryUsage: process.memoryUsage()
});
}, 60000);
}
private static setupMemoryManagement(server: FastifyInstance): void {
// 記憶體洩漏檢測
let lastMemoryUsage = process.memoryUsage();
setInterval(() => {
const currentMemory = process.memoryUsage();
const heapGrowth = currentMemory.heapUsed - lastMemoryUsage.heapUsed;
if (heapGrowth > 50 * 1024 * 1024) { // 50MB 增長警告
server.log.warn({
msg: 'Potential memory leak detected',
heapGrowth: `${(heapGrowth / 1024 / 1024).toFixed(2)}MB`,
currentHeap: `${(currentMemory.heapUsed / 1024 / 1024).toFixed(2)}MB`
});
}
lastMemoryUsage = currentMemory;
// 強制垃圾收集(僅在開發環境)
if (process.env.NODE_ENV === 'development' && global.gc) {
global.gc();
}
}, 30000);
}
}
// src/middleware/PerformanceMiddleware.ts
import { FastifyInstance, FastifyRequest, FastifyReply } from 'fastify';
import fp from 'fastify-plugin';
interface PerformanceMetrics {
requestCount: number;
totalResponseTime: number;
averageResponseTime: number;
errorCount: number;
slowQueryCount: number;
}
export class PerformanceMiddleware {
private static metrics: PerformanceMetrics = {
requestCount: 0,
totalResponseTime: 0,
averageResponseTime: 0,
errorCount: 0,
slowQueryCount: 0
};
static plugin = fp(async (server: FastifyInstance) => {
// 請求限制中間件軟體
await server.register(require('@fastify/rate-limit'), {
max: 100, // 每個時間窗口的最大請求數
timeWindow: 60000, // 時間窗口(毫秒)
hook: 'preHandler',
keyGenerator: (request: FastifyRequest) => {
return request.ip + request.url;
},
errorResponseBuilder: (request: FastifyRequest, context: any) => {
return {
code: 429,
error: 'Too Many Requests',
message: `Rate limit exceeded. Max ${context.max} requests per ${context.timeWindow}ms.`,
retryAfter: Math.round(context.ttl / 1000)
};
}
});
// 響應壓縮
await server.register(require('@fastify/compress'), {
global: true,
threshold: 1024,
encodings: ['gzip', 'deflate', 'br'],
customTypes: /^text\/|application\/json|application\/javascript|image\/svg\+xml/,
zlibOptions: {
level: 6, // 壓縮級別(1-9)
chunkSize: 16384 // 16KB chunks
},
brotliOptions: {
quality: 8, // Brotli 品質級別(0-11)
lgwin: 22, // 窗口大小
lgblock: 0 // 自動塊大小
}
});
// 快取中間件軟體
await server.register(require('@fastify/caching'), {
privacy: 'private',
expiresIn: 300, // 5 分鐘快取
cacheSegment: 'api-cache'
});
// 效能指標收集
server.addHook('onRequest', async (request: FastifyRequest, reply: FastifyReply) => {
this.metrics.requestCount++;
request.startTime = Date.now();
});
server.addHook('onSend', async (request: FastifyRequest, reply: FastifyReply, payload) => {
const responseTime = Date.now() - (request.startTime || Date.now());
this.metrics.totalResponseTime += responseTime;
this.metrics.averageResponseTime = this.metrics.totalResponseTime / this.metrics.requestCount;
if (responseTime > 1000) {
this.metrics.slowQueryCount++;
}
return payload;
});
server.addHook('onError', async (request: FastifyRequest, reply: FastifyReply, error) => {
this.metrics.errorCount++;
});
// 效能指標端點
server.get('/metrics/performance', async () => {
return {
...this.metrics,
uptime: process.uptime(),
memoryUsage: process.memoryUsage(),
cpuUsage: process.cpuUsage()
};
});
});
static getMetrics(): PerformanceMetrics {
return { ...this.metrics };
}
static resetMetrics(): void {
this.metrics = {
requestCount: 0,
totalResponseTime: 0,
averageResponseTime: 0,
errorCount: 0,
slowQueryCount: 0
};
}
}
// src/database/OptimizedDatabase.ts
import { Pool, PoolConfig, PoolClient } from 'pg';
import { EventEmitter } from 'events';
interface DatabaseMetrics {
totalConnections: number;
activeConnections: number;
idleConnections: number;
queriesExecuted: number;
averageQueryTime: number;
slowQueries: number;
}
export class OptimizedDatabase extends EventEmitter {
private pool: Pool;
private metrics: DatabaseMetrics;
private queryTimes: number[] = [];
constructor() {
super();
this.metrics = {
totalConnections: 0,
activeConnections: 0,
idleConnections: 0,
queriesExecuted: 0,
averageQueryTime: 0,
slowQueries: 0
};
this.pool = this.createOptimizedPool();
this.setupMonitoring();
}
private createOptimizedPool(): Pool {
const config: PoolConfig = {
// 基礎連線配置
host: process.env.DB_HOST || 'localhost',
port: parseInt(process.env.DB_PORT || '5432'),
database: process.env.DB_NAME || 'kyo_saas',
user: process.env.DB_USER || 'postgres',
password: process.env.DB_PASSWORD,
// 連線池優化配置
min: 5, // 最小連線數
max: 20, // 最大連線數
idleTimeoutMillis: 30000, // 閒置連線超時時間
connectionTimeoutMillis: 10000, // 連線超時時間
acquireTimeoutMillis: 60000, // 取得連線超時時間
// 效能調優
keepAlive: true,
keepAliveInitialDelayMillis: 10000,
allowExitOnIdle: false,
// SSL 配置(生產環境)
ssl: process.env.NODE_ENV === 'production' ? {
rejectUnauthorized: false
} : false,
// 查詢配置
query_timeout: 30000, // 查詢超時時間
statement_timeout: 30000 // 語句超時時間
};
const pool = new Pool(config);
// 連線池事件監聽
pool.on('connect', (client: PoolClient) => {
this.metrics.totalConnections++;
this.emit('connect', { totalConnections: this.metrics.totalConnections });
});
pool.on('acquire', (client: PoolClient) => {
this.metrics.activeConnections++;
this.metrics.idleConnections = Math.max(0, this.metrics.idleConnections - 1);
});
pool.on('release', (client: PoolClient) => {
this.metrics.activeConnections = Math.max(0, this.metrics.activeConnections - 1);
this.metrics.idleConnections++;
});
pool.on('error', (err: Error, client: PoolClient) => {
console.error('Database pool error:', err);
this.emit('error', err);
});
return pool;
}
// 優化的查詢執行器
async executeQuery<T = any>(
text: string,
params?: any[],
options: { timeout?: number; cache?: boolean } = {}
): Promise<T[]> {
const startTime = Date.now();
const queryTimeout = options.timeout || 30000;
try {
// 查詢快取檢查
if (options.cache) {
const cacheKey = this.generateCacheKey(text, params);
const cached = await this.getCachedQuery(cacheKey);
if (cached) {
return cached;
}
}
// 執行查詢
const client = await this.pool.connect();
try {
// 設定查詢超時
const timeoutPromise = new Promise((_, reject) => {
setTimeout(() => reject(new Error('Query timeout')), queryTimeout);
});
const queryPromise = client.query(text, params);
const result = await Promise.race([queryPromise, timeoutPromise]) as any;
// 記錄效能指標
const queryTime = Date.now() - startTime;
this.recordQueryPerformance(queryTime, text);
// 快取結果
if (options.cache) {
const cacheKey = this.generateCacheKey(text, params);
await this.setCachedQuery(cacheKey, result.rows);
}
return result.rows;
} finally {
client.release();
}
} catch (error) {
const queryTime = Date.now() - startTime;
this.recordQueryPerformance(queryTime, text, true);
throw error;
}
}
// 批次查詢優化
async executeBatch<T = any>(
queries: Array<{ text: string; params?: any[] }>,
options: { transaction?: boolean } = {}
): Promise<T[][]> {
const client = await this.pool.connect();
try {
if (options.transaction) {
await client.query('BEGIN');
}
const results: T[][] = [];
for (const query of queries) {
const startTime = Date.now();
try {
const result = await client.query(query.text, query.params);
results.push(result.rows);
const queryTime = Date.now() - startTime;
this.recordQueryPerformance(queryTime, query.text);
} catch (error) {
if (options.transaction) {
await client.query('ROLLBACK');
}
throw error;
}
}
if (options.transaction) {
await client.query('COMMIT');
}
return results;
} finally {
client.release();
}
}
// 預處理語句優化
async executePrepared<T = any>(
name: string,
text: string,
params: any[]
): Promise<T[]> {
const client = await this.pool.connect();
try {
// 準備語句(如果尚未準備)
await client.query(`PREPARE ${name} AS ${text}`);
// 執行預處理語句
const startTime = Date.now();
const result = await client.query(`EXECUTE ${name}`, params);
const queryTime = Date.now() - startTime;
this.recordQueryPerformance(queryTime, `PREPARED: ${name}`);
return result.rows;
} finally {
client.release();
}
}
private recordQueryPerformance(queryTime: number, query: string, isError = false): void {
this.metrics.queriesExecuted++;
if (!isError) {
this.queryTimes.push(queryTime);
// 保持最近 1000 次查詢的時間記錄
if (this.queryTimes.length > 1000) {
this.queryTimes.shift();
}
this.metrics.averageQueryTime =
this.queryTimes.reduce((sum, time) => sum + time, 0) / this.queryTimes.length;
}
// 慢查詢檢測
if (queryTime > 1000) {
this.metrics.slowQueries++;
console.warn(`Slow query detected (${queryTime}ms):`, query.substring(0, 100));
}
}
private generateCacheKey(text: string, params?: any[]): string {
const hash = require('crypto').createHash('md5');
hash.update(text);
if (params) {
hash.update(JSON.stringify(params));
}
return hash.digest('hex');
}
private async getCachedQuery(key: string): Promise<any[] | null> {
// 實作快取邏輯(Redis 或記憶體快取)
return null;
}
private async setCachedQuery(key: string, data: any[]): Promise<void> {
// 實作快取設定邏輯
}
private setupMonitoring(): void {
// 定期報告連線池狀態
setInterval(() => {
console.log('Database Pool Metrics:', {
...this.metrics,
poolInfo: {
totalCount: this.pool.totalCount,
idleCount: this.pool.idleCount,
waitingCount: this.pool.waitingCount
}
});
}, 60000);
}
async getMetrics(): Promise<DatabaseMetrics & { poolInfo: any }> {
return {
...this.metrics,
poolInfo: {
totalCount: this.pool.totalCount,
idleCount: this.pool.idleCount,
waitingCount: this.pool.waitingCount
}
};
}
async gracefulShutdown(): Promise<void> {
console.log('Closing database connections...');
await this.pool.end();
console.log('Database connections closed.');
}
}
// src/database/QueryOptimizer.ts
export class QueryOptimizer {
private db: OptimizedDatabase;
constructor(database: OptimizedDatabase) {
this.db = database;
}
// 分頁查詢優化
async optimizedPagination<T>(
baseQuery: string,
countQuery: string,
params: any[],
page: number = 1,
limit: number = 20
): Promise<{ data: T[]; total: number; page: number; totalPages: number }> {
const offset = (page - 1) * limit;
// 並行執行查詢和計數
const [data, countResult] = await Promise.all([
this.db.executeQuery<T>(
`${baseQuery} LIMIT $${params.length + 1} OFFSET $${params.length + 2}`,
[...params, limit, offset],
{ cache: true }
),
this.db.executeQuery<{ count: string }>(countQuery, params, { cache: true })
]);
const total = parseInt(countResult[0]?.count || '0');
const totalPages = Math.ceil(total / limit);
return {
data,
total,
page,
totalPages
};
}
// 複雜查詢優化
async optimizedJoinQuery<T>(
tables: string[],
conditions: string[],
params: any[],
options: {
select?: string[];
orderBy?: string;
groupBy?: string;
having?: string;
limit?: number;
} = {}
): Promise<T[]> {
const selectClause = options.select?.join(', ') || '*';
const fromClause = tables[0];
const joinClauses = tables.slice(1).map((table, index) => {
// 假設標準命名約定用於外鍵
return `LEFT JOIN ${table} ON ${tables[0]}.${table.slice(0, -1)}_id = ${table}.id`;
}).join(' ');
const whereClause = conditions.length > 0 ? `WHERE ${conditions.join(' AND ')}` : '';
const groupByClause = options.groupBy ? `GROUP BY ${options.groupBy}` : '';
const havingClause = options.having ? `HAVING ${options.having}` : '';
const orderByClause = options.orderBy ? `ORDER BY ${options.orderBy}` : '';
const limitClause = options.limit ? `LIMIT ${options.limit}` : '';
const query = `
SELECT ${selectClause}
FROM ${fromClause}
${joinClauses}
${whereClause}
${groupByClause}
${havingClause}
${orderByClause}
${limitClause}
`.trim();
return this.db.executeQuery<T>(query, params, { cache: true });
}
// 索引建議分析
async analyzeQueryPerformance(query: string, params?: any[]): Promise<{
executionTime: number;
planningTime: number;
indexSuggestions: string[];
cost: number;
}> {
const explainQuery = `EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON) ${query}`;
const result = await this.db.executeQuery(explainQuery, params);
const plan = result[0]['QUERY PLAN'][0];
const executionTime = plan['Execution Time'];
const planningTime = plan['Planning Time'];
const cost = plan['Plan']['Total Cost'];
// 分析並產生索引建議
const indexSuggestions = this.generateIndexSuggestions(plan['Plan']);
return {
executionTime,
planningTime,
indexSuggestions,
cost
};
}
private generateIndexSuggestions(plan: any): string[] {
const suggestions: string[] = [];
// 遞迴分析查詢計劃
const analyzePlan = (node: any) => {
if (node['Node Type'] === 'Seq Scan') {
const relation = node['Relation Name'];
const filter = node['Filter'];
if (filter) {
suggestions.push(`CREATE INDEX idx_${relation}_filter ON ${relation} (extracted_columns_from_filter);`);
}
}
if (node['Node Type'] === 'Sort' && node['Sort Key']) {
const sortKeys = node['Sort Key'];
suggestions.push(`CREATE INDEX idx_sort ON table_name (${sortKeys.join(', ')});`);
}
if (node['Plans']) {
node['Plans'].forEach((childPlan: any) => analyzePlan(childPlan));
}
};
analyzePlan(plan);
return [...new Set(suggestions)]; // 去重
}
// 自動索引創建
async createOptimalIndexes(tableName: string): Promise<void> {
const commonIndexes = [
// 外鍵索引
`CREATE INDEX IF NOT EXISTS idx_${tableName}_created_at ON ${tableName} (created_at);`,
`CREATE INDEX IF NOT EXISTS idx_${tableName}_updated_at ON ${tableName} (updated_at);`,
// 複合索引(根據常見查詢模式)
`CREATE INDEX IF NOT EXISTS idx_${tableName}_status_created ON ${tableName} (status, created_at);`,
// 部分索引(針對活躍記錄)
`CREATE INDEX IF NOT EXISTS idx_${tableName}_active ON ${tableName} (id) WHERE deleted_at IS NULL;`
];
for (const indexQuery of commonIndexes) {
try {
await this.db.executeQuery(indexQuery);
console.log(`Created index for ${tableName}`);
} catch (error) {
console.warn(`Failed to create index: ${error}`);
}
}
}
}
// src/cache/OptimizedCache.ts
import Redis from 'ioredis';
import { LRUCache } from 'lru-cache';
interface CacheStrategy {
l1: boolean; // 記憶體快取
l2: boolean; // Redis 快取
ttl: number; // 存活時間
maxSize?: number; // 最大快取大小
}
export class OptimizedCache {
private redis: Redis;
private memoryCache: LRUCache<string, any>;
private strategies: Map<string, CacheStrategy>;
constructor() {
this.redis = this.createRedisClient();
this.memoryCache = this.createMemoryCache();
this.strategies = new Map();
this.setupDefaultStrategies();
}
private createRedisClient(): Redis {
const redis = new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
password: process.env.REDIS_PASSWORD,
db: parseInt(process.env.REDIS_DB || '0'),
// 連線池配置
maxRetriesPerRequest: 3,
retryDelayOnFailover: 100,
connectTimeout: 10000,
commandTimeout: 5000,
lazyConnect: true,
// 效能優化
keepAlive: 30000,
enableOfflineQueue: false,
maxRetriesPerRequest: 3,
// 叢集配置(如果使用 Redis Cluster)
enableReadyCheck: true,
showFriendlyErrorStack: true,
// Lua 腳本快取
enableAutoPipelining: true,
maxmemoryPolicy: 'allkeys-lru'
});
// Redis 事件監聽
redis.on('connect', () => {
console.log('Redis connected');
});
redis.on('ready', () => {
console.log('Redis ready');
});
redis.on('error', (err) => {
console.error('Redis error:', err);
});
redis.on('reconnecting', (time) => {
console.log(`Redis reconnecting in ${time}ms`);
});
return redis;
}
private createMemoryCache(): LRUCache<string, any> {
return new LRUCache({
max: 1000, // 最大項目數
maxSize: 50 * 1024 * 1024, // 最大 50MB
sizeCalculation: (value) => {
return JSON.stringify(value).length;
},
ttl: 5 * 60 * 1000, // 5 分鐘 TTL
allowStale: false,
updateAgeOnGet: true,
updateAgeOnHas: true
});
}
private setupDefaultStrategies(): void {
// 用戶資料 - 雙層快取
this.strategies.set('user', {
l1: true,
l2: true,
ttl: 15 * 60, // 15 分鐘
maxSize: 10 * 1024 // 10KB
});
// 會話資料 - 僅 Redis
this.strategies.set('session', {
l1: false,
l2: true,
ttl: 30 * 60, // 30 分鐘
});
// 設定資料 - 長時間快取
this.strategies.set('config', {
l1: true,
l2: true,
ttl: 60 * 60, // 1 小時
});
// API 響應 - 短時間快取
this.strategies.set('api', {
l1: true,
l2: false,
ttl: 2 * 60, // 2 分鐘
});
}
// 智能快取取得
async get<T>(key: string, type: string = 'default'): Promise<T | null> {
const strategy = this.strategies.get(type) || this.strategies.get('default')!;
// L1 快取檢查
if (strategy.l1) {
const l1Result = this.memoryCache.get(key);
if (l1Result !== undefined) {
return l1Result;
}
}
// L2 快取檢查
if (strategy.l2) {
const l2Result = await this.redis.get(key);
if (l2Result !== null) {
const parsed = JSON.parse(l2Result);
// 回填 L1 快取
if (strategy.l1) {
this.memoryCache.set(key, parsed, { ttl: strategy.ttl * 1000 });
}
return parsed;
}
}
return null;
}
// 智能快取設定
async set<T>(key: string, value: T, type: string = 'default', customTtl?: number): Promise<void> {
const strategy = this.strategies.get(type) || this.strategies.get('default')!;
const ttl = customTtl || strategy.ttl;
// L1 快取設定
if (strategy.l1) {
this.memoryCache.set(key, value, { ttl: ttl * 1000 });
}
// L2 快取設定
if (strategy.l2) {
await this.redis.setex(key, ttl, JSON.stringify(value));
}
}
// 批次操作優化
async mget<T>(keys: string[], type: string = 'default'): Promise<Map<string, T>> {
const strategy = this.strategies.get(type) || this.strategies.get('default')!;
const result = new Map<string, T>();
const missingKeys: string[] = [];
// L1 批次檢查
if (strategy.l1) {
keys.forEach(key => {
const value = this.memoryCache.get(key);
if (value !== undefined) {
result.set(key, value);
} else {
missingKeys.push(key);
}
});
} else {
missingKeys.push(...keys);
}
// L2 批次檢查
if (strategy.l2 && missingKeys.length > 0) {
const pipeline = this.redis.pipeline();
missingKeys.forEach(key => pipeline.get(key));
const l2Results = await pipeline.exec();
if (l2Results) {
l2Results.forEach(([err, value], index) => {
if (!err && value !== null) {
const key = missingKeys[index];
const parsed = JSON.parse(value as string);
result.set(key, parsed);
// 回填 L1 快取
if (strategy.l1) {
this.memoryCache.set(key, parsed, { ttl: strategy.ttl * 1000 });
}
}
});
}
}
return result;
}
// 智能快取失效
async invalidate(pattern: string): Promise<number> {
// L1 快取失效
let invalidatedCount = 0;
for (const key of this.memoryCache.keys()) {
if (key.includes(pattern)) {
this.memoryCache.delete(key);
invalidatedCount++;
}
}
// L2 快取失效
const keys = await this.redis.keys(`*${pattern}*`);
if (keys.length > 0) {
const pipeline = this.redis.pipeline();
keys.forEach(key => pipeline.del(key));
await pipeline.exec();
invalidatedCount += keys.length;
}
return invalidatedCount;
}
// 快取穿透防護
async getWithFallback<T>(
key: string,
fallbackFn: () => Promise<T>,
type: string = 'default',
preventCacheBreakdown: boolean = true
): Promise<T> {
// 嘗試從快取獲取
const cached = await this.get<T>(key, type);
if (cached !== null) {
return cached;
}
// 防止快取擊穿 - 使用分散式鎖
if (preventCacheBreakdown) {
const lockKey = `lock:${key}`;
const lockValue = Date.now().toString();
const acquired = await this.redis.set(lockKey, lockValue, 'PX', 10000, 'NX');
if (acquired === 'OK') {
try {
// 再次檢查快取(雙重檢查)
const doubleCheck = await this.get<T>(key, type);
if (doubleCheck !== null) {
return doubleCheck;
}
// 執行降級函數
const result = await fallbackFn();
// 設定快取
await this.set(key, result, type);
return result;
} finally {
// 釋放鎖
const script = `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`;
await this.redis.eval(script, 1, lockKey, lockValue);
}
} else {
// 等待鎖釋放後重試
await new Promise(resolve => setTimeout(resolve, 100));
return this.getWithFallback(key, fallbackFn, type, false);
}
} else {
const result = await fallbackFn();
await this.set(key, result, type);
return result;
}
}
// 快取效能監控
async getStats(): Promise<{
memoryCache: {
size: number;
itemCount: number;
hitRate: number;
};
redis: {
usedMemory: string;
connectedClients: number;
commandsProcessed: number;
};
}> {
const memoryCacheStats = {
size: this.memoryCache.calculatedSize,
itemCount: this.memoryCache.size,
hitRate: 0 // 需要自行實作統計
};
const redisInfo = await this.redis.info('memory');
const redisStats = await this.redis.info('stats');
return {
memoryCache: memoryCacheStats,
redis: {
usedMemory: this.parseRedisInfo(redisInfo, 'used_memory_human'),
connectedClients: parseInt(this.parseRedisInfo(redisStats, 'connected_clients')),
commandsProcessed: parseInt(this.parseRedisInfo(redisStats, 'total_commands_processed'))
}
};
}
private parseRedisInfo(info: string, key: string): string {
const lines = info.split('\r\n');
const line = lines.find(l => l.startsWith(`${key}:`));
return line ? line.split(':')[1] : '0';
}
async gracefulShutdown(): Promise<void> {
console.log('Closing cache connections...');
this.memoryCache.clear();
await this.redis.quit();
console.log('Cache connections closed.');
}
}
// src/queue/OptimizedQueue.ts
import Bull, { Queue, Job, JobOptions } from 'bull';
import { EventEmitter } from 'events';
interface TaskMetrics {
totalJobs: number;
completedJobs: number;
failedJobs: number;
activeJobs: number;
waitingJobs: number;
averageProcessingTime: number;
}
export class OptimizedQueue extends EventEmitter {
private queues: Map<string, Queue> = new Map();
private metrics: Map<string, TaskMetrics> = new Map();
private processingTimes: Map<string, number[]> = new Map();
constructor() {
super();
this.setupDefaultQueues();
}
private setupDefaultQueues(): void {
// 高優先級佇列 - 即時處理
this.createQueue('high-priority', {
concurrency: 10,
priority: 'high',
delay: 0,
removeOnComplete: 100,
removeOnFail: 50
});
// 一般佇列 - 正常處理
this.createQueue('normal', {
concurrency: 5,
priority: 'normal',
delay: 0,
removeOnComplete: 50,
removeOnFail: 25
});
// 低優先級佇列 - 批次處理
this.createQueue('low-priority', {
concurrency: 2,
priority: 'low',
delay: 5000,
removeOnComplete: 25,
removeOnFail: 10
});
// 定時任務佇列
this.createQueue('scheduled', {
concurrency: 3,
priority: 'normal',
delay: 0,
removeOnComplete: 200,
removeOnFail: 100
});
}
private createQueue(name: string, options: {
concurrency: number;
priority: string;
delay: number;
removeOnComplete: number;
removeOnFail: number;
}): void {
const queue = new Bull(name, {
redis: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
password: process.env.REDIS_PASSWORD,
db: parseInt(process.env.REDIS_DB || '1') // 使用不同的 DB
},
defaultJobOptions: {
removeOnComplete: options.removeOnComplete,
removeOnFail: options.removeOnFail,
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000
}
},
settings: {
stalledInterval: 30 * 1000, // 30 秒
maxStalledCount: 1,
retryProcessDelay: 5 * 1000, // 5 秒
}
});
// 設定處理器
queue.process(options.concurrency, this.createJobProcessor(name));
// 事件監聽
this.setupQueueEvents(queue, name);
this.queues.set(name, queue);
this.metrics.set(name, {
totalJobs: 0,
completedJobs: 0,
failedJobs: 0,
activeJobs: 0,
waitingJobs: 0,
averageProcessingTime: 0
});
this.processingTimes.set(name, []);
}
private createJobProcessor(queueName: string) {
return async (job: Job) => {
const startTime = Date.now();
try {
console.log(`Processing job ${job.id} in queue ${queueName}`);
// 根據任務類型執行不同邏輯
const result = await this.executeJob(job);
const processingTime = Date.now() - startTime;
this.recordProcessingTime(queueName, processingTime);
console.log(`Job ${job.id} completed in ${processingTime}ms`);
return result;
} catch (error) {
const processingTime = Date.now() - startTime;
console.error(`Job ${job.id} failed after ${processingTime}ms:`, error);
throw error;
}
};
}
private async executeJob(job: Job): Promise<any> {
const { type, data } = job.data;
switch (type) {
case 'send-notification':
return this.sendNotification(data);
case 'process-payment':
return this.processPayment(data);
case 'generate-report':
return this.generateReport(data);
case 'backup-data':
return this.backupData(data);
case 'cleanup-files':
return this.cleanupFiles(data);
default:
throw new Error(`Unknown job type: ${type}`);
}
}
private setupQueueEvents(queue: Queue, name: string): void {
queue.on('completed', (job: Job, result: any) => {
const metrics = this.metrics.get(name)!;
metrics.completedJobs++;
this.updateActiveJobCount(name);
this.emit('job-completed', { queueName: name, jobId: job.id, result });
});
queue.on('failed', (job: Job, error: Error) => {
const metrics = this.metrics.get(name)!;
metrics.failedJobs++;
this.updateActiveJobCount(name);
this.emit('job-failed', { queueName: name, jobId: job.id, error: error.message });
});
queue.on('active', (job: Job) => {
this.updateActiveJobCount(name);
this.emit('job-active', { queueName: name, jobId: job.id });
});
queue.on('waiting', (job: Job) => {
this.updateWaitingJobCount(name);
});
queue.on('stalled', (job: Job) => {
console.warn(`Job ${job.id} in queue ${name} has stalled`);
this.emit('job-stalled', { queueName: name, jobId: job.id });
});
}
// 添加任務到佇列
async addJob(
queueName: string,
type: string,
data: any,
options: JobOptions = {}
): Promise<Job> {
const queue = this.queues.get(queueName);
if (!queue) {
throw new Error(`Queue ${queueName} not found`);
}
const metrics = this.metrics.get(queueName)!;
metrics.totalJobs++;
const job = await queue.add({ type, data }, {
priority: this.getPriority(type),
delay: options.delay || 0,
attempts: options.attempts || 3,
...options
});
this.updateWaitingJobCount(queueName);
return job;
}
// 批次添加任務
async addBulkJobs(
queueName: string,
jobs: Array<{ type: string; data: any; options?: JobOptions }>
): Promise<Job[]> {
const queue = this.queues.get(queueName);
if (!queue) {
throw new Error(`Queue ${queueName} not found`);
}
const bulkJobs = jobs.map(job => ({
data: { type: job.type, data: job.data },
opts: {
priority: this.getPriority(job.type),
attempts: 3,
...job.options
}
}));
const result = await queue.addBulk(bulkJobs);
const metrics = this.metrics.get(queueName)!;
metrics.totalJobs += jobs.length;
this.updateWaitingJobCount(queueName);
return result;
}
// 定時任務
async scheduleJob(
type: string,
data: any,
cronExpression: string,
options: JobOptions = {}
): Promise<void> {
const queue = this.queues.get('scheduled');
if (!queue) {
throw new Error('Scheduled queue not found');
}
await queue.add(
{ type, data },
{
repeat: { cron: cronExpression },
...options
}
);
}
private getPriority(type: string): number {
const priorityMap: Record<string, number> = {
'send-notification': 1, // 最高優先級
'process-payment': 2,
'generate-report': 5,
'backup-data': 8,
'cleanup-files': 10 // 最低優先級
};
return priorityMap[type] || 5;
}
// 實作具體任務處理方法
private async sendNotification(data: any): Promise<void> {
// 模擬發送通知
await new Promise(resolve => setTimeout(resolve, 100));
console.log('Notification sent:', data);
}
private async processPayment(data: any): Promise<void> {
// 模擬處理付款
await new Promise(resolve => setTimeout(resolve, 500));
console.log('Payment processed:', data);
}
private async generateReport(data: any): Promise<void> {
// 模擬生成報表
await new Promise(resolve => setTimeout(resolve, 2000));
console.log('Report generated:', data);
}
private async backupData(data: any): Promise<void> {
// 模擬資料備份
await new Promise(resolve => setTimeout(resolve, 5000));
console.log('Data backed up:', data);
}
private async cleanupFiles(data: any): Promise<void> {
// 模擬清理檔案
await new Promise(resolve => setTimeout(resolve, 1000));
console.log('Files cleaned up:', data);
}
private recordProcessingTime(queueName: string, time: number): void {
const times = this.processingTimes.get(queueName)!;
times.push(time);
// 保持最近 100 次的處理時間
if (times.length > 100) {
times.shift();
}
const metrics = this.metrics.get(queueName)!;
metrics.averageProcessingTime = times.reduce((sum, t) => sum + t, 0) / times.length;
}
private async updateActiveJobCount(queueName: string): Promise<void> {
const queue = this.queues.get(queueName);
if (queue) {
const active = await queue.getActive();
const metrics = this.metrics.get(queueName)!;
metrics.activeJobs = active.length;
}
}
private async updateWaitingJobCount(queueName: string): Promise<void> {
const queue = this.queues.get(queueName);
if (queue) {
const waiting = await queue.getWaiting();
const metrics = this.metrics.get(queueName)!;
metrics.waitingJobs = waiting.length;
}
}
// 取得佇列統計
async getQueueStats(queueName?: string): Promise<Map<string, TaskMetrics> | TaskMetrics> {
if (queueName) {
const metrics = this.metrics.get(queueName);
if (!metrics) {
throw new Error(`Queue ${queueName} not found`);
}
await this.updateActiveJobCount(queueName);
await this.updateWaitingJobCount(queueName);
return metrics;
}
// 更新所有佇列的即時統計
for (const name of this.queues.keys()) {
await this.updateActiveJobCount(name);
await this.updateWaitingJobCount(name);
}
return new Map(this.metrics);
}
// 清理佇列
async cleanQueue(queueName: string, grace: number = 5000): Promise<void> {
const queue = this.queues.get(queueName);
if (queue) {
await queue.clean(grace, 'completed');
await queue.clean(grace, 'failed');
console.log(`Queue ${queueName} cleaned`);
}
}
// 優雅關閉
async gracefulShutdown(): Promise<void> {
console.log('Shutting down queues...');
for (const [name, queue] of this.queues) {
console.log(`Closing queue: ${name}`);
await queue.close();
}
console.log('All queues closed.');
}
}
今天我們完成了全面的後端效能優化實作: