iT邦幫忙

2021 iThome 鐵人賽

DAY 28
0
Modern Web

基於 Kotlin Ktor 建構支援模組化開發的 Web 框架系列 第 28

[Day 28] 實作 Multi-Channel Notifications

鐵人賽已逐漸進入尾聲,前面二十多天,我們一步步擴充加強 Ktor 功能,也整合了 ORM, Redis 許多框架函式庫,整個系統架構及基礎設施功能已逐漸成形。今天我想儘可能把先前累積實作的功能都串連起來,以此為基礎開發 Multi-Channel Notifications 功能來當作範例。

今天會應用先前已實作的功能及概念,讀者可參考以前的文章

實作目標

  • 支援 Multi-Project 架構,每個子專案只要定義 NotificationType 實作如何產生訊息內容及指定收件者即可,其餘底層寄送訊息的程式碼都是共用的
  • 發送頻道包含 Email, Push, SMS 這3個最基本的頻道,未來預計再加入 Line Notify
    • Email 串接 AWS SES
    • Push 串接 Firebase
    • SMS 預計串接三竹簡訊,但尚未實作,因為發簡訊要花錢…
  • 使用 Coroutine SendChannel 非同步發送通知,底層串接第三方服務也使用非同步 API,而且 ThreadPool 的參數都可以透過 Ktor 設定檔設定來調校效能
  • 發送過程要記錄 log,包含第三方服務的回應結果,方便 Debug
  • 訊息內容要支援多國語言 i18n 及模版 template
    • 整合 freemarker template engine
    • 根據使用者偏好語言發送訊息
  • 實作2個後台 API 範例
    • 管理者可以填寫訊息文字,並撰寫 QueryDSL 只傳送訊息給符合查詢條件的使用者
    • 管理者可以撰寫 QueryDSL 查詢資料庫資料,匯出成 Excel 檔案,寄送至指定 email

點我連結至 Github 查看實作程式碼

實作 NotificationChannelSender 寄送訊息到第三方服務

Notification Plugin 會根據以下的設定,初始化 email, push, sms 3個 NotificationChannelSender,每個 NotificationChannelSender 都有各自的細部設定可以調整。本機開發測試時,可以設定 mock = true 使用 MockNotificationChannelSender 只做寫入 log 的動作就好。

notification {
    channels {
        email {
            mock = false
            noReplyAddress = "no-reply@fanpoll.club"
            #marketingAddress = ""
            awsSES {
                nettyHttpClient {
                    http {
                        #maxConcurrency = 50 # => aws default value = 50
                        #maxPendingConnectionAcquires = 10000 => aws default value = 10000
                        #maxIdleConnectionTimeout = 60s => aws default value = 60s
                    }
                    threadPool {
                        fixedPoolSize = 3
                    }
                }
            }
        }
        push {
            mock = false
            fcm {
                # https://github.com/firebase/firebase-admin-java/issues/317
                # https://github.com/googleapis/google-auth-library-java/issues/356
                # By default failing requests are retried up to 10 times with exponential backoff.
                # connectTimeout = 60000
                readTimeout = 180000
                threadPool {
                    fixedPoolSize = 3
                }
            }
        }
        sms {
            mock = true
        }
    }
    // 其餘省略
}

目前 NotificationChannelSender 包含以下實作子類別

maxReceiversPerRequest => 可以控制每次發送到第三方服務的訊息數量上限
shutdown() => 當 Server 停止時會呼叫此方法關閉與第三方服務的連線物件或 thead pool,例如呼叫 AWS SesAsyncClient 的 close()方法

interface NotificationChannelSender {

    fun send(message: NotificationMessage)

    val maxReceiversPerRequest: Int

    fun shutdown() {}
}

class AwsSESSender(
    config: AwsSESConfig,
    private val loggingConfig: NotificationLogConfig,
    private val logWriter: LogWriter
) : NotificationChannelSender

class FCMSender(
    config: FCMConfig,
    private val pushTokenStorage: PushTokenStorage,
    private val loggingConfig: NotificationLogConfig,
    private val logWriter: LogWriter
) : NotificationChannelSender

class MitakeSender(
    private val config: MitakeConfig,
    private val loggingConfig: NotificationLogConfig,
    private val logWriter: LogWriter
) : NotificationChannelSender

