在微服務架構下,假如一個服務依賴於其他服務的回應,那麼其他服務回應緩慢、故障都會讓服務面臨故障風險,這樣的問題對上一篇提到的 API Gateway 會是很大的挑戰,原因是它依賴的服務會較多,如果因為其他服務回應緩慢導致花費大量資源在等待其回應,很可能讓 API Gateway 陷入危機。
為了避免大量無限期等待的請求無法被消化,勢必要做出一些防禦措施,最常見的措施如下:
Circuit Breaker 是一種 可恢復的 斷路機制,在一段時間內如果請求失敗數量高達設定的 Threshold,Circuit Breaker 將會進入 開啟(Open) 狀態,此時 Client 端發送的請求都會直接失效或是執行 fallback。在超過設定的保護時間後收到來自 Client 的請求,Circuit Breaker 會進入 半開啟(Half-Open) 狀態,並計算 Client 成功的次數是否有達到設定的 Threshold,如果達標則進入 關閉(Closed) 狀態,否則回到 Open 狀態。如下圖所示:
接下來會使用 NestJS 來實作 Circuit Breaker 的概念,會需要先建立 Nx Workspace 並建置兩個 Application 與一個 Library:
user-service
的 Application。user-service
的代理。請透過 Nx 建立名為 api-gateway
的 Nx Application。circuit-breaker
的 Library。先來規劃 Circuit Breaker 的使用方式:
由於需要針對各個 Handler 進行處理,再加上 Circuit Breaker 需要知道請求的成功或失敗,以 NestJS 而言,採用 Interceptor 是不錯的選擇。另外,上述的設計有提到需要針對不同的 Handler 來調整相關設定,這塊非常適合使用 Custom Decorator 來提供 Metadata,在 Interceptor 中,取出該 Handler 所設置的 Metadata 進而設定 Circuit Breaker。
補充:針對 Interceptor 與 Custom Decorator 可以參考官方文件,或是可以參考我之前分享的 Interceptor 介紹 與 Custom Decorator 介紹。
首先,新增 constants.ts
來存放預設值與 Custom Decorator Metadata 的名稱:
export const DEFAULT_SUCCESS_THRESHOLD = 5;
export const DEFAULT_FAILED_THRESHOLD = 5;
export const DEFAULT_TIMEOUT = 60000;
export const CIRCUIT_BREAKER_METADATA = '[Metadata] circuit breaker';
新增 param.interface.ts
來定義 Custom Decorator 可以帶入的參數:
import { Observable } from 'rxjs';
export interface CircuitBreakerOption<T = unknown> {
successThreshold?: number;
failedThreshold?: number;
timeout?: number;
fallback?: () => Observable<T>;
}
新增 circuit-breaker.decorator.ts
來實作 Custom Decorator:
import { SetMetadata } from '@nestjs/common';
import { CircuitBreakerOption } from '../interfaces';
import { CIRCUIT_BREAKER_METADATA } from '../constants';
export const CustomizedCircuitBreaker = (options: CircuitBreakerOption) => {
return SetMetadata(CIRCUIT_BREAKER_METADATA, options);
}
新增 state.enum.ts
針對 Circuit Breaker 的三種狀態製作一個 enum
:
export const enum CircuitBreakerState {
CLOSED = 'CLOSED',
OPEN = 'OPEN',
HALF_OPEN = 'HALF_OPEN',
}
接下來要實作 Circuit Breaker 的核心功能,由於每個 Handler 都會有一組自己定義的 Circuit Breaker,所以這裡我選擇將其功能封裝成名為 CircuitBreaker
的 class
來封裝狀態、成功次數、失敗次數、Threshold 等內容。
下方是範例程式碼,新增 circuit-breaker.ts
並實作 CircuitBreaker
,在建構實例時,可以根據 CircuitBreakerParam
的格式帶入 Threshold、Timeout 與 Fallback。預期會在 Interceptor 將 CallHandler
的處理交給 CircuitBreaker
的 handle
方法來處理,因為需要將請求的成功次數、失敗次數進行統計並針對狀態變成 CircuitBreakerState.OPEN
的時候執行 Fallback 或拋出 503
錯誤。每當狀態變成 CircuitBreakerState.OPEN
就會依照 timeout
值重新計算保護時間,當超過保護時間時,會將狀態切換成 CircuitBreakerState.HALF_OPEN
並重新嘗試執行 CallHandler
,如果又失敗,就會將狀態切換到 CircuitBreakerState.OPEN
,如果成功就會開始計算成功次數,當成功次數超過 Threshold 就會回到 CircuitBreakerState.CLOSED
:
import { CallHandler, ServiceUnavailableException } from '@nestjs/common';
import {
DEFAULT_FAILED_THRESHOLD,
DEFAULT_SUCCESS_THRESHOLD,
DEFAULT_TIMEOUT,
} from '../constants';
import { CircuitBreakerState } from '../enums';
import { Observable, tap, throwError } from 'rxjs';
interface CircuitBreakerParam {
successThreshold?: number;
failedThreshold?: number;
timeout?: number;
fallback?: () => Observable<unknown>;
}
export class CircuitBreaker {
private state = CircuitBreakerState.CLOSED;
private attemptTime: number | null = null;
private failedCount = 0;
private successCount = 0;
constructor(private readonly params: CircuitBreakerParam) {}
private get successThreshold() {
return this.params.successThreshold ?? DEFAULT_SUCCESS_THRESHOLD;
}
private get failedThreshold() {
return this.params.failedThreshold ?? DEFAULT_FAILED_THRESHOLD;
}
private get timeout() {
return this.params.timeout ?? DEFAULT_TIMEOUT;
}
private get fallback() {
const defaultFallback = () =>
throwError(() => new ServiceUnavailableException());
return this.params.fallback ?? defaultFallback;
}
handle(handler: CallHandler) {
if (this.state === CircuitBreakerState.OPEN) {
const attemptTime = this.attemptTime ?? 0;
if (attemptTime > new Date().valueOf()) {
return this.fallback();
}
this.switchTo(CircuitBreakerState.HALF_OPEN);
}
return handler.handle().pipe(
tap({
next: () => this.whenSuccess(),
error: () => this.whenFail(),
})
);
}
private switchTo(state: CircuitBreakerState) {
this.state = state;
}
private whenSuccess() {
this.failedCount = 0;
if (this.state !== CircuitBreakerState.HALF_OPEN) {
return;
}
this.successCount += 1;
if (this.successCount >= this.successThreshold) {
this.switchTo(CircuitBreakerState.CLOSED);
this.successCount = 0;
}
}
private whenFail() {
this.failedCount += 1;
if (
this.state === CircuitBreakerState.HALF_OPEN ||
this.failedCount >= this.failedThreshold
) {
this.switchTo(CircuitBreakerState.OPEN);
this.attemptTime = new Date().valueOf() + this.timeout;
}
}
}
最後,新增 circuit-breaker.interceptor.ts
來實作 CircuitBreakerInterceptor
。下方是範例程式碼,在 intercept
方法內先透過 ExecutionContext
取得 Handler,並從 circuitBreakerMap
取出對應的 CircuitBreaker
實例,如果不存在,就透過 Reflector
取得 key 為 CIRCUIT_BREAKER_METADATA
的 Metadata 來建立 CircuitBreaker
實例,再透過其 handle
方法來處理 CallHandler
:
import {
CallHandler,
ExecutionContext,
Injectable,
NestInterceptor,
} from '@nestjs/common';
import { Reflector } from '@nestjs/core';
import { CircuitBreaker } from '../classes';
import { CIRCUIT_BREAKER_METADATA } from '../constants';
import { CircuitBreakerOption } from '../interfaces';
@Injectable()
export class CircuitBreakerInterceptor implements NestInterceptor {
// eslint-disable-next-line @typescript-eslint/ban-types
private circuitBreakerMap = new WeakMap<Function, CircuitBreaker>();
constructor(private readonly reflector: Reflector) {}
intercept(context: ExecutionContext, next: CallHandler<any>) {
const handler = context.getHandler();
const circuitBreaker = this.getCircuitBreaker(handler);
return circuitBreaker.handle(next);
}
// eslint-disable-next-line @typescript-eslint/ban-types
private getCircuitBreaker(handler: Function) {
const circuitBreaker = this.circuitBreakerMap.get(handler);
return circuitBreaker ?? this.generateCircuitBreaker(handler);
}
// eslint-disable-next-line @typescript-eslint/ban-types
private generateCircuitBreaker(handler: Function) {
const options = this.reflector.get<CircuitBreakerOption | null>(
CIRCUIT_BREAKER_METADATA,
handler
);
const circuitBreaker = new CircuitBreaker({
successThreshold: options?.successThreshold,
failedThreshold: options?.failedThreshold,
timeout: options?.timeout,
fallback: options?.fallback,
});
this.circuitBreakerMap.set(handler, circuitBreaker);
return circuitBreaker;
}
}
注意:記得將
CustomizedCircuitBreaker
與CircuitBreakerInterceptor
在index.ts
匯出,才能讓其他 Application 使用。
User Service 在這次範例中會擔任不穩定服務的角色,所以在 Handler 內會以亂數的方式隨機回 HTTP Code 500
的錯誤。下方是範例程式碼,修改了 AppController
的內容:
import {
Controller,
Get,
InternalServerErrorException,
Param,
} from '@nestjs/common';
type User = {
id: string;
name: string;
};
@Controller('users')
export class AppController {
private readonly users: Array<User> = [
{
id: '1',
name: 'John',
},
];
@Get(':id')
getUserById(@Param('id') id: string) {
if (Math.random() > 0.3) {
throw new InternalServerErrorException();
}
return this.users.find((user) => user.id === id);
}
}
由於會同時啟動多個服務,所以在 apps/user-service
目錄下新增 .env
檔,並設定 PORT
為 3001
:
PORT=3001
最後,修改 api-gateway
底下的 AppController
,在該 Controller 上方使用 @UseInterceptors
裝飾器來指定這個 Controller 底下所有的 Handler 都要套用 CircuitBreakerInterceptor
,同時,實作 getUserById
Handler 並使用 @CustomizedCircuitBreaker
裝飾器指定該 Handler 的 failedThreshold
為 2
、successThreshold
為 2
、timeout
為 5000
微秒:
import { Controller, Get, Param, UseInterceptors } from '@nestjs/common';
import { HttpService } from '@nestjs/axios';
import { map } from 'rxjs';
import {
CircuitBreakerInterceptor,
CustomizedCircuitBreaker,
} from '@nestjs-microservices/DAY23/circuit-breaker';
@UseInterceptors(CircuitBreakerInterceptor)
@Controller()
export class AppController {
constructor(private readonly httpService: HttpService) {}
@CustomizedCircuitBreaker({
failedThreshold: 2,
successThreshold: 2,
timeout: 5000,
})
@Get('/users/:id')
getUserById(@Param('id') id: string) {
return this.httpService
.get(`http://localhost:3001/api/users/${id}`)
.pipe(map((res) => res.data));
}
}
注意:記得要在
AppModule
匯入HttpModule
才能使用HttpService
呦。
現在,我們可以透過 Nx Console 來啟動所有的服務,在「Common Nx Commands」區塊選擇 run-many
並點選 serve
來同時啟動多個服務:
由於我們已經在 user-service
內產生 id
為 1
的 User
,所以我們可以使用 Postman 透過 GET
存取 http://localhost:3000/api/users/1 來查看結果。預期當 user-service
的錯誤率高於設定值時,Circuit Breaker 會啟動並立即回覆錯誤,下方是連續失敗三次會收到的結果:
當超過保護時間並且請求成功時可以順利收到回應:
回顧一下今天的內容,一開始介紹了微服務架構下可以採取 Timeout、RateLimit 與 Circuit Breaker 保護機制避免服務因依賴的服務而面臨故障風險。接著,深入解釋 Circuit Breaker 的運作原理,包含狀態的變化與復原機制。最後,用 NestJS 實作了 Circuit Breaker,以 Custom Decorator 與 Interceptor 的搭配來實現,並透過範例程式展現其效果。