iT邦幫忙

2025 iThome 鐵人賽

DAY 19
0
Rust

Rust 後端入門系列 第 19

Day 19 Axum 加入快取(Redis)與資料一致性策略

  • 分享至 

  • xImage
  •  

本文的目標是:用 Redis 快取減少對 Postgres 的讀取負載、降低延遲,同時維持合理的一致性保證。

常見快取模式與一致性權衡

  • Cache-aside(應用程式主導)
    • 讀:應用先查快取(Redis),cache miss 再從 DB 讀取並寫入快取。
    • 寫:應用先更新 DB,再刪除/更新快取(通常採「更新 DB -> 失效/更新快取」)。
    • 優點:簡單,靈活;缺點:有短暫不一致性。
  • Read-through / Write-through / Write-behind(由快取代理)
    • 需快取層支援自動向 DB 同步(較複雜,常見於使用像是 apcu/redis modules 或專門中間層)。
  • 強一致性(同步更新)
    • 可在 DB transaction 成功後立即同步更新 cache(在同一程式邏輯中),但在分布式系統仍有 race condition(多實例同時更新)。較嚴格方法需使用分布式鎖或事件序列化。

結論:以 web 應用常見做法,建議採用 cache-aside,再搭配:

  • 在 update/delete 時立即失效(或更新)快取;
  • 加上短 TTL 與負快取,減少 cache stampede;
  • 多實例時以 Redis pub/sub 或 keyspace notifications 通知其他實例失效(或用 centralized invalidation)。

Redis 技術選擇與序列化考量

  • 建議使用 redis crate(async)與 ConnectionManager(長連線、連線重連管理):
    • Cargo.toml :
      redis = { version = "0.24", features = ["tokio-rt-core", "aio"] }
  • 序列化:建議使用 serde_json 把 struct 序列化成 JSON 存到 Redis(簡單、可讀)。也可用 bincode / MessagePack 以節省頻寬,但 JSON 可跨語言容易 debug。
  • 不要把 password_hash 放到快取的公開回傳物件;定義一個 Response DTO(UserResponse)不包含 password_hash,再把該 DTO 存進 Redis。

實作重點與節錄程式碼

下面用節錄程式碼示範如何整合 Redis。

先新增一個 response DTO(src/models.rs ):

#[derive(Serialize, Deserialize, Clone)]
pub struct UserResponse {
    pub id: i64,
    pub username: String,
    pub email: String,
    pub created_at: OffsetDateTime,
    pub updated_at: OffsetDateTime,
}

impl From<User> for UserResponse {
    fn from(u: User) -> Self {
        UserResponse {
            id: u.id,
            username: u.username,
            email: u.email,
            created_at: u.created_at,
            updated_at: u.updated_at,
        }
    }
}

在 Cargo.toml 新增 redis:

  • redis = { version = "0.32", features = ["aio", "tokio-comp"] } (請依實際可用版本調整)

在 main.rs 建立 Redis connection manager 並注入:

mod cache;

use redis::aio::MultiplexedConnection;
use redis::Client as RedisClient;

// 在 main() 中,建立 redis manager
let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379/".to_string());
let redis_client = RedisClient::open(redis_url.as_str()).expect("invalid redis url");
let redis_conn: MultiplexedConnection = redis_client.get_multiplexed_tokio_connection().await.expect("redis connect fail");
	
// 將 redis_conn.clone() 放入 Extension alongside pool
let app = Router::new()
    /* routes */
    .layer(Extension(pool))
    .layer(Extension(redis_conn));

建立一些快取 helper(可放在 src/cache.rs):

use redis::aio::MultiplexedConnection;
use redis::AsyncCommands;
use serde_json;
use crate::models::UserResponse;
use std::time::Duration;

const USER_CACHE_PREFIX: &str = "user:"; // key = user:{id}
const USER_CACHE_TTL_SECS: usize = 60 * 5; // 5 分鐘 TTL, 可調

pub async fn get_user_cache(
    redis: &mut ConnectionManager,
    id: i64,
) -> redis::RedisResult<Option<UserResponse>> {
    let key = format!("{}{}", USER_CACHE_PREFIX, id);
    let v: Option<String> = redis.get(&key).await?; // get returns Option<String>
    match v {
        Some(s) => {
            let user: UserResponse = serde_json::from_str(&s).map_err(|e| {
                // map serde error into redis::RedisError
                redis::RedisError::from((redis::ErrorKind::TypeError, "serde json parse error"))
            })?;
            Ok(Some(user))
        }
        None => Ok(None),
    }
}

pub async fn set_user_cache(
    redis: &mut ConnectionManager,
    id: i64,
    user: &UserResponse,
) -> redis::RedisResult<()> {
    let key = format!("{}{}", USER_CACHE_PREFIX, id);
    let s = serde_json::to_string(user).map_err(|_| {
        redis::RedisError::from((redis::ErrorKind::TypeError, "serde json serialize error"))
    })?;
    // SETEX: set with TTL
    let _: () = redis.set_ex(key, s, USER_CACHE_TTL_SECS).await?;
    Ok(())
}

