在過去的 Day 24-28 中,我們建構了 Kyo System 後端的核心架構:
今天是倒數第二天,我們將把所有服務整合在一起,完成:
讓我們將 Kyo System 後端推向企業級、可擴展的微服務架構!
┌─────────────────────────────────────────────────────────────────┐
│ API Gateway (Fastify) │
│ - Routing - Auth - Rate Limiting │
└────────┬────────────────────────────────────────────────────┬────┘
│ │
│ │
┌────────▼──────────┐ ┌──────────────┐ ┌──────────────────▼────┐
│ Auth Service │ │ OTP Service │ │ Notification Service │
│ - JWT │ │ - Send OTP │ │ - Email │
│ - Users │ │ - Verify │ │ - SMS │
│ - RBAC │ │ - Logs │ │ - WebSocket │
└────────┬──────────┘ └──────┬───────┘ └──────┬───────────────┘
│ │ │
│ │ │
┌────────▼─────────────────────▼──────────────────▼───────────────┐
│ Event Bus (Redis Pub/Sub) │
│ - user.created - otp.sent - otp.verified - limit.exceeded│
└────────┬─────────────────────┬──────────────────┬───────────────┘
│ │ │
│ │ │
┌────────▼──────────┐ ┌──────▼───────┐ ┌──────▼────────────────┐
│ PostgreSQL │ │ Redis Cache │ │ BullMQ Workers │
│ - Users │ │ - Sessions │ │ - SMS Queue │
│ - OTP Logs │ │ - Rate Limit │ │ - Email Queue │
│ - Audit Logs │ │ - OTP Codes │ │ - Analytics Queue │
└───────────────────┘ └──────────────┘ └───────────────────────┘
│ │ │
│ │ │
┌────────▼─────────────────────▼──────────────────▼───────────────┐
│ Observability Stack │
│ - Metrics (Prometheus) - Logs (Loki) - Traces (Jaeger) │
└──────────────────────────────────────────────────────────────────┘
依照業務領域進行服務拆分(Domain-Driven Design):
// packages/kyo-core/src/types/service.types.ts
/**
* Service Registry - 定義所有微服務
*/
export interface ServiceDefinition {
name: string;
version: string;
baseUrl: string;
healthCheck: string;
dependencies: string[];
capabilities: string[];
}
export const SERVICES = {
AUTH: {
name: 'auth-service',
version: '1.0.0',
baseUrl: process.env.AUTH_SERVICE_URL || 'http://localhost:3001',
healthCheck: '/health',
dependencies: ['database', 'redis'],
capabilities: ['login', 'register', 'verify-token', 'refresh-token'],
},
OTP: {
name: 'otp-service',
version: '1.0.0',
baseUrl: process.env.OTP_SERVICE_URL || 'http://localhost:3002',
healthCheck: '/health',
dependencies: ['database', 'redis', 'sms-provider'],
capabilities: ['send-otp', 'verify-otp', 'get-logs'],
},
NOTIFICATION: {
name: 'notification-service',
version: '1.0.0',
baseUrl: process.env.NOTIFICATION_SERVICE_URL || 'http://localhost:3003',
healthCheck: '/health',
dependencies: ['redis', 'email-provider', 'sms-provider'],
capabilities: ['send-email', 'send-sms', 'websocket-notify'],
},
ANALYTICS: {
name: 'analytics-service',
version: '1.0.0',
baseUrl: process.env.ANALYTICS_SERVICE_URL || 'http://localhost:3004',
healthCheck: '/health',
dependencies: ['database', 'redis'],
capabilities: ['track-event', 'generate-report', 'get-stats'],
},
} as const;
export type ServiceName = keyof typeof SERVICES;
實作服務發現與健康檢查:
// packages/kyo-core/src/services/service-registry.ts
import axios, { AxiosInstance } from 'axios';
import { SERVICES, ServiceDefinition, ServiceName } from '../types/service.types';
export class ServiceRegistry {
private services: Map<ServiceName, ServiceClient> = new Map();
private healthCheckInterval: NodeJS.Timeout | null = null;
constructor() {
this.initializeServices();
}
/**
* Initialize all service clients
*/
private initializeServices(): void {
for (const [name, config] of Object.entries(SERVICES)) {
const client = new ServiceClient(config);
this.services.set(name as ServiceName, client);
}
}
/**
* Get service client by name
*/
getService(name: ServiceName): ServiceClient {
const service = this.services.get(name);
if (!service) {
throw new Error(`Service ${name} not found`);
}
return service;
}
/**
* Start health check monitoring
*/
startHealthChecks(intervalMs: number = 30000): void {
this.healthCheckInterval = setInterval(async () => {
for (const [name, client] of this.services) {
try {
const isHealthy = await client.healthCheck();
if (!isHealthy) {
console.error(`[ServiceRegistry] ${name} is unhealthy`);
}
} catch (error) {
console.error(`[ServiceRegistry] ${name} health check failed:`, error);
}
}
}, intervalMs);
}
/**
* Stop health check monitoring
*/
stopHealthChecks(): void {
if (this.healthCheckInterval) {
clearInterval(this.healthCheckInterval);
this.healthCheckInterval = null;
}
}
/**
* Get all services status
*/
async getAllServicesStatus(): Promise<Record<ServiceName, boolean>> {
const statuses: Partial<Record<ServiceName, boolean>> = {};
await Promise.all(
Array.from(this.services.entries()).map(async ([name, client]) => {
try {
statuses[name] = await client.healthCheck();
} catch {
statuses[name] = false;
}
})
);
return statuses as Record<ServiceName, boolean>;
}
}
/**
* Service Client - Handle HTTP communication with services
*/
export class ServiceClient {
private client: AxiosInstance;
private config: ServiceDefinition;
constructor(config: ServiceDefinition) {
this.config = config;
this.client = axios.create({
baseURL: config.baseUrl,
timeout: 10000,
headers: {
'Content-Type': 'application/json',
'X-Service-Name': config.name,
'X-Service-Version': config.version,
},
});
// Add request interceptor for logging
this.client.interceptors.request.use(
(config) => {
console.log(
`[ServiceClient] ${this.config.name} -> ${config.method?.toUpperCase()} ${config.url}`
);
return config;
},
(error) => {
console.error(`[ServiceClient] ${this.config.name} request error:`, error);
return Promise.reject(error);
}
);
// Add response interceptor for error handling
this.client.interceptors.response.use(
(response) => response,
(error) => {
console.error(
`[ServiceClient] ${this.config.name} response error:`,
error.message
);
return Promise.reject(error);
}
);
}
/**
* Perform health check
*/
async healthCheck(): Promise<boolean> {
try {
const response = await this.client.get(this.config.healthCheck);
return response.status === 200;
} catch {
return false;
}
}
/**
* Generic GET request
*/
async get<T = any>(path: string, params?: Record<string, any>): Promise<T> {
const response = await this.client.get<T>(path, { params });
return response.data;
}
/**
* Generic POST request
*/
async post<T = any>(path: string, data?: any): Promise<T> {
const response = await this.client.post<T>(path, data);
return response.data;
}
/**
* Generic PUT request
*/
async put<T = any>(path: string, data?: any): Promise<T> {
const response = await this.client.put<T>(path, data);
return response.data;
}
/**
* Generic DELETE request
*/
async delete<T = any>(path: string): Promise<T> {
const response = await this.client.delete<T>(path);
return response.data;
}
/**
* Set authentication token
*/
setAuthToken(token: string): void {
this.client.defaults.headers.common['Authorization'] = `Bearer ${token}`;
}
}
// Singleton instance
export const serviceRegistry = new ServiceRegistry();
建立統一的事件總線:
// packages/kyo-core/src/events/event-bus.ts
import { Redis } from 'ioredis';
import { EventEmitter } from 'events';
/**
* Domain Events Definition
*/
export interface DomainEvent {
id: string;
type: string;
timestamp: string;
aggregateId: string;
aggregateType: string;
version: number;
payload: Record<string, any>;
metadata?: {
userId?: string;
tenantId?: string;
correlationId?: string;
causationId?: string;
};
}
export type EventHandler<T = any> = (event: DomainEvent<T>) => Promise<void>;
/**
* Event Bus - Redis Pub/Sub Implementation
*/
export class EventBus extends EventEmitter {
private publisher: Redis;
private subscriber: Redis;
private handlers: Map<string, EventHandler[]> = new Map();
constructor(redisUrl: string) {
super();
this.publisher = new Redis(redisUrl);
this.subscriber = new Redis(redisUrl);
this.setupSubscriber();
}
/**
* Setup Redis subscriber
*/
private setupSubscriber(): void {
this.subscriber.on('message', async (channel, message) => {
try {
const event: DomainEvent = JSON.parse(message);
await this.handleEvent(event);
} catch (error) {
console.error('[EventBus] Failed to parse event:', error);
}
});
}
/**
* Publish event to Redis
*/
async publish(event: DomainEvent): Promise<void> {
const channel = `events:${event.type}`;
console.log(`[EventBus] Publishing event: ${event.type}`, {
id: event.id,
aggregateId: event.aggregateId,
});
// Publish to Redis channel
await this.publisher.publish(channel, JSON.stringify(event));
// Also store in event stream (for event sourcing)
await this.storeEvent(event);
}
/**
* Subscribe to event type
*/
async subscribe(eventType: string, handler: EventHandler): Promise<void> {
const channel = `events:${eventType}`;
// Add handler to local registry
if (!this.handlers.has(eventType)) {
this.handlers.set(eventType, []);
}
this.handlers.get(eventType)!.push(handler);
// Subscribe to Redis channel
await this.subscriber.subscribe(channel);
console.log(`[EventBus] Subscribed to: ${eventType}`);
}
/**
* Unsubscribe from event type
*/
async unsubscribe(eventType: string): Promise<void> {
const channel = `events:${eventType}`;
this.handlers.delete(eventType);
await this.subscriber.unsubscribe(channel);
console.log(`[EventBus] Unsubscribed from: ${eventType}`);
}
/**
* Handle incoming event
*/
private async handleEvent(event: DomainEvent): Promise<void> {
const handlers = this.handlers.get(event.type) || [];
console.log(`[EventBus] Handling event: ${event.type}`, {
handlers: handlers.length,
});
// Execute all handlers in parallel
await Promise.all(
handlers.map(async (handler) => {
try {
await handler(event);
} catch (error) {
console.error(
`[EventBus] Handler failed for ${event.type}:`,
error
);
// Publish failed event for retry/dead letter queue
await this.publishFailedEvent(event, error);
}
})
);
}
/**
* Store event for event sourcing
*/
private async storeEvent(event: DomainEvent): Promise<void> {
const streamKey = `event-stream:${event.aggregateType}:${event.aggregateId}`;
await this.publisher.xadd(
streamKey,
'*',
'event',
JSON.stringify(event)
);
}
/**
* Publish failed event
*/
private async publishFailedEvent(event: DomainEvent, error: any): Promise<void> {
const failedEvent: DomainEvent = {
id: `${event.id}-failed`,
type: 'event.failed',
timestamp: new Date().toISOString(),
aggregateId: event.id,
aggregateType: 'Event',
version: 1,
payload: {
originalEvent: event,
error: {
message: error.message,
stack: error.stack,
},
},
};
await this.publisher.publish(
'events:event.failed',
JSON.stringify(failedEvent)
);
}
/**
* Get event history for aggregate
*/
async getEventHistory(
aggregateType: string,
aggregateId: string
): Promise<DomainEvent[]> {
const streamKey = `event-stream:${aggregateType}:${aggregateId}`;
const events = await this.publisher.xrange(streamKey, '-', '+');
return events.map((entry) => {
const [, fields] = entry;
return JSON.parse(fields[1]);
});
}
/**
* Close connections
*/
async close(): Promise<void> {
await this.publisher.quit();
await this.subscriber.quit();
}
}
定義業務領域事件:
// packages/kyo-core/src/events/domain-events.ts
import { DomainEvent } from './event-bus';
/**
* User Events
*/
export interface UserCreatedEvent extends DomainEvent {
type: 'user.created';
payload: {
userId: string;
email: string;
tenantId: string;
role: string;
};
}
export interface UserUpdatedEvent extends DomainEvent {
type: 'user.updated';
payload: {
userId: string;
changes: Record<string, any>;
};
}
/**
* OTP Events
*/
export interface OTPSentEvent extends DomainEvent {
type: 'otp.sent';
payload: {
requestId: string;
userId: string;
tenantId: string;
phoneNumber: string;
expiresAt: string;
};
}
export interface OTPVerifiedEvent extends DomainEvent {
type: 'otp.verified';
payload: {
requestId: string;
userId: string;
tenantId: string;
phoneNumber: string;
verifiedAt: string;
};
}
export interface OTPFailedEvent extends DomainEvent {
type: 'otp.failed';
payload: {
requestId: string;
userId: string;
phoneNumber: string;
reason: string;
attemptsRemaining: number;
};
}
/**
* Rate Limit Events
*/
export interface RateLimitExceededEvent extends DomainEvent {
type: 'rate-limit.exceeded';
payload: {
userId: string;
tenantId: string;
endpoint: string;
limit: number;
window: number;
resetAt: string;
};
}
/**
* Notification Events
*/
export interface NotificationSentEvent extends DomainEvent {
type: 'notification.sent';
payload: {
notificationId: string;
userId: string;
type: 'email' | 'sms' | 'websocket';
status: 'success' | 'failed';
};
}
/**
* Event Creators
*/
export function createUserCreatedEvent(
userId: string,
email: string,
tenantId: string,
role: string,
metadata?: any
): UserCreatedEvent {
return {
id: `user-created-${Date.now()}`,
type: 'user.created',
timestamp: new Date().toISOString(),
aggregateId: userId,
aggregateType: 'User',
version: 1,
payload: { userId, email, tenantId, role },
metadata,
};
}
export function createOTPSentEvent(
requestId: string,
userId: string,
tenantId: string,
phoneNumber: string,
expiresAt: string,
metadata?: any
): OTPSentEvent {
return {
id: `otp-sent-${requestId}`,
type: 'otp.sent',
timestamp: new Date().toISOString(),
aggregateId: requestId,
aggregateType: 'OTP',
version: 1,
payload: { requestId, userId, tenantId, phoneNumber, expiresAt },
metadata,
};
}
export function createOTPVerifiedEvent(
requestId: string,
userId: string,
tenantId: string,
phoneNumber: string,
verifiedAt: string,
metadata?: any
): OTPVerifiedEvent {
return {
id: `otp-verified-${requestId}`,
type: 'otp.verified',
timestamp: new Date().toISOString(),
aggregateId: requestId,
aggregateType: 'OTP',
version: 1,
payload: { requestId, userId, tenantId, phoneNumber, verifiedAt },
metadata,
};
}
export function createRateLimitExceededEvent(
userId: string,
tenantId: string,
endpoint: string,
limit: number,
window: number,
resetAt: string,
metadata?: any
): RateLimitExceededEvent {
return {
id: `rate-limit-${Date.now()}`,
type: 'rate-limit.exceeded',
timestamp: new Date().toISOString(),
aggregateId: userId,
aggregateType: 'RateLimit',
version: 1,
payload: { userId, tenantId, endpoint, limit, window, resetAt },
metadata,
};
}
註冊事件處理器:
// apps/kyo-otp-service/src/events/handlers.ts
import { EventBus } from '@kyong/kyo-core';
import { NotificationService } from '../services/notification.service';
import { AnalyticsService } from '../services/analytics.service';
export async function registerEventHandlers(
eventBus: EventBus,
notificationService: NotificationService,
analyticsService: AnalyticsService
): Promise<void> {
// Handle OTP sent events
await eventBus.subscribe('otp.sent', async (event) => {
console.log('[EventHandler] Handling otp.sent', event);
const { userId, tenantId, phoneNumber, expiresAt } = event.payload;
// Send notification to user
await notificationService.notifyOTPSent(userId, tenantId, {
requestId: event.aggregateId,
phoneNumber,
expiresAt,
});
// Track analytics
await analyticsService.trackEvent('otp_sent', {
userId,
tenantId,
timestamp: event.timestamp,
});
});
// Handle OTP verified events
await eventBus.subscribe('otp.verified', async (event) => {
console.log('[EventHandler] Handling otp.verified', event);
const { userId, tenantId, phoneNumber, verifiedAt } = event.payload;
// Send success notification
await notificationService.notifyOTPVerified(userId, tenantId, {
requestId: event.aggregateId,
phoneNumber,
verifiedAt,
});
// Track analytics
await analyticsService.trackEvent('otp_verified', {
userId,
tenantId,
timestamp: event.timestamp,
});
// Update user verification status
// await userService.markPhoneVerified(userId, phoneNumber);
});
// Handle rate limit exceeded events
await eventBus.subscribe('rate-limit.exceeded', async (event) => {
console.log('[EventHandler] Handling rate-limit.exceeded', event);
const { userId, tenantId, endpoint, limit, resetAt } = event.payload;
// Send warning notification
await notificationService.notifyRateLimitExceeded(userId, tenantId, {
endpoint,
limit,
resetAt,
});
// Track for security monitoring
await analyticsService.trackEvent('rate_limit_exceeded', {
userId,
tenantId,
endpoint,
timestamp: event.timestamp,
});
});
// Handle user created events
await eventBus.subscribe('user.created', async (event) => {
console.log('[EventHandler] Handling user.created', event);
const { userId, email, tenantId } = event.payload;
// Send welcome email
await notificationService.sendEmail({
to: email,
subject: '歡迎加入 Kyo System',
template: 'welcome',
data: { userId, email },
});
// Initialize user quota
// await quotaService.initializeUserQuota(userId, tenantId);
});
console.log('[EventHandlers] All event handlers registered');
}
實作 Saga 編排器處理分散式交易:
// packages/kyo-core/src/saga/saga-orchestrator.ts
import { EventBus, DomainEvent } from '../events/event-bus';
import { Redis } from 'ioredis';
export interface SagaStep {
name: string;
execute: (data: any) => Promise<any>;
compensate: (data: any) => Promise<void>;
}
export interface SagaDefinition {
id: string;
name: string;
steps: SagaStep[];
}
export interface SagaExecution {
sagaId: string;
definitionId: string;
status: 'pending' | 'running' | 'completed' | 'failed' | 'compensating' | 'compensated';
currentStep: number;
data: any;
results: any[];
error?: string;
startedAt: string;
completedAt?: string;
}
/**
* Saga Orchestrator - Coordinate distributed transactions
*/
export class SagaOrchestrator {
private redis: Redis;
private eventBus: EventBus;
private definitions: Map<string, SagaDefinition> = new Map();
constructor(redisUrl: string, eventBus: EventBus) {
this.redis = new Redis(redisUrl);
this.eventBus = eventBus;
}
/**
* Register saga definition
*/
registerSaga(definition: SagaDefinition): void {
this.definitions.set(definition.id, definition);
console.log(`[SagaOrchestrator] Registered saga: ${definition.name}`);
}
/**
* Start saga execution
*/
async startSaga(definitionId: string, initialData: any): Promise<string> {
const definition = this.definitions.get(definitionId);
if (!definition) {
throw new Error(`Saga definition not found: ${definitionId}`);
}
const sagaId = `saga-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
const execution: SagaExecution = {
sagaId,
definitionId,
status: 'pending',
currentStep: 0,
data: initialData,
results: [],
startedAt: new Date().toISOString(),
};
// Store saga execution state
await this.saveSagaExecution(execution);
// Publish saga started event
await this.eventBus.publish({
id: `${sagaId}-started`,
type: 'saga.started',
timestamp: new Date().toISOString(),
aggregateId: sagaId,
aggregateType: 'Saga',
version: 1,
payload: {
sagaId,
definitionId: definition.id,
name: definition.name,
},
});
// Execute saga asynchronously
this.executeSaga(sagaId).catch((error) => {
console.error(`[SagaOrchestrator] Saga execution failed: ${sagaId}`, error);
});
return sagaId;
}
/**
* Execute saga steps
*/
private async executeSaga(sagaId: string): Promise<void> {
let execution = await this.loadSagaExecution(sagaId);
const definition = this.definitions.get(execution.definitionId)!;
execution.status = 'running';
await this.saveSagaExecution(execution);
try {
// Execute each step sequentially
for (let i = execution.currentStep; i < definition.steps.length; i++) {
const step = definition.steps[i];
execution.currentStep = i;
console.log(`[SagaOrchestrator] Executing step: ${step.name}`, {
sagaId,
step: i + 1,
total: definition.steps.length,
});
// Execute step
const result = await step.execute(execution.data);
execution.results.push(result);
// Update data for next step
execution.data = { ...execution.data, [`step${i}Result`]: result };
// Save progress
await this.saveSagaExecution(execution);
// Publish step completed event
await this.eventBus.publish({
id: `${sagaId}-step-${i}-completed`,
type: 'saga.step.completed',
timestamp: new Date().toISOString(),
aggregateId: sagaId,
aggregateType: 'Saga',
version: i + 1,
payload: {
sagaId,
stepName: step.name,
stepIndex: i,
result,
},
});
}
// Saga completed successfully
execution.status = 'completed';
execution.completedAt = new Date().toISOString();
await this.saveSagaExecution(execution);
// Publish saga completed event
await this.eventBus.publish({
id: `${sagaId}-completed`,
type: 'saga.completed',
timestamp: new Date().toISOString(),
aggregateId: sagaId,
aggregateType: 'Saga',
version: definition.steps.length + 1,
payload: {
sagaId,
results: execution.results,
},
});
console.log(`[SagaOrchestrator] Saga completed: ${sagaId}`);
} catch (error) {
console.error(`[SagaOrchestrator] Saga step failed: ${sagaId}`, error);
execution.status = 'failed';
execution.error = error.message;
await this.saveSagaExecution(execution);
// Start compensation
await this.compensateSaga(sagaId);
}
}
/**
* Compensate saga (rollback)
*/
private async compensateSaga(sagaId: string): Promise<void> {
let execution = await this.loadSagaExecution(sagaId);
const definition = this.definitions.get(execution.definitionId)!;
execution.status = 'compensating';
await this.saveSagaExecution(execution);
console.log(`[SagaOrchestrator] Starting compensation: ${sagaId}`);
// Compensate steps in reverse order
for (let i = execution.currentStep; i >= 0; i--) {
const step = definition.steps[i];
try {
console.log(`[SagaOrchestrator] Compensating step: ${step.name}`, {
sagaId,
step: i + 1,
});
await step.compensate(execution.data);
// Publish compensation event
await this.eventBus.publish({
id: `${sagaId}-step-${i}-compensated`,
type: 'saga.step.compensated',
timestamp: new Date().toISOString(),
aggregateId: sagaId,
aggregateType: 'Saga',
version: i + 1,
payload: {
sagaId,
stepName: step.name,
stepIndex: i,
},
});
} catch (error) {
console.error(
`[SagaOrchestrator] Compensation failed for step: ${step.name}`,
error
);
// Continue with other compensations
}
}
execution.status = 'compensated';
execution.completedAt = new Date().toISOString();
await this.saveSagaExecution(execution);
// Publish saga compensated event
await this.eventBus.publish({
id: `${sagaId}-compensated`,
type: 'saga.compensated',
timestamp: new Date().toISOString(),
aggregateId: sagaId,
aggregateType: 'Saga',
version: definition.steps.length + 2,
payload: {
sagaId,
error: execution.error,
},
});
console.log(`[SagaOrchestrator] Saga compensated: ${sagaId}`);
}
/**
* Get saga status
*/
async getSagaStatus(sagaId: string): Promise<SagaExecution> {
return await this.loadSagaExecution(sagaId);
}
/**
* Save saga execution state to Redis
*/
private async saveSagaExecution(execution: SagaExecution): Promise<void> {
const key = `saga:${execution.sagaId}`;
await this.redis.set(key, JSON.stringify(execution), 'EX', 86400); // 24 hours TTL
}
/**
* Load saga execution state from Redis
*/
private async loadSagaExecution(sagaId: string): Promise<SagaExecution> {
const key = `saga:${sagaId}`;
const data = await this.redis.get(key);
if (!data) {
throw new Error(`Saga execution not found: ${sagaId}`);
}
return JSON.parse(data);
}
/**
* Close connections
*/
async close(): Promise<void> {
await this.redis.quit();
}
}
實作用戶註冊的 Saga 流程:
// apps/kyo-otp-service/src/sagas/user-registration.saga.ts
import { SagaOrchestrator, SagaDefinition, SagaStep } from '@kyong/kyo-core';
import { ServiceRegistry } from '@kyong/kyo-core';
export interface UserRegistrationData {
email: string;
password: string;
phoneNumber: string;
otp: string;
}
/**
* User Registration Saga
* Steps:
* 1. Create user account
* 2. Send OTP
* 3. Verify OTP
* 4. Activate account
*/
export function createUserRegistrationSaga(
serviceRegistry: ServiceRegistry
): SagaDefinition {
const authService = serviceRegistry.getService('AUTH');
const otpService = serviceRegistry.getService('OTP');
const steps: SagaStep[] = [
// Step 1: Create user account (pending status)
{
name: 'create-user',
execute: async (data: UserRegistrationData) => {
console.log('[Saga] Creating user account');
const result = await authService.post('/api/users', {
email: data.email,
password: data.password,
status: 'pending',
});
return { userId: result.id };
},
compensate: async (data: any) => {
console.log('[Saga] Compensating: Delete user account');
if (data.step0Result?.userId) {
await authService.delete(`/api/users/${data.step0Result.userId}`);
}
},
},
// Step 2: Send OTP
{
name: 'send-otp',
execute: async (data: any) => {
console.log('[Saga] Sending OTP');
const { userId } = data.step0Result;
const result = await otpService.post('/api/otp/send', {
userId,
phoneNumber: data.phoneNumber,
});
return { requestId: result.requestId };
},
compensate: async (data: any) => {
console.log('[Saga] Compensating: Cancel OTP request');
if (data.step1Result?.requestId) {
await otpService.delete(`/api/otp/${data.step1Result.requestId}`);
}
},
},
// Step 3: Verify OTP
{
name: 'verify-otp',
execute: async (data: any) => {
console.log('[Saga] Verifying OTP');
const { requestId } = data.step1Result;
const result = await otpService.post('/api/otp/verify', {
requestId,
otp: data.otp,
});
if (!result.valid) {
throw new Error('Invalid OTP');
}
return { verified: true };
},
compensate: async (data: any) => {
console.log('[Saga] Compensating: Mark OTP as invalid');
// No compensation needed for verification
},
},
// Step 4: Activate account
{
name: 'activate-account',
execute: async (data: any) => {
console.log('[Saga] Activating user account');
const { userId } = data.step0Result;
await authService.put(`/api/users/${userId}`, {
status: 'active',
phoneNumber: data.phoneNumber,
phoneVerified: true,
});
return { activated: true };
},
compensate: async (data: any) => {
console.log('[Saga] Compensating: Deactivate account');
if (data.step0Result?.userId) {
await authService.put(`/api/users/${data.step0Result.userId}`, {
status: 'pending',
phoneVerified: false,
});
}
},
},
];
return {
id: 'user-registration-saga',
name: 'User Registration with OTP Verification',
steps,
};
}
/**
* Start user registration saga
*/
export async function startUserRegistration(
orchestrator: SagaOrchestrator,
data: UserRegistrationData
): Promise<string> {
return await orchestrator.startSaga('user-registration-saga', data);
}
建立統一的 API Gateway:
// apps/kyo-api-gateway/src/app.ts
import Fastify from 'fastify';
import cors from '@fastify/cors';
import helmet from '@fastify/helmet';
import rateLimit from '@fastify/rate-limit';
import httpProxy from '@fastify/http-proxy';
import { ServiceRegistry } from '@kyong/kyo-core';
import { jwtMiddleware } from './middleware/jwt';
import { SERVICES } from '@kyong/kyo-core';
const app = Fastify({
logger: {
level: process.env.LOG_LEVEL || 'info',
},
});
// Security
await app.register(helmet);
await app.register(cors, {
origin: process.env.CORS_ORIGIN || 'http://localhost:5173',
credentials: true,
});
// Rate Limiting
await app.register(rateLimit, {
max: 100,
timeWindow: '1 minute',
});
// Service Registry
const serviceRegistry = new ServiceRegistry();
serviceRegistry.startHealthChecks();
// JWT Middleware for protected routes
app.addHook('preHandler', jwtMiddleware);
// Health Check
app.get('/health', async () => {
const servicesStatus = await serviceRegistry.getAllServicesStatus();
const allHealthy = Object.values(servicesStatus).every((status) => status);
return {
status: allHealthy ? 'healthy' : 'degraded',
services: servicesStatus,
timestamp: new Date().toISOString(),
};
});
// Route: Auth Service
await app.register(httpProxy, {
upstream: SERVICES.AUTH.baseUrl,
prefix: '/api/auth',
rewritePrefix: '/api/auth',
http2: false,
});
// Route: OTP Service
await app.register(httpProxy, {
upstream: SERVICES.OTP.baseUrl,
prefix: '/api/otp',
rewritePrefix: '/api/otp',
http2: false,
preHandler: app.auth, // Require authentication
});
// Route: Notification Service
await app.register(httpProxy, {
upstream: SERVICES.NOTIFICATION.baseUrl,
prefix: '/api/notifications',
rewritePrefix: '/api/notifications',
http2: false,
preHandler: app.auth,
});
// Route: Analytics Service
await app.register(httpProxy, {
upstream: SERVICES.ANALYTICS.baseUrl,
prefix: '/api/analytics',
rewritePrefix: '/api/analytics',
http2: false,
preHandler: app.auth,
});
// Error Handler
app.setErrorHandler((error, request, reply) => {
app.log.error({ error, request }, 'Request error');
reply.status(error.statusCode || 500).send({
error: error.message || 'Internal Server Error',
statusCode: error.statusCode || 500,
});
});
// Graceful Shutdown
const shutdown = async () => {
console.log('Shutting down API Gateway...');
serviceRegistry.stopHealthChecks();
await app.close();
process.exit(0);
};
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
export default app;
// apps/kyo-otp-service/test/integration/service-integration.test.ts
import { test } from 'node:test';
import assert from 'node:assert';
import { ServiceRegistry } from '@kyong/kyo-core';
test('Service Integration Tests', async (t) => {
const registry = new ServiceRegistry();
await t.test('should get all services', () => {
const authService = registry.getService('AUTH');
const otpService = registry.getService('OTP');
assert.ok(authService);
assert.ok(otpService);
});
await t.test('should perform health checks', async () => {
const statuses = await registry.getAllServicesStatus();
assert.ok(typeof statuses === 'object');
assert.ok('AUTH' in statuses);
assert.ok('OTP' in statuses);
});
});
安裝 Pact:
pnpm add -D @pact-foundation/pact
建立 Consumer Contract Test:
// apps/kyo-otp-service/test/pact/otp-service.consumer.test.ts
import { test } from 'node:test';
import { PactV3, MatchersV3 } from '@pact-foundation/pact';
import { ServiceClient } from '@kyong/kyo-core';
const { like, iso8601DateTime } = MatchersV3;
const provider = new PactV3({
consumer: 'kyo-dashboard',
provider: 'kyo-otp-service',
dir: './pacts',
});
test('OTP Service Contract Tests', async (t) => {
await t.test('should send OTP successfully', async () => {
await provider
.given('user is authenticated')
.uponReceiving('a request to send OTP')
.withRequest({
method: 'POST',
path: '/api/otp/send',
headers: {
'Content-Type': 'application/json',
Authorization: 'Bearer valid-token',
},
body: {
phoneNumber: '0912345678',
},
})
.willRespondWith({
status: 200,
headers: {
'Content-Type': 'application/json',
},
body: {
success: true,
requestId: like('otp-123'),
expiresAt: iso8601DateTime(),
},
})
.executeTest(async (mockServer) => {
const client = new ServiceClient({
name: 'otp-service',
version: '1.0.0',
baseUrl: mockServer.url,
healthCheck: '/health',
dependencies: [],
capabilities: [],
});
client.setAuthToken('valid-token');
const response = await client.post('/api/otp/send', {
phoneNumber: '0912345678',
});
assert.ok(response.success);
assert.ok(response.requestId);
});
});
});
建立負載測試腳本:
// apps/kyo-otp-service/test/load/otp-load-test.js
import http from 'k6/http';
import { check, sleep } from 'k6';
import { Rate } from 'k6/metrics';
const errorRate = new Rate('errors');
export const options = {
stages: [
{ duration: '30s', target: 10 }, // Ramp up to 10 users
{ duration: '1m', target: 50 }, // Ramp up to 50 users
{ duration: '2m', target: 100 }, // Ramp up to 100 users
{ duration: '1m', target: 50 }, // Ramp down to 50 users
{ duration: '30s', target: 0 }, // Ramp down to 0 users
],
thresholds: {
http_req_duration: ['p(95)<500', 'p(99)<1000'], // 95% < 500ms, 99% < 1s
errors: ['rate<0.1'], // Error rate < 10%
},
};
const BASE_URL = __ENV.BASE_URL || 'http://localhost:3000';
const JWT_TOKEN = __ENV.JWT_TOKEN || 'test-token';
export default function () {
// Login
let loginRes = http.post(`${BASE_URL}/api/auth/login`, JSON.stringify({
email: 'test@example.com',
password: 'password123',
}), {
headers: { 'Content-Type': 'application/json' },
});
check(loginRes, {
'login successful': (r) => r.status === 200,
}) || errorRate.add(1);
const token = loginRes.json('token');
sleep(1);
// Send OTP
let otpRes = http.post(`${BASE_URL}/api/otp/send`, JSON.stringify({
phoneNumber: '0912345678',
}), {
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`,
},
});
check(otpRes, {
'otp sent successfully': (r) => r.status === 200 || r.status === 429,
'response time OK': (r) => r.timings.duration < 1000,
}) || errorRate.add(1);
sleep(2);
// Verify OTP
if (otpRes.status === 200) {
const requestId = otpRes.json('requestId');
let verifyRes = http.post(`${BASE_URL}/api/otp/verify`, JSON.stringify({
requestId: requestId,
otp: '123456',
}), {
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`,
},
});
check(verifyRes, {
'otp verified': (r) => r.status === 200 || r.status === 400,
}) || errorRate.add(1);
}
sleep(1);
}
執行負載測試:
k6 run test/load/otp-load-test.js
## 後端部署檢查清單
### Code Quality
- [ ] 所有 TypeScript 編譯錯誤已修復
- [ ] ESLint 無警告
- [ ] 程式碼覆蓋率 > 80%
- [ ] 無 TODO 或 FIXME 註解
### Testing
- [ ] 單元測試全部通過
- [ ] 整合測試全部通過
- [ ] Contract 測試全部通過
- [ ] 負載測試達標 (p95 < 500ms)
- [ ] Chaos 測試完成
### Security
- [ ] 所有依賴套件已更新
- [ ] 無高危漏洞 (npm audit)
- [ ] JWT 金鑰已輪替
- [ ] API Rate Limiting 已啟用
- [ ] CORS 設定正確
- [ ] SQL Injection 防護
- [ ] XSS 防護
### Database
- [ ] Migration 腳本已測試
- [ ] 索引已優化
- [ ] 連線池設定正確
- [ ] 備份策略已測試
- [ ] Rollback 腳本已準備
### Infrastructure
- [ ] 環境變數已設定
- [ ] Secrets 已加密儲存
- [ ] 健康檢查 endpoint 正常
- [ ] 服務發現設定正確
- [ ] Load Balancer 設定完成
### Monitoring
- [ ] Prometheus metrics 已設定
- [ ] 日誌聚合已設定 (Loki/ELK)
- [ ] 追蹤已啟用 (Jaeger)
- [ ] 告警規則已設定
- [ ] Dashboard 已建立
### Performance
- [ ] API 回應時間 < 500ms (p95)
- [ ] 資料庫查詢優化
- [ ] Redis 快取策略正確
- [ ] Connection pooling 設定
- [ ] Graceful shutdown 實作
### Documentation
- [ ] API 文件已更新
- [ ] Runbook 已撰寫
- [ ] 架構圖已更新
- [ ] 部署文件已完成
今天我們完成了 Kyo System 後端的微服務整合與事件驅動架構:
微服務架構
事件驅動架構
分散式交易
完整測試策略