iT邦幫忙

2025 iThome 鐵人賽

DAY 17
0

⚡《AI 知識系統建造日誌》這不是一篇純技術文章,而是一場工程師的魔法冒險。程式是咒語、流程是魔法陣、錯誤訊息則是黑暗詛咒。請準備好你的魔杖(鍵盤),今天,我們要踏入魔法學院的基礎魔法課,打造穩定、可擴展的 AI 知識系統。


前言

昨天我們完成了 RAG API,雖然功能完整,但光靠 curl 測試,畫面只有冷冰冰的 JSON。
為了不讓 demo 看起來像考古題,我們今天的目標是:

今天的目標是:

  • 使用 Gradio 建立簡單、互動的前端介面。
  • 即時串接昨天打造的 RAG Streaming API。
  • 支援進階選項:top_k 與 hybrid search

簡單來說,就是幫 API 穿上衣服,不要讓它光著身子跑來跑去。


環境準備

pip install gradio
python app.py

別忘了後端也要允許前端請求,不然你會看到一堆「CORS」錯誤訊息,搞得像是 API 在跟你冷戰。


origins = ["http://apiGateway:8000", "http://localhost:7861"]


# 設定允許的來源
app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

Gradio 前端程式碼概覽


原始程式碼主要分為兩個層次:

  1. 非同步串流回應 (stream_response)

    • 利用 httpx.AsyncClient 以 SSE 或 StreamingResponse 方式逐段取得 RAG API 回應
    • 處理錯誤、chunk 拼接,邊收邊顯示
    • 支援 top_khybrid search
  2. Gradio 介面 (create_interface)

    • 利用 gr.Blocks 建立 UI 元件(Textbox、Button、Slider、Checkbox、Markdown)
    • 「進階選項」藏在 Accordion 裡,乾淨不佔位
    • 將事件綁定到 stream_response,支援按鈕點擊或 Enter 觸發
    • 顯示答案區塊,支援即時更新

技術細節解析

非同步串流 (stream_response)

  • 透過 client.stream("POST", url, ...) 與 API 建立連線,並逐行讀取事件。

  • 每個 SSE 資料事件前綴 "data: ",需去掉再解析 JSON。

  • 將接收到的 chunk 拼接成 current_answer,即時回傳給前端 Markdown。

  • 完成事件 (done) 會傳回最終答案,確保前端與最終結果一致。

  • 異常處理:

    • API 回傳非 200 → 提示錯誤
    • 連線失敗或 JSON 解碼錯誤 → 提示連線或解析問題
async with client.stream(
        "POST", url, json=payload, headers={"Accept": "text/event-stream"}
    ) as response:
         if response.status_code != 200:
        yield f"❌ Error: API returned status {response.status_code}"
        return

    current_answer = ""

    async for line in response.aiter_lines():
        if line.startswith("data: "):
            data_str = line[6:]  # Remove "data: " prefix
            try:
                ...
                

                # Handle streaming chunks
                if "chunk" in data or "response" in data:
                    current_answer += data["chunk"]
                    yield current_answer

                # Handle completion
                if data.get("done", False):
                    current_answer = data.get("answer", current_answer)
                    yield current_answer
                    break

            except json.JSONDecodeError:
                continue

後端我們加了一個 /api/v1/gradio/stream,其實就是把昨天的 stream API 包裝一下,幾個參數直接寫死,給前端 demo 用。


@stream_router.post("/api/v1/gradio/stream")
async def ask_question_gradio_stream(
    request: GradioStreamRequest,
    ollama_client: OllamaDep,
    qdrant_client: QdrantDep,
    user_cache_client: UserCacheDep,
):
    logger.info(f"request {request}")

    settings = SystemSettings(
        user_language="Traditional Chinese",
        translate=True,
        system_prompt="",
        top_k=request.top_k,
        use_rag=True,
        subscribe_email=True,
        reranker_enabled=True,
        temperature=0.3,
        hybrid_search=request.use_hybrid,
    )

    return StreamingResponse(
        rag_stream(
            ollama_client=ollama_client,
            qdrant_client=qdrant_client,
            query=request.query,
            system_settings=settings,
            user_id="gradio user",
            categories=request.categories,
        ),
        media_type="text/event-stream",  # 前端 fetch 會逐段讀取
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        },
    )

Gradio 介面

  • 使用 gr.Row()gr.Column() 組合布局,保持簡潔與可擴展性

  • 「進階選項」提供使用者調整:

    • top_k → 檢索幾個 chunks
    • use_hybrid → 要不要混合搜尋
  • Event binding:Enter / Button 點擊 → 串流問答

    submit_btn.click(fn=stream_response, inputs=[query_input, top_k, use_hybrid, model_choice], outputs=[response_output], show_progress=True)
    query_input.submit(fn=stream_response, inputs=[query_input, top_k, use_hybrid, model_choice], outputs=[response_output], show_progress=True)
    

    → 按鈕,觸發非同步問答

啟動入口 (main)

  • 建立 Gradio 介面
  • 設定 server_name="0.0.0.0" 以允許外部訪問
  • 設定 server_port=7861
  • show_error=True 保留 debug 訊息
interface.launch(
    server_name="0.0.0.0",
    server_port=7861,
    share=False,
    show_error=True,
    quiet=False,
)


