iT邦幫忙

2021 iThome 鐵人賽

DAY 21
0
Modern Web

基於 Kotlin Ktor 建構支援模組化開發的 Web 框架系列 第 21

[Day 21] 使用 Coroutine SendChannel 處理非同步工作

系統除了即時接受及回應使用者請求,也需要執行各種非同步工作,例如背景排程及寄送訊息通知…等。在實作上,雖然我可以直接使用 CompletableFuture, Akka Actor, Coroutine Channel 的 API,將工作切換到另一個執行緒處理,但其實背後有許多實作細節需要考慮,例如 ThreadPool 參數調校、錯誤處理…等,我希望各種不同類型的非同步工作,能採用一致的方式處理這些非商業邏輯的細節。

實作目標

  • 為每種特定功能的非同步工作建立 XXXCoroutineActorXXXCoroutineActor 內部實作使用 Coroutine SendChannel (在此也可稱為 Actor),而且不對外曝露 CoroutineDispatcher, CoroutineExceptionHandler, ThreadFactory …等實作細節,所以使用方式僅僅是呼叫 XXXCoroutineActor.send(message) 方法而已
  • ThreadPool 參數可由外部設定檔設定,方便對每種非同步工作進行效能調校
  • 停止 Server 時會等待 channel 內的工作結束,達到 Graceful Shutdown

點擊以下連結至實作程式碼

CoroutineActor Configuration

每一種非同步工作都要在 application.conf 設定檔設定 asyncExecutor,目前 asyncExecutor 的實作只有coroutineActorcoroutines 是想建立的 coroutine 數量,dispatcher 可設定 threadpool 參數,fixedPoolSizeminPoolSize, maxPoolSize, keepAliveTime 二擇一設定。

asyncExecutor {
    coroutineActor {
        coroutines = 1
        dispatcher {
            fixedPoolSize = 1
            minPoolSize = 1
            maxPoolSize = 3
            keepAliveTime = 1000000000
        }
    }
}

下面是對應的 configuration data class,其中 ThreadPoolConfig 實作 validate() 檢查設定值是否合法。

data class CoroutineActorConfig(val coroutines: Int = 1, val dispatcher: ThreadPoolConfig? = null) {

    fun validate() {
        dispatcher?.validate()
    }

    class Builder {

        var coroutines: Int = 1
        private var dispatcher: ThreadPoolConfig? = null

        fun dispatcher(block: ThreadPoolConfig.Builder.() -> Unit) {
            dispatcher = ThreadPoolConfig.Builder().apply(block).build()
        }

        fun build(): CoroutineActorConfig {
            return CoroutineActorConfig(coroutines, dispatcher).apply { validate() }
        }
    }
}

data class ThreadPoolConfig(
    val fixedPoolSize: Int? = 1,
    val minPoolSize: Int? = null,
    val maxPoolSize: Int? = null,
    val keepAliveTime: Long? = null
) : ValidateableConfig {

    fun isFixedThreadPool(): Boolean = fixedPoolSize != null && (minPoolSize == null && maxPoolSize == null && keepAliveTime == null)

    override fun validate() {
        require(
            (minPoolSize != null && maxPoolSize != null && keepAliveTime != null) ||
                    (minPoolSize == null && maxPoolSize == null && keepAliveTime == null)
        ) {
            "minPoolSize, maxPoolSize, keepAliveTime should be configured"
        }
    }

    class Builder {

        var fixedPoolSize: Int? = 1
        var minPoolSize: Int? = null
        var maxPoolSize: Int? = null
        var keepAliveTime: Long? = null

        fun build(): ThreadPoolConfig {
            return ThreadPoolConfig(fixedPoolSize, minPoolSize, maxPoolSize, keepAliveTime).apply { validate() }
        }
    }
}

為每種特定功能的非同步工作建立 CoroutineActor,以 DBAsyncTaskCoroutineActor 為例

Database Plugin 初始化 DBAsyncTaskCoroutineActor

Database Plugin 先從 Ktor 設定檔讀取 CoroutineActorConfig,再建立 DBAsyncTaskCoroutineActor,並註冊至 Koin DI

override fun install(pipeline: Application, configure: Configuration.() -> Unit): DatabaseFeature {
    val logWriter = pipeline.get<LogWriter>()
    val config = appConfig.infra.database?.asyncExecutor ?: configuration.asyncExecutorConfig
    asyncExecutor = DBAsyncTaskCoroutineActor(config.coroutineActor, logWriter)
    pipeline.koin {
        modules(
            module(createdAtStart = true) {
                single { asyncExecutor }
            }
        )
    }
    
    KoinApplicationShutdownManager.register { asyncExecutor?.shutdown() }
}

DBAsyncTaskCoroutineActor 非同步寫入資料至資料庫

當使用者成功登入之後,以非同步方式記錄使用者 App 裝置資訊於資料庫,所以我透過 Koin DI 拿到 DBAsyncTaskCoroutineActor 物件,然後執行 run 函式更新資料庫的 user_device 資料表

