iT邦幫忙

2025 iThome 鐵人賽

DAY 17
0
生成式 AI

別讓駭客拿走你的AI控制權:MCP x n8n 防禦實戰全攻略系列 第 17

# 防禦策略 - API Key 不再外露:timestamp/nonce 阻截重放

  • 分享至 

  • xImage
  •  

本次把「驗 Key」與防重放前移到 MCP 閘道,前端只打 /work/entry,MCP 先驗 timestamp(±60s)+ 一次性 nonce(TTL 120s)+ IP 限流,再把乾淨的 body轉發給 n8n。為避免在 n8n 介面看到任何敏感值,預設不再轉發任何密鑰/簽章類標頭

架構與流程

MCP /work/entry →(MCP 代打)→ n8n Webhook
MCP 完成:時間窗、nonce 去重、每 IP 限流、審計(_audit.from_ip/ts/nonce),轉發時只帶 Content-Type,因此 n8n 的執行畫面不會出現 x-api-key / x-mcp-trust / x-ts / x-nonce 等敏感欄位。

n8n 改動

Webhook 後第一個 Code 節點改做「去敏+傳遞 body」:

const h = $json.headers || {};
['authorization','x-api-key','x-mcp-trust','x-sig','x-ts','x-nonce','cookie']
  .forEach(k => delete h[k.toLowerCase()]);
return [{ json: ($json.body ?? {}) }]; // 後續依 userRole 分流

MCP 新增

防濫用參數:TIME_WINDOW=60NONCE_TTL=120RATE_LIMIT_PER_MIN=120

參考demo:

# MCP Gateway snippet (Python / FastAPI)
import os, time, json, secrets, hashlib, hmac
from typing import Dict
import httpx
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware

N8N_WEBHOOK_URL = os.getenv("N8N_WEBHOOK_URL")              
# 預設不轉發敏感標頭;如需雙層驗證再打開開關
FORWARD_TRUST    = os.getenv("FORWARD_TRUST", "false").lower() == "true"
FORWARD_API_KEY  = os.getenv("FORWARD_API_KEY", "false").lower() == "true"
FORWARD_TS_NONCE = os.getenv("FORWARD_TS_NONCE", "false").lower() == "true"

MCP_N8N_SHARED = os.getenv("MCP_N8N_SHARED")               
N8N_API_KEY    = os.getenv("N8N_API_KEY")                  

# 防重放 / 限流
TIME_WINDOW = int(os.getenv("TIME_WINDOW", "60"))
NONCE_TTL   = int(os.getenv("NONCE_TTL", "120"))
RATE_LIMIT_PER_MIN = int(os.getenv("RATE_LIMIT_PER_MIN", "120"))

# (可選)簽章模式:打開後即會在 header 帶 x-sig;n8n 端需對應驗簽
ENABLE_HMAC_SIG = os.getenv("ENABLE_HMAC_SIG", "false").lower() == "true"
SHARED_SECRET   = os.getenv("SHARED_SECRET", "").encode() if ENABLE_HMAC_SIG else b""

if not N8N_WEBHOOK_URL:
    raise RuntimeError("Missing env: N8N_WEBHOOK_URL")

# 簡易儲存(nonce 去重、IP 限流)
_used_nonces: Dict[str, float] = {}
_ip_bucket: Dict[str, Dict[str, int]] = {}

def _cleanup_nonces():
    now = time.time()
    for k in list(_used_nonces.keys()):
        if _used_nonces[k] < now:
            del _used_nonces[k]

def _rate_limit_ip(ip: str):
    now_min = time.strftime("%Y%m%d%H%M", time.gmtime())
    rec = _ip_bucket.get(ip)
    if not rec or rec.get("min") != now_min:
        _ip_bucket[ip] = {"min": now_min, "cnt": 0}
    _ip_bucket[ip]["cnt"] += 1
    if _ip_bucket[ip]["cnt"] > RATE_LIMIT_PER_MIN:
        raise HTTPException(status_code=429, detail="rate limited")

def _sha256(b: bytes) -> str:
    import hashlib
    return hashlib.sha256(b).hexdigest()

def _sign(method: str, path: str, ts: int, nonce: str, body_bytes: bytes) -> str:
    msg = (method + path + str(ts) + nonce + _sha256(body_bytes)).encode()
    return hmac.new(SHARED_SECRET, msg, hashlib.sha256).hexdigest()

gateway = FastAPI(title="MCP HTTP Gateway")
gateway.add_middleware(
    CORSMiddleware,
    allow_origins=["*"], allow_credentials=True,
    allow_methods=["POST", "OPTIONS"], allow_headers=["*"],
)