class MockNotificationChannelSender(
    private val loggingConfig: NotificationLogConfig,
    private val logWriter: LogWriter
) : NotificationChannelSender

寄送訊息 NotificationMessage

NotificationChannelSender 寄送訊息時會呼叫 send(message: NotificationMessage) 方法,NotificationMessage 除了訊息內容、收件者之外,還包含了許多額外資訊,方便未來查詢。

  • notificationMessageId => 每次寄送 NotificationMessage 都會產生一個 uuid 以供識別
  • notificationId => 記錄寄送通知的 id,如果此通知包含 email 及 push,那麼這2個 channel 的 NotificationMessage 的 notificationId 是相同的
  • eventId => 記錄觸發某個 domain event 而寄送的通知 Notification

所以這3個 id 的階層關係由上到下的順序是 eventId → notificationId → notificationMessageId,我們可以查詢某個事件寄送的所有 Notification 及 NotificationMessage

data class NotificationMessage(
    val notificationId: UUID,
    val eventId: UUID,
    val type: NotificationType,
    val version: String? = null,
    val channel: NotificationChannel,
    val lang: Lang,
    val sender: String? = null,
    val receivers: List<String>,
    val content: NotificationChannelContent,
    var sendAt: Instant? = null
) : IdentifiableObject<UUID>() {

    override val id: UUID = UUID.randomUUID()

    fun toNotificationMessageLog(): NotificationMessageLog = NotificationMessageLog(
        id, notificationId, eventId,
        type, version, channel, lang, receivers
    )
}

記錄訊息發送歷程 NotificationMessageLog

NotificationChannelSender 寄送訊息時,也會使用 LogWriter 記錄 log,包含第三方服務的回應碼及訊息…等,方便未來追蹤問題除錯。更多實作細節可參考以前的文章 [Day 20] 實作 Logging Plugin 建立系統 Logging 機制

notification {
    logging {
        enabled = true
        destination = "AwsKinesis" # File(default), Database, AwsKinesis
        logSuccess = false
        logSuccessReqBody = false
        logSuccessRspBody = false
    }
    // 其餘省略
}
@Serializable
data class NotificationMessageLog(
    @Serializable(with = UUIDSerializer::class) override val id: UUID,
    @Serializable(with = UUIDSerializer::class) val notificationId: UUID,
    @Serializable(with = UUIDSerializer::class) val eventId: UUID,
    val type: NotificationType,
    val version: String? = null,
    val channel: NotificationChannel,
    val lang: Lang,
    val receivers: List<String>,
    @Serializable(with = InstantSerializer::class) var sendAt: Instant? = null,
    // result
    var successList: JsonArray? = null,
    var failureList: JsonArray? = null,
    var invalidRecipientIds: List<String>? = null,
    // response detail
    var rspCode: String? = null,
    var rspMsg: String? = null,
    @Serializable(with = InstantSerializer::class) var rspAt: Instant? = null,
    var rspTime: Long? = null,
    var rspBody: String? = null
) : LogMessage() {

    @Serializable(with = InstantSerializer::class)
    override val occurAt: Instant = Instant.now()

    var content: String? = null
    var success: Boolean = true
    var errorMsg: String? = null

    override val logLevel: LogLevel = if (success) LogLevel.INFO else LogLevel.ERROR

    override val logType: String = LOG_TYPE

    companion object {
        const val LOG_TYPE = "notification_message"
    }
}

實作 NotificationSender 寄送通知

實作完底層 NotificationChannelSender 串接第三方服務後,接下來是實作上層負責寄送通知的 NotificationSender,我希望寄送通知的程式寫法應該要非常簡單,當某個功能需要寄送通知時,只要先透過 Koin DI 取得 NotificationSender 物件,再呼叫 send(notification: Notification) 方法即可。

val notificationSender by inject<NotificationSender>()
notificationSender.send(notification)

interface NotificationSender {

    fun send(notification: Notification)

    fun shutdown() {}
}

定義 Notification, Recipient, NotificationContent 資料類別

