iT邦幫忙

2024 iThome 鐵人賽

DAY 23
0
Software Development

用 NestJS 闖蕩微服務!系列 第 23

[用NestJS闖蕩微服務!] DAY23 - Circuit Breaker

  • 分享至 

  • xImage
  •  

微服務的保護機制

在微服務架構下,假如一個服務依賴於其他服務的回應,那麼其他服務回應緩慢、故障都會讓服務面臨故障風險,這樣的問題對上一篇提到的 API Gateway 會是很大的挑戰,原因是它依賴的服務會較多,如果因為其他服務回應緩慢導致花費大量資源在等待其回應,很可能讓 API Gateway 陷入危機。

Microservice Pending Concept

保護方式

為了避免大量無限期等待的請求無法被消化,勢必要做出一些防禦措施,最常見的措施如下:

  • 設置超時時間(Timeout):針對請求的回應設置一個合理的超時時間,避免在無法回應的請求上浪費資源。
  • 限制請求數量(RateLimit):服務針對 Client 發送的請求設置上限,當達到上限時,立即傳送失敗。上限的定義可以是每分鐘最多幾發,也可以根據實際情況進行調整,重點是要避免無法消化的請求持續增加。
  • 斷路器模式(Circuit Breaker):檢測 Client 發起請求成功與失敗的次數,當失敗的次數超過 閾值(Threshold) 時,保護機制將啟動,讓後續來自 Client 的請求立即失敗,或執行 備援措施(Fallback),在過了一段保護時間,Client 重新嘗試發送請求並且有成功回應,保護機制將關閉。

深入探討 Circuit Breaker

Circuit Breaker 是一種 可恢復的 斷路機制,在一段時間內如果請求失敗數量高達設定的 Threshold,Circuit Breaker 將會進入 開啟(Open) 狀態,此時 Client 端發送的請求都會直接失效或是執行 fallback。在超過設定的保護時間後收到來自 Client 的請求,Circuit Breaker 會進入 半開啟(Half-Open) 狀態,並計算 Client 成功的次數是否有達到設定的 Threshold,如果達標則進入 關閉(Closed) 狀態,否則回到 Open 狀態。如下圖所示:

Circuit Breaker Concept

NestJS 實現 Circuit Breaker

接下來會使用 NestJS 來實作 Circuit Breaker 的概念,會需要先建立 Nx Workspace 並建置兩個 Application 與一個 Library:

  • 使用者服務:提供使用者相關資訊的服務。請透過 Nx 建立名為 user-service 的 Application。
  • API Gateway:作為 user-service 的代理。請透過 Nx 建立名為 api-gateway 的 Nx Application。
  • CircuitBreaker:提供 Circuit Breaker 相關功能。請透過 Nx 建立名為 circuit-breaker 的 Library。

實作 Circuit Breaker Library

先來規劃 Circuit Breaker 的使用方式:

  • 以 Controller 中的每個 Handler 為單位,設置 Circuit Breaker。
  • 可以自行決定要不要套用 Circuit Breaker。
  • 可以針對不同的 Handler 調整 Threshold、Timeout、Fallback。
  • 套用 Circuit Breaker 會有預設的 Threshold、Timeout、Fallback。

由於需要針對各個 Handler 進行處理,再加上 Circuit Breaker 需要知道請求的成功或失敗,以 NestJS 而言,採用 Interceptor 是不錯的選擇。另外,上述的設計有提到需要針對不同的 Handler 來調整相關設定,這塊非常適合使用 Custom Decorator 來提供 Metadata,在 Interceptor 中,取出該 Handler 所設置的 Metadata 進而設定 Circuit Breaker。

補充:針對 InterceptorCustom Decorator 可以參考官方文件,或是可以參考我之前分享的 Interceptor 介紹Custom Decorator 介紹

首先,新增 constants.ts 來存放預設值與 Custom Decorator Metadata 的名稱:

export const DEFAULT_SUCCESS_THRESHOLD = 5;
export const DEFAULT_FAILED_THRESHOLD = 5;
export const DEFAULT_TIMEOUT = 60000;
export const CIRCUIT_BREAKER_METADATA = '[Metadata] circuit breaker';

新增 param.interface.ts 來定義 Custom Decorator 可以帶入的參數:

import { Observable } from 'rxjs';

export interface CircuitBreakerOption<T = unknown> {
  successThreshold?: number;
  failedThreshold?: number;
  timeout?: number;
  fallback?: () => Observable<T>;
}

新增 circuit-breaker.decorator.ts 來實作 Custom Decorator:

import { SetMetadata } from '@nestjs/common';
import { CircuitBreakerOption } from '../interfaces';
import { CIRCUIT_BREAKER_METADATA } from '../constants';

export const CustomizedCircuitBreaker = (options: CircuitBreakerOption) => {
  return SetMetadata(CIRCUIT_BREAKER_METADATA, options);
}

