打給厚,在前兩天的文章中相信各位已經瞭解 DataFusion 的基礎概念、環境建置和核心工作流程,今天我們要進入更實用的場景:如何在同一個查詢中整合多種不同格式的數據源,讓我們繼續看下去吧!
現實世界的數據分析場景遠比單一格式複雜。以電商系統為例,訂單數據可能是 CSV 格式,產品目錄則使用 JSON 便於 API 交換,而大量的用戶行為數據則採用高效的 Parquet 格式 (parquet 會在後面的天數詳細介紹)。更進一步,這些數據可能分散在不同的存儲系統中,像是 AWS S3 等等。
當我們需要分析不同來源的數據,傳統的做法是先將所有數據轉換成統一格式,但這既費時又浪費存儲空間。因此,又到了 DataFusion 的工商時間啦 (喂,它能夠直接在不同格式間進行無縫整合查詢,讓我們可以用統一的 SQL 語法處理這種複雜的數據環境,提升了開發效率。
今天我們一樣透過實戰的方式帶大家使用 DataFusion 整合多種數據源。
首先一樣先建立一個 rust 專案並設置 Cargo.toml
:
cargo new datafusion-demo3
cd datafusion-demo2
mkdir data
[package]
name = "datafusion-multi-source"
version = "0.1.0"
edition = "2024"
[dependencies]
datafusion = "49.0.0"
tokio = { version = "1.0", features = ["rt-multi-thread"] }
serde_json = "1.0"
object_store = "0.8" # 支援 S3 等雲端存儲
aws-config = "1.0" # AWS SDK 支援
aws-sdk-s3 = "1.0" # S3 專用功能
假設我們經營一個線上商店,需要分析用戶購買行為來制定行銷策略。為此我們需要在 data
中創建三種不同格式的假資料:
訂單數據 (orders.csv
)
order_id,user_id,product_id,quantity,order_date,total_amount
1001,101,2001,2,2024-01-15,299.98
1002,102,2002,1,2024-01-16,149.99
1003,101,2001,1,2024-01-17,149.99
1004,103,2003,3,2024-01-18,899.97
1005,102,2002,2,2024-01-19,299.98
1006,104,2001,1,2024-01-20,149.99
1007,105,2003,1,2024-01-21,299.99
1008,103,2002,4,2024-01-22,599.96
1009,106,2001,2,2024-01-23,299.98
1010,104,2003,1,2024-01-24,299.99
產品目錄 (products.json
) - NDJSON 格式
{"product_id": 2001, "name": "無線藍牙耳機", "category": "電子產品", "price": 149.99, "supplier": "TechCorp", "stock": 150}
{"product_id": 2002, "name": "智能藍牙音箱", "category": "電子產品", "price": 149.99, "supplier": "AudioTech", "stock": 85}
{"product_id": 2003, "name": "智能運動手錶", "category": "穿戴裝置", "price": 299.99, "supplier": "WearableTech", "stock": 120}
用戶資料 (users.parquet
) - 程式生成
由於 Parquet 是二進位格式,無法直接用文字編輯器創建,因此在後面的實作中會展示如何用程式生成 parquet 資料。
資料準備好後就可以開始實作啦! 這次的主涵式會調用三個輔助涵式,分別涵蓋了所有的程式邏輯,接下來就依序介紹他們吧!
use datafusion::prelude::*;
use datafusion::error::Result;
use datafusion::dataframe::DataFrameWriteOptions;
#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
// 創建 Parquet 格式的用戶資料(因為是二進位格式需要程式生成)
create_users_parquet().await?;
// 載入所有格式的數據
load_all_data_sources(&ctx).await?;
// 執行跨格式分析
run_cross_format_analysis(&ctx).await?;
Ok(())
}
create_users_parquet()
主要透過 Arrow 在記憶體中創建用戶行為資料,再透過 DataFusion 的 write_parquet
儲存成parquet 檔案
async fn create_users_parquet() -> Result<()> {
use datafusion::arrow::array::*;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use std::sync::Arc;
let schema = Arc::new(Schema::new(vec![
Field::new("user_id", DataType::Int64, false),
Field::new("name", DataType::Utf8, false),
Field::new("age", DataType::Int32, false),
Field::new("age_group", DataType::Utf8, false),
Field::new("city", DataType::Utf8, false),
Field::new("registration_date", DataType::Utf8, false),
]));
let user_ids = Arc::new(Int64Array::from(vec![101, 102, 103, 104, 105, 106]));
let names = Arc::new(StringArray::from(vec![
"張小明", "李美麗", "王大華", "陳小芳", "林志強", "黃淑雯"
]));
let ages = Arc::new(Int32Array::from(vec![25, 32, 28, 35, 22, 29]));
let age_groups = Arc::new(StringArray::from(vec![
"25-30", "30-35", "25-30", "30-35", "20-25", "25-30"
]));
let cities = Arc::new(StringArray::from(vec![
"台北", "高雄", "台中", "台南", "桃園", "新竹"
]));
let reg_dates = Arc::new(StringArray::from(vec![
"2023-01-15", "2023-02-20", "2023-03-10", "2023-04-05", "2023-05-12", "2023-06-18"
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![user_ids as ArrayRef, names as ArrayRef, ages as ArrayRef,
age_groups as ArrayRef, cities as ArrayRef, reg_dates as ArrayRef],
)?;
let ctx = SessionContext::new();
let df = ctx.read_batch(batch)?;
df.write_parquet("data/users.parquet", DataFrameWriteOptions::new(), None).await?;
println!("✓ users.parquet 創建完成");
Ok(())
}
load_all_data_sources
整合 csv, JSON 以及 parquet 數據源的註冊,其中在載入 csv 時特別設置了 schema_infer_max_records(1000)
。這是一個重要的性能優化技巧。對於大型 CSV 文件,完整的 schema 推斷可能會很耗時,限制推斷記錄數可以顯著提升載入速度。
async fn load_all_data_sources(ctx: &SessionContext) -> Result<()> {
// 1. 載入 CSV 格式的訂單數據
println!("載入訂單數據 (CSV 格式)...");
ctx.register_csv(
"orders",
"data/orders.csv",
CsvReadOptions::new()
.has_header(true)
.delimiter(b',')
.schema_infer_max_records(1000)
).await?;
// 2. 載入 NDJSON 格式的產品目錄
println!("載入產品目錄 (NDJSON 格式)...");
ctx.register_json(
"products",
"data/products.json",
NdJsonReadOptions::default()
).await?;
// 3. 載入 Parquet 格式的用戶資料
println!("載入用戶資料 (Parquet 格式)...");
ctx.register_parquet(
"users",
"data/users.parquet",
ParquetReadOptions::default()
).await?;
println!("✓ 所有數據源載入完成\n");
Ok(())
}
cross_format_analysis
現在來實際進行查詢吧! 從範例中 SQL 語法可知,即使數據源不同,我們還是可以透過 JOIN
將資料結合進行行統一的查詢。 在底層,DataFusion 會將所有數據轉換為 Arrow 格式進行處理,確保了高效的記憶體使用和計算性能。
async fn cross_format_analysis(ctx: &SessionContext) -> Result<()> {
// 跨格式 JOIN 查詢:整合 CSV、JSON、Parquet 三種格式
let result = ctx.sql("
SELECT
u.name as customer_name,
p.name as product_name,
p.category,
o.quantity,
o.total_amount,
u.city
FROM orders o
JOIN users u ON o.user_id = u.user_id
JOIN products p ON o.product_id = p.product_id
WHERE o.order_date >= '2024-01-15'
ORDER BY o.total_amount DESC
LIMIT 10
").await?;
println!("=== 跨格式聯合查詢結果 ===");
result.show().await?;
Ok(())
}
耶! 大功告成~ 實際執行程式後我們會看到:
✓ users.parquet 創建完成
載入訂單數據 (CSV 格式)...
載入產品目錄 (NDJSON 格式)...
載入用戶資料 (Parquet 格式)...
✓ 所有數據源載入完成
=== 跨格式聯合查詢結果 ===
+---------------+--------------+----------+----------+--------------+------+
| customer_name | product_name | category | quantity | total_amount | city |
+---------------+--------------+----------+----------+--------------+------+
| 王大華 | 智能運動手錶 | 穿戴裝置 | 3 | 899.97 | 台中 |
| 王大華 | 智能藍牙音箱 | 電子產品 | 4 | 599.96 | 台中 |
| 陳小芳 | 智能運動手錶 | 穿戴裝置 | 1 | 299.99 | 台南 |
| 林志強 | 智能運動手錶 | 穿戴裝置 | 1 | 299.99 | 桃園 |
| 張小明 | 無線藍牙耳機 | 電子產品 | 2 | 299.98 | 台北 |
| 黃淑雯 | 無線藍牙耳機 | 電子產品 | 2 | 299.98 | 新竹 |
| 李美麗 | 智能藍牙音箱 | 電子產品 | 2 | 299.98 | 高雄 |
| 李美麗 | 智能藍牙音箱 | 電子產品 | 1 | 149.99 | 高雄 |
| 陳小芳 | 無線藍牙耳機 | 電子產品 | 1 | 149.99 | 台南 |
| 張小明 | 無線藍牙耳機 | 電子產品 | 1 | 149.99 | 台北 |
+---------------+--------------+----------+----------+--------------+------+
恭喜大家完成今天的演練,也掌握了 DataFusion 多數據源整合的核心能力和實戰技巧,明天我們將探討 SQL 查詢進階技巧並接觸查詢執行計劃的解讀,明天見啦~