iT邦幫忙

2021 iThome 鐵人賽

DAY 25
0
Modern Web

基於 Kotlin Ktor 建構支援模組化開發的 Web 框架系列 第 25

[Day 25] 實作 Redis Plugin 整合 Redis Coroutine Client

目前 Redis 幾乎已成為後端微服務架構的必備基礎設施,但是 Ktor 官方連 ORM 都沒有整合了,Redis Client 當然也不會有啦,所以第一步就先來挑選 Redis Client。

選擇 Redis Client 函式庫 ktor-client-redis

以往 Redis Java Client 大多是使用 Jedis,近年來大家逐漸改用 Lettuce,而且 Spring Boot 2.x 版本也預設採用了。這是因為 Lettuce 底層是 Netty,所以支援 non-blocking 操作及 asynchronous API,效能會比 Jedis 好,理所當然是寫新專案的最佳選擇。不過我秉持著做 side project 就是要學習新技術及自幹的精神,就是想找基於 Kotlin Coroutine 的 library 來用,結果真的讓我在 Github 找到 JetBrains 官方團隊開發的 ktor-client-redis

不過不要高興的太早,因為 ktor-client-redis 只是實驗性專案,不建議用在 production 環境,而且最後一次 commit 已經是3年前。後來我在 twitter 上面詢問未來專案的計劃,Ktor 官方團隊成員回答 Unfortunately, it is dead at the moment Orz...。但是我當然不會因此放棄,因為我只想拿 Redis 做為 User Session Storage,應該只會用到基本的 get, set, del 指令而已。事實上,到目前為止,我使用上都沒遇到什麼問題,但如果未來要應付大流量的話,那還是要改用 Lettuce 比較有保障。

整合 ktor-client-redis 原始碼

因為 ktor-client-redis 沒有 publish 到 maven repository,所以必須要下載原始碼放到專案裡面。果不其然,馬上就遇到無法編譯的問題,畢竟我是用最新版的 coroutine library,有些 API 已經改了。不過還好都是些小調整,而且 ktor-client-redis 的 redis 指令實作方式是放在獨立的 kt 檔案,例如 Hashes.kt 包含 hset, hget...指令,所以沒用到的指令,我就不把對應的 kt 檔放到我的專案裡了。

實作 RedisPlugin

點我連結到完整的 Redis Plugin 程式碼

定義 RedisConfig

我們先實作從外部設定檔 application.conf 讀取 Redis 連線設定。因為 ktor-client-redis 是基於 Coroutine,所以可以重複使用先前定義的 CoroutineActorConfig 物件來調校效能。

redis {
    host = ${?REDIS_HOST}
    port = ${?REDIS_PORT}
    password = ${?REDIS_PASSWORD}
    rootKeyPrefix = "fanpoll-"${app.server.env}
    client {
        coroutines = 10
        dispatcher {
            minPoolSize = 1
            maxPoolSize = 3
        }
    }
}
data class RedisConfig(
    val host: String, val port: Int = 6379, val password: String?, val rootKeyPrefix: String,
    val client: CoroutineActorConfig
)

data class CoroutineActorConfig(val coroutines: Int = 1, val dispatcher: ThreadPoolConfig? = null) {

    fun validate() {
        dispatcher?.validate()
    }

    class Builder {

        var coroutines: Int = 1
        private var dispatcher: ThreadPoolConfig? = null

        fun dispatcher(block: ThreadPoolConfig.Builder.() -> Unit) {
            dispatcher = ThreadPoolConfig.Builder().apply(block).build()
        }

        fun build(): CoroutineActorConfig {
            return CoroutineActorConfig(coroutines, dispatcher).apply { validate() }
        }
    }
}

