到目前為止,我們的程式大多是「同步」的:一步一步往下執行,上一個任務沒完成,下一個任務就得等著。這種方式雖然直覺,但在需要等待外部回應(例如網路請求、檔案存取、資料庫查詢)時,效率會變得很低。
非同步程式設計(Asynchronous Programming) 的目的,就是讓程式在等待的時候,不必閒著,而是可以去做其他事。這樣可以大幅提升整體效能,特別是在 I/O 密集的情境下。
今天會學習:
async
/ await
撰寫非同步程式。await
、使用阻塞操作、錯誤的函式庫)。掌握非同步程式設計,能讓你寫出的程式不只是正確,更能高效處理多工需求,特別是在現代的網路應用與即時系統中。
非同步程式設計是一種讓程式「一邊等,一邊做別的事」的模式。
它特別適合 等待外部回應 的情境,例如:
術語 | 英文 | 說明 | 指令 |
---|---|---|---|
協程 | Coroutine | 可暫停和恢復的函數 | async def work() |
等待 | await | 把控制權交出去,等結果回來再繼續 | await task |
事件迴圈 | Event Loop | 負責排程與執行協程的「時間管理員」 | asyncio.run(main()) |
任務 | Task | 被排程執行的協程 | asyncio.create_task() |
同步(傳統餐廳)
非同步(快餐店)
同步(傳統排隊)
非同步(智慧醫院)
任務類型 | 同步執行 | 非同步執行 | 效能提升 |
---|---|---|---|
5 個網路請求(每個 3 秒) | 15 秒 | 3 秒 | 80% |
10 個檔案下載(每個 2 秒) | 20 秒 | 2 秒 | 90% |
100 次資料庫查詢 | 100 秒 | 10 秒 | 90% |
I/O 密集:網路請求、檔案、資料庫
需要等待外部服務:API、郵件發送
大量並發:聊天室、即時通訊
CPU 密集:AI 訓練、影像處理
簡單任務:一小段資料處理(同步更直覺)
import asyncio
# 定義協程函式(async 表示它能被暫停)
async def fetch_data(name):
print(f"開始處理 {name}")
await asyncio.sleep(2) # 模擬等待(非同步睡眠)
print(f"{name} 處理完成")
return f"結果: {name}"
# 主程式
async def main():
# 建立三個任務並同時執行
task1 = asyncio.create_task(fetch_data("任務A"))
task2 = asyncio.create_task(fetch_data("任務B"))
task3 = asyncio.create_task(fetch_data("任務C"))
# 等待所有任務完成
results = await asyncio.gather(task1, task2, task3)
print("所有結果:", results)
# 執行事件迴圈
if __name__ == "__main__":
asyncio.run(main())
async def
:定義一個可以「暫停」的函式。await asyncio.sleep(2)
:暫停 2 秒,這段時間讓出控制權給其他任務。asyncio.create_task()
:把協程變成可同時執行的任務。asyncio.gather()
:等待多個任務的結果。同步版(約 6 秒)
任務A → 任務B → 任務C
非同步版(約 2 秒)
任務A、B、C 同時開始 → 幾乎同時結束
import aiohttp, asyncio
async def fetch(session, url):
async with session.get(url) as resp:
text = await resp.text()
return {
"url": url,
"status": resp.status,
"size": len(text),
"preview": text[:120].replace("\n", " ")
}
async def main():
urls = [
"https://www.google.com"]
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(*(fetch(session, u) for u in urls))
for r in results:
print(f"{r['status']} {r['url']} size={r['size']}")
print(f"preview: {r['preview']}\n")
asyncio.run(main())
--執行結果如下所示,內容為網址、HTTP 狀態碼、回傳大小、前 120 字
import asyncio
async def jr_get_user(uid: int):
# 模擬從資料庫撈資料
await asyncio.sleep(0.1)
return {"id": uid, "name": f"jr-user-{uid}", "email": f"jr{uid}@gmail.com"}
async def jr_update_user(user: dict):
# 模擬更新使用者
await asyncio.sleep(0.05)
return {**user, "status": "updated"}
async def jr_process_users():
ids = [1, 2, 3]
users = await asyncio.gather(*(jr_get_user(uid) for uid in ids))
results = await asyncio.gather(*(jr_update_user(u) for u in users))
print(results)
asyncio.run(jr_process_users())
import aiofiles
import asyncio
async def read_file(name):
async with aiofiles.open(name, "r") as f:
return await f.read()
async def main():
files = ["test.txt", "good.txt"]
tasks = [read_file(f) for f in files]
contents = await asyncio.gather(*tasks)
print(contents)
asyncio.run(main())
# 錯誤
import time
async def bad():
time.sleep(1) # 阻塞事件迴圈
# 正確
async def good():
await asyncio.sleep(1)
# 錯誤
async def bad():
fetch_data() # 沒有等待
# 正確
async def good():
result = await fetch_data()
# 錯誤(阻塞)
import requests
async def bad():
requests.get("http://example.com")
# 正確(非同步)
import aiohttp
async def good():
async with aiohttp.ClientSession() as s:
async with s.get("http://example.com") as resp:
return await resp.text()