經過 Day 25 的 JWT 與 Session 管理建置,我們已經有了完整的用戶認證機制。今天我們要為 Kyo System 實作企業級的 API 速率限制 (Rate Limiting) 與防濫用機制。在 SaaS 產品中,速率限制不僅是保護系統資源的防線,更是成本控制、公平性保障、以及商業模式的重要組成部分。
/**
* 速率限制演算法比較
*
* ┌──────────────────────────────────────────────┐
* │ Rate Limiting Algorithms │
* └──────────────────────────────────────────────┘
*
* 1. Fixed Window (固定視窗)
* ┌─────┬─────┬─────┐
* │ 5/5 │ 3/5 │ 2/5 │
* └─────┴─────┴─────┘
* 優點: 簡單、記憶體效率高
* 缺點: 視窗邊界爆發 (Burst at boundary)
* 實作: Redis INCR + EXPIRE
*
* 2. Sliding Window Log (滑動視窗日誌)
* ┌──────────────────┐
* │ [t1, t2, t3, t4] │ 保留所有時間戳
* └──────────────────┘
* 優點: 最精確
* 缺點: 記憶體消耗大
* 實作: Redis ZSET
*
* 3. Sliding Window Counter (滑動視窗計數)
* ┌─────┬─────┐
* │ Cur │ Pre │
* └─────┴─────┘
* 計算: (Pre × 剩餘比例) + Cur
* 優點: 精確且記憶體效率高
* 缺點: 實作較複雜
* 實作: Redis MULTI
*
* 4. Token Bucket (令牌桶)
* ┌─────────────┐
* │ 🪙🪙🪙🪙🪙 │ tokens
* └─────────────┘
* 優點: 允許短時間爆發
* 缺點: 需追蹤令牌數與時間
* 實作: Redis + Lua Script
*
* 5. Leaky Bucket (漏桶)
* ┌─────────────┐
* │ 💧 │
* │ 💧💧 │
* └──────┴──────┘
* 優點: 平滑流量
* 缺點: 可能延遲合法請求
* 實作: Queue + Background Worker
*
* 我們的選擇: Sliding Window Counter
* 理由: 精確度高、記憶體效率佳、適合 SaaS
*/
// packages/kyo-core/src/rate-limit/sliding-window-limiter.ts
import Redis from 'ioredis';
export interface RateLimitConfig {
limit: number; // 允許的請求數
window: number; // 時間窗口(秒)
}
export interface RateLimitResult {
allowed: boolean;
remaining: number;
resetAt: Date;
retryAfter?: number; // 需等待的秒數
}
/**
* Sliding Window Rate Limiter
*
* 演算法說明:
* 1. 維護當前視窗與前一視窗的計數
* 2. 計算當前時間在視窗中的比例
* 3. 加權計算: (前視窗 × 剩餘比例) + 當前視窗
* 4. 如果未超過限制,增加當前視窗計數
*
* 範例:
* - 限制: 10 requests/minute
* - 時間: 00:00:45 (視窗進行到 75%)
* - 前視窗 (00:00:00-00:01:00): 8 requests
* - 當前視窗 (00:01:00-00:02:00): 3 requests
* - 計算: (8 × 0.25) + 3 = 5 requests
* - 結果: 允許 (5 < 10)
*/
export class SlidingWindowRateLimiter {
private redis: Redis;
constructor(redisClient: Redis) {
this.redis = redisClient;
}
/**
* 檢查速率限制
*/
async checkLimit(
key: string,
config: RateLimitConfig
): Promise<RateLimitResult> {
const now = Date.now();
const currentWindow = Math.floor(now / 1000 / config.window);
const previousWindow = currentWindow - 1;
// Redis keys
const currentKey = `rate_limit:${key}:${currentWindow}`;
const previousKey = `rate_limit:${key}:${previousWindow}`;
// 取得計數
const [currentCount, previousCount] = await Promise.all([
this.redis.get(currentKey),
this.redis.get(previousKey),
]);
const current = parseInt(currentCount || '0', 10);
const previous = parseInt(previousCount || '0', 10);
// 計算當前時間在視窗中的位置 (0-1)
const elapsedPercentage =
((now / 1000) % config.window) / config.window;
// 滑動視窗計算
const weightedCount = previous * (1 - elapsedPercentage) + current;
const allowed = weightedCount < config.limit;
const remaining = Math.max(0, config.limit - Math.ceil(weightedCount));
// 計算重置時間
const resetAt = new Date(
(currentWindow + 1) * config.window * 1000
);
let retryAfter: number | undefined;
if (allowed) {
// 增加計數
const pipeline = this.redis.pipeline();
pipeline.incr(currentKey);
pipeline.expire(currentKey, config.window * 2); // 保留兩個視窗
await pipeline.exec();
} else {
// 計算需要等待的時間
retryAfter = Math.ceil(
(currentWindow + 1) * config.window - now / 1000
);
}
return {
allowed,
remaining,
resetAt,
retryAfter,
};
}
/**
* 重置限制(用於測試或管理)
*/
async reset(key: string): Promise<void> {
const pattern = `rate_limit:${key}:*`;
const keys = await this.redis.keys(pattern);
if (keys.length > 0) {
await this.redis.del(...keys);
}
}
/**
* 取得當前使用量
*/
async getUsage(
key: string,
config: RateLimitConfig
): Promise<{
used: number;
limit: number;
remaining: number;
resetAt: Date;
}> {
const result = await this.checkLimit(key, config);
return {
used: config.limit - result.remaining,
limit: config.limit,
remaining: result.remaining,
resetAt: result.resetAt,
};
}
}
// packages/kyo-core/src/rate-limit/token-bucket-limiter.ts
import Redis from 'ioredis';
export interface TokenBucketConfig {
capacity: number; // 桶容量(最大令牌數)
refillRate: number; // 填充速率(令牌/秒)
refillAmount?: number; // 每次填充數量(預設 1)
}
/**
* Token Bucket Rate Limiter
*
* 優勢:
* - 允許短時間爆發(如批次處理)
* - 長期平均速率受控
* - 適合 API 配額管理
*
* 使用場景:
* - 檔案上傳(允許短時大量上傳)
* - 批次 API 呼叫
* - 付費方案配額
*/
export class TokenBucketRateLimiter {
private redis: Redis;
constructor(redisClient: Redis) {
this.redis = redisClient;
}
/**
* Lua Script 實作 Token Bucket
* 確保原子性操作
*/
private readonly luaScript = `
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local refill_amount = tonumber(ARGV[3])
local now = tonumber(ARGV[4])
local requested = tonumber(ARGV[5])
-- 取得當前狀態
local state = redis.call('HMGET', key, 'tokens', 'last_refill')
local tokens = tonumber(state[1]) or capacity
local last_refill = tonumber(state[2]) or now
-- 計算需要補充的令牌
local elapsed = now - last_refill
local refills = math.floor(elapsed * refill_rate)
local new_tokens = math.min(capacity, tokens + (refills * refill_amount))
-- 檢查是否有足夠令牌
if new_tokens >= requested then
new_tokens = new_tokens - requested
redis.call('HMSET', key, 'tokens', new_tokens, 'last_refill', now)
redis.call('EXPIRE', key, 86400) -- 24 小時過期
return {1, new_tokens, capacity}
else
redis.call('HMSET', key, 'tokens', new_tokens, 'last_refill', now)
redis.call('EXPIRE', key, 86400)
return {0, new_tokens, capacity}
end
`;
/**
* 檢查並消耗令牌
*/
async consume(
key: string,
config: TokenBucketConfig,
tokens: number = 1
): Promise<{
allowed: boolean;
remaining: number;
capacity: number;
}> {
const refillAmount = config.refillAmount || 1;
const now = Date.now() / 1000;
const result = (await this.redis.eval(
this.luaScript,
1,
`token_bucket:${key}`,
config.capacity,
config.refillRate,
refillAmount,
now,
tokens
)) as [number, number, number];
return {
allowed: result[0] === 1,
remaining: result[1],
capacity: result[2],
};
}
/**
* 查詢剩餘令牌
*/
async getTokens(key: string, capacity: number): Promise<number> {
const tokens = await this.redis.hget(`token_bucket:${key}`, 'tokens');
return parseInt(tokens || String(capacity), 10);
}
/**
* 重置桶
*/
async reset(key: string): Promise<void> {
await this.redis.del(`token_bucket:${key}`);
}
}
// apps/kyo-otp-service/src/plugins/rate-limit.ts
import { FastifyPluginAsync } from 'fastify';
import fp from 'fastify-plugin';
import Redis from 'ioredis';
import {
SlidingWindowRateLimiter,
TokenBucketRateLimiter,
RateLimitConfig,
} from '@kyong/kyo-core/rate-limit';
declare module 'fastify' {
interface FastifyRequest {
rateLimit: {
remaining: number;
limit: number;
resetAt: Date;
};
}
interface FastifyInstance {
rateLimiter: SlidingWindowRateLimiter;
tokenBucket: TokenBucketRateLimiter;
}
}
interface RateLimitPluginOptions {
redis: Redis;
global?: RateLimitConfig; // 全域限制
skipOnError?: boolean; // 錯誤時跳過限制
}
const rateLimitPlugin: FastifyPluginAsync<RateLimitPluginOptions> = async (
server,
options
) => {
const slidingWindowLimiter = new SlidingWindowRateLimiter(options.redis);
const tokenBucketLimiter = new TokenBucketRateLimiter(options.redis);
// 註冊 limiter 到 Fastify instance
server.decorate('rateLimiter', slidingWindowLimiter);
server.decorate('tokenBucket', tokenBucketLimiter);
/**
* 全域速率限制 Hook
*/
if (options.global) {
server.addHook('onRequest', async (request, reply) => {
try {
// 使用 IP 作為 key
const key = `global:${request.ip}`;
const result = await slidingWindowLimiter.checkLimit(
key,
options.global!
);
// 設定 Rate Limit Headers
reply.header('X-RateLimit-Limit', options.global!.limit);
reply.header('X-RateLimit-Remaining', result.remaining);
reply.header(
'X-RateLimit-Reset',
Math.floor(result.resetAt.getTime() / 1000)
);
// 將資訊附加到 request
request.rateLimit = {
remaining: result.remaining,
limit: options.global!.limit,
resetAt: result.resetAt,
};
if (!result.allowed) {
reply.header('Retry-After', result.retryAfter!);
throw server.httpErrors.tooManyRequests(
`Rate limit exceeded. Try again in ${result.retryAfter} seconds.`
);
}
} catch (error: any) {
if (options.skipOnError && error.statusCode !== 429) {
// 如果 Redis 錯誤,跳過限制
server.log.error('Rate limit error:', error);
return;
}
throw error;
}
});
}
/**
* 路由級別速率限制裝飾器
*/
server.decorateRequest('checkRateLimit', null);
server.addHook('onRequest', async (request) => {
request.checkRateLimit = async (
key: string,
config: RateLimitConfig
) => {
const result = await slidingWindowLimiter.checkLimit(key, config);
if (!result.allowed) {
throw server.httpErrors.tooManyRequests(
`Rate limit exceeded. Try again in ${result.retryAfter} seconds.`
);
}
return result;
};
});
};
export default fp(rateLimitPlugin, {
name: 'rate-limit',
});
// apps/kyo-otp-service/src/lib/multi-tier-rate-limit.ts
import { FastifyRequest } from 'fastify';
import { SlidingWindowRateLimiter } from '@kyong/kyo-core/rate-limit';
/**
* 多層級速率限制
*
* 層級 1: 全域 (所有請求)
* 層級 2: Per-IP (每個 IP)
* 層級 3: Per-User (每個用戶)
* 層級 4: Per-Tenant (每個租戶)
* 層級 5: Per-Endpoint (每個 API 端點)
*/
export class MultiTierRateLimiter {
constructor(private limiter: SlidingWindowRateLimiter) {}
/**
* 檢查所有層級的限制
*/
async checkAll(
request: FastifyRequest,
endpoint: string
): Promise<void> {
const checks = [
// 1. 全域限制
this.checkGlobal(),
// 2. Per-IP 限制
this.checkPerIP(request.ip),
// 3. Per-User 限制 (如果已認證)
request.user
? this.checkPerUser(request.user.userId)
: Promise.resolve(),
// 4. Per-Tenant 限制
request.user
? this.checkPerTenant(request.user.tenantId)
: Promise.resolve(),
// 5. Per-Endpoint 限制
this.checkPerEndpoint(request.ip, endpoint),
];
await Promise.all(checks.filter(Boolean));
}
/**
* 全域限制: 1000 req/min
*/
private async checkGlobal(): Promise<void> {
const result = await this.limiter.checkLimit('global', {
limit: 1000,
window: 60,
});
if (!result.allowed) {
throw new Error('Global rate limit exceeded');
}
}
/**
* Per-IP 限制: 100 req/min
*/
private async checkPerIP(ip: string): Promise<void> {
const result = await this.limiter.checkLimit(`ip:${ip}`, {
limit: 100,
window: 60,
});
if (!result.allowed) {
throw new Error('IP rate limit exceeded');
}
}
/**
* Per-User 限制: 200 req/min
*/
private async checkPerUser(userId: string): Promise<void> {
const result = await this.limiter.checkLimit(`user:${userId}`, {
limit: 200,
window: 60,
});
if (!result.allowed) {
throw new Error('User rate limit exceeded');
}
}
/**
* Per-Tenant 限制: 1000 req/min
*/
private async checkPerTenant(tenantId: string): Promise<void> {
const result = await this.limiter.checkLimit(`tenant:${tenantId}`, {
limit: 1000,
window: 60,
});
if (!result.allowed) {
throw new Error('Tenant rate limit exceeded');
}
}
/**
* Per-Endpoint 限制: 依端點設定
*/
private async checkPerEndpoint(
ip: string,
endpoint: string
): Promise<void> {
// 敏感端點有更嚴格的限制
const limits: Record<string, { limit: number; window: number }> = {
'/api/auth/login': { limit: 5, window: 60 }, // 5 次/分鐘
'/api/auth/register': { limit: 3, window: 3600 }, // 3 次/小時
'/api/auth/forgot-password': { limit: 3, window: 3600 },
'/api/otp/send': { limit: 10, window: 60 },
'/api/otp/verify': { limit: 20, window: 60 },
};
const config = limits[endpoint];
if (!config) return; // 無特殊限制
const result = await this.limiter.checkLimit(
`endpoint:${ip}:${endpoint}`,
config
);
if (!result.allowed) {
throw new Error(`Rate limit exceeded for ${endpoint}`);
}
}
}
// packages/kyo-core/src/rate-limit/api-key-quota.ts
import Redis from 'ioredis';
export interface APIKeyQuota {
keyId: string;
plan: 'free' | 'basic' | 'pro' | 'enterprise';
limits: {
requestsPerMonth: number;
requestsPerMinute: number;
burstLimit: number;
};
usage: {
month: number;
minute: number;
};
}
/**
* API Key 配額管理
*
* 用途:
* - 追蹤 API Key 使用量
* - 實施方案限制
* - 計費依據
*/
export class APIKeyQuotaManager {
private redis: Redis;
// 方案限制配置
private readonly planLimits = {
free: {
requestsPerMonth: 10000,
requestsPerMinute: 10,
burstLimit: 20,
},
basic: {
requestsPerMonth: 100000,
requestsPerMinute: 50,
burstLimit: 100,
},
pro: {
requestsPerMonth: 1000000,
requestsPerMinute: 200,
burstLimit: 500,
},
enterprise: {
requestsPerMonth: -1, // 無限制
requestsPerMinute: 1000,
burstLimit: 2000,
},
};
constructor(redisClient: Redis) {
this.redis = redisClient;
}
/**
* 檢查並記錄 API Key 使用
*/
async checkAndRecord(
keyId: string,
plan: APIKeyQuota['plan']
): Promise<{
allowed: boolean;
quota: APIKeyQuota;
reason?: string;
}> {
const limits = this.planLimits[plan];
// 取得當前使用量
const [monthUsage, minuteUsage] = await Promise.all([
this.getMonthlyUsage(keyId),
this.getMinuteUsage(keyId),
]);
// 檢查月配額
if (
limits.requestsPerMonth !== -1 &&
monthUsage >= limits.requestsPerMonth
) {
return {
allowed: false,
quota: {
keyId,
plan,
limits,
usage: { month: monthUsage, minute: minuteUsage },
},
reason: 'Monthly quota exceeded',
};
}
// 檢查分鐘速率
if (minuteUsage >= limits.requestsPerMinute) {
// 檢查是否在爆發限制內
if (minuteUsage >= limits.burstLimit) {
return {
allowed: false,
quota: {
keyId,
plan,
limits,
usage: { month: monthUsage, minute: minuteUsage },
},
reason: 'Rate limit exceeded',
};
}
}
// 記錄使用
await this.recordUsage(keyId);
return {
allowed: true,
quota: {
keyId,
plan,
limits,
usage: { month: monthUsage + 1, minute: minuteUsage + 1 },
},
};
}
/**
* 取得月使用量
*/
private async getMonthlyUsage(keyId: string): Promise<number> {
const month = this.getCurrentMonth();
const key = `api_key:${keyId}:month:${month}`;
const usage = await this.redis.get(key);
return parseInt(usage || '0', 10);
}
/**
* 取得分鐘使用量
*/
private async getMinuteUsage(keyId: string): Promise<number> {
const minute = this.getCurrentMinute();
const key = `api_key:${keyId}:minute:${minute}`;
const usage = await this.redis.get(key);
return parseInt(usage || '0', 10);
}
/**
* 記錄使用
*/
private async recordUsage(keyId: string): Promise<void> {
const month = this.getCurrentMonth();
const minute = this.getCurrentMinute();
const pipeline = this.redis.pipeline();
// 增加月計數
const monthKey = `api_key:${keyId}:month:${month}`;
pipeline.incr(monthKey);
pipeline.expire(monthKey, 31 * 24 * 60 * 60); // 31 天
// 增加分鐘計數
const minuteKey = `api_key:${keyId}:minute:${minute}`;
pipeline.incr(minuteKey);
pipeline.expire(minuteKey, 120); // 2 分鐘
await pipeline.exec();
}
/**
* 取得配額資訊
*/
async getQuota(
keyId: string,
plan: APIKeyQuota['plan']
): Promise<APIKeyQuota> {
const limits = this.planLimits[plan];
const [monthUsage, minuteUsage] = await Promise.all([
this.getMonthlyUsage(keyId),
this.getMinuteUsage(keyId),
]);
return {
keyId,
plan,
limits,
usage: { month: monthUsage, minute: minuteUsage },
};
}
/**
* 重置月配額(計費週期結束時)
*/
async resetMonthlyQuota(keyId: string): Promise<void> {
const month = this.getCurrentMonth();
const key = `api_key:${keyId}:month:${month}`;
await this.redis.del(key);
}
private getCurrentMonth(): string {
const now = new Date();
return `${now.getFullYear()}-${String(now.getMonth() + 1).padStart(2, '0')}`;
}
private getCurrentMinute(): string {
return String(Math.floor(Date.now() / 1000 / 60));
}
}
// apps/kyo-otp-service/src/lib/rate-limit-monitor.ts
import { FastifyInstance } from 'fastify';
import { SlidingWindowRateLimiter } from '@kyong/kyo-core/rate-limit';
/**
* 速率限制監控
*
* 功能:
* - 追蹤限制觸發頻率
* - 偵測 DDoS 攻擊
* - 自動告警
*/
export class RateLimitMonitor {
private limiter: SlidingWindowRateLimiter;
private redis: any;
constructor(
private server: FastifyInstance,
limiter: SlidingWindowRateLimiter,
redis: any
) {
this.limiter = limiter;
this.redis = redis;
}
/**
* 記錄限制觸發
*/
async recordLimitHit(
ip: string,
endpoint: string,
userId?: string
): Promise<void> {
const timestamp = Date.now();
// 記錄到 Redis
const key = `rate_limit_hits:${ip}`;
await this.redis.zadd(key, timestamp, `${endpoint}:${timestamp}`);
await this.redis.expire(key, 3600); // 1 小時
// 檢查是否需要告警
const recentHits = await this.redis.zcount(
key,
timestamp - 60000, // 最近 1 分鐘
timestamp
);
if (recentHits >= 10) {
// 1 分鐘內超過 10 次限制觸發
this.triggerAlert({
type: 'high_rate_limit_hits',
ip,
endpoint,
userId,
count: recentHits,
});
}
// Log 到應用程式日誌
this.server.log.warn({
event: 'rate_limit_hit',
ip,
endpoint,
userId,
timestamp: new Date(timestamp),
});
}
/**
* 偵測 DDoS 攻擊
*/
async detectDDoS(): Promise<{
detected: boolean;
suspects: Array<{ ip: string; hits: number }>;
}> {
const now = Date.now();
const fiveMinutesAgo = now - 5 * 60 * 1000;
// 取得所有 IP 的限制觸發記錄
const pattern = 'rate_limit_hits:*';
const keys = await this.redis.keys(pattern);
const suspects: Array<{ ip: string; hits: number }> = [];
for (const key of keys) {
const ip = key.replace('rate_limit_hits:', '');
const hits = await this.redis.zcount(key, fiveMinutesAgo, now);
// 5 分鐘內超過 50 次限制觸發視為可疑
if (hits >= 50) {
suspects.push({ ip, hits });
}
}
const detected = suspects.length > 0;
if (detected) {
this.triggerAlert({
type: 'ddos_detected',
suspects,
});
}
return { detected, suspects };
}
/**
* 取得速率限制統計
*/
async getStatistics(period: 'hour' | 'day' | 'week'): Promise<{
totalHits: number;
uniqueIPs: number;
topEndpoints: Array<{ endpoint: string; hits: number }>;
topIPs: Array<{ ip: string; hits: number }>;
}> {
const now = Date.now();
const periodMs = {
hour: 60 * 60 * 1000,
day: 24 * 60 * 60 * 1000,
week: 7 * 24 * 60 * 60 * 1000,
}[period];
const startTime = now - periodMs;
// 取得所有記錄
const pattern = 'rate_limit_hits:*';
const keys = await this.redis.keys(pattern);
let totalHits = 0;
const endpointCounts: Record<string, number> = {};
const ipCounts: Record<string, number> = {};
for (const key of keys) {
const ip = key.replace('rate_limit_hits:', '');
const hits = await this.redis.zrangebyscore(
key,
startTime,
now,
'WITHSCORES'
);
if (hits.length > 0) {
const hitCount = hits.length / 2; // WITHSCORES 返回雙倍數量
totalHits += hitCount;
ipCounts[ip] = (ipCounts[ip] || 0) + hitCount;
// 解析端點
for (let i = 0; i < hits.length; i += 2) {
const [endpoint] = hits[i].split(':');
endpointCounts[endpoint] = (endpointCounts[endpoint] || 0) + 1;
}
}
}
// 排序
const topEndpoints = Object.entries(endpointCounts)
.sort((a, b) => b[1] - a[1])
.slice(0, 10)
.map(([endpoint, hits]) => ({ endpoint, hits }));
const topIPs = Object.entries(ipCounts)
.sort((a, b) => b[1] - a[1])
.slice(0, 10)
.map(([ip, hits]) => ({ ip, hits }));
return {
totalHits,
uniqueIPs: Object.keys(ipCounts).length,
topEndpoints,
topIPs,
};
}
/**
* 觸發告警
*/
private triggerAlert(data: any): void {
// 記錄到日誌
this.server.log.error({
alert: 'rate_limit_alert',
...data,
});
// TODO: 發送到監控系統(如 DataDog, Sentry)
// TODO: 發送 Slack/Email 通知
// TODO: 自動封鎖 IP(如果需要)
}
/**
* 定期執行 DDoS 偵測
*/
startMonitoring(intervalMs: number = 60000): void {
setInterval(async () => {
try {
await this.detectDDoS();
} catch (error) {
this.server.log.error('DDoS detection failed:', error);
}
}, intervalMs);
}
}
我們今天完成了 Kyo System 的企業級速率限制系統:
為什麼選擇 Sliding Window?:
Token Bucket vs Leaky Bucket:
多層級限制的必要性:
API Key 配額計費: