⚡《AI 知識系統建造日誌》這不是一篇純技術文章,而是一場工程師的魔法冒險。程式是咒語、流程是魔法陣、錯誤訊息則是黑暗詛咒。請準備好你的魔杖(鍵盤),今天,我們要踏入魔法學院的基礎魔法課,打造穩定、可擴展的 AI 知識系統。
昨天我們完成了 RAG API,雖然功能完整,但光靠 curl
測試,畫面只有冷冰冰的 JSON。
為了不讓 demo 看起來像考古題,我們今天的目標是:
今天的目標是:
簡單來說,就是幫 API 穿上衣服,不要讓它光著身子跑來跑去。
pip install gradio
python app.py
別忘了後端也要允許前端請求,不然你會看到一堆「CORS」錯誤訊息,搞得像是 API 在跟你冷戰。
origins = ["http://apiGateway:8000", "http://localhost:7861"]
# 設定允許的來源
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
原始程式碼主要分為兩個層次:
非同步串流回應 (stream_response
)
httpx.AsyncClient
以 SSE 或 StreamingResponse 方式逐段取得 RAG API 回應top_k
、hybrid search
Gradio 介面 (create_interface
)
gr.Blocks
建立 UI 元件(Textbox、Button、Slider、Checkbox、Markdown)stream_response
,支援按鈕點擊或 Enter 觸發非同步串流 (stream_response
)
透過 client.stream("POST", url, ...)
與 API 建立連線,並逐行讀取事件。
每個 SSE 資料事件前綴 "data: "
,需去掉再解析 JSON。
將接收到的 chunk 拼接成 current_answer
,即時回傳給前端 Markdown。
完成事件 (done
) 會傳回最終答案,確保前端與最終結果一致。
異常處理:
async with client.stream(
"POST", url, json=payload, headers={"Accept": "text/event-stream"}
) as response:
if response.status_code != 200:
yield f"❌ Error: API returned status {response.status_code}"
return
current_answer = ""
async for line in response.aiter_lines():
if line.startswith("data: "):
data_str = line[6:] # Remove "data: " prefix
try:
...
# Handle streaming chunks
if "chunk" in data or "response" in data:
current_answer += data["chunk"]
yield current_answer
# Handle completion
if data.get("done", False):
current_answer = data.get("answer", current_answer)
yield current_answer
break
except json.JSONDecodeError:
continue
後端我們加了一個 /api/v1/gradio/stream,其實就是把昨天的 stream API 包裝一下,幾個參數直接寫死,給前端 demo 用。
@stream_router.post("/api/v1/gradio/stream")
async def ask_question_gradio_stream(
request: GradioStreamRequest,
ollama_client: OllamaDep,
qdrant_client: QdrantDep,
user_cache_client: UserCacheDep,
):
logger.info(f"request {request}")
settings = SystemSettings(
user_language="Traditional Chinese",
translate=True,
system_prompt="",
top_k=request.top_k,
use_rag=True,
subscribe_email=True,
reranker_enabled=True,
temperature=0.3,
hybrid_search=request.use_hybrid,
)
return StreamingResponse(
rag_stream(
ollama_client=ollama_client,
qdrant_client=qdrant_client,
query=request.query,
system_settings=settings,
user_id="gradio user",
categories=request.categories,
),
media_type="text/event-stream", # 前端 fetch 會逐段讀取
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
)
Gradio 介面
使用 gr.Row()
與 gr.Column()
組合布局,保持簡潔與可擴展性
「進階選項」提供使用者調整:
Event binding:Enter / Button 點擊 → 串流問答
submit_btn.click(fn=stream_response, inputs=[query_input, top_k, use_hybrid, model_choice], outputs=[response_output], show_progress=True)
query_input.submit(fn=stream_response, inputs=[query_input, top_k, use_hybrid, model_choice], outputs=[response_output], show_progress=True)
→ 按鈕,觸發非同步問答
啟動入口 (main
)
server_name="0.0.0.0"
以允許外部訪問server_port=7861
show_error=True
保留 debug 訊息interface.launch(
server_name="0.0.0.0",
server_port=7861,
share=False,
show_error=True,
quiet=False,
)
stream_response
/api/v1/gradio
done=True
)技術亮點:
stream_response
封裝了 API 串流邏輯與錯誤處理app.py
import json
import logging
from typing import Iterator
import httpx
import gradio as gr
logger = logging.getLogger(__name__)
# ==========================
# Configuration
# ==========================
API_BASE_URL = "http://localhost:8022/api/v1/gradio"
# ==========================
# Async streaming response
# ==========================
async def stream_response(
query: str, top_k: int = 3, use_hybrid: bool = True
) -> Iterator[str]:
"""Stream response from the RAG API."""
if not query.strip():
yield "⚠️ Please enter a question."
return
payload = {"query": query, "top_k": top_k, "use_hybrid": use_hybrid}
try:
url = f"{API_BASE_URL}/stream"
async with httpx.AsyncClient(timeout=60.0) as client:
async with client.stream(
"POST", url, json=payload, headers={"Accept": "text/event-stream"}
) as response:
if response.status_code != 200:
yield f"❌ Error: API returned status {response.status_code}"
return
current_answer = ""
async for line in response.aiter_lines():
if line.startswith("data: "):
data_str = line[6:] # Remove "data: " prefix
try:
data = json.loads(data_str)
print(f"Received data chunk: {data}")
# Handle error
if "error" in data:
yield f"❌ Error: {data['error']}"
return
# Handle streaming chunks
if "chunk" in data or "response" in data:
current_answer += data["chunk"]
yield current_answer
# Handle completion
if data.get("done", False):
final_answer = data.get("answer", current_answer)
if final_answer != current_answer:
current_answer = final_answer
yield current_answer
break
except json.JSONDecodeError:
continue
except httpx.RequestError as e:
yield f"⚠️ Connection error: {str(e)}\nMake sure the API server is running at {API_BASE_URL}"
except Exception as e:
yield f"❌ Unexpected error: {str(e)}"
# ==========================
# Gradio Interface
# ==========================
def create_interface():
"""Create and configure the Gradio interface."""
with gr.Blocks(
title="arXiv Paper Assistance - RAG Chat", theme=gr.themes.Soft()
) as interface:
gr.Markdown(
"""
# 🔬 arXiv Paper Assistance - RAG Chat
Ask questions about machine learning and AI research papers from arXiv.
The system will search through indexed papers and provide answers with sources.
"""
)
with gr.Row():
with gr.Column(scale=3):
query_input = gr.Textbox(
label="Your Question",
placeholder="What are transformers in machine learning?",
lines=2,
max_lines=5,
)
with gr.Column(scale=1):
submit_btn = gr.Button("Ask Question", variant="primary", size="lg")
with gr.Row():
with gr.Column():
with gr.Accordion("Advanced Options", open=False):
top_k = gr.Slider(
minimum=1,
maximum=10,
value=3,
step=1,
label="Number of chunks to retrieve",
info="More chunks = more context but slower generation",
)
use_hybrid = gr.Checkbox(
value=True,
label="Use hybrid search",
info="Vector embeddings + metadata filtering for better results",
)
response_output = gr.Markdown(
label="Answer",
value="Ask a question to get started!",
height=400,
elem_classes=["response-markdown"],
)
# Event bindings
submit_btn.click(
fn=stream_response,
inputs=[query_input, top_k, use_hybrid],
outputs=[response_output],
show_progress=True,
)
query_input.submit(
fn=stream_response,
inputs=[query_input, top_k, use_hybrid],
outputs=[response_output],
show_progress=True,
)
gr.Markdown(
"""
---
**Note**: Make sure the RAG API server is running at `http://localhost:8000`.
"""
)
return interface
# ==========================
# Main
# ==========================
def main():
print("🚀 Starting arXiv Paper Curator Gradio Interface...")
print(f"📡 API Base URL: {API_BASE_URL}")
interface = create_interface()
interface.launch(
server_name="0.0.0.0",
server_port=7861,
share=False,
show_error=True,
quiet=False,
)
if __name__ == "__main__":
main()