iT邦幫忙

2021 iThome 鐵人賽

DAY 10
0
Modern Web

基於 Kotlin Ktor 建構支援模組化開發的 Web 框架系列 第 10

[Day 10] 實作 Ktor Graceful Shutdown

各種 Graceful Shutdown 的作法

一個穩定的服務必須要考慮當停止服務時,已經接受的 request 必須要處理完才行,否則輕則使用者會收到伺服器錯誤,觀感不佳,重則造成資料遺失或不一致。如果是微服務架構,那麼更是無法簡單地 rollback 恢復資料。

一般 graceful shutdown 作法是 Server 接收到 stop signal 之後,就不再接收 request,然後等待一段時間消化進行中的 request 再停止,如果超過時間則強制關閉。通常 Web 框架會提供 Shutdown hook 機制,當 Server 停止的時候,先呼叫你實作的函式再停止,我們可以在這個函式裡關閉開啟的資源,例如關閉 Database, Redis 的連線。

除了關閉資源之外,等待所有非同步工作處理完畢也是必須考慮的。例如使用者當前的操作需要寄送 email 通知,我們通常會把寄送動作丟到另一個執行緒處理,藉此加快回應速度。那麼當 Server 停止的時候,我們要如何等待這些執行緒做完所有工作了呢? 在此舉3個不同解決方案的例子

  • Spring Boot 提供 ThreadPoolTaskExecutor 實作,我們可以設定 setWaitForTasksToCompleteOnShutdown(true)setAwaitTerminationSeconds(30) 阻塞等待 30 秒後,再繼續執行停止 server 的工作。

  • Akka Actor 提供 gracefulStop 函式,最久等待5秒處理剩餘的工作。

    import static akka.pattern.Patterns.gracefulStop;
    import akka.pattern.AskTimeoutException;
    import java.util.concurrent.CompletionStage;
    
    try {
      CompletionStage<Boolean> stopped =
          gracefulStop(actorRef, Duration.ofSeconds(5), Manager.SHUTDOWN);
      stopped.toCompletableFuture().get(6, TimeUnit.SECONDS);
      // the actor has been stopped
    } catch (AskTimeoutException e) {
      // the actor wasn't stopped within 5 seconds
    }
    
  • Coroutine Channel 可以參考 Roman Elizarov 大大在 StackOverflow 回答的作法,透過呼叫 close() 傳送特別的 close token 到 queue 裡面,一旦 channel 接收到這種訊息就會立即停止接收新的訊息。另一邊 ReceiveChannel 處理完最後一個訊息之後,填入 CompletableDeferred 結果值通知 SendChannel。最後建議可以在 withTimeout 裡面呼叫 await() 設定最久等待時間

    val lastProcessed = CompletableDeferred<Message?>() 
    val actor = actor<Message> {
        var last: Message? = null
        try {
            for (msg in channel) {
                // process message
                last = msg
            }
        } finally {
            // report the last processed message via back channel
            lastProcessed.complete(last)
        }
    }
    
    actor.close()
    val last = lastProcessed.await() // receive last processed message
    

接下來說明我是如何實作 Ktor graceful shutdown

Ktor ShutDownUrl Plugin

首先我們必須先安裝 ShutDownUrl Plugin,設定 shutdown endpoint 讓我們呼叫觸發 shutdown

ktor {
    deployment {
        shutdown.url = "/my/shutdown/path"
    }
}

這個 endpoint 沒有辦法設定 authentication,為了安全上的考量,我在 url 加上一長串亂數當作 key,而這個亂數可以透過環境變數設定
shutdown.url = "/ops/server/shutdown/"${?SERVER_SHUTDOWN_KEY}

然後部署時準備 stop.sh 檔案執行 curl 指令呼叫 endpoint
curl "http://localhost:$PORT/ops/server/shutdown/$SERVER_SHUTDOWN_KEY"

Ktor ApplicationEvent

當 ShutDownUrl Plugin 執行 shutdown 時,會觸發 Ktor ApplicationEvent 的 ApplicationStopPreparing 事件,所以我們可以事先訂閱此事件,把想要執行的程式碼實作為 lambda 傳入