當我們使用 NotificationSender 發送通知時,需要先建立 Notification 物件,填入訊息內容 NotificationContent 及收件者 Recipient…等資料。

  • type => 記錄這個 Notification 屬於那種 NotificationType,下面會再詳細說明
  • templateArgs => FreeMarker 用來替換樣版 template 裡面的變數
  • contentArgs => 例如推播時除了 title, body,還可以傳送額外的欄位資料
  • remote, remoteArg => 未來可以把 Notification 資料轉為 JSON 丟到 Message Queue,由另一個 NotificationService 微服務負責寄送通知
  • version => 記錄當下這個 Notification 的資料版本
  • lazyLoad => 如果 NotificationType 的 lazyLoad = true,那麼可以延遲到 NotificationSender 寄送通知時才載入 NotificationContent 及收件者 Recipient 資料。由於 NotificationSender 是由另一個執行緒以非同步的方式執行,所以就不會阻塞使用者請求,加快回應速度。
@Serializable
data class Notification(
    val type: NotificationType,
    val recipients: MutableSet<Recipient> = mutableSetOf(),
    val content: NotificationContent = NotificationContent(),
    val contentArgs: MutableMap<String, String> = mutableMapOf(),
    @Transient val templateArgs: MutableMap<String, Any> = mutableMapOf(), // templateArgs doesn't support i18n now
    @Transient val lazyLoadArg: Any? = null,
    val remote: Boolean = false,
    val remoteArg: JsonObject? = null,
    @Serializable(with = UUIDSerializer::class) override val id: UUID = UUID.randomUUID(),
    @Serializable(with = UUIDSerializer::class) val eventId: UUID = UUID.randomUUID(),
    @Serializable(with = InstantSerializer::class) val createAt: Instant = Instant.now(),
    var version: String? = null
) : IdentifiableObject<UUID>() {

    @Serializable(with = InstantSerializer::class)
    var sendAt: Instant? = null

    @Transient
    val lazyLoad = type.isLazy()

    fun load() = type.lazyLoad(this)
}

@Serializable
data class Recipient(
    override val id: String,
    val userType: UserType? = null,
    @Serializable(with = UUIDSerializer::class) val userId: UUID? = null,
    // val channels: Set<NotificationChannel>? = null, TODO => user notification preferences
    val name: String? = null,
    var lang: Lang? = null,
    val email: String? = null,
    val mobile: String? = null,
    val pushTokens: Set<String>? = null
) : IdentifiableObject<String>()

@Serializable
data class NotificationContent(
    val email: MutableMap<Lang, EmailContent> = mutableMapOf(),
    val push: MutableMap<Lang, PushContent> = mutableMapOf(),
    val sms: MutableMap<Lang, SMSContent> = mutableMapOf()
)

@Serializable
class EmailContent(
    var subject: String? = null,
    var body: String? = null,
    @Transient val attachments: List<Attachment>? = null
)

@Serializable
class PushContent(
    val title: String,
    val body: String,
    val data: Map<String, String>? = null
)

@Serializable
class SMSContent(
    val body: String
)

實作 NotificationType 控制寄送通知的行為

  • projectId => 屬於那個子專案
  • channels => 要寄送到那些 channels,可以複選
  • category => NotificationSender 可以依據通知的種類(系統通知或是行銷),決定要如何寄送
  • priority => NotificationSender 可以依據通知的優先度,決定要如何寄送
  • lang => 如果 Recipient 沒有設定偏好語言,那就使用此預設語言
  • lazyLoadBlock => 實作延遲載入的 lambda 產生訊息內容,如果需要載入收件者資料,那必須要實作 findRecipients(userFilters: Map<UserType, String>?) 方法
@Serializable
open class NotificationType(
    val projectId: String,
    val name: String,
    val channels: Set<NotificationChannel>,
    val category: NotificationCategory,
    // val priority: NotificationPriority TODO => priority queues
    val version: String? = null,
    val lang: Lang? = null,
    @Transient @OpenApiIgnore private val lazyLoadBlock: (NotificationType.(Notification) -> Unit)? = null
) : IdentifiableObject<String>() {

    override val id: String = "${projectId}_${name}"

    fun isLazy(): Boolean = lazyLoadBlock != null

    fun lazyLoad(notification: Notification) {
        requireNotNull(lazyLoadBlock)
        lazyLoadBlock.invoke(this, notification)
    }

    open fun findRecipients(userFilters: Map<UserType, String>?): Set<Recipient> =
        error("NotificationType $id findRecipients is not yet implemented")
}

