到目前為止,我們的程式大多是「同步」的:一步一步往下執行,上一個任務沒完成,下一個任務就得等著。這種方式雖然直覺,但在需要等待外部回應(例如網路請求、檔案存取、資料庫查詢)時,效率會變得很低。
非同步程式設計(Asynchronous Programming) 的目的,就是讓程式在等待的時候,不必閒著,而是可以去做其他事。這樣可以大幅提升整體效能,特別是在 I/O 密集的情境下。
今天會學習:
async / await 撰寫非同步程式。await、使用阻塞操作、錯誤的函式庫)。掌握非同步程式設計,能讓你寫出的程式不只是正確,更能高效處理多工需求,特別是在現代的網路應用與即時系統中。
非同步程式設計是一種讓程式「一邊等,一邊做別的事」的模式。
它特別適合 等待外部回應 的情境,例如:
| 術語 | 英文 | 說明 | 指令 | 
|---|---|---|---|
| 協程 | Coroutine | 可暫停和恢復的函數 | async def work() | 
| 等待 | await | 把控制權交出去,等結果回來再繼續 | await task | 
| 事件迴圈 | Event Loop | 負責排程與執行協程的「時間管理員」 | asyncio.run(main()) | 
| 任務 | Task | 被排程執行的協程 | asyncio.create_task() | 
同步(傳統餐廳)
非同步(快餐店)
同步(傳統排隊)
非同步(智慧醫院)
| 任務類型 | 同步執行 | 非同步執行 | 效能提升 | 
|---|---|---|---|
| 5 個網路請求(每個 3 秒) | 15 秒 | 3 秒 | 80% | 
| 10 個檔案下載(每個 2 秒) | 20 秒 | 2 秒 | 90% | 
| 100 次資料庫查詢 | 100 秒 | 10 秒 | 90% | 
I/O 密集:網路請求、檔案、資料庫
需要等待外部服務:API、郵件發送
大量並發:聊天室、即時通訊
CPU 密集:AI 訓練、影像處理
簡單任務:一小段資料處理(同步更直覺)
import asyncio
# 定義協程函式(async 表示它能被暫停)
async def fetch_data(name):
    print(f"開始處理 {name}")
    await asyncio.sleep(2)  # 模擬等待(非同步睡眠)
    print(f"{name} 處理完成")
    return f"結果: {name}"
# 主程式
async def main():
    # 建立三個任務並同時執行
    task1 = asyncio.create_task(fetch_data("任務A"))
    task2 = asyncio.create_task(fetch_data("任務B"))
    task3 = asyncio.create_task(fetch_data("任務C"))
    # 等待所有任務完成
    results = await asyncio.gather(task1, task2, task3)
    print("所有結果:", results)
# 執行事件迴圈
if __name__ == "__main__":
    asyncio.run(main())
async def:定義一個可以「暫停」的函式。await asyncio.sleep(2):暫停 2 秒,這段時間讓出控制權給其他任務。asyncio.create_task():把協程變成可同時執行的任務。asyncio.gather():等待多個任務的結果。同步版(約 6 秒)
任務A → 任務B → 任務C
非同步版(約 2 秒)
任務A、B、C 同時開始 → 幾乎同時結束
import aiohttp, asyncio
async def fetch(session, url):
    async with session.get(url) as resp:
        text = await resp.text()
        return {
            "url": url,
            "status": resp.status,
            "size": len(text),
            "preview": text[:120].replace("\n", " ")
        }
async def main():
    urls = [
        "https://www.google.com"]
    async with aiohttp.ClientSession() as session:
        results = await asyncio.gather(*(fetch(session, u) for u in urls))
        for r in results:
            print(f"{r['status']} {r['url']} size={r['size']}")
            print(f"preview: {r['preview']}\n")
asyncio.run(main())
--執行結果如下所示,內容為網址、HTTP 狀態碼、回傳大小、前 120 字
import asyncio
async def jr_get_user(uid: int):
    # 模擬從資料庫撈資料
    await asyncio.sleep(0.1)
    return {"id": uid, "name": f"jr-user-{uid}", "email": f"jr{uid}@gmail.com"}
async def jr_update_user(user: dict):
    # 模擬更新使用者
    await asyncio.sleep(0.05)
    return {**user, "status": "updated"}
async def jr_process_users():
    ids = [1, 2, 3]
    users = await asyncio.gather(*(jr_get_user(uid) for uid in ids))
    results = await asyncio.gather(*(jr_update_user(u) for u in users))
    print(results)
asyncio.run(jr_process_users())
import aiofiles
import asyncio
async def read_file(name):
    async with aiofiles.open(name, "r") as f:
        return await f.read()
async def main():
    files = ["test.txt", "good.txt"]
    tasks = [read_file(f) for f in files]
    contents = await asyncio.gather(*tasks)
    print(contents)
asyncio.run(main())
#  錯誤
import time
async def bad():
    time.sleep(1)  # 阻塞事件迴圈
#  正確
async def good():
    await asyncio.sleep(1)
#  錯誤
async def bad():
    fetch_data()  # 沒有等待
#  正確
async def good():
    result = await fetch_data()
#  錯誤(阻塞)
import requests
async def bad():
    requests.get("http://example.com")
#  正確(非同步)
import aiohttp
async def good():
    async with aiohttp.ClientSession() as s:
        async with s.get("http://example.com") as resp:
            return await resp.text()