新增 state.enum.ts 針對 Circuit Breaker 的三種狀態製作一個 enum

export const enum CircuitBreakerState {
  CLOSED = 'CLOSED',
  OPEN = 'OPEN',
  HALF_OPEN = 'HALF_OPEN',
}

接下來要實作 Circuit Breaker 的核心功能,由於每個 Handler 都會有一組自己定義的 Circuit Breaker,所以這裡我選擇將其功能封裝成名為 CircuitBreakerclass 來封裝狀態、成功次數、失敗次數、Threshold 等內容。

下方是範例程式碼,新增 circuit-breaker.ts 並實作 CircuitBreaker,在建構實例時,可以根據 CircuitBreakerParam 的格式帶入 Threshold、Timeout 與 Fallback。預期會在 Interceptor 將 CallHandler 的處理交給 CircuitBreakerhandle 方法來處理,因為需要將請求的成功次數、失敗次數進行統計並針對狀態變成 CircuitBreakerState.OPEN 的時候執行 Fallback 或拋出 503 錯誤。每當狀態變成 CircuitBreakerState.OPEN 就會依照 timeout 值重新計算保護時間,當超過保護時間時,會將狀態切換成 CircuitBreakerState.HALF_OPEN 並重新嘗試執行 CallHandler,如果又失敗,就會將狀態切換到 CircuitBreakerState.OPEN,如果成功就會開始計算成功次數,當成功次數超過 Threshold 就會回到 CircuitBreakerState.CLOSED

import { CallHandler, ServiceUnavailableException } from '@nestjs/common';
import {
  DEFAULT_FAILED_THRESHOLD,
  DEFAULT_SUCCESS_THRESHOLD,
  DEFAULT_TIMEOUT,
} from '../constants';
import { CircuitBreakerState } from '../enums';
import { Observable, tap, throwError } from 'rxjs';

interface CircuitBreakerParam {
  successThreshold?: number;
  failedThreshold?: number;
  timeout?: number;
  fallback?: () => Observable<unknown>;
}

export class CircuitBreaker {
  private state = CircuitBreakerState.CLOSED;
  private attemptTime: number | null = null;
  private failedCount = 0;
  private successCount = 0;

  constructor(private readonly params: CircuitBreakerParam) {}

  private get successThreshold() {
    return this.params.successThreshold ?? DEFAULT_SUCCESS_THRESHOLD;
  }

  private get failedThreshold() {
    return this.params.failedThreshold ?? DEFAULT_FAILED_THRESHOLD;
  }

  private get timeout() {
    return this.params.timeout ?? DEFAULT_TIMEOUT;
  }

  private get fallback() {
    const defaultFallback = () =>
      throwError(() => new ServiceUnavailableException());
    return this.params.fallback ?? defaultFallback;
  }

  handle(handler: CallHandler) {
    if (this.state === CircuitBreakerState.OPEN) {
      const attemptTime = this.attemptTime ?? 0;
      if (attemptTime > new Date().valueOf()) {
        return this.fallback();
      }
      this.switchTo(CircuitBreakerState.HALF_OPEN);
    }
    return handler.handle().pipe(
      tap({
        next: () => this.whenSuccess(),
        error: () => this.whenFail(),
      })
    );
  }

  private switchTo(state: CircuitBreakerState) {
    this.state = state;
  }

  private whenSuccess() {
    this.failedCount = 0;

    if (this.state !== CircuitBreakerState.HALF_OPEN) {
      return;
    }

    this.successCount += 1;

    if (this.successCount >= this.successThreshold) {
      this.switchTo(CircuitBreakerState.CLOSED);
      this.successCount = 0;
    }
  }

  private whenFail() {
    this.failedCount += 1;

    if (
      this.state === CircuitBreakerState.HALF_OPEN ||
      this.failedCount >= this.failedThreshold
    ) {
      this.switchTo(CircuitBreakerState.OPEN);
      this.attemptTime = new Date().valueOf() + this.timeout;
    }
  }
}

最後,新增 circuit-breaker.interceptor.ts 來實作 CircuitBreakerInterceptor。下方是範例程式碼,在 intercept 方法內先透過 ExecutionContext 取得 Handler,並從 circuitBreakerMap 取出對應的 CircuitBreaker 實例,如果不存在,就透過 Reflector 取得 key 為 CIRCUIT_BREAKER_METADATA 的 Metadata 來建立 CircuitBreaker 實例,再透過其 handle 方法來處理 CallHandler

import {
  CallHandler,
  ExecutionContext,
  Injectable,
  NestInterceptor,
} from '@nestjs/common';
import { Reflector } from '@nestjs/core';
import { CircuitBreaker } from '../classes';
import { CIRCUIT_BREAKER_METADATA } from '../constants';
import { CircuitBreakerOption } from '../interfaces';

