本次把「驗 Key」與防重放前移到 MCP 閘道,前端只打 /work/entry
,MCP 先驗 timestamp(±60s)+ 一次性 nonce(TTL 120s)+ IP 限流,再把乾淨的 body轉發給 n8n。為避免在 n8n 介面看到任何敏感值,預設不再轉發任何密鑰/簽章類標頭。
MCP /work/entry
→(MCP 代打)→ n8n Webhook
MCP 完成:時間窗、nonce 去重、每 IP 限流、審計(_audit.from_ip/ts/nonce
),轉發時只帶 Content-Type
,因此 n8n 的執行畫面不會出現 x-api-key / x-mcp-trust / x-ts / x-nonce
等敏感欄位。
Webhook 後第一個 Code 節點改做「去敏+傳遞 body」:
const h = $json.headers || {};
['authorization','x-api-key','x-mcp-trust','x-sig','x-ts','x-nonce','cookie']
.forEach(k => delete h[k.toLowerCase()]);
return [{ json: ($json.body ?? {}) }]; // 後續依 userRole 分流
防濫用參數:TIME_WINDOW=60
、NONCE_TTL=120
、RATE_LIMIT_PER_MIN=120
參考demo:
# MCP Gateway snippet (Python / FastAPI)
import os, time, json, secrets, hashlib, hmac
from typing import Dict
import httpx
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
N8N_WEBHOOK_URL = os.getenv("N8N_WEBHOOK_URL")
# 預設不轉發敏感標頭;如需雙層驗證再打開開關
FORWARD_TRUST = os.getenv("FORWARD_TRUST", "false").lower() == "true"
FORWARD_API_KEY = os.getenv("FORWARD_API_KEY", "false").lower() == "true"
FORWARD_TS_NONCE = os.getenv("FORWARD_TS_NONCE", "false").lower() == "true"
MCP_N8N_SHARED = os.getenv("MCP_N8N_SHARED")
N8N_API_KEY = os.getenv("N8N_API_KEY")
# 防重放 / 限流
TIME_WINDOW = int(os.getenv("TIME_WINDOW", "60"))
NONCE_TTL = int(os.getenv("NONCE_TTL", "120"))
RATE_LIMIT_PER_MIN = int(os.getenv("RATE_LIMIT_PER_MIN", "120"))
# (可選)簽章模式:打開後即會在 header 帶 x-sig;n8n 端需對應驗簽
ENABLE_HMAC_SIG = os.getenv("ENABLE_HMAC_SIG", "false").lower() == "true"
SHARED_SECRET = os.getenv("SHARED_SECRET", "").encode() if ENABLE_HMAC_SIG else b""
if not N8N_WEBHOOK_URL:
raise RuntimeError("Missing env: N8N_WEBHOOK_URL")
# 簡易儲存(nonce 去重、IP 限流)
_used_nonces: Dict[str, float] = {}
_ip_bucket: Dict[str, Dict[str, int]] = {}
def _cleanup_nonces():
now = time.time()
for k in list(_used_nonces.keys()):
if _used_nonces[k] < now:
del _used_nonces[k]
def _rate_limit_ip(ip: str):
now_min = time.strftime("%Y%m%d%H%M", time.gmtime())
rec = _ip_bucket.get(ip)
if not rec or rec.get("min") != now_min:
_ip_bucket[ip] = {"min": now_min, "cnt": 0}
_ip_bucket[ip]["cnt"] += 1
if _ip_bucket[ip]["cnt"] > RATE_LIMIT_PER_MIN:
raise HTTPException(status_code=429, detail="rate limited")
def _sha256(b: bytes) -> str:
import hashlib
return hashlib.sha256(b).hexdigest()
def _sign(method: str, path: str, ts: int, nonce: str, body_bytes: bytes) -> str:
msg = (method + path + str(ts) + nonce + _sha256(body_bytes)).encode()
return hmac.new(SHARED_SECRET, msg, hashlib.sha256).hexdigest()
gateway = FastAPI(title="MCP HTTP Gateway")
gateway.add_middleware(
CORSMiddleware,
allow_origins=["*"], allow_credentials=True,
allow_methods=["POST", "OPTIONS"], allow_headers=["*"],
)
@gateway.post("/work/entry")
async def work_entry(req: Request):
"""
安全閘道(雙重驗證前的第一層)
- 驗 timestamp(±TIME_WINDOW)+ 一次性 nonce(TTL)
- 每 IP 限流
- 附審計欄位 _audit;預設只轉發乾淨 body,不帶敏感標頭
- 若要在 n8n 再做第二層驗證,開 FORWARD_* 或 HMAC 簽章
"""
client_ip = (req.headers.get("x-forwarded-for") or req.client.host).split(",")[0].strip()
_rate_limit_ip(client_ip)
try:
payload = await req.json()
except Exception:
raise HTTPException(status_code=400, detail="invalid json body")
ts = payload.get("timestamp")
nonce = payload.get("nonce")
try:
ts = int(ts)
except Exception:
raise HTTPException(status_code=400, detail="bad timestamp")
now = int(time.time())
if abs(now - ts) > TIME_WINDOW:
raise HTTPException(status_code=400, detail="timestamp out of window")
if not nonce or not (1 <= len(str(nonce)) <= 128):
raise HTTPException(status_code=400, detail="invalid nonce")
_cleanup_nonces()
if str(nonce) in _used_nonces:
raise HTTPException(status_code=400, detail="nonce used")
_used_nonces[str(nonce)] = time.time() + NONCE_TTL
# 2) 準備轉發:body + 審計(headers 預設只帶 Content-Type)
body_to_n8n = payload | {"_audit": {"from_ip": client_ip, "ts": ts, "nonce": str(nonce)}}
body_bytes = json.dumps(body_to_n8n, separators=(",", ":")).encode()
fwd_headers = {"Content-Type": "application/json"}
if FORWARD_TRUST:
if not MCP_N8N_SHARED:
raise HTTPException(status_code=500, detail="missing MCP_N8N_SHARED for FORWARD_TRUST")
fwd_headers["x-mcp-trust"] = MCP_N8N_SHARED
if FORWARD_API_KEY:
if not N8N_API_KEY:
raise HTTPException(status_code=500, detail="missing N8N_API_KEY for FORWARD_API_KEY")
fwd_headers["x-api-key"] = N8N_API_KEY
if FORWARD_TS_NONCE:
fwd_headers["x-ts"] = str(ts)
fwd_headers["x-nonce"] = str(nonce)
if ENABLE_HMAC_SIG:
# 取得 path(需與 n8n 驗簽端一致;也可直接寫死 '/webhook/yourpath')
try:
path = "/" + N8N_WEBHOOK_URL.split("/", 3)[-1]
except Exception:
path = "/webhook"
sig = _sign("POST", path, ts, str(nonce), body_bytes)
fwd_headers["x-sig"] = sig
try:
async with httpx.AsyncClient(timeout=15) as cli:
r = await cli.post(N8N_WEBHOOK_URL, headers=fwd_headers, content=body_bytes)
if r.status_code >= 300:
raise HTTPException(status_code=r.status_code, detail=f"n8n error: {r.text}")
return r.json() if "application/json" in (r.headers.get("content-type") or "") else {"ok": True, "data": r.text}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=502, detail=f"upstream error: {e}")
安全建議:在 n8n 主機防火牆白名單只允許 MCP IP 存取 <your_n8n_port>;這樣即使完全不帶 trust header,也無法從外部直打 n8n。
Demo(baseline)— 正常呼叫
內容:帶 timestamp=now、隨機 nonce
預期:200/2xx 成功
Demo 1 — 缺少 timestamp
內容:不帶 timestamp(或空值/非整數)
預期:400,訊息:bad timestamp
Demo 2 — 過期 timestamp
內容:帶 兩分鐘前的 timestamp(超過 ±60s)
預期:400,訊息:timestamp out of window
今天把驗證與限流前移到 MCP,預設不轉發任何敏感標頭;必要時再開 FORWARD_* 或 HMAC 給 n8n 做第二道驗證。
這樣即達成三件事:API Key 不外露、重放無效、n8n 介面不見敏感資訊;如果再配防火牆白名單,就算知道 URL 也打不進來。