iT邦幫忙

2025 iThome 鐵人賽

DAY 17
0
AI & Data

感知你的動作與情緒:深度學習在人機互動的應用系列 第 17

Day 17 | OpenBCI 腦波數據 × 深度學習:從訓練到應用簡例

  • 分享至 

  • xImage
  •  

前言

腦波(EEG)把腦內活動轉成可觀測的電位訊號;OpenBCI 則把這件事變得平價、可駭。當 EEG 遇上深度學習,你不只做學術研究:專注度偵測輔助輸入(BCI cursor)放鬆訓練情緒輔助介面都能快速原型。今天以實作導向梳理一條能落地的流程:資料採集 → 前處理 → 切窗與標註 → 模型訓練 → 即時應用


必知的兩件事

訊號特性:EEG 噪聲高、個體差異大;單人(intra-subject)好做、跨人泛化難。
資料品質 > 模型複雜度:電極接觸、抗干擾、正確標註,勝過再多一層卷積。


流程

  1. 採集:OpenBCI(Cyton/Ganglion)→ OpenBCI GUI / BrainFlow 串流。
  2. 前處理:去直流、帶通 1–40 Hz、50/60 Hz 陷波、(選)ICA 去眨眼/肌電。
  3. 切窗與標註:依事件時間切 2–4 秒窗口;每窗配任務標籤(中心貼標或多數決)。
  4. 特徵或端到端
    • 經典:頻帶功率(δ/θ/α/β/γ)、相對功率、頻帶比。
    • 端到端:1D CNN / CNN-LSTM吃波形或時頻圖(STFT/CWT)
  5. 訓練:5-fold 交叉驗證;先做 intra-subject 通關,再談 inter-subject 泛化。
  6. 即時化:滑動窗口 + 置信度平滑(EMA),避免一格錯判就跳態。

範例任務:睜眼 vs. 閉眼

閉眼時 α 波(8–12 Hz) 在枕葉上升明顯。

  • 電極建議:O1/O2 為佳;若受限,8ch 也可做。
  • 流程:睜眼 10 秒 / 閉眼 10 秒 × 10 輪,約 200 秒。
  • 標註:在每段開始打 marker,離線時按時間對齊(記得扣除 GUI/藍牙延遲)。

需要套件:pip install mne numpy scipy scikit-learn torch pandas
若要讀自己的檔:準備 eeg.csv(欄位 ch1..chN + marker)。marker=0/1(已標註)或全為 -1(未知,會自動走弱監督)。

匯入套件

import warnings, os
from pathlib import Path
import numpy as np, pandas as pd
from scipy.signal import butter, filtfilt, iirnotch, welch
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, confusion_matrix, roc_curve
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.exceptions import ConvergenceWarning

import torch, torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader

warnings.filterwarnings("ignore", category=ConvergenceWarning)

參數/旗標

USE_EEGBCI  = True            # True=從 EEGBCI 下載;False=直接讀 CSV_PATH
CSV_PATH    = Path.home()/ "eeg_demo" / "eeg.csv"
OUT_DIR     = Path.home()/ "eeg_demo"; OUT_DIR.mkdir(parents=True, exist_ok=True)

MODEL_TYPE  = "speccnn"       # "cnn" | "eegnet" | "speccnn"
THRESH_CAL  = True            # 驗證集自適應閾值
USE_KD      = True            # 蒸餾(teacher=bandpower LR)
KD_T        = 3.0
ALPHA_CE    = 0.6             # loss = ALPHA_CE*CE + (1-ALPHA_CE)*KD
BETA_FUSE   = 0.8             # 推論融合:P = (1-BETA_FUSE)*model + BETA_FUSE*baseline

FS_TARGET   = 250
NOTCH_FREQ  = 60              # 50/60 依地區
BANDPASS    = (1, 40)

WIN_SEC     = 6.0             # 窗長(秒)
STEP_SEC    = 2.0             # 步進(秒)
HI_CONF_Q   = (0.30, 0.70)    # 弱監督分位數
MIN_N_WIN   = 60

