打給厚,昨天我們成功執行了第一個 CSV 查詢後,不過在實際工作場景下面臨的資料量和資料來源可沒那麼單純,所以今天我們就來建立一個完整且標準化的 DataFusion 工作流程。這個流程將成為我們後續學習各種高級功能的基礎框架,讓我們能夠系統性地處理各種數據分析任務。
在實際的數據分析工作中,我們很容易陷入「想到什麼查什麼」的混亂狀態。建立一個標準化的工作流程不僅能提高效率,更能幫助我們避免常見的錯誤和遺漏。
DataFusion 的典型工作模式可以總結為四個步驟:準備環境 → 載入數據 → 執行查詢 → 處理結果。讓我們通過一個實際例子來學習這個流程。
首先一樣先建立一個 rust 專案並設置 Cargo.toml
:
cargo new datafusion-demo2
cd datafusion-demo2
mkdir data
[package]
name = "datafusion-demo2"
version = "0.1.0"
edition = "2024"
[dependencies]
datafusion = "49.0.2"
tokio = { version = "1.0", features = ["rt-multi-thread", "macros"] }
假設我們需要分析一家書店的銷售數據,先在 data
目錄中新增 bookstore_sales.csv
:
order_id,book_title,author,category,price,quantity,order_date
1001,資料結構與演算法,張三,技術書籍,580.00,2,2024-01-15
1002,深度學習入門,李四,技術書籍,720.00,1,2024-01-16
1003,哈利波特,J.K.羅琳,奇幻小說,350.00,3,2024-01-16
1004,Python 程式設計,王五,技術書籍,490.00,1,2024-01-17
1005,三體,劉慈欣,科幻小說,420.00,2,2024-01-18
SessionContext 是整個 DataFusion 工作流程的指揮中心。在昨天的例子中我們使用了最簡單的 SessionContext::new()
,但在實際工作中,我們需要根據數據規模和分析需求來進行適當的配置。
use datafusion::functions_aggregate::expr_fn::{sum, count, avg};
use datafusion::prelude::*;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
// 基礎配置:適合小到中等數據集的日常分析
let ctx = SessionContext::new();
// 檢查基礎配置
println!("🚀 DataFusion 環境已準備完成");
println!(" 批次大小: {}", ctx.session_config().batch_size());
println!(" 目標分區數: {}", ctx.session_config().target_partitions());
本次的實作中特別列印出了兩個 SessionContext 的配置:
在不同數據量級下可以調整這些 SessionContext 配置優化查詢效能
數據載入是分析工作的起點。在昨天的例子中,我們直接使用了 register_csv
,但在實際工作中,我們需要更謹慎地處理數據載入過程:
// 載入書店銷售數據
println!("📚 載入書店銷售數據...");
ctx.register_csv(
"bookstore_sales", // 表名
"data/bookstore_sales.csv",
CsvReadOptions::new()
.has_header(true) // 指定文件包含標題行
.delimiter(b','), // 指定分隔符
)
.await?;
// 驗證數據載入結果
let df = ctx.table("bookstore_sales").await?;
let schema = df.schema();
println!("✅ 數據載入成功!");
println!(" 包含 {} 個欄位", schema.fields().len());
for field in schema.fields() {
println!(" - {}: {:?}", field.name(), field.data_type());
}
// 快速查看數據樣本
println!("\n📋 數據預覽:");
df.clone().limit(0, Some(3))?.show().await?;
這個載入過程不僅完成了數據的讀取,還自動進行了 Schema 推斷。DataFusion 會分析數據內容來判斷每個欄位的數據類型,這帶來三個層面的好處:
DataFusion 為我們提供了兩種查詢方式:SQL API 和 DataFrame API。在實際工作中,我們會根據具體場景選擇合適的API。讓我們通過實際例子來體驗這兩種方式的特點。
昨天的例子中我們就是使用這種 API 來查詢資料,這邊就不多做介紹了。今天的例子是計算訂單的筆數和時間範圍。
println!("\n🔍 階段一:基礎數據探索 (使用 SQL API)");
// SQL API
let total_orders = ctx.sql("SELECT COUNT(*) as total_orders FROM bookstore_sales").await?;
println!("訂單總數:");
total_orders.show().await?;
let date_range = ctx.sql("
SELECT
MIN(order_date) as first_order,
MAX(order_date) as last_order
FROM bookstore_sales
").await?;
println!("數據時間範圍:");
date_range.show().await?;
當我們需要動態構建查詢,或者希望有更好的型別安全性時,DataFrame API 就顯現出它的優勢了。與傳統資料庫的 ORM 類似,它將複雜的查詢邏輯包裝成鏈式的方法調用,讓我們可以動態地組合查詢條件。
讓我們用 DataFrame API 來分析不同小說類別銷售的狀況:
println!("\n📊 階段二:業務指標分析 (使用 DataFrame API)");
// 取得數據表的引用
let sales_df = ctx.table("bookstore_sales").await?;
// DataFrame API 的優勢:型別安全、可組合、易於測試
let category_analysis = sales_df.clone()
.aggregate(
vec![col("category")], // 按類別分組
vec![
count(lit(1)).alias("order_count"),
sum(col("quantity")).alias("books_sold"),
(sum(col("price") * col("quantity"))).alias("revenue"),
avg(col("price")).alias("avg_book_price")
]
)?
.sort(vec![col("revenue").sort(false, true)])? // 按營收降序排列
;
println!("各類別銷售表現 (DataFrame API):");
category_analysis.show().await?;
分析的價值在於應用。無論使用哪種 API,DataFusion 都提供了一致的結果處理方式:
println!("\n💡 階段四:結果處理與報告生成");
// 生成最終的管理報告(使用SQL的簡潔性)
let management_report = ctx.sql("
SELECT
'書店銷售報告' as report_title,
COUNT(*) as total_orders,
SUM(price * quantity) as total_revenue,
AVG(price * quantity) as avg_order_value,
SUM(quantity) as total_books_sold
FROM bookstore_sales
").await?;
// 結果處理:收集到記憶體進行後續使用
let result_batches = management_report.collect().await?;
println!("📋 管理報告已生成:");
for batch in &result_batches {
println!(" 包含 {} 行結果", batch.num_rows());
}
// 顯示最終報告
management_report.show().await?;
// 展示DataFrame API結果的不同處理方式
let category_summary = sales_df.clone()
.aggregate(
vec![col("category")],
vec![count(lit(1)).alias("orders")]
)?;
// 結果處理方式一:直接顯示
println!("\n📊 類別統計 (直接顯示):");
category_summary.clone().show().await?;
// 結果處理方式二:收集後處理
let category_batches = category_summary.collect().await?;
println!("\n📈 類別統計 (收集後處理):");
println!(" 共有 {} 個類別", category_batches[0].num_rows());
println!("\n✨ 分析完成!工作流程總結:");
println!(" 1. ✅ 環境準備:SessionContext 配置完成");
println!(" 2. ✅ 數據載入:CSV 數據成功解析");
println!(" 3. ✅ 查詢執行:SQL 和 DataFrame API 協同工作");
println!(" 4. ✅ 結果處理:多種輸出方式靈活運用");
Ok(())
}
歐耶~ 恭喜各位完成了今天的實作練習 (我也是XD)! 通過這個完整的工作流程,我們不僅學會了標準化的分析過程,更重要的是理解如何運用 DataFusion 的兩種 API。明天我們將學習如何處理多種數據格式,更靈活的運用 Datafusion!