iT邦幫忙

2021 iThome 鐵人賽

DAY 20
0

系統的 Log 除了基本的 Request Log 及 Error Log 之外,特定的功能也會有記錄 Log 的需求,例如使用者登入時,需要記錄來源 IP、裝置及登入失敗的原因;寄送訊息通知時,記錄訊息類型、寄送方式(channel)及第三方服務的回應來判斷是否成功寄送。即使這些 Log 的資料欄位及寫入時機不相同,但我們仍需要有統一的方式處理。

實作目標

  • 不使用字串型態寫入 Log,而是定義自己的 LogMessage 資料類別,再根據寫入的目的地,轉換為特定的資料格式,例如 JSON
  • 寫入 Log 的目的地不限於檔案,可以自行實作 LogWriter 寫到任意目的地,例如資料庫,或是轉為 JSON 寫入到 AWS Kinesis Data Firehose
  • 根據 LogMessageLogType 決定對應的 LogWriter 寫入 log 到特定的目的地
  • 當實作新功能有記錄 Log 的需求時,可以沿用既有的 LogWriter,只要實作 LogMessage 資料類別就好
  • 支援非同步寫入 Log

那為什麼不使用 Logging Framework 就好呢? 例如 Logback

  • 我們在使用 Logging Library 寫 log 的時候都是傳入字串,寫到檔案之後再用 Logstash 剖析格式轉換為結構化資料,然後再傳送到 ElasticSearch。但是這部分我想要自己實作 LogMessage 資料類別,自定義 log 資料欄位,再自行轉換各種資料格式,比較有彈性滿足我的進階需求
  • 我手邊有 AWS EKK 的測試環境,但是 Logback 沒有 Appender 可以使用,所以只能自己實作 AwsKinesisLogWriter 使用 Kinesis Firehose SDK 傳送 JSON 到 ElasticaSearch (我沒有在 EC2 安裝 Kinesis Agent 轉一手處理 log 資料)。AWS EKK 服務介紹可以看這篇文章 From ELK Stack to EKK: Aggregating and Analyzing Apache Logs with Amazon Elasticsearch Service, Amazon Kinesis, and Kibana

定義各種 Log 的父類別 LogMessage

每種功能的 Log 資料類別都要繼承 LogMessage,至少包含以下3個欄位

  • occurAt => 事件發生時間
  • logType => 用來區分不同功能的 Log
  • logLevel => 可以做一些判斷,決定要如何處理 log

各種 Log 資料類別的程式碼在此

  • RequestLog (logType = request) => 系統收到 Http Request 的 log
  • ErrorLog (logType = error) => 系統錯誤 log
  • LoginLog (logType = login) => 使用者登入/登出 log
  • NotificationMessageLog (logType = notification_message) => 系統寄送訊息通知 log
abstract class LogMessage : IdentifiableObject<UUID>() {

    abstract val occurAt: Instant

    abstract val logType: String

    abstract val logLevel: LogLevel

    fun toJson(): JsonElement = json.encodeToJsonElement(this)
}

@Serializable
data class LoginLog(
    @Serializable(with = UUIDSerializer::class) val userId: UUID,
    val resultCode: LoginResultCode,
    @Serializable(with = InstantSerializer::class) override val occurAt: Instant,
    val project: String,
    val source: PrincipalSource,
    val tenantId: TenantId? = null,
    val clientId: String? = null,
    val clientVersion: String?,
    val ip: String? = null,
    val sid: String? = null
) : LogMessage() {

    @Serializable(with = UUIDSerializer::class)
    override val id: UUID = UUID.randomUUID()

    override val logType: String = LOG_TYPE

    override val logLevel: LogLevel = Log_Level

    companion object {
        const val LOG_TYPE = "login"
        private val Log_Level = LogLevel.INFO
    }
}

實作各種 LogWriter 寫入 LogMessage 到不同目的地

目前 LogMessage 可以使用以下 LogWriter 寫到不同目的地

interface LogWriter {

    fun write(message: LogMessage)

    fun shutdown() {}
}

實作 LogMessageDispatcher 根據 LogMessage 的 LogType 決定對應的 LogWriter

LogMessageDispatcher 也實作 LogWriter 介面,當呼叫 write(message: LogMessage) 方法時,會根據 LogMessage 的 LogType 找出事先已註冊的 LogWriter,再呼叫 write(message: LogMessage) 方法寫入 log

class LogMessageDispatcher(private val defaultLogWriter: LogWriter? = null) : LogWriter {

    private val logWriters: MutableMap<String, LogWriter> = mutableMapOf()

    fun register(logType: String, logWriter: LogWriter) {
        require(!logWriters.containsKey(logType))

        logWriters[logType] = logWriter
    }

