iT邦幫忙

2025 iThome 鐵人賽

DAY 17
0
Rust

Rust 後端入門系列 第 17

Day 17 Axum CRUD(PostgreSQL版本)

  • 分享至 

  • xImage
  •  

重點設計概念

  • PgPool 在程式啟動時建立並透過 axum::Extension 注入給 handler 使用。
  • 每個 handler 使用 parameterized SQL(避免 SQL injection)。
  • 用 serde for JSON (Deserialize request, Serialize response)。
  • 使用 sqlx 的 runtime query API(sqlx::query / sqlx::query_as)以避免編譯時需連 DB 的限制。
  • 明確回傳 HTTP 狀態碼(201 Created、200 OK、404 Not Found、204 No Content、400 Bad Request、500 Internal Server Error)。
  • 密碼儲存:範例程式用 password_hash 欄位,但實務上請用 bcrypt/argon2 做雜湊(下方會說明)。

必要的依賴

將 Cargo.toml 修改如下:

[package]
name = "sqlx_connect_demo"
version = "0.1.0"
edition = "2024"

[dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
sqlx = { version = "0.8", features = ["runtime-tokio-rustls", "postgres", "macros", "migrate", "time"] }
dotenvy = "0.15"
axum = "0.8"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
anyhow = "1.0"
time = { version = "0.3", features = ["serde"] }

程式檔案設計

  • src/main.rs:啟動,建立 pool、執行 migrations、啟動 axum server
  • src/models.rs:request/response 的 struct,及 FromRow impl
  • src/handlers.rs:各 CRUD handler

下面給完整範例程式碼(可直接放到專案中):

src/models.rs

use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use time::OffsetDateTime;

#[derive(Serialize, FromRow)]
pub struct User {
    pub id: i64,
    pub username: String,
    pub email: String,
    pub password_hash: String,
    pub created_at: OffsetDateTime,
    pub updated_at: OffsetDateTime,
}

// 用於建立(request body)
#[derive(Deserialize)]
pub struct CreateUser {
    pub username: String,
    pub email: String,
    pub password: String, // 請在 handler 中 hash,範例中會直接示意
}

// 用於更新(部分 update 使用 PUT ,替換全部內容可改為 PATCH)
#[derive(Deserialize)]
pub struct UpdateUser {
    pub username: Option<String>,
    pub email: Option<String>,
    pub password: Option<String>,
}

src/handlers.rs

use axum::{
    extract::{Extension, Json, Path, Query},
    http::StatusCode,
    response::IntoResponse,
};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use serde_json::json;
use crate::models::{User, CreateUser, UpdateUser};
use time::OffsetDateTime;
use anyhow::Context;

type AppError = (StatusCode, String);

// Helper: map anyhow::Error -> HTTP 500
fn internal_err(e: impl std::fmt::Display) -> AppError {
    (StatusCode::INTERNAL_SERVER_ERROR, format!("{}", e))
}

// POST /users -> INSERT
pub async fn create_user(
    Extension(pool): Extension<PgPool>,
    Json(payload): Json<CreateUser>,
) -> Result<impl IntoResponse, AppError> {
    // 實務上:在此對 payload 做驗證(username/email 格式、密碼強度等)
    // 密碼雜湊:示範使用 bcrypt/argon2 更好。這裡用簡單示意 (不要在真實系統中儲存明文)
    // 範例中我們用 bcrypt(如果想啟用,請在 Cargo.toml 加 bcrypt)
    let password_hash = {
        // 若未加入 bcrypt crate,可直接用 payload.password.clone()(但不建議)
        // 下面假設你安裝了 argon2 或 bcrypt。此處示意為 plaintext fallback:
        payload.password.clone()
    };

    let rec = sqlx::query_as::<_, User>(
        r#"
        INSERT INTO users (username, email, password_hash, created_at, updated_at)
        VALUES ($1, $2, $3, now(), now())
        RETURNING id, username, email, password_hash, created_at, updated_at
        "#,
    )
    .bind(&payload.username)
    .bind(&payload.email)
    .bind(&password_hash)
    .fetch_one(&pool)
    .await
    .map_err(|e| internal_err(e))?;

    // 回傳 201 Created 與新建立的資源
    Ok((StatusCode::CREATED, Json(rec)))
}

// GET /users/{id} -> SELECT single
pub async fn get_user(
    Extension(pool): Extension<PgPool>,
    Path(id): Path<i64>,
) -> Result<impl IntoResponse, AppError> {
    let user = sqlx::query_as::<_, User>(
        r#"
        SELECT id, username, email, password_hash, created_at, updated_at
        FROM users
        WHERE id = $1
        "#,
    )
    .bind(id)
    .fetch_optional(&pool)
    .await
    .map_err(|e| internal_err(e))?;

    match user {
        Some(u) => Ok((StatusCode::OK, Json(u))),
        None => Err((StatusCode::NOT_FOUND, format!("user {} not found", id))),
    }
}

// GET /users -> SELECT list (簡單 limit, offset)
#[derive(Deserialize)]
pub struct ListParams {
    pub limit: Option<u32>,
    pub offset: Option<u32>,
}

pub async fn list_users(
    Extension(pool): Extension<PgPool>,
    Query(params): Query<ListParams>,
) -> Result<impl IntoResponse, AppError> {
    let limit = params.limit.unwrap_or(50) as i64;
    let offset = params.offset.unwrap_or(0) as i64;

    let users = sqlx::query_as::<_, User>(
        r#"
        SELECT id, username, email, password_hash, created_at, updated_at
        FROM users
        ORDER BY id
        LIMIT $1 OFFSET $2
        "#,
    )
    .bind(limit)
    .bind(offset)
    .fetch_all(&pool)
    .await
    .map_err(|e| internal_err(e))?;

    Ok((StatusCode::OK, Json(users)))
}

// PUT /users/{id} -> UPDATE
pub async fn update_user(
    Extension(pool): Extension<PgPool>,
    Path(id): Path<i64>,
    Json(payload): Json<UpdateUser>,
) -> Result<impl IntoResponse, AppError> {
    // 簡單示範:先取得現有資料,再更新指定欄位
    let existing = sqlx::query_as::<_, User>(
        "SELECT id, username, email, password_hash, created_at, updated_at FROM users WHERE id = $1",
    )
    .bind(id)
    .fetch_optional(&pool)
    .await
    .map_err(|e| internal_err(e))?;

    let existing = match existing {
        Some(e) => e,
        None => return Err((StatusCode::NOT_FOUND, format!("user {} not found", id))),
    };

    let new_username = payload.username.unwrap_or(existing.username);
    let new_email = payload.email.unwrap_or(existing.email);
    let new_password_hash = match payload.password {
        Some(p) => p, // 實務上 hash 密碼
        None => existing.password_hash,
    };

    let updated = sqlx::query_as::<_, User>(
        r#"
        UPDATE users
        SET username = $1, email = $2, password_hash = $3, updated_at = now()
        WHERE id = $4
        RETURNING id, username, email, password_hash, created_at, updated_at
        "#,
    )
    .bind(&new_username)
    .bind(&new_email)
    .bind(&new_password_hash)
    .bind(id)
    .fetch_one(&pool)
    .await
    .map_err(|e| internal_err(e))?;

    Ok((StatusCode::OK, Json(updated)))
}

// DELETE /users/{id} -> DELETE
pub async fn delete_user(
    Extension(pool): Extension<PgPool>,
    Path(id): Path<i64>,
) -> Result<impl IntoResponse, AppError> {
    let res = sqlx::query!(
        r#"
        DELETE FROM users
        WHERE id = $1
        "#,
        id
    )
    .execute(&pool)
    .await
    .map_err(|e| internal_err(e))?;

    if res.rows_affected() == 0 {
        return Err((StatusCode::NOT_FOUND, format!("user {} not found", id)));
    }

    // 回傳 204 No Content
    Ok(StatusCode::NO_CONTENT)
}

src/main.rs(擴充後,部分程式碼來自於前兩天)

mod handlers;
mod models;

use axum::{
    extract::{Path, Extension},
    Router, routing::{get, post, put, delete},
};
use sqlx::postgres::PgPoolOptions;
use sqlx::migrate::MigrateDatabase;
use std::time::Duration;
use dotenvy::dotenv;
use std::env;
use tokio::time;

#[tokio::main]
async fn main() {
    dotenv().ok();

    let database_url = match env::var("DATABASE_URL") {
        Ok(v) => v,
        Err(_) => {
            eprintln!("錯誤:找不到 DATABASE_URL");
            std::process::exit(1);
        }
    };

    // 例如 5 秒超時
    let connect_future = PgPoolOptions::new()
        .max_connections(5)
        .connect(&database_url);

	match sqlx::postgres::Postgres::database_exists(&database_url).await {
        Ok(exists) => {
            if !exists {
				println!("資料庫不存在,嘗試建立...");
                if let Err(e) = sqlx::postgres::Postgres::create_database(&database_url).await {
                    eprintln!("建立失敗: {}", e);
                }
            }
        }
        Err(e) => eprintln!("檢查資料庫是否存在失敗: {}", e),
    }

    let pool = match time::timeout(Duration::from_secs(5), connect_future).await {
        Ok(Ok(p)) => {
            println!("成功建立 PgPool");
            p
        }
        Ok(Err(e)) => {
            eprintln!("建立 PgPool 失敗: {}", e);
            std::process::exit(1);
        }
        Err(_) => {
            eprintln!("建立 PgPool 超時");
            std::process::exit(1);
        }
    };
	
	if let Err(e) = sqlx::migrate!("./migrations").run(&pool).await {
        eprintln!("migrations失敗: {}", e);
        std::process::exit(1);
    }
	println!("成功完成 migrations");
	
	let app = Router::new()
    .route("/users", post(handlers::create_user).get(handlers::list_users))
    .route(
        "/users/{id}",
        get(handlers::get_user)
            .put(handlers::update_user)
            .delete(handlers::delete_user),
    )
    .layer(Extension(pool));
	
	let listener = tokio::net::TcpListener::bind("127.0.0.1:3000")
	        .await
	        .unwrap();
	
	axum::serve(listener, app)
	    .await
	    .unwrap();
}

細節與說明

  • INSERT(create_user)
    • SQL:
      INSERT INTO users (username, email, password_hash, created_at, updated_at) VALUES (1,2,$3, now(), now()) RETURNING ...

      1,1,

    • 使用 RETURNING 可以直接取得剛插入的整筆資料(包含 id、timestamp)。

    • 錯誤處理:若 username/email UNIQUE 衝突,Postgres 會回錯誤,sqlx 的錯誤會反映出 constraint violation,應回 409 Conflict 或 400 取決於需求。範例直接回 500。

  • SELECT(get_user, list_users)
    • get_user 用 WHERE id = $1,fetch_optional 以回傳 Option,找不到回 404。
    • list_users 提供簡單 limit/offset,避免一次拉太多記錄。
  • UPDATE(update_user)
    • 範例採先 fetch existing,再依 payload 欄位更新。也可直接在 SQL 用 COALESCE 或 CASE 來做 partial update,但先 fetch 再 update 更直觀並能避免無意義覆寫。
    • RETURNING 可以取得更新後的記錄。
  • DELETE(delete_user)
    • 使用 DELETE ... WHERE id = $1,檢查 rows_affected 判斷是否有刪除紀錄,若沒有回 404。成功回 204 No Content。

輸入驗證與密碼處理(實務注意)

  • 密碼:請務必在建立/更新時用安全雜湊演算法(argon2、bcrypt、scrypt)處理,永遠不要儲存明文。建議使用 argon2(argon2rs 或者 argon2 crate):
    • 產生 hash:argon2::hash_encoded(...)
    • 比對:argon2::verify_encoded(...)
    • 若要整合,create_user handler 需在 bind 到 SQL 之前把 payload.password hash 起來再存入 password_hash 欄位。
  • 欄位驗證:username 長度、email 格式、避免漢字或特殊字元(視需求),可用 validator crate 或自行驗證。
  • 回傳資料時避免把 password_hash 回傳給 client(在 models 中可以建立 response DTO 不包含 password_hash)。上面的範例為簡潔示範,如果要安全,應定義一個 UserResponse 並只回傳非敏感欄位。

測試

  • 建立 user:

    POST http://127.0.0.1:3000/users
    
    {
    
    "username": "user1",
    "email": "user1@a.com",
    "password": "secret"
    
    }
    
  • 取得 user(id 假設為 1):
    curl http://localhost:3000/users/1

  • 列表:
    curl "http://localhost:3000/users?limit=10&offset=0"

  • 更新 user(部分欄位):

    PUT http://localhost:3000/users/1 
    
    {
    
    "email": "user1_update@a.com"
    
    }
    
  • 刪除 user:
    curl -X DELETE http://localhost:3000/users/1


上一篇
Day 16 Axum 撰寫 migration
下一篇
Day18 Axum 資料庫 PgPool 與連線管理
系列文
Rust 後端入門24
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言