iT邦幫忙

2025 iThome 鐵人賽

DAY 23
0
Security

醫療數據的資安挑戰與創新解決方案系列 第 29

醫療資安整合實作

  • 分享至 

  • xImage
  •  

今日是整合前面幾周的重點寫出一個Demo,先整理前幾周的重點

一、理論重點

  1. 醫療數據資安挑戰
  • 敏感性高:病歷、檢查結果、個人資料皆屬敏感資訊,洩漏風險高。
  • 合規性要求:需遵守 HIPAA、GDPR 或本地醫療資安法規。
  • 資料傳輸安全:醫院內部系統、遠距醫療平台、API 通訊都需要加密(如 HTTPS、TLS)。
  • 資料完整性:確保上傳、儲存的數據不可被竄改,常用 Hash、簽章、區塊鏈驗證。
  1. 數據整合實作
  • 多端資料接收:不同來源的數據(病患端、醫生端、設備端)統一格式化。

  • 資料標準化:使用 JSON 或 HL7 FHIR 標準,方便跨系統交換。

  • 安全傳輸:
    HTTPS/SSL 保護資料傳輸。
    Token 認證確保使用者權限。

  • 資料驗證機制:
    使用 Hash 值檢驗資料完整性。
    伺服器端回傳成功狀態,避免重複或錯誤寫入。

  • 伺服器端整合:
    Flask 或 FastAPI 建立 API 接收與處理資料。
    伺服器端可同步更新醫生端與病患端資料,保持資料一致性。

  1. 安全設計重點
  • 雙向同步:確保醫生與病患看到的數據一致。
  • 不可否認性:上傳資料有 Hash 值,防止事後否認。
  • 日誌紀錄:每筆資料上傳、修改都有時間戳與使用者紀錄。

二、簡單程式範例


from flask import Flask, request, jsonify
from cryptography.fernet import Fernet
import jwt, json, sqlite3, hashlib, datetime
import threading, requests, time
import ssl, os

# -----------------------
# 設定
FERNET_KEY = b'nEkD-a1CJMwTjTRbfi8VFIb9Bo6hLUwthb652hWEIKw='
cipher = Fernet(FERNET_KEY)
JWT_SECRET = "supersecretkey"
JWT_ALGORITHM = "HS256"
DB_NAME = "medical_demo.db"

CERT_FILE = "cert.pem"
KEY_FILE = "key.pem"

# -----------------------
# 自簽名憑證產生(第一次執行會自動生成)
if not os.path.exists(CERT_FILE) or not os.path.exists(KEY_FILE):
    from cryptography import x509
    from cryptography.x509.oid import NameOID
    from cryptography.hazmat.backends import default_backend
    from cryptography.hazmat.primitives import serialization, hashes
    from cryptography.hazmat.primitives.asymmetric import rsa
    import datetime as dt

    # 生成私鑰
    key = rsa.generate_private_key(public_exponent=65537, key_size=2048, backend=default_backend())
    with open(KEY_FILE, "wb") as f:
        f.write(key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.TraditionalOpenSSL,
            encryption_algorithm=serialization.NoEncryption()
        ))

    # 生成自簽名憑證
    subject = issuer = x509.Name([
        x509.NameAttribute(NameOID.COUNTRY_NAME, u"TW"),
        x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Taiwan"),
        x509.NameAttribute(NameOID.LOCALITY_NAME, u"Taipei"),
        x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Demo Org"),
        x509.NameAttribute(NameOID.COMMON_NAME, u"localhost"),
    ])
    cert = x509.CertificateBuilder().subject_name(subject).issuer_name(issuer).public_key(
        key.public_key()
    ).serial_number(x509.random_serial_number()).not_valid_before(
        dt.datetime.utcnow()
    ).not_valid_after(
        dt.datetime.utcnow() + dt.timedelta(days=365)
    ).sign(key, hashes.SHA256(), default_backend())

    with open(CERT_FILE, "wb") as f:
        f.write(cert.public_bytes(serialization.Encoding.PEM))