    override fun write(message: LogMessage) {
        val logWriter = logWriters[message.logType] ?: defaultLogWriter ?: throw InternalServerException(
            InfraResponseCode.SERVER_CONFIG_ERROR, "logType ${message.logType} logWriter is not registered"
        )
        logWriter.write(message)
    }

    override fun shutdown() {
        logWriters.values.forEach { it.shutdown() }
    }
}

實作 LogMessageCoroutineActor 非同步寫入 Log

這裡使用 decorator pattern 的手法,讓 LogMessageDispatcherLogMessageCoroutineActor 都實作 LogWriter 介面,所以呼叫 write(message: LogMessage) 方法時,會 delegate 到內部的 LogWriter,執行順序會是 LogMessageCoroutineActor → LogMessageDispatcher → XXXLogWriterLogMessageCoroutineActor 內部是以 Coroutine SendChannel 達成非同步執行,更多 CoroutineActor 的實作細節,請參考 [Day 21] 使用 Coroutine SendChannel 處理非同步工作

class LogMessageCoroutineActor(
    coroutineActorConfig: CoroutineActorConfig,
    private val logWriter: LogWriter
) : LogWriter {

    private val logger = KotlinLogging.logger {}

    private val actorName = "LogWriterActor"

    private val actor: CoroutineActor<LogMessage> = CoroutineActor(
        actorName, Channel.UNLIMITED,
        coroutineActorConfig, Dispatchers.IO,
        this::execute
    )

    override fun write(message: LogMessage) {
        actor.sendToUnlimitedChannel(message, InfraResponseCode.LOG_ERROR) // non-blocking by Channel.UNLIMITED
    }

    private fun execute(message: LogMessage) {
        try {
            logWriter.write(message)
        } catch (e: Throwable) {
            logger.error("$actorName execute error => $message", e)
        }
    }

    override fun shutdown() {
        actor.close()
        logWriter.shutdown()
    }
}

實作 Logging Plugin

Logging Configuration

Logging Plugin 會根據 application.conf 設定檔,初始化建立對應的 LogWriter,然後註冊至 Koin DI。因為 Request Log 及 Error Log 是系統最基本的 LogType,所以我會在 Logging Plugin 裡面進行設定。以下面的設定值為例,Request Log 會使用 AwsKinesisLogWriter,Error Log 則是使用 ErrorLogDBWriter

logging {
    request {
        enabled = true
        destination = "AwsKinesis" # File(default), Database, AwsKinesis
        includeHeaders = false
        includeQueryString = false
        includeResponseBody = false
        includeGetMethod = false
        excludePaths = ["/ops/sysstat/healthCheck"]
        excludeRequestBodyPaths = ["/login", "/myPassword"]
    }
    error {
        enabled = true
        destination = "Database" # File(default), Database, AwsKinesis
        // TODO integrate notification feature to notify OpsTeam by email, sms...
    }
    writer {
        awsKinesis {
            streamName = "logging"
            nettyClient {
                #maxConcurrency = 50 # => aws default value = 50
                #maxPendingConnectionAcquires = 10000 => aws default value = 10000
                #maxIdleConnectionTimeout = 60s => aws default value = 60s
            }
            threadPool {
                fixedPoolSize = 3
            }
        }
    }
    asyncExecutor {
        coroutineActor {
            coroutines = 1
            dispatcher {
                fixedPoolSize = 1
            }
        }
    }
}

初始化 LogMessageDispatcher、LogMessageCoroutineActor 註冊至 Koin DI

下面是 Logging Plugin 的內部程式碼,我會先建立 LogMessageDispatcher 物件,後續每種 LogType 的 LogWriter 要註冊到 LogMessageDispatcher 裡面,這樣子寫入 log 時就可以根據 LogMessage 的 LogType,取出對應的 LogWriter 執行寫入動作。如果 LogType 沒有設定對應的 LogWriter,就會使用預設的 FileLogWriter 寫到檔案。