enum class NotificationChannel {
    Email, Push, SMS
}

enum class NotificationCategory {
    System, Marketing
}

enum class NotificationPriority {
    URGENT, HIGH, LOW
}

實作 NotificationDispatcher 根據 NotificationType 使用對應的 NotificationChannelSender 寄送訊息

class NotificationDispatcher(
    private val config: NotificationChannelConfig,
    private val envMode: EnvMode,
    private val availableLangs: AvailableLangs,
    private val i18nNotificationProjectMessages: I18nNotificationProjectMessages,
    private val emailSender: NotificationChannelSender? = null,
    private val pushSender: NotificationChannelSender? = null,
    private val smsSender: NotificationChannelSender? = null
) : NotificationSender

NotificationDispatcher 也實作 NotificationSender 介面,當呼叫 send(notification: Notification) 方法時,會根據 Notification 的 NotificationType 所定義的 channels: Set<NotificationChannel>,呼叫對應 NotificationChannelSender 的 send(message: NotificationMessage) 方法

實作 NotificationCoroutineActor 非同步寄送通知

這裡使用 decorator pattern 的手法,讓 NotificationDispatcher 及 NotificationCoroutineActor 都實作 NotificationSender 介面,所以呼叫 send(notification: Notification) 方法時,會 delegate 到內部的 NotificationSender,執行順序會是 NotificationCoroutineActor → NotificationDispatcher → NotificationChannelSender。更多 NotificationCoroutineActor 的實作細節,請參考 [Day 21] 使用 Coroutine SendChannel 處理非同步工作

class NotificationCoroutineActor(
    coroutineActorConfig: CoroutineActorConfig,
    private val notificationSender: NotificationSender,
    private val logWriter: LogWriter
) : NotificationSender {

    private val logger = KotlinLogging.logger {}

    private val actorName = "NotificationActor"

    private val actor: CoroutineActor<Notification> = CoroutineActor(
        actorName, Channel.UNLIMITED,
        coroutineActorConfig, Dispatchers.IO,
        this::execute, null,
        logWriter
    )

    override fun send(notification: Notification) {
        actor.sendToUnlimitedChannel(notification, InfraResponseCode.NOTIFICATION_ERROR) // non-blocking by Channel.UNLIMITED
    }

    private fun execute(notification: Notification) {
        try {
            notificationSender.send(notification)
        } catch (e: Throwable) {
            val errorMsg = "$actorName execute error"
            logger.error("errorMsg => $notification", e)
            logWriter.write(
                ErrorLog.internal(
                    InternalServerException(
                        InfraResponseCode.NOTIFICATION_ERROR, errorMsg, e,
                        mapOf("id" to notification.id, "type" to notification.type, "eventId" to notification.eventId)
                    ),
                    actorName, notification.id.toString()
                )
            )
        }
    }

    override fun shutdown() {
        notificationSender.shutdown()
        actor.close()
    }
}

實作 Notification Plugin

最後實作 Notification Plugin 初始化 NotificationDispatcher、NotificationCoroutineActor 及所有 NotificationChannelSender,並且都註冊至 Koin DI。當某個功能需要寄送通知時,只要先透過 Koin DI 取得 NotificationSender 物件,再呼叫 send(notification: Notification) 方法即可。

install(NotificationFeature)

override fun install(pipeline: Application, configure: Configuration.() -> Unit): NotificationFeature {
    pipeline.koin {
        modules(
            module(createdAtStart = true) {
                val notificationDispatcher = NotificationDispatcher(
                    channelConfig,
                    envMode,
                    availableLangs,
                    i18nNotificationProjectMessagesProviders,
                    emailSender,
                    pushSender,
                    smsSender
                )

                val notificationSender = config.asyncExecutor?.let {
                    NotificationCoroutineActor(it.coroutineActor, notificationDispatcher, logWriter)
                } ?: notificationDispatcher

                single { notificationSender }

                KoinApplicationShutdownManager.register { notificationSender.shutdown() }
                // 其餘省略
            }
        )
    }
}

整合 i18n 機制及 FreeMarker Template Engine

為了支援多國語言的訊息內容,我實作了 I18nNotificationProjectMessages 從語系檔 notification_${lang}.properties 讀取訊息文字,例如下面是 ops 子專案的 dataReport NotificationType 的 Email 主旨,詳細的實作細節可參考之前的文章 [Day 11] 實作 Ktor i18n 機制

