iT邦幫忙

2025 iThome 鐵人賽

DAY 6
1
Rust

DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅系列 第 6

Day 6: SQL 查詢進階技巧 - 解決實際分析問題

  • 分享至 

  • xImage
  •  

前言

打給厚,今天我們將深入學習如何運用 DataFusion 支援的進階 SQL 語法查詢資料,包括窗口函數、CTE(Common Table Expression)、子查詢等,以及如何在 SQL 和 DataFrame API 之間做出最佳選擇。是不遲疑,讓我們開始今日份的學習吧!

窗口函數(Window Functions)

窗口函數是實現複雜查詢的強力工具之一,和傳統聚合相比窗口函數能在原始資料行的後面添加計算結果而不影響表格結構。函數結構如下:

function([expr])
  OVER(
    [PARTITION BY expr[, …]]
    [ORDER BY expr [ ASC | DESC ][, …]]
    [ frame_clause ]
)

其中:

  • function([expr]): 窗口函數本身
  • OVER: 為必寫語句,表示前面的函數會作用在 OVER 內定義的分區,大致可以由三個部分組成:
    • PARTITION BY:定義「窗口邊界」- 在哪些行的範圍內計算
    • ORDER BY:定義「計算順序」- 按什麼順序處理行
    • Frame :定義「計算範圍」- 當前行前後多少行參與計算

DataFusion 內建窗口函數分類

DataFusion 支援的窗口函數非常多樣,這邊舉幾個常見的函數

聚合函數

基本上傳統聚合函數都可以轉換成窗口函數的形式計算

SUM()       -- 累計總和或移動總和
AVG()       -- 移動平均
COUNT()     -- 累計計數

排名函數(Ranking Functions)

ROW_NUMBER()  -- 連續編號:1,2,3,4...
RANK()        -- 跳躍排名:1,2,2,4...(相同值同排名,下一個跳過)
DENSE_RANK()  -- 密集排名:1,2,2,3...(相同值同排名,下一個連續)

偏移函數(Offset Functions)

LAG(column, n)    -- 取前n行的值
LEAD(column, n)   -- 取後n行的值
FIRST_VALUE()     -- 窗口內第一個值
LAST_VALUE()      -- 窗口內最後一個值

CTE(Common Table Expression)

有時候我們需要從多個表當中提取數據時常常會使用子查詢,但為了簡化查詢語法並達成遞迴查詢,CTE 的寫法就派上用場了。CTE 的本質是命名的暫時結果集,它將複雜查詢分解成多個邏輯步驟,大幅提升可讀性和維護性。

CTE 的三大優勢

  1. 可讀性:將複雜邏輯分解成清晰的步驟
  2. 重用性:同一個 CTE 可以在主查詢中多次引用
  3. 除錯性:每個 CTE 都可以單獨測試和驗證

實作範例

環境準備

照慣例先建立我們的測試專案:

cargo new datafusion-demo4
cd datafusion-demo4
mkdir data
# Cargo.toml
[dependencies]
datafusion = "49.0.2"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }

數據準備與主程式結構

這次我們使用學生成績作為實作的資料,不過為了方便我們直接使用 DataFusion 創建一張學生成績表。主程式的結構和昨天的範例一樣,將這次要學習的內容拆分成多個輔助函數由主程式調用,後面則依序實做這些輔助函數。

use datafusion::prelude::*;
use datafusion::error::Result;

現在我們需要在 main 函數中調用所有新增的範例函數:

