iT邦幫忙

2025 iThome 鐵人賽

DAY 4
1
Rust

DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅系列 第 4

Day 04: DataFusion 核心工作模式 - 從數據到洞察的完整流程

  • 分享至 

  • xImage
  •  

前言

打給厚,昨天我們成功執行了第一個 CSV 查詢後,不過在實際工作場景下面臨的資料量和資料來源可沒那麼單純,所以今天我們就來建立一個完整且標準化的 DataFusion 工作流程。這個流程將成為我們後續學習各種高級功能的基礎框架,讓我們能夠系統性地處理各種數據分析任務。

DataFusion 的標準工作流程

在實際的數據分析工作中,我們很容易陷入「想到什麼查什麼」的混亂狀態。建立一個標準化的工作流程不僅能提高效率,更能幫助我們避免常見的錯誤和遺漏。

DataFusion 的典型工作模式可以總結為四個步驟:準備環境 → 載入數據 → 執行查詢 → 處理結果。讓我們通過一個實際例子來學習這個流程。

首先一樣先建立一個 rust 專案並設置 Cargo.toml:

cargo new datafusion-demo2
cd datafusion-demo2
mkdir data
[package]
name = "datafusion-demo2"
version = "0.1.0"
edition = "2024"

[dependencies]
datafusion = "49.0.2"
tokio = { version = "1.0", features = ["rt-multi-thread", "macros"] }

假設我們需要分析一家書店的銷售數據,先在 data 目錄中新增 bookstore_sales.csv

order_id,book_title,author,category,price,quantity,order_date
1001,資料結構與演算法,張三,技術書籍,580.00,2,2024-01-15
1002,深度學習入門,李四,技術書籍,720.00,1,2024-01-16
1003,哈利波特,J.K.羅琳,奇幻小說,350.00,3,2024-01-16
1004,Python 程式設計,王五,技術書籍,490.00,1,2024-01-17
1005,三體,劉慈欣,科幻小說,420.00,2,2024-01-18

1. 環境準備 - SessionContext 的彈性配置

SessionContext 是整個 DataFusion 工作流程的指揮中心。在昨天的例子中我們使用了最簡單的 SessionContext::new(),但在實際工作中,我們需要根據數據規模和分析需求來進行適當的配置。

use datafusion::functions_aggregate::expr_fn::{sum, count, avg};
use datafusion::prelude::*;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
    // 基礎配置:適合小到中等數據集的日常分析
    let ctx = SessionContext::new();

    // 檢查基礎配置
    println!("🚀 DataFusion 環境已準備完成");
    println!("   批次大小: {}", ctx.session_config().batch_size());
    println!("   目標分區數: {}", ctx.session_config().target_partitions());

本次的實作中特別列印出了兩個 SessionContext 的配置:

  • 批次大小 (batch_size):
    決定 DataFusion 每次處理的數據批次大小,較大的批次可以有效利用 SIMD 指令,但較消耗記憶體空間
  • 目標分區數 (target_partitions):
    可以讓 DataFusion 將不同分區得工作分散到多個 CPU 核心並行處裡

在不同數據量級下可以調整這些 SessionContext 配置優化查詢效能

2. 數據載入 - 建立分析的基礎

數據載入是分析工作的起點。在昨天的例子中,我們直接使用了 register_csv,但在實際工作中,我們需要更謹慎地處理數據載入過程:

    // 載入書店銷售數據
    println!("📚 載入書店銷售數據...");

    ctx.register_csv(
        "bookstore_sales", // 表名
        "data/bookstore_sales.csv",
        CsvReadOptions::new()
            .has_header(true) // 指定文件包含標題行
            .delimiter(b','), // 指定分隔符
    )
    .await?;

    // 驗證數據載入結果
    let df = ctx.table("bookstore_sales").await?;
    let schema = df.schema();
    println!("✅ 數據載入成功!");
    println!("   包含 {} 個欄位", schema.fields().len());
    for field in schema.fields() {
        println!("   - {}: {:?}", field.name(), field.data_type());
    }

    // 快速查看數據樣本
    println!("\n📋 數據預覽:");
    df.clone().limit(0, Some(3))?.show().await?;

這個載入過程不僅完成了數據的讀取,還自動進行了 Schema 推斷。DataFusion 會分析數據內容來判斷每個欄位的數據類型,這帶來三個層面的好處:

  • 效能:
    正確的數據類型推斷能夠顯著提升查詢執行效率。當一個數值欄位被正確識別為整數類型而不是字串類型時,DataFusion 可以使用更高效的數值比較算法。
  • 記憶體使用效率:
    當數據類型被正確推斷時,DataFusion 可以選擇最適合的記憶體表示方式。
  • 安全性:
    正確的 Schema 推斷能夠在早期階段捕獲數據類型相關的錯誤