# format => ${type}.${channel}.${part}=""
ops_dataReport.Email.subject=[維運] 資料查詢報表: ${dataType} ${queryTime}

至於 Email 的 body 內容就需要準備每個語言的 html 樣板檔案

<!DOCTYPE html>
<html lang="zh-TW">
<head>
    <meta charset="UTF-8">
</head>
<body>
<div id="content">
    <ul>
        <li>
            查詢時間: ${queryTime}
        </li>
        <li>
            查詢資料類型: ${dataType}
        </li>
        <li>
            查詢條件: ${query}
        </li>
    </ul>
</div>
</body>
</html>

然後實作 NotificationTemplateProcessor 使用 FreeMarker 載入 html 樣板檔案,並替換裡面的變數值

class NotificationTemplateProcessor(
    private val availableLangs: AvailableLangs
) {

    private val cfg: Configuration = Configuration(Configuration.VERSION_2_3_31).apply {
        templateLoader = ClassTemplateLoader(NotificationTemplateProcessor::class.java, "/i18n/notification/templates")
        templateExceptionHandler = TemplateExceptionHandler.RETHROW_HANDLER
        logTemplateExceptions = false
        wrapUncheckedExceptions = true
        recognizeStandardFileExtensions = false
        defaultEncoding = "UTF-8"
        outputFormat = HTMLOutputFormat.INSTANCE
        locale = availableLangs.first().locale
        timeZone = TimeZone.getTimeZone(ZoneId.of("UTC"))
        dateFormat = "yyyy-MM-dd"
        dateTimeFormat = "yyyy-MM-dd HH:mm:ss"
    }
    
    fun processEmail(type: NotificationType, args: Map<String, Any>, lang: Lang): String =
        process(type.id, args, lang, "html")

    private fun process(templateName: String, args: Map<String, Any>, lang: Lang, ext: String): String {
        val templateFileName = buildTemplateFileName(templateName, lang, ext)
        val template: Template = try {
            cfg.getTemplate(templateFileName, null, null, null, true, true)
                ?: cfg.getTemplate(buildTemplateFileName(templateName, availableLangs.first(), ext))
        } catch (e: Throwable) {
            throw InternalServerException(InfraResponseCode.DEV_ERROR, "template file $templateFileName parsing error or not found", e)
        }

        return try {
            val outputWriter = StringWriter()
            template.process(args, outputWriter)
            outputWriter.toString()
        } catch (e: Throwable) {
            throw InternalServerException(InfraResponseCode.DEV_ERROR, "process template error: template = $templateName, args = $args", e)
        }
    }

    private fun buildTemplateFileName(templateName: String, lang: Lang, ext: String) = "${templateName}_${lang.code}.$ext"
}

Club 子專案實作後台填寫訊息文字寄送通知範例

實作的目標是 => 管理者可以填寫訊息文字,並撰寫 QueryDSL 只傳送訊息給符合查詢條件的使用者。以下面的 request body json 為例,系統只會寄送中文測試通知給符合查詢條件使用者 (1980年後出生的男性,而且為啟用狀態的使用者)

{
  "userFilters": {
    "club_user": "[enabled = true and gender = Male and birthYear >= 1980]"
  },
  "content": {
    "email": {
      "zh-TW": {
        "subject": "測試Email",
        "body": "我是Email內容"
      }
    },
    "push": {
      "zh-TW": {
        "title": "測試推播",
        "body": "我是推播內容"
      }
    }
  }
}

首先建立 POST /club/users/sendNotification API,只有 ClubAuth.Admin 角色的使用者可以呼叫。接下來是透過 Koin DI 取得 NotificationSender,然後建立 NotificationType 為 SendNotification 的 Notification 物件,最後再呼叫 send() 方法即可發送通知。

由於是非同步發送通知,所以 API 會回傳 notification 的 id,管理者可以稍後使用此 id 去 Kibana 搜尋 log 查詢寄送結果