@Injectable()
export class CircuitBreakerInterceptor implements NestInterceptor {
  // eslint-disable-next-line @typescript-eslint/ban-types
  private circuitBreakerMap = new WeakMap<Function, CircuitBreaker>();

  constructor(private readonly reflector: Reflector) {}

  intercept(context: ExecutionContext, next: CallHandler<any>) {
    const handler = context.getHandler();
    const circuitBreaker = this.getCircuitBreaker(handler);
    return circuitBreaker.handle(next);
  }

  // eslint-disable-next-line @typescript-eslint/ban-types
  private getCircuitBreaker(handler: Function) {
    const circuitBreaker = this.circuitBreakerMap.get(handler);
    return circuitBreaker ?? this.generateCircuitBreaker(handler);
  }

  // eslint-disable-next-line @typescript-eslint/ban-types
  private generateCircuitBreaker(handler: Function) {
    const options = this.reflector.get<CircuitBreakerOption | null>(
      CIRCUIT_BREAKER_METADATA,
      handler
    );
    const circuitBreaker = new CircuitBreaker({
      successThreshold: options?.successThreshold,
      failedThreshold: options?.failedThreshold,
      timeout: options?.timeout,
      fallback: options?.fallback,
    });
    this.circuitBreakerMap.set(handler, circuitBreaker);
    return circuitBreaker;
  }
}

注意:記得將 CustomizedCircuitBreakerCircuitBreakerInterceptorindex.ts 匯出,才能讓其他 Application 使用。

實作 User Service

User Service 在這次範例中會擔任不穩定服務的角色,所以在 Handler 內會以亂數的方式隨機回 HTTP Code 500 的錯誤。下方是範例程式碼,修改了 AppController 的內容:

import {
  Controller,
  Get,
  InternalServerErrorException,
  Param,
} from '@nestjs/common';

type User = {
  id: string;
  name: string;
};

@Controller('users')
export class AppController {
  private readonly users: Array<User> = [
    {
      id: '1',
      name: 'John',
    },
  ];

  @Get(':id')
  getUserById(@Param('id') id: string) {
    if (Math.random() > 0.3) {
      throw new InternalServerErrorException();
    }
    return this.users.find((user) => user.id === id);
  }
}

由於會同時啟動多個服務,所以在 apps/user-service 目錄下新增 .env 檔,並設定 PORT3001

PORT=3001

實作 API Gateway

最後,修改 api-gateway 底下的 AppController,在該 Controller 上方使用 @UseInterceptors 裝飾器來指定這個 Controller 底下所有的 Handler 都要套用 CircuitBreakerInterceptor,同時,實作 getUserById Handler 並使用 @CustomizedCircuitBreaker 裝飾器指定該 Handler 的 failedThreshold2successThreshold2timeout5000 微秒:

import { Controller, Get, Param, UseInterceptors } from '@nestjs/common';

import { HttpService } from '@nestjs/axios';
import { map } from 'rxjs';
import {
  CircuitBreakerInterceptor,
  CustomizedCircuitBreaker,
} from '@nestjs-microservices/DAY23/circuit-breaker';

@UseInterceptors(CircuitBreakerInterceptor)
@Controller()
export class AppController {
  constructor(private readonly httpService: HttpService) {}

  @CustomizedCircuitBreaker({
    failedThreshold: 2,
    successThreshold: 2,
    timeout: 5000,
  })
  @Get('/users/:id')
  getUserById(@Param('id') id: string) {
    return this.httpService
      .get(`http://localhost:3001/api/users/${id}`)
      .pipe(map((res) => res.data));
  }
}

注意:記得要在 AppModule 匯入 HttpModule 才能使用 HttpService 呦。

整合測試

現在,我們可以透過 Nx Console 來啟動所有的服務,在「Common Nx Commands」區塊選擇 run-many 並點選 serve 來同時啟動多個服務:

Nx Run Many Serve

由於我們已經在 user-service 內產生 id1User,所以我們可以使用 Postman 透過 GET 存取 http://localhost:3000/api/users/1 來查看結果。預期當 user-service 的錯誤率高於設定值時,Circuit Breaker 會啟動並立即回覆錯誤,下方是連續失敗三次會收到的結果:

Circuit Breaker Open Result

當超過保護時間並且請求成功時可以順利收到回應:

Circuit Breaker Close Result

小結

回顧一下今天的內容,一開始介紹了微服務架構下可以採取 Timeout、RateLimit 與 Circuit Breaker 保護機制避免服務因依賴的服務而面臨故障風險。接著,深入解釋 Circuit Breaker 的運作原理,包含狀態的變化與復原機制。最後,用 NestJS 實作了 Circuit Breaker,以 Custom Decorator 與 Interceptor 的搭配來實現,並透過範例程式展現其效果。


上一篇
[用NestJS闖蕩微服務!] DAY22 - API Gateway
下一篇
[用NestJS闖蕩微服務!] DAY24 - Saga (一)
系列文
用 NestJS 闖蕩微服務!30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言