BATCH_TR    = 32
BATCH_TE    = 64
EPOCHS      = 50
LR          = 3e-4
WEIGHT_DECAY= 1e-4
PATIENCE    = 7

SEED        = 42
np.random.seed(SEED); torch.manual_seed(SEED)
device = torch.device("cuda" if torch.cuda.is_available()
                      else "mps" if torch.backends.mps.is_available()
                      else "cpu")

工具:濾波

def bandpass(x, fs, lo=1, hi=40):
    b,a = butter(4, [lo/(fs/2), hi/(fs/2)], btype="band")
    return filtfilt(b,a,x,axis=0)

def notch(x, fs, f0=60, Q=30):
    b,a = iirnotch(f0, Q, fs)
    return filtfilt(b,a,x,axis=0)

讀取:EEGBCI or CSV

def load_eegbci_to_csv(csv_path):
    import mne
    from mne.datasets import eegbci
    from mne.io import read_raw_edf, concatenate_raws

    print("[STEP] 下載/讀取 EEGBCI...")
    files = eegbci.load_data(1, [3, 7])  # subject=1
    raws  = [read_raw_edf(f, preload=True, verbose="ERROR") for f in files]
    raw   = concatenate_raws(raws, verbose="ERROR")

    def clean(s):  # 統一通道命名
        return (s.replace("EEG ","").replace("-REF","").replace("-Ref","")
                .replace(".","").replace("Z","z"))
    raw.rename_channels(clean)
    try: raw.set_montage("standard_1020", match_case=False, on_missing="ignore")
    except Exception: pass

    raw.filter(BANDPASS[0], BANDPASS[1], fir_design="firwin", verbose="ERROR")
    raw.notch_filter(freqs=[NOTCH_FREQ], verbose="ERROR")
    raw.resample(FS_TARGET, npad="auto", verbose="ERROR")
    FS = int(raw.info["sfreq"]); print("[INFO] Resample to:", FS, "Hz")

    want = ["C3","Cz","C4"]
    present = [w for w in want if w in raw.ch_names]
    if present:
        picks = mne.pick_channels(raw.ch_names, include=present, ordered=False)
    else:
        picks = [i for i,ch in enumerate(raw.ch_names) if ch.upper().startswith("C")]
        if len(picks)==0:
            picks = mne.pick_types(raw.info, eeg=True, exclude=[], verbose="ERROR")[:8]

    X = raw.get_data(picks=picks).T.astype("float32")  # [T,C]
    T,C = X.shape; MAX_CH=8
    if C<MAX_CH: X = np.hstack([X, np.zeros((T,MAX_CH-C),dtype=np.float32)])
    elif C>MAX_CH: X = X[:, :MAX_CH]

    y = -np.ones((T,), dtype=int)  # 未知,待弱監督
    df = pd.DataFrame({f"ch{i+1}": X[:,i] for i in range(8)}); df["marker"]=y
    csv_path.parent.mkdir(parents=True, exist_ok=True)
    df.to_csv(csv_path, index=False)
    print(f"[INFO] 已輸出 {csv_path}: shape={df.shape}")
    return FS

def load_csv(csv_path):
    df = pd.read_csv(csv_path)
    ch_cols = [c for c in df.columns if c.lower().startswith("ch")]
    X = df[ch_cols].values.astype(np.float32)
    y = df["marker"].values.astype(int)
    return X,y

FS = load_eegbci_to_csv(CSV_PATH) if USE_EEGBCI else FS_TARGET
X,y = load_csv(CSV_PATH)
print(f"[INFO] X: {X.shape}, y: {y.shape}, pos_rate={ (y==1).mean():.3f}")
print("[DEBUG] 原始 marker 取值:", np.unique(y))

# 保障性濾波(若用自己的 CSV 也沿用)
X = notch(bandpass(X, FS, *BANDPASS), FS, NOTCH_FREQ)

切窗 & 初標