@gateway.post("/work/entry")
async def work_entry(req: Request):
    """
    安全閘道(雙重驗證前的第一層)
    - 驗 timestamp(±TIME_WINDOW)+ 一次性 nonce(TTL)
    - 每 IP 限流
    - 附審計欄位 _audit;預設只轉發乾淨 body,不帶敏感標頭
    - 若要在 n8n 再做第二層驗證,開 FORWARD_* 或 HMAC 簽章
    """
    client_ip = (req.headers.get("x-forwarded-for") or req.client.host).split(",")[0].strip()
    _rate_limit_ip(client_ip)

    try:
        payload = await req.json()
    except Exception:
        raise HTTPException(status_code=400, detail="invalid json body")

    ts = payload.get("timestamp")
    nonce = payload.get("nonce")
    try:
        ts = int(ts)
    except Exception:
        raise HTTPException(status_code=400, detail="bad timestamp")
    now = int(time.time())
    if abs(now - ts) > TIME_WINDOW:
        raise HTTPException(status_code=400, detail="timestamp out of window")

    if not nonce or not (1 <= len(str(nonce)) <= 128):
        raise HTTPException(status_code=400, detail="invalid nonce")
    _cleanup_nonces()
    if str(nonce) in _used_nonces:
        raise HTTPException(status_code=400, detail="nonce used")
    _used_nonces[str(nonce)] = time.time() + NONCE_TTL

    # 2) 準備轉發:body + 審計(headers 預設只帶 Content-Type)
    body_to_n8n = payload | {"_audit": {"from_ip": client_ip, "ts": ts, "nonce": str(nonce)}}
    body_bytes = json.dumps(body_to_n8n, separators=(",", ":")).encode()

    fwd_headers = {"Content-Type": "application/json"}
    if FORWARD_TRUST:
        if not MCP_N8N_SHARED:
            raise HTTPException(status_code=500, detail="missing MCP_N8N_SHARED for FORWARD_TRUST")
        fwd_headers["x-mcp-trust"] = MCP_N8N_SHARED
    if FORWARD_API_KEY:
        if not N8N_API_KEY:
            raise HTTPException(status_code=500, detail="missing N8N_API_KEY for FORWARD_API_KEY")
        fwd_headers["x-api-key"] = N8N_API_KEY
    if FORWARD_TS_NONCE:
        fwd_headers["x-ts"] = str(ts)
        fwd_headers["x-nonce"] = str(nonce)

    if ENABLE_HMAC_SIG:
        # 取得 path(需與 n8n 驗簽端一致;也可直接寫死 '/webhook/yourpath')
        try:
            path = "/" + N8N_WEBHOOK_URL.split("/", 3)[-1]
        except Exception:
            path = "/webhook"
        sig = _sign("POST", path, ts, str(nonce), body_bytes)
        fwd_headers["x-sig"] = sig

    try:
        async with httpx.AsyncClient(timeout=15) as cli:
            r = await cli.post(N8N_WEBHOOK_URL, headers=fwd_headers, content=body_bytes)
        if r.status_code >= 300:
            raise HTTPException(status_code=r.status_code, detail=f"n8n error: {r.text}")
        return r.json() if "application/json" in (r.headers.get("content-type") or "") else {"ok": True, "data": r.text}
    except HTTPException:
        raise
    except Exception as e:
        raise HTTPException(status_code=502, detail=f"upstream error: {e}")

安全建議:在 n8n 主機防火牆白名單只允許 MCP IP 存取 <your_n8n_port>;這樣即使完全不帶 trust header,也無法從外部直打 n8n。

為什麼更安全

  • 金鑰不外露:API Key/Trust/Sig 不進入 n8n 執行資料,UI 與日誌皆不可見。
  • 重放無效:MCP 先擋舊時間戳與重複 nonce。
  • 攻擊面縮小:n8n 僅接受來自 MCP 的請求(再配白名單/內網更穩)。
  • 職責分離:MCP 管驗證與流量;n8n 專注業務路由。

測試

Demo(baseline)— 正常呼叫
內容:帶 timestamp=now、隨機 nonce
預期:200/2xx 成功

Demo 1 — 缺少 timestamp
內容:不帶 timestamp(或空值/非整數)
預期:400,訊息:bad timestamp
https://ithelp.ithome.com.tw/upload/images/20251001/201686874PLeqWoX4R.png
Demo 2 — 過期 timestamp
內容:帶 兩分鐘前的 timestamp(超過 ±60s)
預期:400,訊息:timestamp out of window
https://ithelp.ithome.com.tw/upload/images/20251001/20168687PGWwIBkcJM.png

總結

今天把驗證與限流前移到 MCP,預設不轉發任何敏感標頭;必要時再開 FORWARD_* 或 HMAC 給 n8n 做第二道驗證。
這樣即達成三件事:API Key 不外露、重放無效、n8n 介面不見敏感資訊;如果再配防火牆白名單,就算知道 URL 也打不進來。


上一篇
# 模擬攻擊:API Key 被竊取並重放
下一篇
# 硬編碼金鑰外洩
系列文
別讓駭客拿走你的AI控制權:MCP x n8n 防禦實戰全攻略21
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言