腦波(EEG)把腦內活動轉成可觀測的電位訊號;OpenBCI 則把這件事變得平價、可駭。當 EEG 遇上深度學習,你不只做學術研究:專注度偵測、輔助輸入(BCI cursor)、放鬆訓練、情緒輔助介面都能快速原型。今天以實作導向梳理一條能落地的流程:資料採集 → 前處理 → 切窗與標註 → 模型訓練 → 即時應用。
訊號特性:EEG 噪聲高、個體差異大;單人(intra-subject)好做、跨人泛化難。
資料品質 > 模型複雜度:電極接觸、抗干擾、正確標註,勝過再多一層卷積。
閉眼時 α 波(8–12 Hz) 在枕葉上升明顯。
需要套件:pip install mne numpy scipy scikit-learn torch pandas
若要讀自己的檔:準備 eeg.csv
(欄位 ch1..chN
+ marker
)。marker=0/1(已標註)或全為 -1(未知,會自動走弱監督)。
import warnings, os
from pathlib import Path
import numpy as np, pandas as pd
from scipy.signal import butter, filtfilt, iirnotch, welch
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, confusion_matrix, roc_curve
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.exceptions import ConvergenceWarning
import torch, torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
warnings.filterwarnings("ignore", category=ConvergenceWarning)
USE_EEGBCI = True # True=從 EEGBCI 下載;False=直接讀 CSV_PATH
CSV_PATH = Path.home()/ "eeg_demo" / "eeg.csv"
OUT_DIR = Path.home()/ "eeg_demo"; OUT_DIR.mkdir(parents=True, exist_ok=True)
MODEL_TYPE = "speccnn" # "cnn" | "eegnet" | "speccnn"
THRESH_CAL = True # 驗證集自適應閾值
USE_KD = True # 蒸餾(teacher=bandpower LR)
KD_T = 3.0
ALPHA_CE = 0.6 # loss = ALPHA_CE*CE + (1-ALPHA_CE)*KD
BETA_FUSE = 0.8 # 推論融合:P = (1-BETA_FUSE)*model + BETA_FUSE*baseline
FS_TARGET = 250
NOTCH_FREQ = 60 # 50/60 依地區
BANDPASS = (1, 40)
WIN_SEC = 6.0 # 窗長(秒)
STEP_SEC = 2.0 # 步進(秒)
HI_CONF_Q = (0.30, 0.70) # 弱監督分位數
MIN_N_WIN = 60
BATCH_TR = 32
BATCH_TE = 64
EPOCHS = 50
LR = 3e-4
WEIGHT_DECAY= 1e-4
PATIENCE = 7
SEED = 42
np.random.seed(SEED); torch.manual_seed(SEED)
device = torch.device("cuda" if torch.cuda.is_available()
else "mps" if torch.backends.mps.is_available()
else "cpu")
def bandpass(x, fs, lo=1, hi=40):
b,a = butter(4, [lo/(fs/2), hi/(fs/2)], btype="band")
return filtfilt(b,a,x,axis=0)
def notch(x, fs, f0=60, Q=30):
b,a = iirnotch(f0, Q, fs)
return filtfilt(b,a,x,axis=0)
def load_eegbci_to_csv(csv_path):
import mne
from mne.datasets import eegbci
from mne.io import read_raw_edf, concatenate_raws
print("[STEP] 下載/讀取 EEGBCI...")
files = eegbci.load_data(1, [3, 7]) # subject=1
raws = [read_raw_edf(f, preload=True, verbose="ERROR") for f in files]
raw = concatenate_raws(raws, verbose="ERROR")
def clean(s): # 統一通道命名
return (s.replace("EEG ","").replace("-REF","").replace("-Ref","")
.replace(".","").replace("Z","z"))
raw.rename_channels(clean)
try: raw.set_montage("standard_1020", match_case=False, on_missing="ignore")
except Exception: pass
raw.filter(BANDPASS[0], BANDPASS[1], fir_design="firwin", verbose="ERROR")
raw.notch_filter(freqs=[NOTCH_FREQ], verbose="ERROR")
raw.resample(FS_TARGET, npad="auto", verbose="ERROR")
FS = int(raw.info["sfreq"]); print("[INFO] Resample to:", FS, "Hz")
want = ["C3","Cz","C4"]
present = [w for w in want if w in raw.ch_names]
if present:
picks = mne.pick_channels(raw.ch_names, include=present, ordered=False)
else:
picks = [i for i,ch in enumerate(raw.ch_names) if ch.upper().startswith("C")]
if len(picks)==0:
picks = mne.pick_types(raw.info, eeg=True, exclude=[], verbose="ERROR")[:8]
X = raw.get_data(picks=picks).T.astype("float32") # [T,C]
T,C = X.shape; MAX_CH=8
if C<MAX_CH: X = np.hstack([X, np.zeros((T,MAX_CH-C),dtype=np.float32)])
elif C>MAX_CH: X = X[:, :MAX_CH]
y = -np.ones((T,), dtype=int) # 未知,待弱監督
df = pd.DataFrame({f"ch{i+1}": X[:,i] for i in range(8)}); df["marker"]=y
csv_path.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(csv_path, index=False)
print(f"[INFO] 已輸出 {csv_path}: shape={df.shape}")
return FS
def load_csv(csv_path):
df = pd.read_csv(csv_path)
ch_cols = [c for c in df.columns if c.lower().startswith("ch")]
X = df[ch_cols].values.astype(np.float32)
y = df["marker"].values.astype(int)
return X,y
FS = load_eegbci_to_csv(CSV_PATH) if USE_EEGBCI else FS_TARGET
X,y = load_csv(CSV_PATH)
print(f"[INFO] X: {X.shape}, y: {y.shape}, pos_rate={ (y==1).mean():.3f}")
print("[DEBUG] 原始 marker 取值:", np.unique(y))
# 保障性濾波(若用自己的 CSV 也沿用)
X = notch(bandpass(X, FS, *BANDPASS), FS, NOTCH_FREQ)
WIN, STEP = int(WIN_SEC*FS), int(STEP_SEC*FS)
segments, labels = [], []
for s in range(0, len(X)-WIN+1, STEP):
e = s + WIN
seg = X[s:e].T
mwin = y[s:e]
lab = mwin[WIN//2] if ((mwin==0).any() or (mwin==1).any()) else -1
seg = (seg - seg.mean(axis=1, keepdims=True)) / (seg.std(axis=1, keepdims=True)+1e-6)
segments.append(seg.astype("float32")); labels.append(int(lab))
Xw = np.stack(segments)
yw = np.array(labels, dtype=int)
print(f"[INFO] 切窗初始:Xw=({len(Xw)}, C, {Xw.shape[-1]}), 未知標籤數={(yw==-1).sum()}")
def mu_beta_ratio(seg, fs, mu=(8,12), beta=(16,28)):
f, Pxx = welch(seg, fs=fs, nperseg=min(512, seg.shape[-1]))
def bp(lo,hi):
m=(f>=lo)&(f<hi)
return np.log(Pxx[:,m].mean(axis=1)+1e-10).mean()
return bp(*mu) - bp(*beta)
unk_idx = np.where(yw==-1)[0]
if len(unk_idx)>0:
scores = np.array([mu_beta_ratio(Xw[i], FS) for i in unk_idx])
lo,hi = np.quantile(scores, HI_CONF_Q[0]), np.quantile(scores, HI_CONF_Q[1])
sel_hi = unk_idx[scores>=hi]; sel_lo = unk_idx[scores<=lo]
yw_new = yw.copy(); yw_new[sel_hi]=1; yw_new[sel_lo]=0
keep = np.r_[sel_hi, sel_lo]
Xw, yw = Xw[keep], yw_new[keep]
print(f"[INFO] 切窗後:Xw={Xw.shape}, yw={yw.shape}, 分佈={np.bincount(yw)}")
if len(Xw) < MIN_N_WIN:
raise RuntimeError("高置信窗太少:調整 HI_CONF_Q、拉長 WIN、或收集更多資料。")
def bandpower_feat(seg, fs, bands=[(4,8),(8,12),(12,16),(16,20),(20,28)]):
C,T = seg.shape
f,P = welch(seg, fs=fs, nperseg=min(512,T), axis=1) # [C,F]
out=[]
for lo,hi in bands:
m=(f>=lo)&(f<hi)
bp=np.log(P[:,m].mean(axis=1)+1e-10)
out.append(bp)
M=np.stack(out,axis=1) # [C,F]
M=(M-M.mean(axis=1,keepdims=True))/(M.std(axis=1,keepdims=True)+1e-6)
return M.reshape(-1).astype("float32")
BF = np.stack([bandpower_feat(x, FS) for x in Xw])
skf=StratifiedKFold(n_splits=5, shuffle=True, random_state=SEED)
cv=[]
for tr,te in skf.split(BF,yw):
clf=Pipeline([("sc",StandardScaler()),
("lr",LogisticRegression(max_iter=1000,class_weight="balanced"))])
clf.fit(BF[tr], yw[tr]); cv.append(clf.score(BF[te], yw[te]))
print(f"[BASELINE] Bandpower(LogReg) CV acc={np.mean(cv):.3f} (>=0.60 代表訊號可用)")
Xtr, Xte, ytr, yte, BFtr, BFte = train_test_split(Xw, yw, BF, test_size=0.25,
random_state=SEED, stratify=yw)
teacher = Pipeline([("sc",StandardScaler()),
("lr",LogisticRegression(max_iter=1000,class_weight="balanced"))]).fit(BFtr,ytr)
class EEGCNN(nn.Module):
def __init__(self, n_ch, n_cls=2):
super().__init__()
self.net = nn.Sequential(
nn.Conv1d(n_ch,32,7,padding=3), nn.BatchNorm1d(32), nn.ReLU(), nn.MaxPool1d(2),
nn.Conv1d(32,64,7,padding=3), nn.BatchNorm1d(64), nn.ReLU(), nn.MaxPool1d(2),
nn.Conv1d(64,128,7,padding=3), nn.BatchNorm1d(128), nn.ReLU(),
nn.AdaptiveAvgPool1d(1),
); self.fc = nn.Linear(128,n_cls)
def forward(self,x): return self.fc(self.net(x).squeeze(-1))
class EEGNet(nn.Module):
def __init__(self, n_ch, T, n_cls=2, F1=8, D=2, dropout=0.5):
super().__init__(); F2=F1*D
self.first = nn.Sequential(
nn.Conv2d(1,F1,(1,64),padding=(0,32),bias=False), nn.BatchNorm2d(F1),
nn.Conv2d(F1,F1*D,(n_ch,1),groups=F1,bias=False), nn.BatchNorm2d(F1*D),
nn.ELU(), nn.AvgPool2d((1,4)), nn.Dropout(dropout),
)
self.sep = nn.Sequential(
nn.Conv2d(F1*D,F1*D,(1,16),padding=(0,8),groups=F1*D,bias=False),
nn.Conv2d(F1*D,F2,(1,1),bias=False), nn.BatchNorm2d(F2),
nn.ELU(), nn.AvgPool2d((1,8)), nn.Dropout(dropout),
)
with torch.no_grad():
flat = self.sep(self.first(torch.zeros(1,1,n_ch,T))).numel()
self.fc = nn.Linear(flat, n_cls)
def forward(self,x):
x=x.unsqueeze(1); x=self.first(x); x=self.sep(x)
return self.fc(torch.flatten(x,1))
class SpecCNN(nn.Module):
def __init__(self, C, F, n_cls=2):
super().__init__()
self.net = nn.Sequential(
nn.Conv2d(1,16,(3,3),padding=1), nn.BatchNorm2d(16), nn.ReLU(), nn.MaxPool2d((2,2)),
nn.Conv2d(16,32,(3,3),padding=1), nn.BatchNorm2d(32), nn.ReLU(),
nn.AdaptiveAvgPool2d((1,1)),
); self.fc = nn.Linear(32,n_cls)
def forward(self,x): return self.fc(self.net(x).squeeze(-1).squeeze(-1))
def bandpower_map_batch(Xbatch, fs, bands=[(4,8),(8,12),(12,16),(16,20),(20,28)]):
out=[]
for seg in Xbatch:
f,P = welch(seg, fs=fs, nperseg=min(512, seg.shape[-1]), axis=1)
feats=[]
for lo,hi in bands:
m=(f>=lo)&(f<hi); bp=np.log(P[:,m].mean(axis=1)+1e-10); feats.append(bp)
M=np.stack(feats,axis=1); M=(M-M.mean(axis=1,keepdims=True))/(M.std(axis=1,keepdims=True)+1e-6)
out.append(M.astype("float32"))
return np.stack(out) # [N,C,F]
# DataLoaders
if MODEL_TYPE=="speccnn":
BFimg_tr = bandpower_map_batch(Xtr, FS); BFimg_te = bandpower_map_batch(Xte, FS)
tr_ds = TensorDataset(torch.tensor(BFimg_tr).unsqueeze(1), torch.tensor(ytr))
te_ds = TensorDataset(torch.tensor(BFimg_te).unsqueeze(1), torch.tensor(yte))
else:
tr_ds = TensorDataset(torch.tensor(Xtr), torch.tensor(ytr))
te_ds = TensorDataset(torch.tensor(Xte), torch.tensor(yte))
tr_dl = DataLoader(tr_ds, batch_size=BATCH_TR, shuffle=True)
te_dl = DataLoader(te_ds, batch_size=BATCH_TE)
if MODEL_TYPE=="cnn":
model = EEGCNN(n_ch=Xw.shape[1]).to(device)
elif MODEL_TYPE=="eegnet":
model = EEGNet(n_ch=Xw.shape[1], T=Xw.shape[2]).to(device)
elif MODEL_TYPE=="speccnn":
C,F = BFimg_tr.shape[1], BFimg_tr.shape[2]; model = SpecCNN(C,F).to(device)
else:
raise ValueError("MODEL_TYPE 只能是 cnn/eegnet/speccnn")
opt = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=WEIGHT_DECAY)
crit = nn.CrossEntropyLoss()
def kd_loss(student_logits, teacher_probs, T=3.0):
S = torch.log_softmax(student_logits/T, dim=-1)
Tt= torch.tensor(teacher_probs, dtype=torch.float32, device=student_logits.device)
return nn.KLDivLoss(reduction="batchmean")(S, Tt)*(T*T)
best_loss=float("inf"); best_state=None; bad=0
def eval_model(m):
m.eval(); tot=0; y_true=[]; y_pred=[]
with torch.no_grad():
for xb,yb in te_dl:
xb=xb.to(device); yb=yb.to(device)
out=m(xb); tot += crit(out,yb).item()
y_true += yb.cpu().tolist(); y_pred += out.argmax(1).cpu().tolist()
return tot/len(te_dl), np.array(y_true), np.array(y_pred)
for ep in range(1,EPOCHS+1):
model.train(); run=0
for xb,yb in tr_dl:
xb=xb.to(device); yb=yb.to(device)
opt.zero_grad(); logits=model(xb)
loss=crit(logits,yb)
if USE_KD:
idx=np.random.choice(len(BFtr), size=len(yb), replace=True)
te_probs=teacher.predict_proba(BFtr[idx])
loss=ALPHA_CE*loss + (1-ALPHA_CE)*kd_loss(logits, te_probs, T=KD_T)
loss.backward(); opt.step(); run+=loss.item()
val_loss,y_true,y_pred = eval_model(model)
acc=(y_true==y_pred).mean()
print(f"Epoch {ep:02d} train_loss={run/len(tr_dl):.3f} val_loss={val_loss:.3f} val_acc={acc:.3f}")
if val_loss < best_loss-1e-4: best_loss=val_loss; best_state=model.state_dict(); bad=0
else:
bad+=1
if bad>=PATIENCE: print("[EARLY STOP] no improvement."); break
if best_state is not None: model.load_state_dict(best_state)
def get_probs(m, loader):
m.eval(); ps=[]; ys=[]
with torch.no_grad():
for xb,yb in loader:
xb=xb.to(device)
ps.append(torch.softmax(m(xb),dim=-1)[:,1].cpu().numpy())
ys+=yb.numpy().tolist()
return np.concatenate(ps), np.array(ys)
p_val,y_val = get_probs(model, te_dl)
base_val = teacher.predict_proba(BFte)[:,1]
p_val_fused= (1-BETA_FUSE)*p_val + BETA_FUSE*base_val
if THRESH_CAL:
fpr,tpr,thr = roc_curve(y_val, p_val_fused); j=tpr-fpr
best_t = thr[j.argmax()]
else:
best_t = 0.5
from sklearn.metrics import classification_report as CR, confusion_matrix
y_pred = (p_val_fused>=best_t).astype(int)
print("\n=== Classification report ===")
print(CR(y_val, y_pred, target_names=["class0","class1"], digits=4))
print("Confusion matrix:\n", confusion_matrix(y_val, y_pred))
print(f"[THRESH] best_t={best_t:.3f} (THRESH_CAL={THRESH_CAL}) BETA_FUSE={BETA_FUSE} MODEL={MODEL_TYPE} KD={USE_KD}")
OpenBCI 讓腦波研究走出實驗室;深度學習讓特徵工程門檻降低。先把任務變簡單、資料變乾淨、流程變穩,再談更炫的模型,你就能在短時間做出可演示、可迭代的 EEG × AI 原型。從「睜眼/閉眼」起步,走向專注監測、想像輸入、情緒輔助介面,腦—機互動會從酷炫實驗,變成你產品裡可靠的一顆模組。