WIN, STEP = int(WIN_SEC*FS), int(STEP_SEC*FS)
segments, labels = [], []
for s in range(0, len(X)-WIN+1, STEP):
    e = s + WIN
    seg = X[s:e].T
    mwin = y[s:e]
    lab  = mwin[WIN//2] if ((mwin==0).any() or (mwin==1).any()) else -1
    seg  = (seg - seg.mean(axis=1, keepdims=True)) / (seg.std(axis=1, keepdims=True)+1e-6)
    segments.append(seg.astype("float32")); labels.append(int(lab))

Xw = np.stack(segments)
yw = np.array(labels, dtype=int)
print(f"[INFO] 切窗初始:Xw=({len(Xw)}, C, {Xw.shape[-1]}), 未知標籤數={(yw==-1).sum()}")

弱監督:μ/β 分位數雙門檻

def mu_beta_ratio(seg, fs, mu=(8,12), beta=(16,28)):
    f, Pxx = welch(seg, fs=fs, nperseg=min(512, seg.shape[-1]))
    def bp(lo,hi):
        m=(f>=lo)&(f<hi)
        return np.log(Pxx[:,m].mean(axis=1)+1e-10).mean()
    return bp(*mu) - bp(*beta)

unk_idx = np.where(yw==-1)[0]
if len(unk_idx)>0:
    scores = np.array([mu_beta_ratio(Xw[i], FS) for i in unk_idx])
    lo,hi  = np.quantile(scores, HI_CONF_Q[0]), np.quantile(scores, HI_CONF_Q[1])
    sel_hi = unk_idx[scores>=hi]; sel_lo = unk_idx[scores<=lo]
    yw_new = yw.copy(); yw_new[sel_hi]=1; yw_new[sel_lo]=0
    keep   = np.r_[sel_hi, sel_lo]
    Xw, yw = Xw[keep], yw_new[keep]

print(f"[INFO] 切窗後:Xw={Xw.shape}, yw={yw.shape}, 分佈={np.bincount(yw)}")
if len(Xw) < MIN_N_WIN:
    raise RuntimeError("高置信窗太少:調整 HI_CONF_Q、拉長 WIN、或收集更多資料。")

Baseline:bandpower + LR

def bandpower_feat(seg, fs, bands=[(4,8),(8,12),(12,16),(16,20),(20,28)]):
    C,T = seg.shape
    f,P = welch(seg, fs=fs, nperseg=min(512,T), axis=1)  # [C,F]
    out=[]
    for lo,hi in bands:
        m=(f>=lo)&(f<hi)
        bp=np.log(P[:,m].mean(axis=1)+1e-10)
        out.append(bp)
    M=np.stack(out,axis=1)  # [C,F]
    M=(M-M.mean(axis=1,keepdims=True))/(M.std(axis=1,keepdims=True)+1e-6)
    return M.reshape(-1).astype("float32")

BF = np.stack([bandpower_feat(x, FS) for x in Xw])
skf=StratifiedKFold(n_splits=5, shuffle=True, random_state=SEED)
cv=[]
for tr,te in skf.split(BF,yw):
    clf=Pipeline([("sc",StandardScaler()),
                  ("lr",LogisticRegression(max_iter=1000,class_weight="balanced"))])
    clf.fit(BF[tr], yw[tr]); cv.append(clf.score(BF[te], yw[te]))
print(f"[BASELINE] Bandpower(LogReg) CV acc={np.mean(cv):.3f}  (>=0.60 代表訊號可用)")

Xtr, Xte, ytr, yte, BFtr, BFte = train_test_split(Xw, yw, BF, test_size=0.25,
                                                  random_state=SEED, stratify=yw)
teacher = Pipeline([("sc",StandardScaler()),
                    ("lr",LogisticRegression(max_iter=1000,class_weight="balanced"))]).fit(BFtr,ytr)

模型

class EEGCNN(nn.Module):
    def __init__(self, n_ch, n_cls=2):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv1d(n_ch,32,7,padding=3), nn.BatchNorm1d(32), nn.ReLU(), nn.MaxPool1d(2),
            nn.Conv1d(32,64,7,padding=3),  nn.BatchNorm1d(64), nn.ReLU(), nn.MaxPool1d(2),
            nn.Conv1d(64,128,7,padding=3), nn.BatchNorm1d(128), nn.ReLU(),
            nn.AdaptiveAvgPool1d(1),
        ); self.fc = nn.Linear(128,n_cls)
    def forward(self,x): return self.fc(self.net(x).squeeze(-1))

