接著要開始新增一些功能了,今天我想先給他來個地震功能,原因很簡單,因為我怕死
我們使用的api依舊是由中央氣象署提供的api,我選擇的是顯著有感地震報告
https://opendata.cwa.gov.tw/dist/opendata-swagger.html#/
其實可以直接透過n8n的http request來做到獲得資料,並讓ai處理的,但是由於
再來看你之後是否還想要使用claude desktop來呼喊你的mcp server,如果不想要的話,這邊的.yaml規範可以直接跳過,想要的可以照著這個丟在你的.yaml,而且之前需要再開一個fastmcp檔案,因為之前那個被改成了透過位址連接的server,沒辦法透過claude desktop呼叫
/earthquake:
post:
operationId: getEarthquakes
summary: 取得地震資料
description: 可指定縣市或取得全部最新地震
requestBody:
required: true
content:
application/json:
schema:
type: object
properties:
target_areas:
type: string
description: "目標縣市,可以是字串或 JSON 陣列格式的字串"
example: "臺中市"
limit:
type: string
description: "限制回傳地震筆數"
example: "5"
default: "5"
responses:
'200':
description: 地震查詢結果
content:
application/json:
schema:
type: object
properties:
success:
type: boolean
count:
type: integer
earthquakes:
type: array
items:
type: object
properties:
earthquake_no:
type: string
origin_time:
type: string
location:
type: string
magnitude:
type: string
depth:
type: string
latitude:
type: string
longitude:
type: string
target_areas:
type: array
items:
type: string
description: "如果指定了特定縣市"
'400':
description: 請求錯誤
content:
application/json:
schema:
type: object
properties:
error:
type: string
接著是python的部分
import httpx
import os
from typing import Optional, List
from dotenv import load_dotenv
# 載入環境變數
load_dotenv()
# 從環境變數取得 API 金鑰
CWB_API_KEY = os.getenv("CWB_API_KEY")
class SimpleEarthquakeAPI:
def __init__(self):
self.api_key = CWB_API_KEY
self.base_url = "https://opendata.cwa.gov.tw/api/v1/rest/datastore/E-A0015-001"
async def get_latest_earthquakes(self, limit: int = 5):
"""取得最新的地震報告(簡化版本)"""
if not self.api_key:
return {"error": "未設定 CWB_API_KEY 環境變數"}
params = {
"Authorization": self.api_key,
"format": "JSON",
"limit": str(limit),
"sort": "OriginTime"
}
try:
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
response = await client.get(self.base_url, params=params)
response.raise_for_status()
data = response.json()
if data.get("success") != "true":
return {"error": f"API 回應錯誤: {data.get('result', {}).get('message', '未知錯誤')}"}
# 簡化資料格式
records = data.get("records", {})
earthquake_data = records.get("Earthquake", [])
simplified_data = []
for earthquake in earthquake_data:
earthquake_info = earthquake.get("EarthquakeInfo", {})
epicenter = earthquake_info.get("Epicenter", {})
magnitude = earthquake_info.get("EarthquakeMagnitude", {})
simplified_data.append({
"earthquake_no": earthquake_info.get("EarthquakeNo", "未知"),
"origin_time": earthquake_info.get("OriginTime", "未知時間"),
"location": epicenter.get("Location", "未知位置"),
"magnitude": f"{magnitude.get('MagnitudeType', '')} {magnitude.get('MagnitudeValue', '未知')}".strip(),
"depth": earthquake_info.get("FocalDepth", "未知"),
"latitude": epicenter.get("EpicenterLatitude", "未知"),
"longitude": epicenter.get("EpicenterLongitude", "未知")
})
return {
"success": True,
"count": len(simplified_data),
"earthquakes": simplified_data
}
except httpx.HTTPStatusError as e:
return {"error": f"HTTP 錯誤 {e.response.status_code}"}
except httpx.RequestError as e:
return {"error": f"連線錯誤: {str(e)}"}
except Exception as e:
return {"error": f"未預期的錯誤: {str(e)}"}
async def get_earthquake_by_areas(self, target_areas: List[str], limit: int = 5):
"""取得指定縣市的地震資料"""
if not self.api_key:
return {"error": "未設定 CWB_API_KEY 環境變數"}
params = {
"Authorization": self.api_key,
"format": "JSON",
"limit": str(limit * 2), # 取得更多資料以確保找到相關地震
"sort": "OriginTime"
}
try:
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
response = await client.get(self.base_url, params=params)
response.raise_for_status()
data = response.json()
if data.get("success") != "true":
return {"error": f"API 回應錯誤: {data.get('result', {}).get('message', '未知錯誤')}"}
records = data.get("records", {})
earthquake_data = records.get("Earthquake", [])
filtered_earthquakes = []
for earthquake in earthquake_data:
earthquake_info = earthquake.get("EarthquakeInfo", {})
intensities = earthquake.get("Intensity", {})
shake_map = intensities.get("ShakingArea", [])
# 檢查是否影響目標區域
target_areas_data = []
found_target = False
for area_info in shake_map:
area_desc = area_info.get("AreaDesc", "")
area_intensity = area_info.get("AreaIntensity", "")
# 檢查是否包含目標縣市
for target_area in target_areas:
target_variations = [
target_area,
target_area.replace("臺", "台"),
target_area.replace("台", "臺")
]
if any(var in area_desc for var in target_variations):
target_areas_data.append({
"area": area_desc,
"intensity": area_intensity
})
found_target = True
break
# 只保留影響目標區域的地震
if found_target:
epicenter = earthquake_info.get("Epicenter", {})
magnitude = earthquake_info.get("EarthquakeMagnitude", {})
filtered_earthquakes.append({
"earthquake_no": earthquake_info.get("EarthquakeNo", "未知"),
"origin_time": earthquake_info.get("OriginTime", "未知時間"),
"location": epicenter.get("Location", "未知位置"),
"magnitude": f"{magnitude.get('MagnitudeType', '')} {magnitude.get('MagnitudeValue', '未知')}".strip(),
"depth": earthquake_info.get("FocalDepth", "未知"),
"latitude": epicenter.get("EpicenterLatitude", "未知"),
"longitude": epicenter.get("EpicenterLongitude", "未知"),
"target_areas": target_areas_data
})
if len(filtered_earthquakes) >= limit:
break
return {
"success": True,
"target_areas": target_areas,
"count": len(filtered_earthquakes),
"earthquakes": filtered_earthquakes
}
except httpx.HTTPStatusError as e:
return {"error": f"HTTP 錯誤 {e.response.status_code}"}
except httpx.RequestError as e:
return {"error": f"連線錯誤: {str(e)}"}
except Exception as e:
return {"error": f"未預期的錯誤: {str(e)}"}
def format_earthquake_data(self, data):
"""格式化地震資料顯示"""
if not data.get("success"):
return f"錯誤: {data.get('error', '未知錯誤')}"
earthquakes = data.get("earthquakes", [])
if not earthquakes:
return "未找到地震資料"
result = []
result.append("最新地震報告")
result.append("=" * 30)
for i, eq in enumerate(earthquakes, 1):
result.append(f"\n地震 #{i}")
result.append(f"編號: {eq['earthquake_no']}")
result.append(f"時間: {eq['origin_time']}")
result.append(f"位置: {eq['location']}")
result.append(f"規模: {eq['magnitude']}")
result.append(f"深度: {eq['depth']} 公里")
result.append(f"座標: {eq['latitude']}, {eq['longitude']}")
# 如果有目標區域震度資料
if 'target_areas' in eq:
result.append("影響區域:")
for area in eq['target_areas']:
result.append(f" • {area['area']}: 震度 {area['intensity']}")
return "\n".join(result)
rest_api的部分
earthquake_api = SimpleEarthquakeAPI()
class EarthquakeRequest(BaseModel):
target_areas: Optional[str] = None # 改成接受字串,可能是陣列格式的字串
limit: Optional[str] = "5" # 改成字串,之後轉換成數字
@app.post("/earthquake")
async def get_earthquakes(
request: EarthquakeRequest
):
"""取得地震資料 - 可指定縣市或取得全部最新地震"""
try:
# 處理 target_areas,支援字串格式(來自 n8n)
target_areas = None
if request.target_areas:
# 如果是 JSON 陣列字串格式,例如 "[\"臺中市\"]"
if request.target_areas.startswith('[') and request.target_areas.endswith(']'):
import json
try:
target_areas = json.loads(request.target_areas)
except json.JSONDecodeError:
# 如果 JSON 解析失敗,就當作單一縣市
target_areas = [request.target_areas.strip('[]"')]
else:
# 如果是單純的字串,就當作單一縣市
target_areas = [request.target_areas]
# 處理 limit 參數,轉換成數字
try:
limit_num = int(request.limit) if request.limit else 5
except (ValueError, TypeError):
limit_num = 5
if target_areas and len(target_areas) > 0:
print(f"收到指定地震查詢請求: target_areas={target_areas}, limit={limit_num}")
result = await earthquake_api.get_earthquake_by_areas(
target_areas=target_areas,
limit=limit_num
)
else:
print(f"收到最新地震查詢請求: limit={limit_num}")
result = await earthquake_api.get_latest_earthquakes(limit=limit_num)
return result
except Exception as e:
print(f"地震查詢錯誤: {str(e)}")
raise HTTPException(status_code=400, detail=str(e))
明天再做n8n的部分