iT邦幫忙

2021 iThome 鐵人賽

DAY 27
0
Modern Web

基於 Kotlin Ktor 建構支援模組化開發的 Web 框架系列 第 27

[Day 27] 實作 Redis PubSub Keyspace Notification 訂閱 Session Key Expired 事件通知

session authentication 的機制是

  • 登入時建立 sessionId 儲存 session 資料至 Redis
  • 驗證時使用 sessionId 尋找 Redis 是否存在 session
  • 登出時使用 sessionId 刪除 Redis session 資料

以上操作都是使用 sessionId 操作某個 user 的 session 資料,如果系統允許使用者重複登入的話,我們怎麼反過來知道某個使用者擁有那些 sessionId?

直覺上的解決方式就是在登入時,除了記錄 sessionId 與 session 資料的關聯,也一併把 sessionId 加入至 userId 與 sessionId 的一對多 sessionId 清單。反之,登出時也一併從 user 的 sessionId 清單中刪除此 sessionId。但是這麼做還不夠,因為當 sessionId 逾期而被 Redis 刪除時,並不會連動更新 sessionId 清單,這必須由我們系統自行把過期的 sessionId 從 sessionId 清單刪除。不過問題來了,我們怎麼知道 sessionId 逾期被 Redis 刪除,系統總不能一直使用 sessionId 去詢問 Redis 是否還存在,然後再去更新 sessionId 清單,這種作法在使用者數量龎大時根本不可行。

Redis PubSub Keyspace Notification

還好 Redis 有提供 Redis Keyspace Notifications 機制,我們系統可以先訂閱 keyspace 的事件通知,當 sessionId 過期時 Redis 會通知我們。不過使用此功能前要注意以下事項,詳細說明請參考官方文件

  • 支援 Redis 2.8.0 以上版本
  • Redis 不會儲存事件,所以如果系統與 Redis 失去連線的話,就收不到通知了,可靠性較低
  • 預設不啟用避免消耗 CPU 資源,必須先執行以下指令開啟
redis-cli config set notify-keyspace-events KEA

Redis Keyspace Notifications 的流程上分成兩階段,下面以 Redis 指令及傳輸資料來說明

  1. 系統先向 Redis 訂閱 expired 事件
執行指令 psubscribe '__keyevent*__:expired'
然後收到成功的回應資料 "psubscribe","__keyevent*__:expired",1
  1. 後續當 session 過期時,系統會收到 expired 事件通知
"pmessage","__keyevent*__:expired","__keyevent@0__:expired","我是sessionId"

接下來我們系統再從過期的 sessionId 找出 userId,然後再根據 userId 去更新使用者的 sessionId 清單即可。本專案的 sessionId 清單資料也儲存於 Redis,但使用的是 Hash 資料結構,至於原始的 session 資料是使用 String 資料結構。

  • [String] session 資料: key => sessionId ; value => session data (json)
  • [Hash] sessionId 清單: key => userId ; field => sessionId ; value => expireTime (也可以儲存其它資料)

修改 ktor-client-redis 函式庫,實作訂閱 Keyspace Notification 功能

我使用的 Redis Client 是 ktor-client-redis,它是 JetBrains 開發的實驗性專案,只有實作基本的 PubSub 功能,所以我必須修改原始碼,在 PubSub 的基礎上實作 KeyspaceNotification 功能。更多 ktor-client-redis 的資訊,請看我先前寫的文章 => [Day 25] 實作 Redis Plugin 整合 Redis Coroutine Client

首先是修改 ktor-client-redis 的 PubSub.kt 原始碼,我在 RedisPubSub 裡面增加 Packet 子類別 KeyspaceNotification,同時也在 mapToPacket 方法裡,把從 Redis 收到資料轉為 KeyspaceNotification 物件。

點我連結至修改後的 PubSub.kt 程式碼

interface RedisPubSub {

    interface Packet

    data class Message(val channel: String, val message: String, val isPattern: Boolean = false) : Packet

    data class Subscription(val channel: String, val subscriptions: Long, val subscribe: Boolean, val isPattern: Boolean = false) : Packet

    data class KeyspaceNotification(val channel: String, val isKeyEvent: Boolean, val event: String, val key: String) :
        IdentifiableObject<String>(), Packet {

        override val id: String
            get() = "$event-$key"
    }
}