pipeline.koin {
    modules(
        module(createdAtStart = true) {
            single { loggingConfig }

            val fileLogWriter = FileLogWriter()
            single { fileLogWriter }

            val logMessageDispatcher = LogMessageDispatcher(FileLogWriter())
            single { logMessageDispatcher }

            val awsKinesisLogWriter = loggingConfig.writer?.awsKinesis?.let {
                AwsKinesisLogWriter(it, serverConfig)
            }
            if (awsKinesisLogWriter != null)
                single { awsKinesisLogWriter }

            if (loggingConfig.request.enabled) {
                val requestLogWriter = when (loggingConfig.request.destination) {
                    LogDestination.File -> fileLogWriter
                    LogDestination.Database -> RequestLogDBWriter()
                    LogDestination.AwsKinesis -> awsKinesisLogWriter ?: throw InternalServerException(
                        InfraResponseCode.SERVER_CONFIG_ERROR, "AwsKinesisLogWriter is not configured"
                    )
                }
                logMessageDispatcher.register(RequestLog.LOG_TYPE, requestLogWriter)
            }
            if (loggingConfig.error.enabled) {
                val errorLogWriter = when (loggingConfig.error.destination) {
                    LogDestination.File -> fileLogWriter
                    LogDestination.Database -> ErrorLogDBWriter()
                    LogDestination.AwsKinesis -> awsKinesisLogWriter ?: throw InternalServerException(
                        InfraResponseCode.SERVER_CONFIG_ERROR, "kinesisLogWriter is not configured"
                    )
                }
                logMessageDispatcher.register(ErrorLog.LOG_TYPE, errorLogWriter)
            }

            val logWriter = loggingConfig.asyncExecutor?.let {
                LogMessageCoroutineActor(it.coroutineActor, logMessageDispatcher)
            } ?: logMessageDispatcher

            single { logWriter }

            KoinApplicationShutdownManager.register { logWriter.shutdown() }
        }
    )
}

SessionAuth Plugin 註冊 LoginLog 的 LogWriter 至 LogMessageDispatcher 範例

先安裝 Logging Plugin 初始化 LogMessageDispatcher 之後,SessionAuth Plugin 就可以透過 Koin DI 取出 LogMessageDispatcher 物件,再註冊 LoginLog 使用 AwsKinesisLogWriter。當使用者登入、登出時,就可以使用 AwsKinesisLogWriter 寫入 LoginLog。

sessionAuth {
    storageType = "Redis" # Redis
    redisKeyExpiredNotification = true
    session {
        expireDuration = 1d
        extendDuration = 15m
    }
    logging {
        enabled = true
        destination = "AwsKinesis" # File(default), Database, AwsKinesis
    }
}
val loginLogWriter = when (authConfig.logging.destination) {
    LogDestination.File -> pipeline.get<FileLogWriter>()
    LogDestination.Database -> LoginLogDBWriter()
    LogDestination.AwsKinesis -> pipeline.get<AwsKinesisLogWriter>()
}
val logMessageDispatcher = pipeline.get<LogMessageDispatcher>()
logMessageDispatcher.register(LoginLog.LOG_TYPE, loginLogWriter)

Global Exception Handler 寫入 ErrorLog 範例

系統錯誤可分為2種,一種是處理外部請求的錯誤,我們可以在 Global Exception Handler ,先透過 Koin DI 取得 LogWriter,再把 Exception 轉為 ErrorLog 物件寫入。

install(StatusPages) {
    val loggingConfig = get<LoggingConfig>()
    val logWriter = get<LogWriter>()
    val responseCreator = get<I18nResponseCreator>()

    exception<Throwable> { cause ->
        val e = ExceptionUtils.wrapException(cause)
        ExceptionUtils.writeLogToFile(e, call)
        
        if (loggingConfig.error.enabled) {
            logWriter.write(ErrorLog.request(e, call))
        }
        
        val errorResponse = responseCreator.createErrorResponse(e, call)
        call.respond(errorResponse)
    }
}

另一種情況是系統內部背景排程、執行非同步工作的錯誤。例如 DBAsyncTaskCoroutineActor 非同步寫入資料庫時,我們要 catch Exception 轉為 ErrorLog 物件再寫入,差別在於這裡沒有外部請求的 ApplicationCall 物件

private fun execute(task: DBAsyncTask) {
    try {
        transaction {
            task.block(this)
        }
    } catch (e: Throwable) {
        val errorMsg = "$actorName execute error"
        logger.error("errorMsg => $task", e)
        logWriter.write(
            ErrorLog.internal(
                InternalServerException(
                    InfraResponseCode.DB_ASYNC_TASK_ERROR, errorMsg, e,
                    mapOf("taskId" to task.id, "taskType" to task.type)
                ),
                actorName, task.id.toString()
            )
        )
    }
}

結語

今天我說明了如何建立系統的 Logging 機制處理不同類型的 log,另一種類似的需求是系統也需要對各種非同步工作採用一致的方式處理。例如今天提到的 LogMessageCoroutineActor 非同步寫入 log,還有 DBAsyncTaskCoroutineActor 非同步寫入資料庫。明天我會說明如何把 Coroutine SendChannel 包裝成不同功能的 CoroutineActor 執行非同步工作。


上一篇
[Day 19] 實作 Ktor Request Logging
下一篇
[Day 21] 使用 Coroutine SendChannel 處理非同步工作
系列文
基於 Kotlin Ktor 建構支援模組化開發的 Web 框架30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言