@Photon.handler(method="POST", path="/query")
當接收到 POST 訊號(例如向 /query 路徑發送的 POST 請求)後,這段程式碼會按照以下步驟進行操作:
函數定義與文檔字符串
def query_function(
self,
query: str,
search_uuid: str,
generate_related_questions: Optional[bool] = True,
) -> StreamingResponse:
"""
Query the search engine and returns the response.
The query can have the following fields:
- query: the user query.
- search_uuid: a uuid that is used to store or retrieve the search result. If
the uuid does not exist, generate and write to the kv. If the kv
fails, we generate regardless, in favor of availability. If the uuid
exists, return the stored result.
- generate_related_questions: if set to false, will not generate related
questions. Otherwise, will depend on the environment variable
RELATED_QUESTIONS. Default: true.
"""
def query_function(...) -> StreamingResponse:
:定義了一個名為 query_function
的方法,屬於某個類(從 self
參數可以看出)。該方法接收三個參數:
query: str
:用戶查詢的字符串。search_uuid: str
:用於存儲或檢索搜索結果的唯一標識符(UUID)。generate_related_questions: Optional[bool] = True
:一個可選的布爾值,指示是否生成相關問題。默認值為 True。-> StreamingResponse
:表示該方法返回一個 StreamingResponse
對象,這是一種特殊的 FastAPI 響應類型,允許以流的方式發送數據,而不是一次性將所有數據發送到客戶端。"""..."""
:這是一個文檔字符串,描述了函數的作用、參數和返回值。它對於理解代碼和生成文檔非常有用。處理 search_uuid
# Note that, if uuid exists, we don't check if the stored query is the same
# as the current query, and simply return the stored result. This is to enable
# the user to share a searched link to others and have others see the same result.
if search_uuid:
try:
result = self.kv.get(search_uuid)
def str_to_generator(result: str) -> Generator[str, None, None]:
yield result
return StreamingResponse(str_to_generator(result))
except KeyError:
logger.info(f"Key {search_uuid} not found, will generate again.")
except Exception as e:
logger.error(
f"KV error: {e}\n{traceback.format_exc()}, will generate again."
)
else:
raise HTTPException(status_code=400, detail="search_uuid must be provided.")
search_uuid
是否存在。如果存在,則嘗試從 self.kv
(一個鍵值存儲)中獲取對應的結果。try...except
塊用於處理可能出現的錯誤:
KeyError
:如果 search_uuid
在 KV 存儲中不存在,則記錄一條信息,表示將重新生成結果。Exception
:如果發生其他錯誤,則記錄錯誤信息,並同樣表示將重新生成結果。search_uuid
不存在,則拋出一個 HTTP 400 錯誤,提示必須提供 search_uuid
。str_to_generator
),並使用 StreamingResponse
返回。這樣做的好處是,可以逐步將結果發送給客戶端,而不是一次性將所有數據加載到內存中。處理 LEPTON 後端
if self.backend == "LEPTON":
# delegate to the lepton search api.
result = self.leptonsearch_client.query(
query=query,
search_uuid=search_uuid,
generate_related_questions=generate_related_questions,
)
return StreamingResponse(content=result, media_type="text/html")
self.backend
的值為 "LEPTON",則將查詢委託給 self.leptonsearch_client
處理。self.leptonsearch_client.query(...)
函數調用可能是一個遠程 API 調用,將查詢、UUID 和是否生成相關問題的信息發送給 Lepton 搜索 API。StreamingResponse
,其中包含 Lepton 搜索 API 返回的結果,並將媒體類型設置為 "text/html"。查詢處理與響應生成
# First, do a search query.
query = query or _default_query
# Basic attack protection: remove "[INST]" or "[/INST]" from the query
query = re.sub(r"\[/?INST\]", "", query)
contexts = self.search_function(query)
system_prompt = _rag_query_text.format(
context="\n\n".join(
[f"[[citation:{i+1}]] {c['snippet']}" for i, c in enumerate(contexts)]
)
)
try:
client = self.local_client()
llm_response = client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": query},
],
max_tokens=1024,
stop=stop_words,
stream=True,
temperature=0.9,
)
if self.should_do_related_questions and generate_related_questions:
# While the answer is being generated, we can start generating
# related questions as a future.
related_questions_future = self.executor.submit(
self.get_related_questions, query, contexts
)
else:
related_questions_future = None
except Exception as e:
logger.error(f"encountered error: {e}\n{traceback.format_exc()}")
return HTMLResponse("Internal server error.", 503)
return StreamingResponse(
self.stream_and_upload_to_kv(
contexts, llm_response, related_questions_future, search_uuid
),
media_type="text/html",
)
query
為空,則使用默認查詢 _default_query
。query
中移除可能存在的攻擊性內容(例如,[INST]
或 [/INST]
)。self.search_function
進行搜索,獲取上下文 contexts
。system_prompt
,將搜索得到的上下文信息格式化為 LLM 模型可以理解的形式。self.local_client().chat.completions.create
方法調用 LLM 模型,生成回答。
model=self.model
:指定要使用的 LLM 模型。messages=[...]
:提供系統提示和用戶查詢作為輸入。max_tokens=1024
:限制生成的最大 token 數量。stop=stop_words
:指定停止生成的條件。stream=True
:以流的方式生成響應。temperature=0.9
:控制生成的多樣性。self.should_do_related_questions
和 generate_related_questions
都為 True,則異步生成相關問題。StreamingResponse
返回結果,同時將結果上傳到 KV 存儲中。總結
query_function
函數的主要功能是處理用戶查詢,並返回搜索引擎和 LLM 模型生成的響應。它首先嘗試從 KV 存儲中獲取緩存的結果,如果不存在或後端為 LEPTON,則進行搜索並調用 LLM 模型生成回答。生成的響應以流的方式返回,同時異步上傳到 KV 存儲中。
目前大致的流程如此,明天大概會開始修改成符合我需求的版本。
目前項目的思考: 可能是一個能載入多個網站的網頁或桌面應用,中間使用 agent 跟 rag 技術 統整總結回答,桌面應用可能會試著用Electron 或 Flutter 去搭,感覺光是要在一個網頁裡面切成九宮格載入網站就要花很多的時間去做了,另外後端求快的話我大概就用python相關的框架去搭,或者找一個go 的後端去搭看看。
另外agent 跟 rag 明天看看能不能使用前端的PyScript 或相關的技術載入,這樣的話就有可能把這個網頁放在 類似github.io 這類的靜態網站上使用而不用太多的花費。