打給厚,今天我們將深入學習如何運用 DataFusion 支援的進階 SQL 語法查詢資料,包括窗口函數、CTE(Common Table Expression)、子查詢等,以及如何在 SQL 和 DataFrame API 之間做出最佳選擇。是不遲疑,讓我們開始今日份的學習吧!
窗口函數是實現複雜查詢的強力工具之一,和傳統聚合相比窗口函數能在原始資料行的後面添加計算結果而不影響表格結構。函數結構如下:
function([expr])
OVER(
[PARTITION BY expr[, …]]
[ORDER BY expr [ ASC | DESC ][, …]]
[ frame_clause ]
)
其中:
DataFusion 支援的窗口函數非常多樣,這邊舉幾個常見的函數
聚合函數
基本上傳統聚合函數都可以轉換成窗口函數的形式計算
SUM() -- 累計總和或移動總和
AVG() -- 移動平均
COUNT() -- 累計計數
排名函數(Ranking Functions)
ROW_NUMBER() -- 連續編號:1,2,3,4...
RANK() -- 跳躍排名:1,2,2,4...(相同值同排名,下一個跳過)
DENSE_RANK() -- 密集排名:1,2,2,3...(相同值同排名,下一個連續)
偏移函數(Offset Functions)
LAG(column, n) -- 取前n行的值
LEAD(column, n) -- 取後n行的值
FIRST_VALUE() -- 窗口內第一個值
LAST_VALUE() -- 窗口內最後一個值
有時候我們需要從多個表當中提取數據時常常會使用子查詢,但為了簡化查詢語法並達成遞迴查詢,CTE 的寫法就派上用場了。CTE 的本質是命名的暫時結果集,它將複雜查詢分解成多個邏輯步驟,大幅提升可讀性和維護性。
CTE 的三大優勢
照慣例先建立我們的測試專案:
cargo new datafusion-demo4
cd datafusion-demo4
mkdir data
# Cargo.toml
[dependencies]
datafusion = "49.0.2"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
這次我們使用學生成績作為實作的資料,不過為了方便我們直接使用 DataFusion 創建一張學生成績表。主程式的結構和昨天的範例一樣,將這次要學習的內容拆分成多個輔助函數由主程式調用,後面則依序實做這些輔助函數。
use datafusion::prelude::*;
use datafusion::error::Result;
現在我們需要在 main 函數中調用所有新增的範例函數:
```rust
#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
// 創建簡單的學生成績數據
create_student_data(&ctx).await?;
// 1. 窗口函數基礎應用
basic_window_functions(&ctx).await?;
// 2. CTE 基礎應用
basic_cte_example(&ctx).await?;
// 5. 三種方法的實戰對比
comparison_example(&ctx).await?;
// 6. SQL vs DataFrame API 比較
sql_vs_dataframe_comparison(&ctx).await?;
// 7. 常見陷阱與最佳實務
common_pitfalls_and_best_practices(&ctx).await?;
Ok(())
}
async fn create_student_data(ctx: &SessionContext) -> Result<()> {
// 創建學生成績表 - 數據簡單易理解
let student_sql = r#"
CREATE MEMORY TABLE students AS VALUES
('張三', '數學', 85, '高一'),
('李四', '數學', 92, '高一'),
('王五', '數學', 78, '高一'),
('張三', '英語', 88, '高一'),
('李四', '英語', 96, '高一'),
('王五', '英語', 82, '高一'),
('張三', '物理', 90, '高一'),
('李四', '物理', 89, '高一'),
('王五', '物理', 76, '高一'),
('陳六', '數學', 94, '高二'),
('趙七', '數學', 87, '高二'),
('陳六', '英語', 91, '高二'),
('趙七', '英語', 85, '高二')
AS students(name, subject, score, grade)
"#;
ctx.sql(student_sql).await?.collect().await?;
println!("✓ 學生成績數據已創建");
Ok(())
}
讓我們從最簡單的範例開始理解窗口函數:
async fn basic_window_functions(ctx: &SessionContext) -> Result<()> {
println!("\n=== 窗口函數基礎應用 ===");
// 範例1:為每個學生的成績排名
println!("\n1. 學生成績排名:");
let ranking_query = r#"
SELECT
name,
subject,
score,
-- 全校排名(所有科目一起排)
ROW_NUMBER() OVER (ORDER BY score DESC) as overall_rank,
-- 科目內排名
ROW_NUMBER() OVER (PARTITION BY subject ORDER BY score DESC) as subject_rank
FROM students
ORDER BY subject, score DESC
"#;
let results = ctx.sql(ranking_query).await?.collect().await?;
datafusion::arrow::util::pretty::print_batches(&results)?;
println!("\n💡 講解:");
println!("- ROW_NUMBER() 為每一行分配唯一編號");
println!("- PARTITION BY subject:在每個科目內部重新開始編號");
println!("- ORDER BY score DESC:按分數降序排列");
Ok(())
}
async fn basic_cte_example(ctx: &SessionContext) -> Result<()> {
println!("\n=== CTE 基礎應用 ===");
// 範例1:使用 CTE 分步計算學生總分和平均分
println!("\n1. 學生總分和平均分計算:");
let basic_cte = r#"
WITH student_totals AS (
-- 第一步:計算每個學生的總分和科目數
SELECT
name,
COUNT(*) as subject_count,
SUM(score) as total_score,
ROUND(AVG(score), 1) as avg_score
FROM students
GROUP BY name
)
-- 第二步:基於第一步結果進行進一步分析
SELECT
name,
subject_count,
total_score,
avg_score,
CASE
WHEN avg_score >= 90 THEN '優秀'
WHEN avg_score >= 80 THEN '良好'
WHEN avg_score >= 70 THEN '中等'
ELSE '需要努力'
END as performance_level
FROM student_totals
ORDER BY avg_score DESC
"#;
let results = ctx.sql(basic_cte).await?.collect().await?;
datafusion::arrow::util::pretty::print_batches(&results)?;
println!("\n💡 講解:");
println!("- WITH student_totals AS (...): 定義一個名為 student_totals 的 CTE");
println!("- CTE 內部完成了複雜的聚合計算");
println!("- 主查詢基於 CTE 結果進行進一步的邏輯處理");
println!("- 這樣的分步處理比一個複雜的嵌套查詢更易讀");
Ok(())
}
DataFusion 使用經過實戰檢驗的基於排序的窗口函數實現,這個方法也被 PostgreSQL 和 Vertica 等成熟數據庫採用:
性能優化策略:
DataFusion 的查詢優化器對 CTE 有特殊的處理策略:
1. 內聯決策
// 簡單的 CTE 會被內聯
WITH simple_filter AS (SELECT * FROM table WHERE condition)
SELECT * FROM simple_filter;
// 等價於:
SELECT * FROM table WHERE condition;
2. 物化決策
// 複雜的 CTE 可能被物化
WITH complex_stats AS (
SELECT region, COUNT(*), AVG(amount), STDDEV(amount)
FROM large_table
GROUP BY region
)
SELECT * FROM complex_stats WHERE avg > 1000;
3. 下推優化
DataFusion 會嘗試將篩選條件下推到 CTE 內部,減少不必要的計算。
async fn explain_query_plans(ctx: &SessionContext) -> Result<()> {
println!("\n=== 查詢計劃分析 ===");
// 分析窗口函數的執行計劃
let explain_window = r#"
EXPLAIN SELECT
name,
score,
ROW_NUMBER() OVER (PARTITION BY subject ORDER BY score DESC) as rank
FROM students
"#;
let results = ctx.sql(explain_window).await?.collect().await?;
println!("窗口函數執行計劃:");
datafusion::arrow::util::pretty::print_batches(&results)?;
// 分析 CTE 的執行計劃
let explain_cte = r#"
EXPLAIN WITH ranked AS (
SELECT name, score,
ROW_NUMBER() OVER (ORDER BY score DESC) as rank
FROM students
)
SELECT * FROM ranked WHERE rank <= 3
"#;
let results = ctx.sql(explain_cte).await?.collect().await?;
println!("\nCTE 執行計劃:");
datafusion::arrow::util::pretty::print_batches(&results)?;
Ok(())
}
DataFusion 49.0 引入的動態過濾器技術特別適用於以下場景:
1. TOP-K 查詢優化
-- 這種查詢可以利用動態過濾器大幅提升性能
SELECT * FROM large_table
ORDER BY timestamp DESC
LIMIT 10;
2. 時間範圍查詢
當查詢涉及時間排序的 TOP-K 時,動態過濾器可以在掃描過程中逐步收緊過濾條件,跳過不必要的數據頁。
3. 大表 JOIN 優化
動態過濾器可以在 JOIN 過程中傳遞資訊,提前過濾不符合條件的數據。
特性 | SQL | DataFrame API |
---|---|---|
學習曲線 | 熟悉 SQL 即可上手 | 需要學習 DataFusion 特定 API |
表達能力 | 聲明式,直觀表達業務邏輯 | 程式化,適合複雜控制流程 |
型別安全 | 運行時檢查 | 編譯時檢查 |
動態構建 | 字符串拼接,易出錯 | 函數式構建,型別安全 |
除錯能力 | EXPLAIN 分析 | 逐步構建,易於除錯 |
性能 | 優化器充分優化 | 同樣受益於優化器 |
整合性 | 與 BI 工具無縫整合 | 與 Rust 生態深度整合 |
實際選擇建議:
今天我們深入探討了 DataFusion 中 SQL 查詢的進階技巧,通過簡潔易懂的學生成績範例全面理解了窗口函數和 CTE 的強大功能。