iT邦幫忙

2025 iThome 鐵人賽

DAY 19
0
Software Development

30 天 Python 專案工坊:環境、結構、測試到部署全打通系列 第 19

Day 19 -資料層工程化:SQLAlchemy 2.x 與 Repository Pattern

  • 分享至 

  • xImage
  •  

在前 18 天,我們把環境、專案結構、型別與資料契約、測試藍圖、結構化日誌、錯誤處理與快取都建好了。今天把資料層補齊:用 SQLAlchemy 2.x 的現代 API,配合 Repository Pattern,讓資料存取具備以下特性:

  • 與框架解耦:上層(服務、API)只依賴介面,而不是 ORM 細節
  • 可測試:單元測試可用 in-memory DB,整合測試再切換真實資料庫
  • 可觀測:查詢事件與耗時以結構化日誌輸出
  • 可恢復:遇到暫時性錯誤可重試,對外提供穩定行為
  • 可擴充:讀熱點以快取包裝,不入侵核心邏輯

今日目標

  1. 以 2.0 風格撰寫 SQLAlchemy 模型與查詢(Mapped[T]select()Session
  2. 定義 Repository 介面與具體實作,隔離 ORM 細節
  3. 建立 Session 與 Unit of Work(UoW)集中管理交易
  4. 串接日誌、重試、快取與設定
  5. 交付一組可跑的整合測試與 CI 工作流

專案結構(延續 Day 4 的分層)

my_project/
├─ pyproject.toml
├─ src/my_project/
│  ├─ domain/
│  │  ├─ entities.py          # 純領域實體(dataclass 或 Pydantic model)
│  │  └─ repositories.py      # Repository 介面(Port)
│  ├─ infrastructure/
│  │  └─ db/
│  │     ├─ models.py         # SQLAlchemy ORM(Adapter)
│  │     ├─ session.py        # engine / session / UoW
│  │     ├─ repositories.py   # Repository 實作(Adapter)
│  │     └─ repositories_cached.py  # 快取裝飾(選用)
│  ├─ services/
│  │  └─ user_service.py      # 應用服務層,僅依賴 Repository 介面
│  ├─ logging_config.py
│  └─ settings.py
└─ tests/
   ├─ unit/
   └─ integration/


SQLAlchemy 2.x:以 2.0 風格建模與查詢

模型(Mapped[T]mapped_column()

# src/my_project/infrastructure/db/models.py
from typing import Annotated
from datetime import datetime
from sqlalchemy import String, ForeignKey, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship

IdStr = Annotated[str, mapped_column(String(36), primary_key=True)]
Ts = Annotated[datetime, mapped_column(server_default=func.now())]

class Base(DeclarativeBase): ...

class User(Base):
    __tablename__ = "users"
    id: Mapped[IdStr]
    email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
    name: Mapped[str] = mapped_column(String(100))
    created_at: Mapped[Ts]
    orders: Mapped[list["Order"]] = relationship(back_populates="user", cascade="all, delete-orphan")

class Order(Base):
    __tablename__ = "orders"
    id: Mapped[IdStr]
    user_id: Mapped[str] = mapped_column(ForeignKey("users.id", ondelete="CASCADE"))
    amount: Mapped[int]
    created_at: Mapped[Ts]
    user: Mapped[User] = relationship(back_populates="orders")

重點:使用型別標註讓 IDE 與型別檢查器更友善,與 Day 10 的型別策略相容。

Session 與 UoW 集中管理

# src/my_project/infrastructure/db/session.py
from contextlib import contextmanager
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, sessionmaker
from .models import Base

def create_sync_engine(url: str):
    # 可依需要加入 pool_size、pool_recycle 等參數
    return create_engine(url, pool_pre_ping=True)

def make_session_factory(url: str):
    engine = create_sync_engine(url)
    Base.metadata.create_all(engine)  # demo 用;正式請改 Alembic migration
    return sessionmaker(bind=engine, expire_on_commit=False, class_=Session)

@contextmanager
def unit_of_work(SessionFactory):
    session: Session = SessionFactory()
    try:
        yield session
        session.commit()
    except Exception:
        session.rollback()
        raise
    finally:
        session.close()

正式環境請用 migration(例如 Alembic)管理 schema 演進,以符合 Day 7 的可重現要求。


Repository Pattern:從「SQL/ORM 語言」轉為「業務語言」

介面(Port)

# src/my_project/domain/repositories.py
from typing import Protocol, Sequence, Optional
from .entities import UserEntity

class UserRepository(Protocol):
    def get(self, user_id: str) -> Optional[UserEntity]: ...
    def get_by_email(self, email: str) -> Optional[UserEntity]: ...
    def list(self, limit: int = 50, offset: int = 0) -> Sequence[UserEntity]: ...
    def add(self, user: UserEntity) -> None: ...
    def remove(self, user_id: str) -> None: ...

實作(Adapter)

# src/my_project/infrastructure/db/repositories.py
from typing import Optional, Sequence
from sqlalchemy import select, delete
from sqlalchemy.orm import Session
from .models import User
from ...domain.entities import UserEntity

def to_entity(m: User) -> UserEntity:
    return UserEntity(id=m.id, email=m.email, name=m.name, created_at=m.created_at)

class SqlAlchemyUserRepository:
    def __init__(self, session: Session) -> None:
        self.session = session

    def get(self, user_id: str) -> Optional[UserEntity]:
        m = self.session.get(User, user_id)
        return to_entity(m) if m else None

    def get_by_email(self, email: str) -> Optional[UserEntity]:
        m = self.session.scalar(select(User).where(User.email == email))
        return to_entity(m) if m else None

    def list(self, limit: int = 50, offset: int = 0) -> Sequence[UserEntity]:
        rows = self.session.scalars(select(User).limit(limit).offset(offset)).all()
        return [to_entity(r) for r in rows]

    def add(self, user: UserEntity) -> None:
        self.session.add(User(id=user.id, email=user.email, name=user.name))

    def remove(self, user_id: str) -> None:
        self.session.execute(delete(User).where(User.id == user_id))

domain 只看見 UserRepository 介面,與 ORM、資料表結構完全解耦。


應用服務層:用例聚焦,依賴注入

# src/my_project/services/user_service.py
from ..domain.repositories import UserRepository
from ..domain.entities import UserEntity
import structlog

log = structlog.get_logger()

class UserService:
    def __init__(self, users: UserRepository) -> None:
        self.users = users

    def register(self, user: UserEntity) -> None:
        if self.users.get_by_email(user.email):
            log.info("user_exists", email=user.email)
            return
        self.users.add(user)
        log.info("user_registered", user_id=user.id, email=user.email)


可靠性:對可恢復錯誤套用重試

# 例:在查詢上套用保守重試策略
from tenacity import retry, stop_after_attempt, wait_exponential
import sqlalchemy

class ResilientUserRepository(SqlAlchemyUserRepository):
    @retry(reraise=True, stop=stop_after_attempt(3),
           wait=wait_exponential(multiplier=0.2, min=0.2, max=2))
    def get_by_email(self, email: str):
        try:
            return super().get_by_email(email)
        except sqlalchemy.exc.OperationalError:
            # 例如連線瞬斷、連線池抖動
            raise


效能:Cache-Aside 包裝 Repository(選用)

# src/my_project/infrastructure/db/repositories_cached.py
import redis, orjson
from typing import Optional
from ...domain.entities import UserEntity
from .repositories import SqlAlchemyUserRepository

def dumps(x): return orjson.dumps(x)
def loads(b): return UserEntity(**orjson.loads(b))

class CachedUserRepository:
    def __init__(self, inner: SqlAlchemyUserRepository, r: redis.Redis, ttl_sec: int = 300):
        self.inner = inner
        self.r = r
        self.ttl = ttl_sec
        self.prefix = "v1:user:"  # 以版本前綴管理大規模失效

    def get(self, user_id: str) -> Optional[UserEntity]:
        key = f"{self.prefix}{user_id}"
        if (b := self.r.get(key)) is not None:
            data = orjson.loads(b)
            if data.get("__nil"):  # 負向快取
                return None
            return loads(b)
        u = self.inner.get(user_id)
        if u:
            self.r.setex(key, self.ttl, dumps(u.__dict__))
        else:
            self.r.setex(key, 60, orjson.dumps({"__nil": True}))
        return u


設定與祕密:連線字串來自設定層

# src/my_project/settings.py
from pydantic_settings import BaseSettings, SettingsConfigDict

class AppSettings(BaseSettings):
    model_config = SettingsConfigDict(env_prefix="APP_", env_file=".env", env_file_encoding="utf-8")
    db_url: str
    redis_url: str = "redis://localhost:6379/0"


測試策略:in-memory DB、fixtures 與 coverage

# tests/integration/test_user_repo.py
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from my_project.infrastructure.db.models import Base
from my_project.infrastructure.db.repositories import SqlAlchemyUserRepository
from my_project.domain.entities import UserEntity

@pytest.fixture
def repo():
    engine = create_engine("sqlite+pysqlite:///:memory:")
    Base.metadata.create_all(engine)
    Session = sessionmaker(bind=engine, expire_on_commit=False)
    with Session() as s:
        yield SqlAlchemyUserRepository(s)

def test_add_and_get(repo):
    u = UserEntity(id="u1", email="a@x.com", name="Alice")
    repo.add(u)
    got = repo.get("u1")
    assert got and got.email == "a@x.com"


非同步版本(選讀)

# Async 版本示意:需要高併發時採用
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession

engine = create_async_engine("postgresql+asyncpg://user:pass@host/db")
Session = async_sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)

# 之後 Repository 實作改為 async def 並使用 await session.get(...) / session.execute(...)

觀測性:為每次資料層呼叫記錄事件與耗時

# 共用裝飾器
import time, structlog
log = structlog.get_logger()

def timed(fn):
    def wrap(*a, **kw):
        t0 = time.perf_counter()
        try:
            return fn(*a, **kw)
        finally:
            log.info("repo_call", fn=fn.__name__, ms=round((time.perf_counter()-t0)*1000, 2))
    return wrap

事件名稱、欄位固定化(fn, ms 等),方便在日誌平台/追蹤系統聚合查詢。


CI 與一鍵化工作流

  • Hatch:在 pyproject.toml 增加 scripts,例如 hatch run testhatch run lint
  • Nox:提供 tests, lint session,維持本機與 CI 一致
  • CI 門檻:coverage 最低門檻、型別檢查必過、lint 無錯誤


小結

今天把資料層從「到處寫 SQL」提升為「可測試、可觀測、可替換」的一層:上層用例只面對 Repository 介面,ORM 細節被封裝;可靠性與效能需求透過重試與快取在外圍實作。到這裡,服務層與 API 層的對接就有了穩固地基。下一步,把 Repository 接進 Web 層(例如 FastAPI)做 Adapter,完成從入站到資料存取的整條脈絡。


上一篇
Day 18 -快取策略:lru_cache、Redis 與失效設計
下一篇
Day 20 -API 層:用 FastAPI 落實 Ports/Adapters(六邊形味道)
系列文
30 天 Python 專案工坊:環境、結構、測試到部署全打通23
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言