session authentication 的機制是
以上操作都是使用 sessionId 操作某個 user 的 session 資料,如果系統允許使用者重複登入的話,我們怎麼反過來知道某個使用者擁有那些 sessionId?
直覺上的解決方式就是在登入時,除了記錄 sessionId 與 session 資料的關聯,也一併把 sessionId 加入至 userId 與 sessionId 的一對多 sessionId 清單。反之,登出時也一併從 user 的 sessionId 清單中刪除此 sessionId。但是這麼做還不夠,因為當 sessionId 逾期而被 Redis 刪除時,並不會連動更新 sessionId 清單,這必須由我們系統自行把過期的 sessionId 從 sessionId 清單刪除。不過問題來了,我們怎麼知道 sessionId 逾期被 Redis 刪除,系統總不能一直使用 sessionId 去詢問 Redis 是否還存在,然後再去更新 sessionId 清單,這種作法在使用者數量龎大時根本不可行。
還好 Redis 有提供 Redis Keyspace Notifications 機制,我們系統可以先訂閱 keyspace 的事件通知,當 sessionId 過期時 Redis 會通知我們。不過使用此功能前要注意以下事項,詳細說明請參考官方文件
redis-cli config set notify-keyspace-events KEA
Redis Keyspace Notifications 的流程上分成兩階段,下面以 Redis 指令及傳輸資料來說明
執行指令 psubscribe '__keyevent*__:expired'
然後收到成功的回應資料 "psubscribe","__keyevent*__:expired",1
"pmessage","__keyevent*__:expired","__keyevent@0__:expired","我是sessionId"
接下來我們系統再從過期的 sessionId 找出 userId,然後再根據 userId 去更新使用者的 sessionId 清單即可。本專案的 sessionId 清單資料也儲存於 Redis,但使用的是 Hash 資料結構,至於原始的 session 資料是使用 String 資料結構。
我使用的 Redis Client 是 ktor-client-redis,它是 JetBrains 開發的實驗性專案,只有實作基本的 PubSub 功能,所以我必須修改原始碼,在 PubSub 的基礎上實作 KeyspaceNotification 功能。更多 ktor-client-redis 的資訊,請看我先前寫的文章 => [Day 25] 實作 Redis Plugin 整合 Redis Coroutine Client
首先是修改 ktor-client-redis 的 PubSub.kt 原始碼,我在 RedisPubSub 裡面增加 Packet
子類別 KeyspaceNotification
,同時也在 mapToPacket
方法裡,把從 Redis 收到資料轉為 KeyspaceNotification
物件。
interface RedisPubSub {
interface Packet
data class Message(val channel: String, val message: String, val isPattern: Boolean = false) : Packet
data class Subscription(val channel: String, val subscriptions: Long, val subscribe: Boolean, val isPattern: Boolean = false) : Packet
data class KeyspaceNotification(val channel: String, val isKeyEvent: Boolean, val event: String, val key: String) :
IdentifiableObject<String>(), Packet {
override val id: String
get() = "$event-$key"
}
}
private fun CoroutineScope.mapToPacket(rawChannel: ReceiveChannel<Any>) = produce(capacity = Channel.UNLIMITED) {
for (data in rawChannel) {
logger.debug("data = $data")
val list = data as List<Any>
val kind = String(list[0] as ByteArray)
val channel = String(list[1] as ByteArray)
val isPattern = kind.startsWith("p")
val isMessage = kind == "message"
val isSubscription = kind.startsWith("psub") || kind.startsWith("sub")
val isPMessage = kind == "pmessage"
val isKeyspaceNotification = isPMessage && list.size == 4
val packet = when {
isMessage -> RedisPubSub.Message(channel, String(list[2] as ByteArray), isPattern)
isSubscription -> RedisPubSub.Subscription(channel, list[2] as Long, isSubscription, isPattern)
isKeyspaceNotification -> {
val info = String(list[2] as ByteArray)
val pMessage = String(list[3] as ByteArray)
val isKeyEvent = info.contains("keyevent")
val event = if (isKeyEvent) info.substringAfterLast(":") else pMessage
val key = if (isKeyEvent) pMessage else info.substringAfterLast(":")
RedisPubSub.KeyspaceNotification(channel, isKeyEvent, event, key)
}
else -> error("Undefined Redis PubSub raw data: $list")
}
logger.debug("packet = $packet")
send(packet)
}
}
因為 PubSub 收到的資料有多種格式,所以在此使用 coroutine channel pipeline 的方式實作,過濾掉其它格式的資料,只保留 KeyspaceNotification
物件,這樣就完成底層接收 Redis 事件通知資料的部分了。
private fun CoroutineScope.filterKeyspaceNotification(channel: ReceiveChannel<RedisPubSub.Packet>) = produce {
for (packet in channel) {
if (packet is RedisPubSub.KeyspaceNotification)
send(packet)
}
}
fun CoroutineScope.redisKeyspaceNotificationChannel(redisPubSub: RedisPubSub): ReceiveChannel<RedisPubSub.KeyspaceNotification> =
filterKeyspaceNotification(mapToPacket(((redisPubSub as RedisPubSubImpl).rawChannel)))
雖然我目前只需要訂閱 session key expired 事件而已,但考量到 Redis KeyspaceNotification 可以應用到任何 key 的事件通知,所以我決定先在底層實作泛用的 RedisKeyspaceNotificationListener
。RedisKeyspaceNotificationListener
的內部實作上是利用我之前做好的泛用 CoroutineActor
,我們可以設定處理事件的 coroutine 數量及 dispatcher,與底層收取事件通知的 PubSub 的 dispatcher 分開,藉此調校效能。CoroutineActor
內部一樣使用 coroutine channel pipeline 的方式實作,先從上游接收底層 PubSub 收到的資料 => val upstreamChannel = redisKeyspaceNotificationChannel(redisPubSub)
,然後再丟到 CoroutineActor
內部的 channel,最後再執行先前註冊到 RedisKeyspaceNotificationListener
的 lambda。
※ 詳細 CoroutineActor
的實作,請看我先前寫的文章 => [Day 21] 使用 Coroutine SendChannel 處理非同步工作
點我連結到完整的 RedisKeyspaceNotificationListener 程式碼
class RedisKeyspaceNotificationListener(
config: KeyspaceNotificationConfig,
redisPubSub: RedisPubSub,
logWriter: LogWriter
) {
private val logger = KotlinLogging.logger {}
private val actorName = "RedisKeyspaceNotificationListener"
private val keyspaceNotificationBlocks: MutableList<suspend (RedisPubSub.KeyspaceNotification) -> Unit> = mutableListOf()
private val actor: CoroutineActor<RedisPubSub.KeyspaceNotification> = CoroutineActor(
actorName, Channel.UNLIMITED,
config.processor, Dispatchers.IO,
this::processKeyspaceNotification, { channel ->
launch {
val upstreamChannel = redisKeyspaceNotificationChannel(redisPubSub)
for (message in upstreamChannel) {
try {
channel.send(message) // non-blocking if Channel.UNLIMITED
} catch (e: Throwable) { // ignore CancellationException because we don't call channel.cancel()
var errorMsg = ""
if (e is ClosedSendChannelException) {
errorMsg = "$actorName is closed"
logger.warn("$errorMsg => $message")
} else {
errorMsg = "$actorName unexpected error"
logger.error("$errorMsg => $message", e)
}
logWriter.write(
ErrorLog.internal(
InternalServerException(InfraResponseCode.REDIS_ERROR, errorMsg, e, mapOf("message" to message)),
actorName, message.id
)
)
// TODO: persistence unDeliveredMessage and retry later
}
}
}
}
)
private suspend fun processKeyspaceNotification(data: RedisPubSub.KeyspaceNotification) {
keyspaceNotificationBlocks.forEach { it(data) }
}
fun subscribeKeyspaceNotification(block: suspend (RedisPubSub.KeyspaceNotification) -> Unit) {
keyspaceNotificationBlocks.add(block)
}
fun shutdown() {
actor.close()
}
}
以上功能都實作完了之後,最後就是整合至 Ktor Plugin 了。我先在 Redis Plugin 裡面,根據外部設定檔 application.conf
讀取 Redis 連線設定及 RedisKeyspaceNotificationListener
設定。然後建立 PubSub 專用的 RedisClient 物件,與原有的 RedisClient 物件分開,因為這裡我只想設定一個單執行緒處理事件通知就好。接下來再執行 psubscribe
指令向 Redis 訂閱事件通知,最後再初始化 RedisKeyspaceNotificationListener
就完成了。
redis {
host = ${REDIS_HOST}
port = ${REDIS_PORT}
password = ${?REDIS_PASSWORD}
rootKeyPrefix = "fanpoll-"${app.server.env}
client {
coroutines = 3
dispatcher {
fixedPoolSize = 1
}
}
subscribe {
patterns = ["__keyevent*__:expired"]
channels = []
keyspaceNotification {
processor {
coroutines = 3
dispatcher {
fixedPoolSize = 1
}
}
}
}
}
override fun install(pipeline: Application, configure: Configuration.() -> Unit): RedisFeature {
val configuration = Configuration().apply(configure)
val feature = RedisFeature(configuration)
val appConfig = pipeline.get<MyApplicationConfig>()
val logWriter = pipeline.get<LogWriter>()
config = appConfig.infra.redis ?: configuration.build()
initClient(config)
if (config.subscribe != null) {
initSubscriber(config, logWriter)
}
pipeline.koin {
modules(
module(createdAtStart = true) {
single { client }
if (keyspaceNotificationListener != null)
single { keyspaceNotificationListener }
KoinApplicationShutdownManager.register { shutdown() }
}
)
}
return feature
}
private fun initSubscriber(config: RedisConfig, logWriter: LogWriter) {
logger.info("========== init Redis PubSub subscriber... ==========")
try {
subscribeDispatcher = CoroutineUtils.createDispatcher(subscribeDispatcherName, ThreadPoolConfig(fixedPoolSize = 1))
subscribeClient = RedisClient(
address = InetSocketAddress(config.host, config.port),
password = config.password,
maxConnections = 1,
dispatcher = subscribeDispatcher, rootKeyPrefix = config.rootKeyPrefix
)
with(config.subscribe!!) {
val redisPubSub = runBlocking {
val subscriber = if (!patterns.isNullOrEmpty())
subscribeClient.psubscribe(*patterns.toTypedArray())
else null
if (!channels.isNullOrEmpty()) {
subscriber?.subscribe(*channels.toTypedArray()) ?: subscribeClient.subscribe(*channels.toTypedArray())
} else subscriber
}!!
if (keyspaceNotification != null) {
keyspaceNotificationListener = RedisKeyspaceNotificationListener(keyspaceNotification, redisPubSub, logWriter)
}
}
} catch (e: Throwable) {
throw InternalServerException(InfraResponseCode.REDIS_ERROR, "fail to init Redis PubSub subscriber", e)
}
logger.info("========== init Redis PubSub subscriber completed ==========")
}
昨天的文章 [Day 26] 實作 Ktor Session Authentication with Redis 有提到 SessionAuth Plugin 會建立 RedisSessionStorage
物件用來操作 Session 資料。初始化RedisSessionStorage
的時候會呼叫 subscribeSessionKeyExpired()
方法,把處理 session key expired 事件的 lambda 註冊到 RedisKeyspaceNotificationListener
裡面。
一旦收到 expired 事件通知,會從 key 也就是 sessionId 字串取出 userId,然後再執行 redis HDEL
指令刪除 userId 清單中的 sessionId。這裡為了簡化實作把 userId 夾帶在 sessionId 之中,如果不想這麼做,那就要自己產生另一個隨機 id,並儲存對應表於資料庫。
點我連結到完整的 RedisSessionStorage 程式碼
class RedisSessionStorage(
private val sessionConfig: SessionConfig,
private val redisClient: RedisClient,
private val redisKeyspaceNotificationListener: RedisKeyspaceNotificationListener? = null,
private val logWriter: LogWriter
) : SessionService {
// Redis Key => sessionId -> session data
private val sessionKeyPrefix: String = redisClient.rootKeyPrefix + ":session:"
// Redis Key => userId -> sessionId List
private val sessionIdsKeyPrefix: String = redisClient.rootKeyPrefix + ":sessionIds:"
init {
if (redisKeyspaceNotificationListener != null) {
subscribeSessionKeyExpired()
}
}
override suspend fun deleteSession(session: UserSession) {
logger.debug("logout session: ${session.id}")
val sessionKey = buildSessionKey(session)
val sessionIdsKey = buildSessionIdKey(session.id.userId)
redisClient.del(sessionKey)
redisClient.hdel(sessionIdsKey, sessionKey)
}
private suspend fun setSession(session: UserSession, startTime: Instant, expireDuration: Duration?) {
session.value.expireTime = expireDuration?.let { startTime.plus(it) }
val sessionKey = buildSessionKey(session)
val sessionIdsKey = buildSessionIdKey(session.id.userId)
redisClient.set(
sessionKey,
json.encodeToString(UserSession.Value.serializer(), session.value),
expireDuration?.toMillis()
)
redisClient.hset(
sessionIdsKey, sessionKey,
session.value.expireTime?.let { DateTimeUtils.UTC_DATE_TIME_FORMATTER.format(it) } ?: ""
)
}
private fun subscribeSessionKeyExpired() {
redisKeyspaceNotificationListener!!.subscribeKeyspaceNotification { notification ->
if (notification.isKeyEvent && notification.event == "expired" && notification.key.startsWith(sessionKeyPrefix)) {
logger.debug { "session key expired: ${notification.key}" }
try {
val segments = notification.key.split(":")
val userId = UUID.fromString(segments[5])
val sessionKey = notification.key
redisClient.hdel(buildSessionIdKey(userId), sessionKey)
} catch (e: Throwable) {
val errorMsg = "subscribeSessionKeyExpired error"
logger.error("$errorMsg => $notification", e)
logWriter.write(
ErrorLog.internal(
InternalServerException(
InfraResponseCode.REDIS_KEY_NOTIFICATION_ERROR, errorMsg, e,
mapOf("notification" to notification)
),
"SessionService", "subscribeSessionKeyExpired"
)
)
}
}
}
}
}
下圖是 Server 啟動時的 log,最後一行是系統執行指令 psubscribe '__keyevent*__:expired'
向 Redis 訂閱 Keyspace notifications 的回應資料 "psubscribe","__keyevent*__:expired",1
下圖是當 session 過期時,收到 Redis 通知的 log
收到的原始資料是
"pmessage","__keyevent*__:expired","__keyevent@0__:expired","fanpoll-dev:session:ops:user:user:5711da3a-6096-46fc-bce9-2bf70af27d85:631ad1a0926235174f3b7601476cfff3"
``