private fun CoroutineScope.mapToPacket(rawChannel: ReceiveChannel<Any>) = produce(capacity = Channel.UNLIMITED) {
    for (data in rawChannel) {
        logger.debug("data = $data")
        val list = data as List<Any>
        val kind = String(list[0] as ByteArray)
        val channel = String(list[1] as ByteArray)

        val isPattern = kind.startsWith("p")
        val isMessage = kind == "message"
        val isSubscription = kind.startsWith("psub") || kind.startsWith("sub")
        val isPMessage = kind == "pmessage"
        val isKeyspaceNotification = isPMessage && list.size == 4

        val packet = when {
            isMessage -> RedisPubSub.Message(channel, String(list[2] as ByteArray), isPattern)
            isSubscription -> RedisPubSub.Subscription(channel, list[2] as Long, isSubscription, isPattern)
            isKeyspaceNotification -> {
                val info = String(list[2] as ByteArray)
                val pMessage = String(list[3] as ByteArray)
                val isKeyEvent = info.contains("keyevent")
                val event = if (isKeyEvent) info.substringAfterLast(":") else pMessage
                val key = if (isKeyEvent) pMessage else info.substringAfterLast(":")
                RedisPubSub.KeyspaceNotification(channel, isKeyEvent, event, key)
            }
            else -> error("Undefined Redis PubSub raw data: $list")
        }
        logger.debug("packet = $packet")
        send(packet)
    }
}

因為 PubSub 收到的資料有多種格式,所以在此使用 coroutine channel pipeline 的方式實作,過濾掉其它格式的資料,只保留 KeyspaceNotification 物件,這樣就完成底層接收 Redis 事件通知資料的部分了。

private fun CoroutineScope.filterKeyspaceNotification(channel: ReceiveChannel<RedisPubSub.Packet>) = produce {
    for (packet in channel) {
        if (packet is RedisPubSub.KeyspaceNotification)
            send(packet)
    }
}

fun CoroutineScope.redisKeyspaceNotificationChannel(redisPubSub: RedisPubSub): ReceiveChannel<RedisPubSub.KeyspaceNotification> =
    filterKeyspaceNotification(mapToPacket(((redisPubSub as RedisPubSubImpl).rawChannel)))

實作 RedisKeyspaceNotificationListener 處理 Keyspace Notification

雖然我目前只需要訂閱 session key expired 事件而已,但考量到 Redis KeyspaceNotification 可以應用到任何 key 的事件通知,所以我決定先在底層實作泛用的 RedisKeyspaceNotificationListenerRedisKeyspaceNotificationListener 的內部實作上是利用我之前做好的泛用 CoroutineActor,我們可以設定處理事件的 coroutine 數量及 dispatcher,與底層收取事件通知的 PubSub 的 dispatcher 分開,藉此調校效能。CoroutineActor 內部一樣使用 coroutine channel pipeline 的方式實作,先從上游接收底層 PubSub 收到的資料 => val upstreamChannel = redisKeyspaceNotificationChannel(redisPubSub),然後再丟到 CoroutineActor 內部的 channel,最後再執行先前註冊到 RedisKeyspaceNotificationListener 的 lambda。

※ 詳細 CoroutineActor 的實作,請看我先前寫的文章 => [Day 21] 使用 Coroutine SendChannel 處理非同步工作

點我連結到完整的 RedisKeyspaceNotificationListener 程式碼

class RedisKeyspaceNotificationListener(
    config: KeyspaceNotificationConfig,
    redisPubSub: RedisPubSub,
    logWriter: LogWriter
) {

    private val logger = KotlinLogging.logger {}

    private val actorName = "RedisKeyspaceNotificationListener"

    private val keyspaceNotificationBlocks: MutableList<suspend (RedisPubSub.KeyspaceNotification) -> Unit> = mutableListOf()

    private val actor: CoroutineActor<RedisPubSub.KeyspaceNotification> = CoroutineActor(
        actorName, Channel.UNLIMITED,
        config.processor, Dispatchers.IO,
        this::processKeyspaceNotification, { channel ->
            launch {
                val upstreamChannel = redisKeyspaceNotificationChannel(redisPubSub)
                for (message in upstreamChannel) {
                    try {
                        channel.send(message) // non-blocking if Channel.UNLIMITED
                    } catch (e: Throwable) { // ignore CancellationException because we don't call channel.cancel()
                        var errorMsg = ""
                        if (e is ClosedSendChannelException) {
                            errorMsg = "$actorName is closed"
                            logger.warn("$errorMsg => $message")
                        } else {
                            errorMsg = "$actorName unexpected error"
                            logger.error("$errorMsg => $message", e)
                        }
                        logWriter.write(
                            ErrorLog.internal(
                                InternalServerException(InfraResponseCode.REDIS_ERROR, errorMsg, e, mapOf("message" to message)),
                                actorName, message.id
                            )
                        )
                        // TODO: persistence unDeliveredMessage and retry later
                    }
                }
            }
        }
    )

    private suspend fun processKeyspaceNotification(data: RedisPubSub.KeyspaceNotification) {
        keyspaceNotificationBlocks.forEach { it(data) }
    }

    fun subscribeKeyspaceNotification(block: suspend (RedisPubSub.KeyspaceNotification) -> Unit) {
        keyspaceNotificationBlocks.add(block)
    }

    fun shutdown() {
        actor.close()
    }
}