```rust
#[tokio::main]
async fn main() -> Result<()> {
    let ctx = SessionContext::new();

    // 創建簡單的學生成績數據
    create_student_data(&ctx).await?;

    // 1. 窗口函數基礎應用
    basic_window_functions(&ctx).await?;

    // 2. CTE 基礎應用
    basic_cte_example(&ctx).await?;

    // 5. 三種方法的實戰對比
    comparison_example(&ctx).await?;

    // 6. SQL vs DataFrame API 比較
    sql_vs_dataframe_comparison(&ctx).await?;

    // 7. 常見陷阱與最佳實務
    common_pitfalls_and_best_practices(&ctx).await?;

    Ok(())
}

async fn create_student_data(ctx: &SessionContext) -> Result<()> {
    // 創建學生成績表 - 數據簡單易理解
    let student_sql = r#"
        CREATE MEMORY TABLE students AS VALUES
        ('張三', '數學', 85, '高一'),
        ('李四', '數學', 92, '高一'),
        ('王五', '數學', 78, '高一'),
        ('張三', '英語', 88, '高一'),
        ('李四', '英語', 96, '高一'),
        ('王五', '英語', 82, '高一'),
        ('張三', '物理', 90, '高一'),
        ('李四', '物理', 89, '高一'),
        ('王五', '物理', 76, '高一'),
        ('陳六', '數學', 94, '高二'),
        ('趙七', '數學', 87, '高二'),
        ('陳六', '英語', 91, '高二'),
        ('趙七', '英語', 85, '高二')
        AS students(name, subject, score, grade)
    "#;

    ctx.sql(student_sql).await?.collect().await?;
    println!("✓ 學生成績數據已創建");

    Ok(())
}

1. 窗口函數基礎應用

讓我們從最簡單的範例開始理解窗口函數:

async fn basic_window_functions(ctx: &SessionContext) -> Result<()> {
    println!("\n=== 窗口函數基礎應用 ===");

    // 範例1:為每個學生的成績排名
    println!("\n1. 學生成績排名:");
    let ranking_query = r#"
        SELECT
            name,
            subject,
            score,
            -- 全校排名(所有科目一起排)
            ROW_NUMBER() OVER (ORDER BY score DESC) as overall_rank,
            -- 科目內排名
            ROW_NUMBER() OVER (PARTITION BY subject ORDER BY score DESC) as subject_rank
        FROM students
        ORDER BY subject, score DESC
    "#;

    let results = ctx.sql(ranking_query).await?.collect().await?;
    datafusion::arrow::util::pretty::print_batches(&results)?;

    println!("\n💡 講解:");
    println!("- ROW_NUMBER() 為每一行分配唯一編號");
    println!("- PARTITION BY subject:在每個科目內部重新開始編號");
    println!("- ORDER BY score DESC:按分數降序排列");

    Ok(())
}

3. CTE 基礎應用

async fn basic_cte_example(ctx: &SessionContext) -> Result<()> {
    println!("\n=== CTE 基礎應用 ===");

    // 範例1:使用 CTE 分步計算學生總分和平均分
    println!("\n1. 學生總分和平均分計算:");
    let basic_cte = r#"
        WITH student_totals AS (
            -- 第一步:計算每個學生的總分和科目數
            SELECT
                name,
                COUNT(*) as subject_count,
                SUM(score) as total_score,
                ROUND(AVG(score), 1) as avg_score
            FROM students
            GROUP BY name
        )
        -- 第二步:基於第一步結果進行進一步分析
        SELECT
            name,
            subject_count,
            total_score,
            avg_score,
            CASE
                WHEN avg_score >= 90 THEN '優秀'
                WHEN avg_score >= 80 THEN '良好'
                WHEN avg_score >= 70 THEN '中等'
                ELSE '需要努力'
            END as performance_level
        FROM student_totals
        ORDER BY avg_score DESC
    "#;

    let results = ctx.sql(basic_cte).await?.collect().await?;
    datafusion::arrow::util::pretty::print_batches(&results)?;

    println!("\n💡 講解:");
    println!("- WITH student_totals AS (...): 定義一個名為 student_totals 的 CTE");
    println!("- CTE 內部完成了複雜的聚合計算");
    println!("- 主查詢基於 CTE 結果進行進一步的邏輯處理");
    println!("- 這樣的分步處理比一個複雜的嵌套查詢更易讀");

    Ok(())
}

進階技巧與最佳實務

DataFusion 窗口函數的底層優化機制

DataFusion 使用經過實戰檢驗的基於排序的窗口函數實現,這個方法也被 PostgreSQL 和 Vertica 等成熟數據庫採用:

  1. 預排序階段:首先按 PARTITION BY 和 ORDER BY 表達式對輸入數據進行排序
  2. 分區識別:WindowAggExec 算子高效識別分區邊界
  3. 分區計算:為每個分區創建對應的 PartitionEvaluator 實例
  4. 增量計算:利用排序特性進行增量式窗口計算

性能優化策略:

  • 重用排序:如果數據已經按所需順序排序,DataFusion 會智能重用現有排序
  • 並行分區:不同分區可以並行處理,提升多核利用率
  • 記憶體管理:使用流式處理,避免大數據集的記憶體溢出

CTE 在 DataFusion 中的優化處理

DataFusion 的查詢優化器對 CTE 有特殊的處理策略:

1. 內聯決策

// 簡單的 CTE 會被內聯
WITH simple_filter AS (SELECT * FROM table WHERE condition)
SELECT * FROM simple_filter;

// 等價於:
SELECT * FROM table WHERE condition;

2. 物化決策

// 複雜的 CTE 可能被物化
WITH complex_stats AS (
    SELECT region, COUNT(*), AVG(amount), STDDEV(amount)
    FROM large_table
    GROUP BY region
)
SELECT * FROM complex_stats WHERE avg > 1000;

3. 下推優化
DataFusion 會嘗試將篩選條件下推到 CTE 內部,減少不必要的計算。

查詢計劃分析與優化

async fn explain_query_plans(ctx: &SessionContext) -> Result<()> {
    println!("\n=== 查詢計劃分析 ===");

    // 分析窗口函數的執行計劃
    let explain_window = r#"
        EXPLAIN SELECT
            name,
            score,
            ROW_NUMBER() OVER (PARTITION BY subject ORDER BY score DESC) as rank
        FROM students
    "#;

    let results = ctx.sql(explain_window).await?.collect().await?;
    println!("窗口函數執行計劃:");
    datafusion::arrow::util::pretty::print_batches(&results)?;

    // 分析 CTE 的執行計劃
    let explain_cte = r#"
        EXPLAIN WITH ranked AS (
            SELECT name, score,
                   ROW_NUMBER() OVER (ORDER BY score DESC) as rank
            FROM students
        )
        SELECT * FROM ranked WHERE rank <= 3
    "#;

    let results = ctx.sql(explain_cte).await?.collect().await?;
    println!("\nCTE 執行計劃:");
    datafusion::arrow::util::pretty::print_batches(&results)?;

    Ok(())
}

動態過濾器與現代優化技術

DataFusion 49.0 引入的動態過濾器技術特別適用於以下場景:

1. TOP-K 查詢優化

-- 這種查詢可以利用動態過濾器大幅提升性能
SELECT * FROM large_table
ORDER BY timestamp DESC
LIMIT 10;

2. 時間範圍查詢
當查詢涉及時間排序的 TOP-K 時,動態過濾器可以在掃描過程中逐步收緊過濾條件,跳過不必要的數據頁。

3. 大表 JOIN 優化
動態過濾器可以在 JOIN 過程中傳遞資訊,提前過濾不符合條件的數據。

SQL vs DataFrame API 的深度比較

特性 SQL DataFrame API
學習曲線 熟悉 SQL 即可上手 需要學習 DataFusion 特定 API
表達能力 聲明式,直觀表達業務邏輯 程式化,適合複雜控制流程
型別安全 運行時檢查 編譯時檢查
動態構建 字符串拼接,易出錯 函數式構建,型別安全
除錯能力 EXPLAIN 分析 逐步構建,易於除錯
性能 優化器充分優化 同樣受益於優化器
整合性 與 BI 工具無縫整合 與 Rust 生態深度整合

實際選擇建議:

  • 數據探索和分析:優先使用 SQL
  • 生產環境的數據流水線:考慮 DataFrame API
  • 混合使用:複雜邏輯用 DataFrame API 構建,簡單查詢用 SQL

今天我們深入探討了 DataFusion 中 SQL 查詢的進階技巧,通過簡潔易懂的學生成績範例全面理解了窗口函數和 CTE 的強大功能。

參考資料

  1. User defined Window Functions in DataFusion
  2. Optimizing SQL (and DataFrames) in DataFusion, Part 1
  3. Optimizing SQL (and DataFrames) in DataFusion, Part 2
  4. Dynamic Filters: Passing Information Between Operators
  5. Apache DataFusion 49.0.0 Released
  6. Window Functions Documentation

上一篇
Day 5: 多數據源整合實戰 - CSV、JSON、Parquet 混合分析
下一篇
Day 7: 除錯與性能優化入門 - 讓查詢跑得又快又穩
系列文
DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅9
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言