dbAsyncTaskCoroutineActor.run("createUserDevice") {
    UserDeviceTable.insert(form.toCreateUserDeviceDTO())
}

實作 DBAsyncTaskCoroutineActor

以下是 DBAsyncTaskCoroutineActor 程式碼,當我們呼叫 run 函式時,內部實作是建立一個 DBAsyncTask 丟到一個無關功能邏輯的 CoroutineActor 裡面,然後從 coroutine channel 另一端取出後再執行 execute 函式,建立 DB Transaction 執行 DBAsyncTaskTransaction.() -> Any? lambda。

另一方面,當停止 Server 時,因為先前已經在 Database Plugin 註冊 shutdown() 函式,所以會呼叫 channel.close(),等所有的 DBAsyncTask 執行完畢後再停止。詳細實作 coroutine channel graceful stop 的作法,可參考我之前寫的文章 [Day 10] Ktor Graceful Shutdown

class DBAsyncTaskCoroutineActor(
    coroutineActorConfig: CoroutineActorConfig,
    private val logWriter: LogWriter
) {

    private val logger = KotlinLogging.logger {}

    private val actorName = "DBAsyncTaskActor"

    private val actor: CoroutineActor<DBAsyncTask> = CoroutineActor(
        actorName, Channel.UNLIMITED,
        coroutineActorConfig, Dispatchers.IO,
        this::execute, null,
        logWriter
    )

    fun run(type: String, block: Transaction.() -> Any?): UUID {
        val task = DBAsyncTask(type, block)
        actor.sendToUnlimitedChannel(task, InfraResponseCode.DB_ASYNC_TASK_ERROR) // non-blocking by Channel.UNLIMITED
        return task.id
    }

    private fun execute(task: DBAsyncTask) {
        try {
            transaction {
                task.block(this)
            }
        } catch (e: Throwable) {
            val errorMsg = "$actorName execute error"
            logger.error("errorMsg => $task", e)
            logWriter.write(
                ErrorLog.internal(
                    InternalServerException(
                        InfraResponseCode.DB_ASYNC_TASK_ERROR, errorMsg, e,
                        mapOf("taskId" to task.id, "taskType" to task.type)
                    ),
                    actorName, task.id.toString()
                )
            )
        }
    }

    internal fun shutdown() {
        actor.close()
    }
}

CoroutineActor 內部無關功能邏輯的實作細節

建立 CoroutineScope 包含 CorotuineDispatcher 及 CoroutineExceptionHandler

  • 根據外部設定檔的 CoroutineActorConfig 設定值建立 CorotuineDispatcher
  • 目前實作的 CoroutineExceptionHandler 只有印出錯誤訊息,更積極的作法是儲存失敗的工作資料,未來進行 Retry 動作

其它參數的意義

  • capacity coroutine channel 的容量大小,如果工作數量超過上限的話,呼叫 send() 方法時必須等待
  • processBlock 是執行工作的 lambda
  • produceBlock 是傳入工作的 lambda,會在 CoroutineActor 建立後立即呼叫
class CoroutineActor<T : IdentifiableObject<*>>(
    val name: String,
    capacity: Int,
    coroutineActorConfig: CoroutineActorConfig,
    defaultCoroutineDispatcher: CoroutineDispatcher,
    processBlock: suspend (T) -> Unit,
    produceBlock: (CoroutineScope.(SendChannel<T>) -> Unit)? = null,
    private val logWriter: LogWriter? = null
) {

    private val logger = KotlinLogging.logger {}

    private val dispatcher: CoroutineDispatcher
    private val channel: SendChannel<T>
    private val coroutineScope: CoroutineScope

    init {
        logger.info("========== init coroutine actor $name ... ==========")
        try {
            dispatcher = if (coroutineActorConfig.dispatcher != null)
                CoroutineUtils.createDispatcher(name, coroutineActorConfig.dispatcher)
            else defaultCoroutineDispatcher

            // Note: block should catch all exceptions in most cases
            val exceptionHandler = CoroutineExceptionHandler { ctx, e ->
                logger.error("coroutine actor uncaught exception => $ctx", e)
            }
            val context = dispatcher + exceptionHandler
            coroutineScope = CoroutineScope(context)
            channel = CoroutineUtils.createActor(
                name, capacity, coroutineActorConfig.coroutines,
                coroutineScope, processBlock
            )

            produceBlock?.invoke(coroutineScope, channel)
        } catch (e: Throwable) {
            throw InternalServerException(InfraResponseCode.LOG_ERROR, "fail to init coroutine actor $name", e)
        }
        logger.info("========== init coroutine actor $name completed ==========")
    }

    // call channel.close() to wait task completed when shutdown server (don't call channel.cancel())
    fun close() {
        logger.info("coroutine actor $name close ...")
        CoroutineUtils.closeChannel(name, channel)
        coroutineScope.cancel(name)
        if (dispatcher is ExecutorCoroutineDispatcher) {
            CoroutineUtils.closeDispatcher(name, dispatcher)
        }
        logger.info("coroutine actor $name closed")
    }
}