class EEGNet(nn.Module):
    def __init__(self, n_ch, T, n_cls=2, F1=8, D=2, dropout=0.5):
        super().__init__(); F2=F1*D
        self.first = nn.Sequential(
            nn.Conv2d(1,F1,(1,64),padding=(0,32),bias=False), nn.BatchNorm2d(F1),
            nn.Conv2d(F1,F1*D,(n_ch,1),groups=F1,bias=False), nn.BatchNorm2d(F1*D),
            nn.ELU(), nn.AvgPool2d((1,4)), nn.Dropout(dropout),
        )
        self.sep = nn.Sequential(
            nn.Conv2d(F1*D,F1*D,(1,16),padding=(0,8),groups=F1*D,bias=False),
            nn.Conv2d(F1*D,F2,(1,1),bias=False), nn.BatchNorm2d(F2),
            nn.ELU(), nn.AvgPool2d((1,8)), nn.Dropout(dropout),
        )
        with torch.no_grad():
            flat = self.sep(self.first(torch.zeros(1,1,n_ch,T))).numel()
        self.fc = nn.Linear(flat, n_cls)
    def forward(self,x):
        x=x.unsqueeze(1); x=self.first(x); x=self.sep(x)
        return self.fc(torch.flatten(x,1))

class SpecCNN(nn.Module):
    def __init__(self, C, F, n_cls=2):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv2d(1,16,(3,3),padding=1), nn.BatchNorm2d(16), nn.ReLU(), nn.MaxPool2d((2,2)),
            nn.Conv2d(16,32,(3,3),padding=1), nn.BatchNorm2d(32), nn.ReLU(),
            nn.AdaptiveAvgPool2d((1,1)),
        ); self.fc = nn.Linear(32,n_cls)
    def forward(self,x): return self.fc(self.net(x).squeeze(-1).squeeze(-1))

def bandpower_map_batch(Xbatch, fs, bands=[(4,8),(8,12),(12,16),(16,20),(20,28)]):
    out=[]
    for seg in Xbatch:
        f,P = welch(seg, fs=fs, nperseg=min(512, seg.shape[-1]), axis=1)
        feats=[]
        for lo,hi in bands:
            m=(f>=lo)&(f<hi); bp=np.log(P[:,m].mean(axis=1)+1e-10); feats.append(bp)
        M=np.stack(feats,axis=1); M=(M-M.mean(axis=1,keepdims=True))/(M.std(axis=1,keepdims=True)+1e-6)
        out.append(M.astype("float32"))
    return np.stack(out)  # [N,C,F]

# DataLoaders
if MODEL_TYPE=="speccnn":
    BFimg_tr = bandpower_map_batch(Xtr, FS); BFimg_te = bandpower_map_batch(Xte, FS)
    tr_ds = TensorDataset(torch.tensor(BFimg_tr).unsqueeze(1), torch.tensor(ytr))
    te_ds = TensorDataset(torch.tensor(BFimg_te).unsqueeze(1), torch.tensor(yte))
else:
    tr_ds = TensorDataset(torch.tensor(Xtr), torch.tensor(ytr))
    te_ds = TensorDataset(torch.tensor(Xte), torch.tensor(yte))

tr_dl = DataLoader(tr_ds, batch_size=BATCH_TR, shuffle=True)
te_dl = DataLoader(te_ds, batch_size=BATCH_TE)

if MODEL_TYPE=="cnn":
    model = EEGCNN(n_ch=Xw.shape[1]).to(device)
elif MODEL_TYPE=="eegnet":
    model = EEGNet(n_ch=Xw.shape[1], T=Xw.shape[2]).to(device)
elif MODEL_TYPE=="speccnn":
    C,F = BFimg_tr.shape[1], BFimg_tr.shape[2]; model = SpecCNN(C,F).to(device)
else:
    raise ValueError("MODEL_TYPE 只能是 cnn/eegnet/speccnn")

opt  = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=WEIGHT_DECAY)
crit = nn.CrossEntropyLoss()