# -----------------------
# 建立 SQLite DB + 表
def init_db():
    conn = sqlite3.connect(DB_NAME)
    cursor = conn.cursor()
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS records(
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            device_id TEXT,
            encrypted_data TEXT,
            data_hash TEXT
        )
    ''')
    conn.commit()
    conn.close()

init_db()

# -----------------------
# Flask App
app = Flask(__name__)

# 產生 JWT Token
@app.route("/get_token", methods=["POST"])
def get_token():
    data = request.json
    device_id = data.get("device_id")
    role = data.get("role", "patient")
    exp = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(minutes=30)
    token = jwt.encode({"device_id": device_id, "role": role, "exp": exp}, JWT_SECRET, algorithm=JWT_ALGORITHM)
    return jsonify({"token": token})

# 上傳病人資料 API
@app.route("/submit_record", methods=["POST"])
def submit_record():
    try:
        data = request.json
        token = data.get("token")
        encrypted_payload = data.get("payload")

        payload_jwt = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
        role = payload_jwt.get("role")

        decrypted_bytes = cipher.decrypt(encrypted_payload.encode())
        record = json.loads(decrypted_bytes.decode())

        if role == "patient" and record.get("device_id") != payload_jwt.get("device_id"):
            return jsonify({"status":"error","message":"權限不足"}),403

        record_json = json.dumps(record, sort_keys=True)
        record_hash = hashlib.sha256(record_json.encode()).hexdigest()

        conn = sqlite3.connect(DB_NAME)
        cursor = conn.cursor()
        cursor.execute("INSERT INTO records(device_id, encrypted_data, data_hash) VALUES (?, ?, ?)",
                       (record["device_id"], encrypted_payload, record_hash))
        conn.commit()
        conn.close()

        print(f"[{role}] 收到資料: {record} | Hash: {record_hash}")
        return jsonify({"status":"success","record":record,"hash":record_hash}),200

    except jwt.ExpiredSignatureError:
        return jsonify({"status":"error","message":"Token 過期"}),401
    except Exception as e:
        return jsonify({"status":"error","message":str(e)}),500

# -----------------------
# IoT 裝置模擬
def get_token_retry(device_id, role):
    for _ in range(5):
        try:
            res = requests.post("https://127.0.0.1:8000/get_token", json={"device_id": device_id, "role": role}, verify=False)
            return res.json()["token"]
        except Exception:
            time.sleep(1)
    raise Exception("無法取得 token")

def iot_device(device_id, role):
    token = get_token_retry(device_id, role)
    payload = {
        "device_id": device_id,
        "heart_rate": 70 + int(device_id[-1]),
        "systolic": 110 + int(device_id[-1]),
        "diastolic": 70 + int(device_id[-1]),
        "note": "正常"
    }
    encrypted = cipher.encrypt(json.dumps(payload).encode())
    try:
        res2 = requests.post("https://127.0.0.1:8000/submit_record", json={
            "payload": encrypted.decode(),
            "token": token
        }, verify=False)
        print(f"[{device_id}] 上傳結果: {res2.json()}")
    except Exception as e:
        print(f"[{device_id}] 上傳失敗: {e}")

# -----------------------
# 啟動 Flask HTTPS 伺服器
def run_server():
    context = (CERT_FILE, KEY_FILE)
    app.run(host="127.0.0.1", port=8000, debug=False, ssl_context=context)

# -----------------------
if __name__ == "__main__":
    server_thread = threading.Thread(target=run_server, daemon=True)
    server_thread.start()

    time.sleep(3)

    for i in range(1,4):
        iot_device(f"device-00{i}", "doctor")
        iot_device(f"device-00{i}", "patient")

執行的結果經過整理後如下

程式區塊 應用知識
Flask 伺服器啟動 Python Flask 架構、開發/生產差異
POST /get_token API 認證、token 機制
POST /submit_record JSON  資料解析、雙端通知、模擬智慧裝置
Hash 生成 資料完整性、防篡改
上傳結果回傳 API 回應處理、日誌追蹤、HTTP 200
多裝置資料 迴圈測試、資料一致性
HTTPS 警告 HTTPS 測試安全性概念、urllib3 使用
裝置ID 上傳順序 heart_rate systolic diastolic note Hash 收到者
device-001 1 71 111 71 正常 427454b580abb25f0c1257f9226e7f7eaf110ae7d46a4f128567739f5d4430c2 doctor
device-001 1 71 111 71 正常 427454b580abb25f0c1257f9226e7f7eaf110ae7d46a4f128567739f5d4430c2 patient
device-002 2 72 112 72 正常 8748d4bf3092575ea6fae42d86e7b71ce85d5ad176929fe688211e8bee862497 doctor
device-002 2 72 112 72 正常 8748d4bf3092575ea6fae42d86e7b71ce85d5ad176929fe688211e8bee862497 patient
device-003 3 73 113 73 正常 abd2e19cb49822bf8241bbec1bab898aa86bf26a914937ccdb48c994e88f3e87 doctor
device-003 3 73 113 73 正常 abd2e19cb49822bf8241bbec1bab898aa86bf26a914937ccdb48c994e88f3e87 patient

每個裝置上傳的資料都對應一個 Hash,可用來驗證資料完整性。Doctor 與 Patient 端均能收到相同資料,符合先前的流程設計。裝置依序上傳順序為 device-001 → device-002 → device-003,且資料中的 heart_rate、systolic、diastolic 及 note 都正確被接收,確保了資料的一致性與可靠性。


上一篇
醫療 IoT 裝置的資安風險
系列文
醫療數據的資安挑戰與創新解決方案29
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言