fun Routing.clubUser() {

    val notificationSender by inject<NotificationSender>()

    route("${ClubConst.urlRootPath}/users") {
        authorize(ClubAuth.Admin) {
            post<SendNotificationForm, UUID>("/sendNotification", ClubOpenApi.SendNotification) { form ->
                val notification = form.toNotification(ClubNotification.SendNotification)
                notificationSender.send(notification)
                call.respond(DataResponseDTO.uuid(notification.id))
            }
        }
    }
}

@Serializable
data class SendNotificationForm(
    val recipients: MutableSet<Recipient>? = null, //直接填入 Recipient 資料
    val userFilters: Map<UserType, String>? = null, // 或是撰寫 QueryDSL 查詢使用者
    val content: NotificationContent, // 直接填寫通知訊息內容
    val contentArgs: MutableMap<String, String>? = null
): Form<SendNotificationForm>() {

    fun toNotification(type: NotificationType): Notification {
        content.email.values.forEach {
            it.body = buildEmailHtmlBody(it.body!!)
        }

        val recipients = recipients ?: type.findRecipients(userFilters)
        if (recipients.isEmpty()) {
            throw RequestException(InfraResponseCode.QUERY_RESULT_EMPTY, "recipients is empty")
        }

        return Notification(
            type, recipients = recipients.toMutableSet(),
            content, contentArgs = contentArgs ?: mutableMapOf(),
            remote = false
        )
    }
    // 其餘省略
}

我在 ClubNotification.kt 檔案定義了 Club 子專案所有的 NotificationType。每個 NotificationType 都是子類別 ClubNotificationType 物件,ClubNotificationType 已實作 findRecipients(userFilters: Map<UserType, String>?) 方法,可以從資料庫載入 Club 子專案中符合 QueryDSL 查詢條件的使用者資料,包括 email, lang…等欄位,建立 Recipient 物件。

object ClubNotification {

    val SendNotification = ClubNotificationType(
        "sendNotification",
        setOf(NotificationChannel.Push, NotificationChannel.Email), NotificationCategory.System
    )
}

class ClubNotificationType(
    name: String,
    channels: Set<NotificationChannel>,
    category: NotificationCategory,
    @Transient @OpenApiIgnore private val lazyLoadBlock: (NotificationType.(Notification) -> Unit)? = null
) : NotificationType(ClubConst.projectId, name, channels, category, null, null, lazyLoadBlock) {

    override fun findRecipients(userFilters: Map<UserType, String>?): Set<Recipient> {
        val userFilter = userFilters?.get(ClubUserType.User.value)?.let {
            if (it.isBlank()) null
            else DynamicDBQuery.convertPredicate(DynamicQuery.parseFilter(it), UserDTO.mapper)
        }

        return transaction {
            val query = ClubUserTable.join(UserDeviceTable, JoinType.LEFT, ClubUserTable.id, UserDeviceTable.userId) {
                UserDeviceTable.enabled eq true
            }.slice(
                ClubUserTable.id, ClubUserTable.account, ClubUserTable.name,
                ClubUserTable.email, ClubUserTable.mobile, ClubUserTable.lang,
                UserDeviceTable.id, UserDeviceTable.pushToken
            ).select { ClubUserTable.enabled eq true }
            userFilter?.let { query.adjustWhere { it } }
            query.toList().toDTO(UserDTO::class).map { user ->
                with(user) {
                    Recipient(
                        account!!, ClubUserType.User.value, id, name, lang, email, mobile,
                        devices?.mapNotNull { it.pushToken }?.toSet()
                    )
                }
            }.toSet()
        }
    }
}

Ops 子專案實作後台匯出 Excel 報表寄送 Email 範例

實作的目標是 => 管理者可以撰寫 QueryDSL 查詢 User 資料表,把資料匯出成 Excel 檔案,寄送至指定 email。以下面的 request body json 為例,系統會查詢「角色為 AppTeam 而且為啟用狀態」的使用者資料 account, name 欄位,匯出成 Excel 檔案,寄送 email 至 admin@test.abc.com

{
  "dataType": "OpsUser",
  "email": "admin@test.abc.com",
  "query": "q_fields=account,name&q_filter=[role = AppTeam and enabled = true]&q_orderBy=createdAt"
}

查詢 SQL => SELECT ops_user.id, ops_user.account, ops_user."name" FROM ops_user WHERE (ops_user."role" = 'AppTeam') AND (ops_user.enabled = true) ORDER BY ops_user.created_at ASC