fun stopPreparing(application: Application) { log("Application stopPreparing: $it") }

application.environment.monitor.subscribe(ApplicationStopPreparing, ::stopPreparing) // subscribe

不過因為我有導入 Koin DI,而且 Koin 也有整合 Ktor ApplicationEvent,所以我改訂閱 Koin 的 KoinApplicationStopPreparing 事件

以下是 Shutdown 時的各個 ApplicationEvent 的執行順序
Ktor ApplicationStopPreparing → KoinApplicationStopPreparing → KoinApplicationStopped → Ktor ApplicationStopping → Ktor ApplicationStopped

現在我們已經知道可以透過 subscribe ApplicationEvent,讓 Ktor 執行我們的 shutdown 程式碼,那麼我們應該要在那裡呼叫 subscribe 呢?

Ktor Plugins Graceful Shutdown

我們是透過安裝 Ktor Plugin 來增加功能,然後在 Plugin 的 install function 裡面進行資源初始化動作,例如 Server 啟動時,Redis Plugin 會與 Redis 建立連線,然後停止 Server 時,我們要呼叫 RedisClient 的 quit() 方法中斷連線,那要怎麼呼叫 quit() 呢? 我們可以在建立 RedisClient 物件之後,就呼叫 subscribe 方法把呼叫 quit() 的 lambda 傳入 KoinApplicationStopPreparing event 的 EventHandler

override fun install(pipeline: Application, configure: Configuration.() -> Unit): RedisFeature {
    //以上省略
   initClient(config)
   application.environment.monitor.subscribe(KoinApplicationStopPreparing) {
        runBlocking {
            logger.info("close redis connection...")
            client.quit()
            logger.info("redis connection closed")
        }
    }
}

同樣地,我們所有實作的 Plugin 也都採用此作法,例如 Database Plugin 關閉 database connection pool,Logging Plugin 呼叫 coroutine channel 的 close() 方法,等待剩下的 log 都寫入到 AWS Kinesis Stream 之後再停止。

當 Server 停止時,Ktor 會逐一呼叫 KoinApplicationStopPreparing 之中,所有在 Plugin 傳入的 EventHandler lambda。不過在這裡要注意呼叫的順序! 例如我希望系統能在一啟動 Server 就能開始寫入 log,然後停止 Server 時,LogWriter coroutine channel 是最後才被 close,確保系統運行時的所有重要 log 都能被記錄下來,所以此時要考慮多個 Plugin 的安裝順序,甚至 depenency 關係。

Ktor ApplicationEvent 的內部實作是把所有 EventHandler 儲存在一個 List 物件裡面,所以會按照 Plugin 的安裝順序逐一呼叫,但我希望是相反的順序呼叫,所以我實作了 KoinApplicationShutdownManager,在安裝所有 Plugin 之後呼叫 complete() 方法,以相反的順序 subscribe KoinApplicationStopPreparing

fun Application.main() {
    install(LoggingFeature)
    install(DatabaseFeature)
    install(RedisFeature)
    //省略... 
    KoinApplicationShutdownManager.complete(environment)
}

object KoinApplicationShutdownManager {

    private val tasks: MutableList<EventHandler<KoinApplication>> = mutableListOf()

    fun register(handler: EventHandler<KoinApplication>) {
        tasks += handler
    }

    fun complete(applicationEnvironment: ApplicationEnvironment) {
        tasks.asReversed().forEach {
            applicationEnvironment.monitor.subscribe(KoinApplicationStopPreparing, it)
        }
    }
}

到目前為止,我已經說明了 Ktor Plugin 的安裝到最後的 graceful shutdown,明天說明我如何為 Ktor 加上 i18n 機制


上一篇
[Day 9] 使用 Config4k 以 Typesafe 及 Validatable 的方式讀取 Ktor 設定檔
下一篇
[Day 11] 實作 Ktor i18n 機制
系列文
基於 Kotlin Ktor 建構支援模組化開發的 Web 框架30

尚未有邦友留言

立即登入留言