pub async fn invalidate_user_cache(
    redis: &mut ConnectionManager,
    id: i64,
) -> redis::RedisResult<()> {
    let key = format!("{}{}", USER_CACHE_PREFIX, id);
    let _: () = redis.del(key).await?;
    Ok(())
}

注意:這裡 get_user_cache / set_user_cache 都接收 &mut ConnectionManager。當你在 handler 中 extract Extension(redis_conn) 時,取得的是 clone 後的 ConnectionManager(可直接傳 mutable binding)。

修改 handler.rs(get_user)以使用快取:

use redis::aio::MultiplexedConnection;
use crate::cache::{get_user_cache, set_user_cache, invalidate_user_cache};
use crate::models::UserResponse;

pub async fn get_user(
    Extension(pool): Extension<PgPool>,
    Extension(mut redis): Extension<ConnectionManager>, // 注意:在 same layer 時要指定兩個 Extension 的順序,axum 會匹配
    Path(id): Path<i64>,
) -> Result<impl IntoResponse, AppError> {
    // 1) 嘗試從 Redis 取
    if let Ok(Some(user_res)) = get_user_cache(&mut redis, id).await {
        // 快取命中 — 直接回傳 (200)
        return Ok((StatusCode::OK, Json(user_res)));
    }

    // 2) Cache miss -> 從 DB 讀
    let user = sqlx::query_as::<_, User>(
        r#"
        SELECT id, username, email, password_hash, created_at, updated_at
        FROM users
        WHERE id = $1
        "#,
    )
    .bind(id)
    .fetch_optional(&pool)
    .await
    .map_err(|e| internal_err(e))?;

    match user {
        Some(u) => {
            let resp: UserResponse = u.into();
            // 3) 將結果寫進 Redis(忽略寫入錯誤,避免阻塞請求)
            let mut redis_for_set = redis.clone();
            let resp_clone = resp.clone();
            // 非阻塞地嘗試 set cache:可 spawn 背景任務
            tokio::spawn(async move {
                let _ = set_user_cache(&mut redis_for_set, id, &resp_clone).await;
            });
            Ok((StatusCode::OK, Json(resp)))
        }
        None => {
            Err((StatusCode::NOT_FOUND, format!("user {} not found", id)))
        }
    }
}

修改 create_user/update_user/delete_user 完成快取一致性:

  • create_user: 在 DB insert 成功後,直接 set cache:
// 轉成外部回傳用型別
    let resp: UserResponse = rec.clone().into();

    // 背景寫入快取(不阻塞回應)
    // 注意:我們把 redis.clone() 給背景任務
    let mut redis_for_set = redis.clone();
    let user_resp = resp.clone();
    tokio::spawn(async move {
        let _ = set_user_cache(&mut redis_for_set, user_resp.id, &user_resp).await;
    });

    Ok((StatusCode::CREATED, Json(resp)))
  • update_user: 在 DB 更新成功後,最好更新快取(set new value)或至少 invalidate(del)。建議在 transaction 成功後 set cache。
let resp: UserResponse = updated.into();
	let mut redis_for_set = redis.clone();
	let resp_clone = resp.clone();
	tokio::spawn(async move {
		let _ = set_user_cache(&mut redis_for_set, resp.id, &resp_clone).await;
	});
	Ok((StatusCode::OK, Json(resp)))
  • delete_user : 刪除 DB 成功後,刪除快取(invalidate):
let mut redis_for_del = redis.clone();
	tokio::spawn(async move {
		let _ = invalidate_user_cache(&mut redis_for_del, id).await;
	});

上面用 tokio::spawn 非同步背景任務做 cache set/del,避免因 Redis 寫入錯誤或延遲而阻塞主要請求;但這樣會造成「DB 已改但 cache 更新失敗」的短暫不一致。若你需要更強一致性,可以在主程式中同步等待 set_user_cache 完成(但會增加延遲與失敗暴露面)。

最後補充幾點設計判斷:

  • 如果資料是高度頻繁讀、很少寫(例如熱門公開頁面),快取收益最大:可用較長 TTL 與 background refresh。
  • 如果資料頻繁寫且需強一致性(例如銀行餘額),避免用快取或只在非關鍵路徑使用快取,並以 transaction 與鎖來確保一致性。
  • 快取不是替代一致性保證的手段;設計策略時先把一致性需求列出(可接受 eventual consistency 嗎?是否需要線性/強一致性?),再選用相應快取模式。

上一篇
Day18 Axum 資料庫 PgPool 與連線管理
下一篇
Day 20 Axum與密碼安全
系列文
Rust 後端入門25
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言