首先建立 POST /ops/data/report API,只有 OpsAuth.Admin 角色的使用者可以呼叫。接下來是透過 Koin DI 取得 NotificationSender,然後建立 NotificationType 為 DataReport 的 Notification 物件,最後再呼叫 send() 方法即可發送通知。

由於是非同步發送通知,所以 API 會回傳 notification 的 id,管理者可以稍後使用此 id 去 Kibana 搜尋 log 查詢寄送結果

fun Routing.opsDataReport() {

    val notificationSender by inject<NotificationSender>()

    route("${OpsConst.urlRootPath}/data/report") {
        authorize(OpsAuth.OpsTeam) {
            post<DataReportForm, UUID>(OpsOpenApi.DataReport) { form ->
                val notification = Notification(OpsNotification.DataReport, lazyLoadArg = form)
                notificationSender.send(notification)
                call.respond(DataResponseDTO.uuid(notification.id))
            }
        }
    }
}

@Serializable
class DataReportForm(
    val dataType: DataReportDataType,
    val query: String,
    var email: String? = null
) : Form<DataReportForm>()

enum class DataReportDataType(val entityDTOType: KType) {
    OpsUser(typeOf<UserDTO>())
}

我在 OpsNotification.kt 檔案定義了 Ops 子專案所有的 NotificationType。每個 NotificationType 都是子類別 OpsNotificationType 物件。DataReport 的 lazyLoadBlock lambda 會根據 QueryDSL 查詢得到 List 物件,再匯出成 Excel 檔案。

object OpsNotification {

    val DataReport = OpsNotificationType("dataReport") { notification ->
        val form = notification.lazyLoadArg as DataReportForm
        requireNotNull(form.email)
        notification.recipients.add(Recipient(form.email!!, email = form.email))

        val dtoClass = form.dataType.entityDTOType.classifier as KClass<EntityDTO<*>>
        val dtoList = transaction {
            DynamicQuery.from(form.query).toDBQuery(dtoClass).toList(dtoClass)
        }
        val columnIds = ReportDataUtils.getColumnIds(dtoClass)
        val table = Table(form.dataType.name, columnIds)
        dtoList.forEach { table.addRow(ReportDataUtils.toMap(it, columnIds)) }
        val report = ReportData(id, name, mutableListOf(table))

        val queryTime = Instant.now()
        val args = mapOf(
            "dataType" to form.dataType.name,
            "queryTime" to DateTimeUtils.TAIWAN_DATE_TIME_FORMATTER.format(queryTime),
            "query" to form.query
        )
        notification.templateArgs.putAll(args)

        val fileName = "${this.name}_${args["dataType"]}_${args["queryTime"]}"
        val attachment = report.toExcelAttachment(fileName)
        notification.content.email[lang!!] = EmailContent(attachments = listOf(attachment))
    }

    private val notificationType = typeOf<OpsNotificationType>()

    val AllTypes = OpsNotification::class.memberProperties
        .filter { it.returnType == notificationType }
        .map { it.getter.call(this) as OpsNotificationType }
}

class OpsNotificationType(
    name: String,
    @Transient @OpenApiIgnore private val lazyLoadBlock: (NotificationType.(Notification) -> Unit)? = null
) : NotificationType(
    OpsConst.projectId, name, setOf(NotificationChannel.Email),
    NotificationCategory.System, null, Lang.SystemDefault, lazyLoadBlock
)

要填入 dataType, queryTime, query 3個 templateArgs 給 FreeMarker 替換 email template 變數

ops_dataReport.Email.subject=[維運] 資料查詢報表: ${dataType} ${queryTime}
<!DOCTYPE html>
<html lang="zh-TW">
<head>
    <meta charset="UTF-8">
</head>
<body>
<div id="content">
    <ul>
        <li>
            查詢時間: ${queryTime}
        </li>
        <li>
            查詢資料類型: ${dataType}
        </li>
        <li>
            查詢條件: ${query}
        </li>
    </ul>
</div>
</body>
</html>

上一篇
[Day 27] 實作 Redis PubSub Keyspace Notification 訂閱 Session Key Expired 事件通知
下一篇
[Day 29] 建立子專案來監控管理系統
系列文
基於 Kotlin Ktor 建構支援模組化開發的 Web 框架30

尚未有邦友留言

立即登入留言