在前 18 天,我們把環境、專案結構、型別與資料契約、測試藍圖、結構化日誌、錯誤處理與快取都建好了。今天把資料層補齊:用 SQLAlchemy 2.x 的現代 API,配合 Repository Pattern,讓資料存取具備以下特性:
Mapped[T]
、select()
、Session
)Repository
介面與具體實作,隔離 ORM 細節my_project/
├─ pyproject.toml
├─ src/my_project/
│ ├─ domain/
│ │ ├─ entities.py # 純領域實體(dataclass 或 Pydantic model)
│ │ └─ repositories.py # Repository 介面(Port)
│ ├─ infrastructure/
│ │ └─ db/
│ │ ├─ models.py # SQLAlchemy ORM(Adapter)
│ │ ├─ session.py # engine / session / UoW
│ │ ├─ repositories.py # Repository 實作(Adapter)
│ │ └─ repositories_cached.py # 快取裝飾(選用)
│ ├─ services/
│ │ └─ user_service.py # 應用服務層,僅依賴 Repository 介面
│ ├─ logging_config.py
│ └─ settings.py
└─ tests/
├─ unit/
└─ integration/
Mapped[T]
與 mapped_column()
)# src/my_project/infrastructure/db/models.py
from typing import Annotated
from datetime import datetime
from sqlalchemy import String, ForeignKey, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
IdStr = Annotated[str, mapped_column(String(36), primary_key=True)]
Ts = Annotated[datetime, mapped_column(server_default=func.now())]
class Base(DeclarativeBase): ...
class User(Base):
__tablename__ = "users"
id: Mapped[IdStr]
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
name: Mapped[str] = mapped_column(String(100))
created_at: Mapped[Ts]
orders: Mapped[list["Order"]] = relationship(back_populates="user", cascade="all, delete-orphan")
class Order(Base):
__tablename__ = "orders"
id: Mapped[IdStr]
user_id: Mapped[str] = mapped_column(ForeignKey("users.id", ondelete="CASCADE"))
amount: Mapped[int]
created_at: Mapped[Ts]
user: Mapped[User] = relationship(back_populates="orders")
重點:使用型別標註讓 IDE 與型別檢查器更友善,與 Day 10 的型別策略相容。
# src/my_project/infrastructure/db/session.py
from contextlib import contextmanager
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, sessionmaker
from .models import Base
def create_sync_engine(url: str):
# 可依需要加入 pool_size、pool_recycle 等參數
return create_engine(url, pool_pre_ping=True)
def make_session_factory(url: str):
engine = create_sync_engine(url)
Base.metadata.create_all(engine) # demo 用;正式請改 Alembic migration
return sessionmaker(bind=engine, expire_on_commit=False, class_=Session)
@contextmanager
def unit_of_work(SessionFactory):
session: Session = SessionFactory()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
正式環境請用 migration(例如 Alembic)管理 schema 演進,以符合 Day 7 的可重現要求。
# src/my_project/domain/repositories.py
from typing import Protocol, Sequence, Optional
from .entities import UserEntity
class UserRepository(Protocol):
def get(self, user_id: str) -> Optional[UserEntity]: ...
def get_by_email(self, email: str) -> Optional[UserEntity]: ...
def list(self, limit: int = 50, offset: int = 0) -> Sequence[UserEntity]: ...
def add(self, user: UserEntity) -> None: ...
def remove(self, user_id: str) -> None: ...
# src/my_project/infrastructure/db/repositories.py
from typing import Optional, Sequence
from sqlalchemy import select, delete
from sqlalchemy.orm import Session
from .models import User
from ...domain.entities import UserEntity
def to_entity(m: User) -> UserEntity:
return UserEntity(id=m.id, email=m.email, name=m.name, created_at=m.created_at)
class SqlAlchemyUserRepository:
def __init__(self, session: Session) -> None:
self.session = session
def get(self, user_id: str) -> Optional[UserEntity]:
m = self.session.get(User, user_id)
return to_entity(m) if m else None
def get_by_email(self, email: str) -> Optional[UserEntity]:
m = self.session.scalar(select(User).where(User.email == email))
return to_entity(m) if m else None
def list(self, limit: int = 50, offset: int = 0) -> Sequence[UserEntity]:
rows = self.session.scalars(select(User).limit(limit).offset(offset)).all()
return [to_entity(r) for r in rows]
def add(self, user: UserEntity) -> None:
self.session.add(User(id=user.id, email=user.email, name=user.name))
def remove(self, user_id: str) -> None:
self.session.execute(delete(User).where(User.id == user_id))
domain 只看見 UserRepository 介面,與 ORM、資料表結構完全解耦。
# src/my_project/services/user_service.py
from ..domain.repositories import UserRepository
from ..domain.entities import UserEntity
import structlog
log = structlog.get_logger()
class UserService:
def __init__(self, users: UserRepository) -> None:
self.users = users
def register(self, user: UserEntity) -> None:
if self.users.get_by_email(user.email):
log.info("user_exists", email=user.email)
return
self.users.add(user)
log.info("user_registered", user_id=user.id, email=user.email)
# 例:在查詢上套用保守重試策略
from tenacity import retry, stop_after_attempt, wait_exponential
import sqlalchemy
class ResilientUserRepository(SqlAlchemyUserRepository):
@retry(reraise=True, stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=0.2, min=0.2, max=2))
def get_by_email(self, email: str):
try:
return super().get_by_email(email)
except sqlalchemy.exc.OperationalError:
# 例如連線瞬斷、連線池抖動
raise
# src/my_project/infrastructure/db/repositories_cached.py
import redis, orjson
from typing import Optional
from ...domain.entities import UserEntity
from .repositories import SqlAlchemyUserRepository
def dumps(x): return orjson.dumps(x)
def loads(b): return UserEntity(**orjson.loads(b))
class CachedUserRepository:
def __init__(self, inner: SqlAlchemyUserRepository, r: redis.Redis, ttl_sec: int = 300):
self.inner = inner
self.r = r
self.ttl = ttl_sec
self.prefix = "v1:user:" # 以版本前綴管理大規模失效
def get(self, user_id: str) -> Optional[UserEntity]:
key = f"{self.prefix}{user_id}"
if (b := self.r.get(key)) is not None:
data = orjson.loads(b)
if data.get("__nil"): # 負向快取
return None
return loads(b)
u = self.inner.get(user_id)
if u:
self.r.setex(key, self.ttl, dumps(u.__dict__))
else:
self.r.setex(key, 60, orjson.dumps({"__nil": True}))
return u
# src/my_project/settings.py
from pydantic_settings import BaseSettings, SettingsConfigDict
class AppSettings(BaseSettings):
model_config = SettingsConfigDict(env_prefix="APP_", env_file=".env", env_file_encoding="utf-8")
db_url: str
redis_url: str = "redis://localhost:6379/0"
# tests/integration/test_user_repo.py
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from my_project.infrastructure.db.models import Base
from my_project.infrastructure.db.repositories import SqlAlchemyUserRepository
from my_project.domain.entities import UserEntity
@pytest.fixture
def repo():
engine = create_engine("sqlite+pysqlite:///:memory:")
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine, expire_on_commit=False)
with Session() as s:
yield SqlAlchemyUserRepository(s)
def test_add_and_get(repo):
u = UserEntity(id="u1", email="a@x.com", name="Alice")
repo.add(u)
got = repo.get("u1")
assert got and got.email == "a@x.com"
# Async 版本示意:需要高併發時採用
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
engine = create_async_engine("postgresql+asyncpg://user:pass@host/db")
Session = async_sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)
# 之後 Repository 實作改為 async def 並使用 await session.get(...) / session.execute(...)
# 共用裝飾器
import time, structlog
log = structlog.get_logger()
def timed(fn):
def wrap(*a, **kw):
t0 = time.perf_counter()
try:
return fn(*a, **kw)
finally:
log.info("repo_call", fn=fn.__name__, ms=round((time.perf_counter()-t0)*1000, 2))
return wrap
事件名稱、欄位固定化(fn, ms 等),方便在日誌平台/追蹤系統聚合查詢。
pyproject.toml
增加 scripts,例如 hatch run test
、hatch run lint
tests
, lint
session,維持本機與 CI 一致今天把資料層從「到處寫 SQL」提升為「可測試、可觀測、可替換」的一層:上層用例只面對 Repository 介面,ORM 細節被封裝;可靠性與效能需求透過重試與快取在外圍實作。到這裡,服務層與 API 層的對接就有了穩固地基。下一步,把 Repository 接進 Web 層(例如 FastAPI)做 Adapter,完成從入站到資料存取的整條脈絡。