前端與 RAG API 互動流程

  1. 使用者在 Textbox 輸入問題
  2. 點擊按鈕或按 Enter 觸發 stream_response
  3. 非同步呼叫 /api/v1/gradio
  4. API 逐段回傳 chunk → Gradio Markdown 即時更新
  5. 回應完成 (done=True)

技術亮點:

  • 完整非同步 SSE / StreamingResponse 支援,即時回傳
  • 支援多種檢索模式與模型選擇,保持彈性
  • Gradio UI 簡潔,且可輕鬆擴充新功能(如多模型、多分類、多範例)

小結

  • Gradio 作為快速原型前端,結合 RAG API 可即時問答
  • stream_response 封裝了 API 串流邏輯與錯誤處理
  • 前端 UI 與後端 API 分層清晰,保持可擴展性
  • 下一步:加入更多模型、使用者偏好設定,讓它變成「AI 助理」而不是「JSON 生成器」

補充 - 前端完整程式碼

app.py

import json
import logging
from typing import Iterator

import httpx

import gradio as gr

logger = logging.getLogger(__name__)

# ==========================
# Configuration
# ==========================
API_BASE_URL = "http://localhost:8022/api/v1/gradio"


# ==========================
# Async streaming response
# ==========================
async def stream_response(
    query: str, top_k: int = 3, use_hybrid: bool = True
) -> Iterator[str]:
    """Stream response from the RAG API."""
    if not query.strip():
        yield "⚠️ Please enter a question."
        return

    payload = {"query": query, "top_k": top_k, "use_hybrid": use_hybrid}

    try:
        url = f"{API_BASE_URL}/stream"
        async with httpx.AsyncClient(timeout=60.0) as client:
            async with client.stream(
                "POST", url, json=payload, headers={"Accept": "text/event-stream"}
            ) as response:
                if response.status_code != 200:
                    yield f"❌ Error: API returned status {response.status_code}"
                    return

                current_answer = ""

                async for line in response.aiter_lines():
                    if line.startswith("data: "):
                        data_str = line[6:]  # Remove "data: " prefix

                        try:
                            data = json.loads(data_str)
                            print(f"Received data chunk: {data}")

                            # Handle error
                            if "error" in data:
                                yield f"❌ Error: {data['error']}"
                                return

                            # Handle streaming chunks
                            if "chunk" in data or "response" in data:
                                current_answer += data["chunk"]
                                yield current_answer

                            # Handle completion
                            if data.get("done", False):
                                final_answer = data.get("answer", current_answer)
                                if final_answer != current_answer:
                                    current_answer = final_answer

                                yield current_answer
                                break

                        except json.JSONDecodeError:
                            continue

    except httpx.RequestError as e:
        yield f"⚠️ Connection error: {str(e)}\nMake sure the API server is running at {API_BASE_URL}"
    except Exception as e:
        yield f"❌ Unexpected error: {str(e)}"


# ==========================
# Gradio Interface
# ==========================
def create_interface():
    """Create and configure the Gradio interface."""
    with gr.Blocks(
        title="arXiv Paper Assistance - RAG Chat", theme=gr.themes.Soft()
    ) as interface:
        gr.Markdown(
            """
            # 🔬 arXiv Paper Assistance - RAG Chat

            Ask questions about machine learning and AI research papers from arXiv.
            The system will search through indexed papers and provide answers with sources.
            """
        )

        with gr.Row():
            with gr.Column(scale=3):
                query_input = gr.Textbox(
                    label="Your Question",
                    placeholder="What are transformers in machine learning?",
                    lines=2,
                    max_lines=5,
                )
            with gr.Column(scale=1):
                submit_btn = gr.Button("Ask Question", variant="primary", size="lg")

        with gr.Row():
            with gr.Column():
                with gr.Accordion("Advanced Options", open=False):
                    top_k = gr.Slider(
                        minimum=1,
                        maximum=10,
                        value=3,
                        step=1,
                        label="Number of chunks to retrieve",
                        info="More chunks = more context but slower generation",
                    )

                    use_hybrid = gr.Checkbox(
                        value=True,
                        label="Use hybrid search",
                        info="Vector embeddings + metadata filtering for better results",
                    )

        response_output = gr.Markdown(
            label="Answer",
            value="Ask a question to get started!",
            height=400,
            elem_classes=["response-markdown"],
        )

        # Event bindings
        submit_btn.click(
            fn=stream_response,
            inputs=[query_input, top_k, use_hybrid],
            outputs=[response_output],
            show_progress=True,
        )
        query_input.submit(
            fn=stream_response,
            inputs=[query_input, top_k, use_hybrid],
            outputs=[response_output],
            show_progress=True,
        )

        gr.Markdown(
            """
            ---
            **Note**: Make sure the RAG API server is running at `http://localhost:8000`.
            """
        )

    return interface


# ==========================
# Main
# ==========================
def main():
    print("🚀 Starting arXiv Paper Curator Gradio Interface...")
    print(f"📡 API Base URL: {API_BASE_URL}")
    interface = create_interface()
    interface.launch(
        server_name="0.0.0.0",
        server_port=7861,
        share=False,
        show_error=True,
        quiet=False,
    )


if __name__ == "__main__":
    main()


上一篇
Day16 | RAG 的全流程(上):用 FastAPI 將 RAG 魔法包裝成後端 API
系列文
論文流浪記:我與AI 探索工具、組合流程、挑戰完整平台18
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言