Redis Plugin 初始化 RedisKeyspaceNotificationListener

以上功能都實作完了之後,最後就是整合至 Ktor Plugin 了。我先在 Redis Plugin 裡面,根據外部設定檔 application.conf 讀取 Redis 連線設定及 RedisKeyspaceNotificationListener 設定。然後建立 PubSub 專用的 RedisClient 物件,與原有的 RedisClient 物件分開,因為這裡我只想設定一個單執行緒處理事件通知就好。接下來再執行 psubscribe 指令向 Redis 訂閱事件通知,最後再初始化 RedisKeyspaceNotificationListener 就完成了。

點我連結到完整的 RedisPlugin 程式碼

redis {
    host = ${REDIS_HOST}
    port = ${REDIS_PORT}
    password = ${?REDIS_PASSWORD}
    rootKeyPrefix = "fanpoll-"${app.server.env}
    client {
        coroutines = 3
        dispatcher {
            fixedPoolSize = 1
        }
    }
    subscribe {
        patterns = ["__keyevent*__:expired"]
        channels = []
        keyspaceNotification {
            processor {
                coroutines = 3
                dispatcher {
                    fixedPoolSize = 1
                }
            }
        }
    }
}
override fun install(pipeline: Application, configure: Configuration.() -> Unit): RedisFeature {
    val configuration = Configuration().apply(configure)
    val feature = RedisFeature(configuration)

    val appConfig = pipeline.get<MyApplicationConfig>()
    val logWriter = pipeline.get<LogWriter>()
    config = appConfig.infra.redis ?: configuration.build()

    initClient(config)

    if (config.subscribe != null) {
        initSubscriber(config, logWriter)
    }

    pipeline.koin {
        modules(
            module(createdAtStart = true) {
                single { client }
                if (keyspaceNotificationListener != null)
                    single { keyspaceNotificationListener }
                KoinApplicationShutdownManager.register { shutdown() }
            }
        )
    }

    return feature
}
        
private fun initSubscriber(config: RedisConfig, logWriter: LogWriter) {
    logger.info("========== init Redis PubSub subscriber... ==========")
    try {
        subscribeDispatcher = CoroutineUtils.createDispatcher(subscribeDispatcherName, ThreadPoolConfig(fixedPoolSize = 1))
        subscribeClient = RedisClient(
            address = InetSocketAddress(config.host, config.port),
            password = config.password,
            maxConnections = 1,
            dispatcher = subscribeDispatcher, rootKeyPrefix = config.rootKeyPrefix
        )

        with(config.subscribe!!) {
            val redisPubSub = runBlocking {
                val subscriber = if (!patterns.isNullOrEmpty())
                    subscribeClient.psubscribe(*patterns.toTypedArray())
                else null

                if (!channels.isNullOrEmpty()) {
                    subscriber?.subscribe(*channels.toTypedArray()) ?: subscribeClient.subscribe(*channels.toTypedArray())
                } else subscriber
            }!!

            if (keyspaceNotification != null) {
                keyspaceNotificationListener = RedisKeyspaceNotificationListener(keyspaceNotification, redisPubSub, logWriter)
            }
        }
    } catch (e: Throwable) {
        throw InternalServerException(InfraResponseCode.REDIS_ERROR, "fail to init Redis PubSub subscriber", e)
    }
    logger.info("========== init Redis PubSub subscriber completed ==========")
}

RedisSessionStorage 透過 RedisKeyspaceNotificationListener 訂閱 Session Key Expired 事件