下面是 CoroutineUtils.createActor() 的程式碼,在這裡沒有使用 CoroutineScope.actor() 函式建立 SendChannel 是因為目前 actor 的實作有點簡單,無法滿足我的需求,而且官方 API 文件也有加注 This API will become obsolete in future updates with introduction of complex actors. See issue #87

此外,我為每個 coroutine 指定 CoroutineName 方便 Trace & Debug。記得要一併設定 JVM Option -Dkotlinx.coroutines.debug

fun <E> createActor(
    name: String, capacity: Int, coroutines: Int,
    scope: CoroutineScope,
    block: suspend (E) -> Unit
): SendChannel<E> {
    require(coroutines > 0)

    val channel = Channel<E>(capacity)
    repeat(coroutines) {
        scope.launch(CoroutineName("$name-(${it + 1})")) {
            for (e in channel) {
                logger.debug { coroutineContext }
                block(e)
            }
        }
    }
    return channel
}

實作 ThreadFactory 指定 Thread 名稱及 UncaughtExceptionHandler

與 coroutine 同樣的做法,建議 ThreadFactory 建立 thread 時也指定 thread name 及 uncaughtExceptionHandler。然後再使用 ExecutorService 的 asCoroutineDispatcher() 方法,把 Java ExecutorService 轉為 Coroutine Dispatcher

object ThreadPoolUtils {

    private val logger = KotlinLogging.logger {}

    fun createThreadPoolExecutor(
        threadNamePrefix: String,
        config: ThreadPoolConfig,
        handler: Thread.UncaughtExceptionHandler? = null
    ): ExecutorService {
        logger.info("init thread pool $threadNamePrefix ... $config")
        val factory = DefaultThreadFactory(threadNamePrefix, handler)

        return if (config.isFixedThreadPool())
            Executors.newFixedThreadPool(config.fixedPoolSize!!, factory)
        else
            ThreadPoolExecutor(
                config.minPoolSize!!, config.maxPoolSize!!,
                config.keepAliveTime!!, TimeUnit.SECONDS,
                LinkedBlockingQueue(), factory
            )
    }

    private class DefaultThreadFactory(
        private val namePrefix: String,
        val handler: Thread.UncaughtExceptionHandler? = null
    ) : ThreadFactory {

        private val backingThreadFactory: ThreadFactory = Executors.defaultThreadFactory()

        override fun newThread(r: Runnable): Thread {
            val thread = backingThreadFactory.newThread(r)
            thread.name = "$namePrefix-${thread.name}"
            if (handler != null)
                thread.uncaughtExceptionHandler = handler
            else
                thread.setUncaughtExceptionHandler { t, e ->
                    logger.error("Thread Uncaught Error => ${t.name}", e)
                }
            return thread
        }
    }
}

傳送工作給 CoroutineActor

建立及初始化 CoroutineActor 之後,接下來就是要傳送工作給 coroutine channel。實務上要依工作種類及情境決定 channel buffer 的大小,避免累積太多工作耗光記憶體,不過我在此假設 Server 記憶體很大,所以預設值給定 Channel.UNLIMITED,因此雖然是呼叫 trySendBlocking,但實際上 channel 不會滿,所以可視為 non-blocking 操作。

在這裡要注意 channel 是否已經被 close,如果已被 close,此時使用 LogWriter 先把未完成工作的 ErrorLog 記錄下來,未來再加入 Retry 機制。

// suspending
suspend fun send(message: T) {
    channel.send(message)
}
    
// not a suspend function and non-blocking due to Channel.UNLIMITED
fun sendToUnlimitedChannel(message: T, errorCode: ResponseCode) {
    val result = channel.trySendBlocking(message)
    if (result.isFailure) {
        var errorMsg = ""
        val e = result.exceptionOrNull()
        if (result.isClosed) {
            errorMsg = "$name is closed"
            logger.warn("$errorMsg => $message")
        } else {
            errorMsg = "$name unexpected error" // we never call channel.cancel()
            logger.error("$errorMsg => $message", e)
        }
        if (errorCode != InfraResponseCode.LOG_ERROR) {
            logWriter?.write(
                ErrorLog.internal(
                    InternalServerException(errorCode, errorMsg, e, mapOf("message" to message)),
                    name, message.id.toString()
                )
            )
            // TODO: persistence unDeliveredMessage and retry later
        }
    }
}

上一篇
[Day 20] 實作 Logging Plugin 建立系統 Logging 機制
下一篇
[Day 22] 實作 Database Plugin 整合 Exposed ORM, HikariCP 及 Flyway
系列文
基於 Kotlin Ktor 建構支援模組化開發的 Web 框架30

尚未有邦友留言

立即登入留言