3. 查詢執行 - 兩種 API 的靈活運用

DataFusion 為我們提供了兩種查詢方式:SQL APIDataFrame API。在實際工作中,我們會根據具體場景選擇合適的API。讓我們通過實際例子來體驗這兩種方式的特點。

SQL API:直觀的聲明式查詢

昨天的例子中我們就是使用這種 API 來查詢資料,這邊就不多做介紹了。今天的例子是計算訂單的筆數和時間範圍。

    println!("\n🔍 階段一:基礎數據探索 (使用 SQL API)");

    // SQL API
    let total_orders = ctx.sql("SELECT COUNT(*) as total_orders FROM bookstore_sales").await?;
    println!("訂單總數:");
    total_orders.show().await?;

    let date_range = ctx.sql("
        SELECT
            MIN(order_date) as first_order,
            MAX(order_date) as last_order
        FROM bookstore_sales
    ").await?;
    println!("數據時間範圍:");
    date_range.show().await?;

DataFrame API:程式化的靈活構建

當我們需要動態構建查詢,或者希望有更好的型別安全性時,DataFrame API 就顯現出它的優勢了。與傳統資料庫的 ORM 類似,它將複雜的查詢邏輯包裝成鏈式的方法調用,讓我們可以動態地組合查詢條件。

讓我們用 DataFrame API 來分析不同小說類別銷售的狀況:

    println!("\n📊 階段二:業務指標分析 (使用 DataFrame API)");
		
		// 取得數據表的引用
    let sales_df = ctx.table("bookstore_sales").await?;

    // DataFrame API 的優勢:型別安全、可組合、易於測試
    let category_analysis = sales_df.clone()
        .aggregate(
            vec![col("category")], // 按類別分組
            vec![
                count(lit(1)).alias("order_count"),
                sum(col("quantity")).alias("books_sold"),
                (sum(col("price") * col("quantity"))).alias("revenue"),
                avg(col("price")).alias("avg_book_price")
            ]
        )?
        .sort(vec![col("revenue").sort(false, true)])? // 按營收降序排列
        ;

    println!("各類別銷售表現 (DataFrame API):");
    category_analysis.show().await?;

4. 結果處理 - 將洞察轉化為行動

分析的價值在於應用。無論使用哪種 API,DataFusion 都提供了一致的結果處理方式:


    println!("\n💡 階段四:結果處理與報告生成");

    // 生成最終的管理報告(使用SQL的簡潔性)
    let management_report = ctx.sql("
        SELECT
            '書店銷售報告' as report_title,
            COUNT(*) as total_orders,
            SUM(price * quantity) as total_revenue,
            AVG(price * quantity) as avg_order_value,
            SUM(quantity) as total_books_sold
        FROM bookstore_sales
    ").await?;

    // 結果處理:收集到記憶體進行後續使用
    let result_batches = management_report.collect().await?;

    println!("📋 管理報告已生成:");
    for batch in &result_batches {
        println!("   包含 {} 行結果", batch.num_rows());
    }

    // 顯示最終報告
    management_report.show().await?;

    // 展示DataFrame API結果的不同處理方式
    let category_summary = sales_df.clone()
        .aggregate(
            vec![col("category")],
            vec![count(lit(1)).alias("orders")]
        )?;

    // 結果處理方式一:直接顯示
    println!("\n📊 類別統計 (直接顯示):");
    category_summary.clone().show().await?;

    // 結果處理方式二:收集後處理
    let category_batches = category_summary.collect().await?;
    println!("\n📈 類別統計 (收集後處理):");
    println!("   共有 {} 個類別", category_batches[0].num_rows());

    println!("\n✨ 分析完成!工作流程總結:");
    println!("   1. ✅ 環境準備:SessionContext 配置完成");
    println!("   2. ✅ 數據載入:CSV 數據成功解析");
    println!("   3. ✅ 查詢執行:SQL 和 DataFrame API 協同工作");
    println!("   4. ✅ 結果處理:多種輸出方式靈活運用");

    Ok(())
}

歐耶~ 恭喜各位完成了今天的實作練習 (我也是XD)! 通過這個完整的工作流程,我們不僅學會了標準化的分析過程,更重要的是理解如何運用 DataFusion 的兩種 API。明天我們將學習如何處理多種數據格式,更靈活的運用 Datafusion!

參考資料


上一篇
Day 03: DataFusion 初體驗 - 建立第一個查詢
下一篇
Day 5: 多數據源整合實戰 - CSV、JSON、Parquet 混合分析
系列文
DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅9
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言