昨天的文章 [Day 26] 實作 Ktor Session Authentication with Redis 有提到 SessionAuth Plugin 會建立 RedisSessionStorage 物件用來操作 Session 資料。初始化RedisSessionStorage 的時候會呼叫 subscribeSessionKeyExpired() 方法,把處理 session key expired 事件的 lambda 註冊到 RedisKeyspaceNotificationListener 裡面。

一旦收到 expired 事件通知,會從 key 也就是 sessionId 字串取出 userId,然後再執行 redis HDEL 指令刪除 userId 清單中的 sessionId。這裡為了簡化實作把 userId 夾帶在 sessionId 之中,如果不想這麼做,那就要自己產生另一個隨機 id,並儲存對應表於資料庫。

點我連結到完整的 RedisSessionStorage 程式碼

class RedisSessionStorage(
    private val sessionConfig: SessionConfig,
    private val redisClient: RedisClient,
    private val redisKeyspaceNotificationListener: RedisKeyspaceNotificationListener? = null,
    private val logWriter: LogWriter
) : SessionService {

    // Redis Key => sessionId -> session data
    private val sessionKeyPrefix: String = redisClient.rootKeyPrefix + ":session:"
    
    // Redis Key => userId -> sessionId List
    private val sessionIdsKeyPrefix: String = redisClient.rootKeyPrefix + ":sessionIds:"
    
    init {
        if (redisKeyspaceNotificationListener != null) {
            subscribeSessionKeyExpired()
        }
    }
    
    override suspend fun deleteSession(session: UserSession) {
        logger.debug("logout session: ${session.id}")

        val sessionKey = buildSessionKey(session)
        val sessionIdsKey = buildSessionIdKey(session.id.userId)

        redisClient.del(sessionKey)
        redisClient.hdel(sessionIdsKey, sessionKey)
    }

    private suspend fun setSession(session: UserSession, startTime: Instant, expireDuration: Duration?) {
        session.value.expireTime = expireDuration?.let { startTime.plus(it) }

        val sessionKey = buildSessionKey(session)
        val sessionIdsKey = buildSessionIdKey(session.id.userId)

        redisClient.set(
            sessionKey,
            json.encodeToString(UserSession.Value.serializer(), session.value),
            expireDuration?.toMillis()
        )
        redisClient.hset(
            sessionIdsKey, sessionKey,
            session.value.expireTime?.let { DateTimeUtils.UTC_DATE_TIME_FORMATTER.format(it) } ?: ""
        )
    }
    
    private fun subscribeSessionKeyExpired() {
        redisKeyspaceNotificationListener!!.subscribeKeyspaceNotification { notification ->
            if (notification.isKeyEvent && notification.event == "expired" && notification.key.startsWith(sessionKeyPrefix)) {
                logger.debug { "session key expired: ${notification.key}" }
                try {
                    val segments = notification.key.split(":")
                    val userId = UUID.fromString(segments[5])
                    val sessionKey = notification.key
                    redisClient.hdel(buildSessionIdKey(userId), sessionKey)
                } catch (e: Throwable) {
                    val errorMsg = "subscribeSessionKeyExpired error"
                    logger.error("$errorMsg => $notification", e)
                    logWriter.write(
                        ErrorLog.internal(
                            InternalServerException(
                                InfraResponseCode.REDIS_KEY_NOTIFICATION_ERROR, errorMsg, e,
                                mapOf("notification" to notification)
                            ),
                            "SessionService", "subscribeSessionKeyExpired"
                        )
                    )
                }
            }
        }
    }
}   

成果展示

下圖是 Server 啟動時的 log,最後一行是系統執行指令 psubscribe '__keyevent*__:expired' 向 Redis 訂閱 Keyspace notifications 的回應資料 "psubscribe","__keyevent*__:expired",1

下圖是當 session 過期時,收到 Redis 通知的 log

收到的原始資料是

"pmessage","__keyevent*__:expired","__keyevent@0__:expired","fanpoll-dev:session:ops:user:user:5711da3a-6096-46fc-bce9-2bf70af27d85:631ad1a0926235174f3b7601476cfff3"
``

上一篇
[Day 26] 實作 Ktor Session Authentication with Redis
下一篇
[Day 28] 實作 Multi-Channel Notifications
系列文
基於 Kotlin Ktor 建構支援模組化開發的 Web 框架30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言