data class ThreadPoolConfig(
    val fixedPoolSize: Int? = 1,
    val minPoolSize: Int? = null,
    val maxPoolSize: Int? = null,
    val keepAliveTime: Long? = null
) : ValidateableConfig {

    fun isFixedThreadPool(): Boolean = fixedPoolSize != null && (minPoolSize == null && maxPoolSize == null && keepAliveTime == null)

    override fun validate() {
        require(
            (minPoolSize != null && maxPoolSize != null && keepAliveTime != null) ||
                    (minPoolSize == null && maxPoolSize == null && keepAliveTime == null)
        ) {
            "minPoolSize, maxPoolSize, keepAliveTime should be configured"
        }
    }

    class Builder {

        var fixedPoolSize: Int? = 1
        var minPoolSize: Int? = null
        var maxPoolSize: Int? = null
        var keepAliveTime: Long? = null

        fun build(): ThreadPoolConfig {
            return ThreadPoolConfig(fixedPoolSize, minPoolSize, maxPoolSize, keepAliveTime).apply { validate() }
        }
    }
}

建立 RedisClient 物件,註冊至 Koin DI

Redis Plugin 讀取設定值後,就根據 RedisConfig 建立 RedisClient 物件。RedisClient 內部實作是透過 requestQueue: Channel<RedisRequest> 送出指令封包 ByteReadPacket,再使用 CompletableDeferred<Any?> 接收結果。其中 RedisClient 的最大連線數量 maxConnections 就是對應到我們所建立的 coroutine 數量,預設的 coroutine dispatcher 是 Dispatchers.Default,不過我們可以替換掉它。

建立連線後,隨即執行 ping() 測試一下 latency。然後註冊至 Koin DI,後續要操作 Redis 時,就可以透過 Koin 取得 RedisClient 物件。

最後不要忘記加上 KoinApplicationShutdownManager.register { closeClient() },在停止 Server 時呼叫 quit() 關閉 Redis 連線。

private lateinit var config: RedisConfig
private lateinit var client: RedisClient
private const val dispatcherName = "Redis"
private lateinit var dispatcher: CoroutineDispatcher
        
override fun install(pipeline: Application, configure: Configuration.() -> Unit): RedisFeature {
    val configuration = Configuration().apply(configure)
    val feature = RedisFeature(configuration)

    val appConfig = pipeline.get<MyApplicationConfig>()
    val logWriter = pipeline.get<LogWriter>()
    config = appConfig.infra.redis ?: configuration.build()

    initClient(config)

    pipeline.koin {
        modules(
            module(createdAtStart = true) {
                single { client }
                KoinApplicationShutdownManager.register { closeClient() }
            }
        )
    }

    return feature
}
        
private fun initClient(config: RedisConfig) {
    logger.info("========== init Redis Client... ==========")
    try {
        dispatcher = if (config.client.dispatcher != null)
            CoroutineUtils.createDispatcher(dispatcherName, config.client.dispatcher)
        else Dispatchers.IO

        logger.info("connect to redis => $config")
        client = RedisClient(
            address = InetSocketAddress(config.host, config.port),
            password = config.password,
            maxConnections = config.client.coroutines,
            dispatcher = dispatcher, rootKeyPrefix = config.rootKeyPrefix
        )

        runBlocking {
            logger.info("ping...")
            val latency = measureTimeMillis {
                client.ping()?.let {
                    logger.info(it)
                }
            }
            logger.info("ping latency = $latency milliseconds")
        }
    } catch (e: Throwable) {
        throw InternalServerException(InfraResponseCode.REDIS_ERROR, "fail to init redis client", e)
    }
    logger.info("========== init Redis Client completed ==========")
}

private fun closeClient() {
    try {
        runBlocking {
            logger.info("close redis connection...")
            client.quit()
            logger.info("redis connection closed")
        }

        if (dispatcher is ExecutorCoroutineDispatcher) {
            CoroutineUtils.closeDispatcher(dispatcherName, dispatcher as ExecutorCoroutineDispatcher)
        }
    } catch (e: Throwable) {
        throw InternalServerException(InfraResponseCode.REDIS_ERROR, "fail to close redis connection", e)
    }
}

完成結果


上一篇
[Day 24] 自定義 REST QueryDSL 實現動態查詢資料庫
下一篇
[Day 26] 實作 Ktor Session Authentication with Redis
系列文
基於 Kotlin Ktor 建構支援模組化開發的 Web 框架30

尚未有邦友留言

立即登入留言