目前 Redis 幾乎已成為後端微服務架構的必備基礎設施,但是 Ktor 官方連 ORM 都沒有整合了,Redis Client 當然也不會有啦,所以第一步就先來挑選 Redis Client。
以往 Redis Java Client 大多是使用 Jedis,近年來大家逐漸改用 Lettuce,而且 Spring Boot 2.x 版本也預設採用了。這是因為 Lettuce 底層是 Netty,所以支援 non-blocking 操作及 asynchronous API,效能會比 Jedis 好,理所當然是寫新專案的最佳選擇。不過我秉持著做 side project 就是要學習新技術及自幹的精神,就是想找基於 Kotlin Coroutine 的 library 來用,結果真的讓我在 Github 找到 JetBrains 官方團隊開發的 ktor-client-redis。
不過不要高興的太早,因為 ktor-client-redis 只是實驗性專案,不建議用在 production 環境,而且最後一次 commit 已經是3年前。後來我在 twitter 上面詢問未來專案的計劃,Ktor 官方團隊成員回答 Unfortunately, it is dead at the moment
Orz...。但是我當然不會因此放棄,因為我只想拿 Redis 做為 User Session Storage,應該只會用到基本的 get, set, del 指令而已。事實上,到目前為止,我使用上都沒遇到什麼問題,但如果未來要應付大流量的話,那還是要改用 Lettuce 比較有保障。
因為 ktor-client-redis 沒有 publish 到 maven repository,所以必須要下載原始碼放到專案裡面。果不其然,馬上就遇到無法編譯的問題,畢竟我是用最新版的 coroutine library,有些 API 已經改了。不過還好都是些小調整,而且 ktor-client-redis 的 redis 指令實作方式是放在獨立的 kt 檔案,例如 Hashes.kt 包含 hset, hget...指令,所以沒用到的指令,我就不把對應的 kt 檔放到我的專案裡了。
我們先實作從外部設定檔 application.conf 讀取 Redis 連線設定。因為 ktor-client-redis 是基於 Coroutine,所以可以重複使用先前定義的 CoroutineActorConfig 物件來調校效能。
redis {
host = ${?REDIS_HOST}
port = ${?REDIS_PORT}
password = ${?REDIS_PASSWORD}
rootKeyPrefix = "fanpoll-"${app.server.env}
client {
coroutines = 10
dispatcher {
minPoolSize = 1
maxPoolSize = 3
}
}
}
data class RedisConfig(
val host: String, val port: Int = 6379, val password: String?, val rootKeyPrefix: String,
val client: CoroutineActorConfig
)
data class CoroutineActorConfig(val coroutines: Int = 1, val dispatcher: ThreadPoolConfig? = null) {
fun validate() {
dispatcher?.validate()
}
class Builder {
var coroutines: Int = 1
private var dispatcher: ThreadPoolConfig? = null
fun dispatcher(block: ThreadPoolConfig.Builder.() -> Unit) {
dispatcher = ThreadPoolConfig.Builder().apply(block).build()
}
fun build(): CoroutineActorConfig {
return CoroutineActorConfig(coroutines, dispatcher).apply { validate() }
}
}
}
data class ThreadPoolConfig(
val fixedPoolSize: Int? = 1,
val minPoolSize: Int? = null,
val maxPoolSize: Int? = null,
val keepAliveTime: Long? = null
) : ValidateableConfig {
fun isFixedThreadPool(): Boolean = fixedPoolSize != null && (minPoolSize == null && maxPoolSize == null && keepAliveTime == null)
override fun validate() {
require(
(minPoolSize != null && maxPoolSize != null && keepAliveTime != null) ||
(minPoolSize == null && maxPoolSize == null && keepAliveTime == null)
) {
"minPoolSize, maxPoolSize, keepAliveTime should be configured"
}
}
class Builder {
var fixedPoolSize: Int? = 1
var minPoolSize: Int? = null
var maxPoolSize: Int? = null
var keepAliveTime: Long? = null
fun build(): ThreadPoolConfig {
return ThreadPoolConfig(fixedPoolSize, minPoolSize, maxPoolSize, keepAliveTime).apply { validate() }
}
}
}
Redis Plugin 讀取設定值後,就根據 RedisConfig 建立 RedisClient 物件。RedisClient 內部實作是透過 requestQueue: Channel<RedisRequest>
送出指令封包 ByteReadPacket
,再使用 CompletableDeferred<Any?>
接收結果。其中 RedisClient 的最大連線數量 maxConnections
就是對應到我們所建立的 coroutine 數量,預設的 coroutine dispatcher 是 Dispatchers.Default,不過我們可以替換掉它。
建立連線後,隨即執行 ping()
測試一下 latency。然後註冊至 Koin DI,後續要操作 Redis 時,就可以透過 Koin 取得 RedisClient 物件。
最後不要忘記加上 KoinApplicationShutdownManager.register { closeClient() }
,在停止 Server 時呼叫 quit()
關閉 Redis 連線。
private lateinit var config: RedisConfig
private lateinit var client: RedisClient
private const val dispatcherName = "Redis"
private lateinit var dispatcher: CoroutineDispatcher
override fun install(pipeline: Application, configure: Configuration.() -> Unit): RedisFeature {
val configuration = Configuration().apply(configure)
val feature = RedisFeature(configuration)
val appConfig = pipeline.get<MyApplicationConfig>()
val logWriter = pipeline.get<LogWriter>()
config = appConfig.infra.redis ?: configuration.build()
initClient(config)
pipeline.koin {
modules(
module(createdAtStart = true) {
single { client }
KoinApplicationShutdownManager.register { closeClient() }
}
)
}
return feature
}
private fun initClient(config: RedisConfig) {
logger.info("========== init Redis Client... ==========")
try {
dispatcher = if (config.client.dispatcher != null)
CoroutineUtils.createDispatcher(dispatcherName, config.client.dispatcher)
else Dispatchers.IO
logger.info("connect to redis => $config")
client = RedisClient(
address = InetSocketAddress(config.host, config.port),
password = config.password,
maxConnections = config.client.coroutines,
dispatcher = dispatcher, rootKeyPrefix = config.rootKeyPrefix
)
runBlocking {
logger.info("ping...")
val latency = measureTimeMillis {
client.ping()?.let {
logger.info(it)
}
}
logger.info("ping latency = $latency milliseconds")
}
} catch (e: Throwable) {
throw InternalServerException(InfraResponseCode.REDIS_ERROR, "fail to init redis client", e)
}
logger.info("========== init Redis Client completed ==========")
}
private fun closeClient() {
try {
runBlocking {
logger.info("close redis connection...")
client.quit()
logger.info("redis connection closed")
}
if (dispatcher is ExecutorCoroutineDispatcher) {
CoroutineUtils.closeDispatcher(dispatcherName, dispatcher as ExecutorCoroutineDispatcher)
}
} catch (e: Throwable) {
throw InternalServerException(InfraResponseCode.REDIS_ERROR, "fail to close redis connection", e)
}
}