def kd_loss(student_logits, teacher_probs, T=3.0):
    S = torch.log_softmax(student_logits/T, dim=-1)
    Tt= torch.tensor(teacher_probs, dtype=torch.float32, device=student_logits.device)
    return nn.KLDivLoss(reduction="batchmean")(S, Tt)*(T*T)

訓練/驗證(早停)

best_loss=float("inf"); best_state=None; bad=0

def eval_model(m):
    m.eval(); tot=0; y_true=[]; y_pred=[]
    with torch.no_grad():
        for xb,yb in te_dl:
            xb=xb.to(device); yb=yb.to(device)
            out=m(xb); tot += crit(out,yb).item()
            y_true += yb.cpu().tolist(); y_pred += out.argmax(1).cpu().tolist()
    return tot/len(te_dl), np.array(y_true), np.array(y_pred)

for ep in range(1,EPOCHS+1):
    model.train(); run=0
    for xb,yb in tr_dl:
        xb=xb.to(device); yb=yb.to(device)
        opt.zero_grad(); logits=model(xb)
        loss=crit(logits,yb)
        if USE_KD:
            idx=np.random.choice(len(BFtr), size=len(yb), replace=True)
            te_probs=teacher.predict_proba(BFtr[idx])
            loss=ALPHA_CE*loss + (1-ALPHA_CE)*kd_loss(logits, te_probs, T=KD_T)
        loss.backward(); opt.step(); run+=loss.item()
    val_loss,y_true,y_pred = eval_model(model)
    acc=(y_true==y_pred).mean()
    print(f"Epoch {ep:02d}  train_loss={run/len(tr_dl):.3f}  val_loss={val_loss:.3f}  val_acc={acc:.3f}")
    if val_loss < best_loss-1e-4: best_loss=val_loss; best_state=model.state_dict(); bad=0
    else:
        bad+=1
        if bad>=PATIENCE: print("[EARLY STOP] no improvement."); break
if best_state is not None: model.load_state_dict(best_state)

閾值校準 + 融合

def get_probs(m, loader):
    m.eval(); ps=[]; ys=[]
    with torch.no_grad():
        for xb,yb in loader:
            xb=xb.to(device)
            ps.append(torch.softmax(m(xb),dim=-1)[:,1].cpu().numpy())
            ys+=yb.numpy().tolist()
    return np.concatenate(ps), np.array(ys)

p_val,y_val = get_probs(model, te_dl)
base_val   = teacher.predict_proba(BFte)[:,1]
p_val_fused= (1-BETA_FUSE)*p_val + BETA_FUSE*base_val

if THRESH_CAL:
    fpr,tpr,thr = roc_curve(y_val, p_val_fused); j=tpr-fpr
    best_t = thr[j.argmax()]
else:
    best_t = 0.5

報表

from sklearn.metrics import classification_report as CR, confusion_matrix
y_pred = (p_val_fused>=best_t).astype(int)
print("\n=== Classification report ===")
print(CR(y_val, y_pred, target_names=["class0","class1"], digits=4))
print("Confusion matrix:\n", confusion_matrix(y_val, y_pred))
print(f"[THRESH] best_t={best_t:.3f} (THRESH_CAL={THRESH_CAL})  BETA_FUSE={BETA_FUSE}  MODEL={MODEL_TYPE}  KD={USE_KD}")

https://ithelp.ithome.com.tw/upload/images/20250919/20178436v1mGqWVV3L.png


結語

OpenBCI 讓腦波研究走出實驗室;深度學習讓特徵工程門檻降低。先把任務變簡單、資料變乾淨、流程變穩,再談更炫的模型,你就能在短時間做出可演示、可迭代的 EEG × AI 原型。從「睜眼/閉眼」起步,走向專注監測、想像輸入、情緒輔助介面,腦—機互動會從酷炫實驗,變成你產品裡可靠的一顆模組。


上一篇
Day 16 | 為什麼你的語氣,AI 聽得懂了?淺談 NLP 與語氣分析
下一篇
Day 18 | 心跳、皮膚電、呼吸… 生理訊號與情緒感知的交叉應用
系列文
感知你的動作與情緒:深度學習在人機互動的應用18
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言