系統除了即時接受及回應使用者請求,也需要執行各種非同步工作,例如背景排程及寄送訊息通知…等。在實作上,雖然我可以直接使用 CompletableFuture, Akka Actor, Coroutine Channel 的 API,將工作切換到另一個執行緒處理,但其實背後有許多實作細節需要考慮,例如 ThreadPool 參數調校、錯誤處理…等,我希望各種不同類型的非同步工作,能採用一致的方式處理這些非商業邏輯的細節。
XXXCoroutineActor
,XXXCoroutineActor
內部實作使用 Coroutine SendChannel (在此也可稱為 Actor),而且不對外曝露 CoroutineDispatcher
, CoroutineExceptionHandler
, ThreadFactory
…等實作細節,所以使用方式僅僅是呼叫 XXXCoroutineActor.send(message)
方法而已點擊以下連結至實作程式碼
每一種非同步工作都要在 application.conf 設定檔設定 asyncExecutor
,目前 asyncExecutor 的實作只有coroutineActor
。coroutines
是想建立的 coroutine 數量,dispatcher 可設定 threadpool 參數,fixedPoolSize
或 minPoolSize
, maxPoolSize
, keepAliveTime
二擇一設定。
asyncExecutor {
coroutineActor {
coroutines = 1
dispatcher {
fixedPoolSize = 1
minPoolSize = 1
maxPoolSize = 3
keepAliveTime = 1000000000
}
}
}
下面是對應的 configuration data class,其中 ThreadPoolConfig
實作 validate()
檢查設定值是否合法。
data class CoroutineActorConfig(val coroutines: Int = 1, val dispatcher: ThreadPoolConfig? = null) {
fun validate() {
dispatcher?.validate()
}
class Builder {
var coroutines: Int = 1
private var dispatcher: ThreadPoolConfig? = null
fun dispatcher(block: ThreadPoolConfig.Builder.() -> Unit) {
dispatcher = ThreadPoolConfig.Builder().apply(block).build()
}
fun build(): CoroutineActorConfig {
return CoroutineActorConfig(coroutines, dispatcher).apply { validate() }
}
}
}
data class ThreadPoolConfig(
val fixedPoolSize: Int? = 1,
val minPoolSize: Int? = null,
val maxPoolSize: Int? = null,
val keepAliveTime: Long? = null
) : ValidateableConfig {
fun isFixedThreadPool(): Boolean = fixedPoolSize != null && (minPoolSize == null && maxPoolSize == null && keepAliveTime == null)
override fun validate() {
require(
(minPoolSize != null && maxPoolSize != null && keepAliveTime != null) ||
(minPoolSize == null && maxPoolSize == null && keepAliveTime == null)
) {
"minPoolSize, maxPoolSize, keepAliveTime should be configured"
}
}
class Builder {
var fixedPoolSize: Int? = 1
var minPoolSize: Int? = null
var maxPoolSize: Int? = null
var keepAliveTime: Long? = null
fun build(): ThreadPoolConfig {
return ThreadPoolConfig(fixedPoolSize, minPoolSize, maxPoolSize, keepAliveTime).apply { validate() }
}
}
}
Database Plugin 先從 Ktor 設定檔讀取 CoroutineActorConfig
,再建立 DBAsyncTaskCoroutineActor
,並註冊至 Koin DI
override fun install(pipeline: Application, configure: Configuration.() -> Unit): DatabaseFeature {
val logWriter = pipeline.get<LogWriter>()
val config = appConfig.infra.database?.asyncExecutor ?: configuration.asyncExecutorConfig
asyncExecutor = DBAsyncTaskCoroutineActor(config.coroutineActor, logWriter)
pipeline.koin {
modules(
module(createdAtStart = true) {
single { asyncExecutor }
}
)
}
KoinApplicationShutdownManager.register { asyncExecutor?.shutdown() }
}
當使用者成功登入之後,以非同步方式記錄使用者 App 裝置資訊於資料庫,所以我透過 Koin DI 拿到 DBAsyncTaskCoroutineActor
物件,然後執行 run
函式更新資料庫的 user_device 資料表
dbAsyncTaskCoroutineActor.run("createUserDevice") {
UserDeviceTable.insert(form.toCreateUserDeviceDTO())
}
以下是 DBAsyncTaskCoroutineActor 程式碼,當我們呼叫 run
函式時,內部實作是建立一個 DBAsyncTask
丟到一個無關功能邏輯的 CoroutineActor 裡面,然後從 coroutine channel 另一端取出後再執行 execute
函式,建立 DB Transaction 執行 DBAsyncTask
的 Transaction.() -> Any?
lambda。
另一方面,當停止 Server 時,因為先前已經在 Database Plugin 註冊 shutdown()
函式,所以會呼叫 channel.close()
,等所有的 DBAsyncTask
執行完畢後再停止。詳細實作 coroutine channel graceful stop 的作法,可參考我之前寫的文章 [Day 10] Ktor Graceful Shutdown
class DBAsyncTaskCoroutineActor(
coroutineActorConfig: CoroutineActorConfig,
private val logWriter: LogWriter
) {
private val logger = KotlinLogging.logger {}
private val actorName = "DBAsyncTaskActor"
private val actor: CoroutineActor<DBAsyncTask> = CoroutineActor(
actorName, Channel.UNLIMITED,
coroutineActorConfig, Dispatchers.IO,
this::execute, null,
logWriter
)
fun run(type: String, block: Transaction.() -> Any?): UUID {
val task = DBAsyncTask(type, block)
actor.sendToUnlimitedChannel(task, InfraResponseCode.DB_ASYNC_TASK_ERROR) // non-blocking by Channel.UNLIMITED
return task.id
}
private fun execute(task: DBAsyncTask) {
try {
transaction {
task.block(this)
}
} catch (e: Throwable) {
val errorMsg = "$actorName execute error"
logger.error("errorMsg => $task", e)
logWriter.write(
ErrorLog.internal(
InternalServerException(
InfraResponseCode.DB_ASYNC_TASK_ERROR, errorMsg, e,
mapOf("taskId" to task.id, "taskType" to task.type)
),
actorName, task.id.toString()
)
)
}
}
internal fun shutdown() {
actor.close()
}
}
建立 CoroutineScope 包含 CorotuineDispatcher 及 CoroutineExceptionHandler
其它參數的意義
capacity
coroutine channel 的容量大小,如果工作數量超過上限的話,呼叫 send()
方法時必須等待processBlock
是執行工作的 lambdaproduceBlock
是傳入工作的 lambda,會在 CoroutineActor 建立後立即呼叫class CoroutineActor<T : IdentifiableObject<*>>(
val name: String,
capacity: Int,
coroutineActorConfig: CoroutineActorConfig,
defaultCoroutineDispatcher: CoroutineDispatcher,
processBlock: suspend (T) -> Unit,
produceBlock: (CoroutineScope.(SendChannel<T>) -> Unit)? = null,
private val logWriter: LogWriter? = null
) {
private val logger = KotlinLogging.logger {}
private val dispatcher: CoroutineDispatcher
private val channel: SendChannel<T>
private val coroutineScope: CoroutineScope
init {
logger.info("========== init coroutine actor $name ... ==========")
try {
dispatcher = if (coroutineActorConfig.dispatcher != null)
CoroutineUtils.createDispatcher(name, coroutineActorConfig.dispatcher)
else defaultCoroutineDispatcher
// Note: block should catch all exceptions in most cases
val exceptionHandler = CoroutineExceptionHandler { ctx, e ->
logger.error("coroutine actor uncaught exception => $ctx", e)
}
val context = dispatcher + exceptionHandler
coroutineScope = CoroutineScope(context)
channel = CoroutineUtils.createActor(
name, capacity, coroutineActorConfig.coroutines,
coroutineScope, processBlock
)
produceBlock?.invoke(coroutineScope, channel)
} catch (e: Throwable) {
throw InternalServerException(InfraResponseCode.LOG_ERROR, "fail to init coroutine actor $name", e)
}
logger.info("========== init coroutine actor $name completed ==========")
}
// call channel.close() to wait task completed when shutdown server (don't call channel.cancel())
fun close() {
logger.info("coroutine actor $name close ...")
CoroutineUtils.closeChannel(name, channel)
coroutineScope.cancel(name)
if (dispatcher is ExecutorCoroutineDispatcher) {
CoroutineUtils.closeDispatcher(name, dispatcher)
}
logger.info("coroutine actor $name closed")
}
}
下面是 CoroutineUtils.createActor()
的程式碼,在這裡沒有使用 CoroutineScope.actor()
函式建立 SendChannel 是因為目前 actor 的實作有點簡單,無法滿足我的需求,而且官方 API 文件也有加注 This API will become obsolete in future updates with introduction of complex actors.
See issue #87。
此外,我為每個 coroutine 指定 CoroutineName
方便 Trace & Debug。記得要一併設定 JVM Option -Dkotlinx.coroutines.debug
fun <E> createActor(
name: String, capacity: Int, coroutines: Int,
scope: CoroutineScope,
block: suspend (E) -> Unit
): SendChannel<E> {
require(coroutines > 0)
val channel = Channel<E>(capacity)
repeat(coroutines) {
scope.launch(CoroutineName("$name-(${it + 1})")) {
for (e in channel) {
logger.debug { coroutineContext }
block(e)
}
}
}
return channel
}
與 coroutine 同樣的做法,建議 ThreadFactory
建立 thread 時也指定 thread name 及 uncaughtExceptionHandler。然後再使用 ExecutorService 的 asCoroutineDispatcher()
方法,把 Java ExecutorService 轉為 Coroutine Dispatcher
object ThreadPoolUtils {
private val logger = KotlinLogging.logger {}
fun createThreadPoolExecutor(
threadNamePrefix: String,
config: ThreadPoolConfig,
handler: Thread.UncaughtExceptionHandler? = null
): ExecutorService {
logger.info("init thread pool $threadNamePrefix ... $config")
val factory = DefaultThreadFactory(threadNamePrefix, handler)
return if (config.isFixedThreadPool())
Executors.newFixedThreadPool(config.fixedPoolSize!!, factory)
else
ThreadPoolExecutor(
config.minPoolSize!!, config.maxPoolSize!!,
config.keepAliveTime!!, TimeUnit.SECONDS,
LinkedBlockingQueue(), factory
)
}
private class DefaultThreadFactory(
private val namePrefix: String,
val handler: Thread.UncaughtExceptionHandler? = null
) : ThreadFactory {
private val backingThreadFactory: ThreadFactory = Executors.defaultThreadFactory()
override fun newThread(r: Runnable): Thread {
val thread = backingThreadFactory.newThread(r)
thread.name = "$namePrefix-${thread.name}"
if (handler != null)
thread.uncaughtExceptionHandler = handler
else
thread.setUncaughtExceptionHandler { t, e ->
logger.error("Thread Uncaught Error => ${t.name}", e)
}
return thread
}
}
}
建立及初始化 CoroutineActor 之後,接下來就是要傳送工作給 coroutine channel。實務上要依工作種類及情境決定 channel buffer 的大小,避免累積太多工作耗光記憶體,不過我在此假設 Server 記憶體很大,所以預設值給定 Channel.UNLIMITED
,因此雖然是呼叫 trySendBlocking
,但實際上 channel 不會滿,所以可視為 non-blocking 操作。
在這裡要注意 channel 是否已經被 close,如果已被 close,此時使用 LogWriter 先把未完成工作的 ErrorLog 記錄下來,未來再加入 Retry 機制。
// suspending
suspend fun send(message: T) {
channel.send(message)
}
// not a suspend function and non-blocking due to Channel.UNLIMITED
fun sendToUnlimitedChannel(message: T, errorCode: ResponseCode) {
val result = channel.trySendBlocking(message)
if (result.isFailure) {
var errorMsg = ""
val e = result.exceptionOrNull()
if (result.isClosed) {
errorMsg = "$name is closed"
logger.warn("$errorMsg => $message")
} else {
errorMsg = "$name unexpected error" // we never call channel.cancel()
logger.error("$errorMsg => $message", e)
}
if (errorCode != InfraResponseCode.LOG_ERROR) {
logWriter?.write(
ErrorLog.internal(
InternalServerException(errorCode, errorMsg, e, mapOf("message" to message)),
name, message.id.toString()
)
)
// TODO: persistence unDeliveredMessage and retry later
}
}
}