
在實際的用戶互動中,等待 AI 完整回應往往會讓用戶感到焦急。今天我們要學習 Koog 框架的流式處理功能,實現即時的 AI 回應,大幅提升用戶體驗。透過 executeStreaming API,我們將讓智慧客服助手能夠像真人對話一樣,逐字逐句地即時回應用戶
在傳統的 AI 應用中,我們習慣等待模型完全生成回應後再顯示給用戶
// 傳統批量處理方式
val response = executor.execute(prompt, model)
println(response.content) // 等待完整回應後一次性顯示
這種方式雖然簡單,但存在明顯的用戶體驗問題
流式處理則能夠
Koog 框架通過 Kotlin 的 Flow API 提供了優雅的流式處理支援
interface LLMClient {
    // 傳統同步執行
    abstract suspend fun execute(prompt: Prompt, model: LLModel, tools: List<ToolDescriptor>): List<Message.Response>
    // 流式執行 - 返回 Flow<String>
    abstract fun executeStreaming(prompt: Prompt, model: LLModel): Flow<String>
}
讓我們從最基本的流式處理開始
suspend fun main() {
    // 建立執行器
    val executor = simpleOpenAIExecutor(ApiKeyManager.openAIApiKey!!)
    // 建立提示
    val prompt = prompt("streaming") {
        system("你是一個友善的 AI 助手,請使用正體中文回答問題")
        user("請簡單的說明,什麼是 Kotlin 的協程")
    }
    // 流式執行
    println("AI 正在回應...")
    executor.executeStreaming(prompt, OpenAIModels.CostOptimized.GPT4_1Mini)
        .collect { token ->
            // 即時輸出每個文字片段
            print(token)
        }
    println("\n回應完成!")
}
直接從結果是看不太出來 streaming 的效果 XD,它會幾個字幾個字地輸出,讓使用者可以及時看到 AI 的回應進度
AI 正在回應...
Kotlin 的協程(Coroutine)是一種用來進行非同步程式設計的輕量級工具。它可以讓你用同步的寫法寫出非同步的程式碼,讓程式更簡潔且易於閱讀。協程可以暫停執行(掛起),等候某些操作完成後再繼續,而不會阻塞整個線程,提升效能並減少資源浪費。簡單來說,協程讓異步任務的撰寫變得像寫同步程式一樣直覺方便。
回應完成!
在網路環境中,流式處理更容易遇到中斷問題,因此需要完善的錯誤處理
class RobustStreamingChat(private val executor: PromptExecutor) {
    fun streamWithRetry(
        prompt: Prompt,
        model: LLModel,
        maxRetries: Int = 3
    ): Flow<String> = flow {
        var attempt = 0
        var success = false
        while (attempt <= maxRetries && !success) {
            try {
                executor.executeStreaming(prompt, model).collect { token ->
                    emit(token)
                    // 模擬出錯的情況
                    delay(100L)
                    throw SocketTimeoutException("Dummy timeout")
                }
                success = true
            } catch (e: Exception) {
                attempt++
                if (attempt < maxRetries) {
                    emit("\n[連線中斷,正在重新嘗試... ($attempt/$maxRetries)]\n")
                    // 指數退避
                    delay(1000L * attempt)
                } else {
                    emit("\n[連線失敗,請稍後再試... ($attempt/$maxRetries)]\n")
                    throw e
                }
            }
        }
    }
}
suspend fun main() {
    // 建立執行器
    val executor = simpleOpenAIExecutor(ApiKeyManager.openAIApiKey!!)
    // 建立提示
    val prompt = prompt("streaming") {
        system("你是一個友善的 AI 助手,請使用正體中文回答問題")
        user("請簡單的說明,什麼是 Kotlin 的協程")
    }
    // 流式執行
    println("AI 正在回應...")
    val robustStreamingChat = RobustStreamingChat(executor)
    robustStreamingChat.streamWithRetry(prompt, OpenAIModels.CostOptimized.GPT4_1Mini)
        .collect { token ->
            // 即時輸出每個文字片段
            print(token)
        }
    println("\n回應完成!")
}
AI 正在回應...
[連線中斷,正在重新嘗試... (1/3)]
[連線中斷,正在重新嘗試... (2/3)]
Exception in thread "main" [連線失敗,請稍後再試... (3/3)]
java.lang.IllegalStateException: Error from OpenAI API: 200 OK: Dummy timeout
我們可以建立簡單的效能監控來追蹤流式處理的表現
class SimpleStreamingMonitor {
    data class StreamingStats(
        val totalTokens: Int,
        val duration: Duration,
        val firstTokenDelay: Duration
    )
    // 監控流式處理效能
    fun Flow<String>.withPerformanceTracking(): Flow<String> = flow {
        val startTime = TimeSource.Monotonic.markNow()
        var firstTokenTime: Duration? = null
        var tokenCount = 0
        collect { token ->
            tokenCount++
            // 記錄第一個 token 的時間
            if (firstTokenTime == null) {
                firstTokenTime = startTime.elapsedNow()
            }
            emit(token)
        }
        // 輸出統計資訊
        val totalDuration = startTime.elapsedNow()
        val stats = StreamingStats(
            totalTokens = tokenCount,
            duration = totalDuration,
            firstTokenDelay = firstTokenTime ?: Duration.ZERO
        )
        println("\n=== 效能統計 ===")
        println("總 Token 數:${stats.totalTokens}")
        println("總耗時:${stats.duration}")
        println("首 Token 延遲:${stats.firstTokenDelay}")
    }
}
suspend fun main() {
    // 建立執行器
    val executor = simpleOpenAIExecutor(ApiKeyManager.openAIApiKey!!)
    // 建立提示
    val prompt = prompt("streaming") {
        system("你是一個友善的 AI 助手,請使用正體中文回答問題")
        user("請簡單的說明,什麼是 Kotlin 的協程")
    }
    // 流式執行
    println("AI 正在回應...")
    with(SimpleStreamingMonitor()) {
        executor.executeStreaming(prompt, OpenAIModels.CostOptimized.GPT4_1Mini)
            .withPerformanceTracking()
            .collect { token ->
                // 即時輸出每個文字片段
                print(token)
            }
    }
    println("\n回應完成!")
}
AI 正在回應...
Kotlin 的協程(Coroutine)是一種輕量級的非同步程式設計工具,讓你可以用類似同步的方式撰寫非同步或並行的程式碼。它可以在不阻塞主執行緒的情況下,進行耗時操作(例如網路請求、檔案讀寫),提升效能和使用者體驗。協程能夠暫停和恢復執行,讓程式更容易閱讀和維護。簡單來說,協程就是用來簡化非同步程式碼寫法的技術。
=== 效能統計 ===
總 Token 數:130
總耗時:3.520779625s
首 Token 延遲:1.451959625s
回應完成!
確保流式處理在合理時間內完成
class StreamingWithTimeout(private val executor: PromptExecutor) {
    suspend fun execute(
        prompt: Prompt,
        model: LLModel,
        timeoutSeconds: Long = 5
    ): Flow<String> = flow {
        try {
            withTimeout(timeoutSeconds * 1000) {
                executor.executeStreaming(prompt, model).collect { token ->
                    // 模擬超過時間
                    delay(6000)
                    emit(token)
                }
            }
        } catch (e: TimeoutCancellationException) {
            emit("\n[回應超時,請稍後再試]")
        } catch (e: IllegalStateException) {
            emit("\n[回應超時,請稍後再試]")
        }
    }
}
suspend fun main() {
    // 建立執行器
    val executor = simpleOpenAIExecutor(ApiKeyManager.openAIApiKey!!)
    // 建立提示
    val prompt = prompt("streaming") {
        system("你是一個友善的 AI 助手,請使用正體中文回答問題")
        user("請簡單的說明,什麼是 Kotlin 的協程")
    }
    // 流式執行
    println("AI 正在回應...")
    val streamingWithTimeout = StreamingWithTimeout(executor)
    streamingWithTimeout.execute(prompt, OpenAIModels.CostOptimized.GPT4_1Mini)
        .collect { token ->
            // 即時輸出每個文字片段
            print(token)
        }
    println("\n回應完成!")
}
6 秒後會呈現超時的訊息
AI 正在回應...
[回應超時,請稍後再試]
回應完成!
今天我們學習了 Koog 框架的流式處理基礎功能
流式處理大幅提升了 AI 應用的用戶體驗,讓互動更加自然流暢。透過 Kotlin 的 Flow API,我們可以輕鬆實現複雜的即時回應功能。在下一篇文章中,我們將學習多 LLM 整合與智慧切換,探討如何在不同場景下選擇最適合的語言模型
圖片來源:AI 產生