iT邦幫忙

2025 iThome 鐵人賽

DAY 5
1
Rust

DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅系列 第 5

Day 5: 多數據源整合實戰 - CSV、JSON、Parquet 混合分析

  • 分享至 

  • xImage
  •  

前言

打給厚,在前兩天的文章中相信各位已經瞭解 DataFusion 的基礎概念、環境建置和核心工作流程,今天我們要進入更實用的場景:如何在同一個查詢中整合多種不同格式的數據源,讓我們繼續看下去吧!

為什麼需要多格式整合?

現實世界的數據分析場景遠比單一格式複雜。以電商系統為例,訂單數據可能是 CSV 格式,產品目錄則使用 JSON 便於 API 交換,而大量的用戶行為數據則採用高效的 Parquet 格式 (parquet 會在後面的天數詳細介紹)。更進一步,這些數據可能分散在不同的存儲系統中,像是 AWS S3 等等。

當我們需要分析不同來源的數據,傳統的做法是先將所有數據轉換成統一格式,但這既費時又浪費存儲空間。因此,又到了 DataFusion 的工商時間啦 (喂,它能夠直接在不同格式間進行無縫整合查詢,讓我們可以用統一的 SQL 語法處理這種複雜的數據環境,提升了開發效率。

今天我們一樣透過實戰的方式帶大家使用 DataFusion 整合多種數據源。

實戰演練: 電商數據場景

首先一樣先建立一個 rust 專案並設置 Cargo.toml:

cargo new datafusion-demo3
cd datafusion-demo2
mkdir data
[package]
name = "datafusion-multi-source"
version = "0.1.0"
edition = "2024"

[dependencies]
datafusion = "49.0.0"
tokio = { version = "1.0", features = ["rt-multi-thread"] }
serde_json = "1.0"
object_store = "0.8"  # 支援 S3 等雲端存儲
aws-config = "1.0"    # AWS SDK 支援
aws-sdk-s3 = "1.0"    # S3 專用功能

準備不同格式的數據

假設我們經營一個線上商店,需要分析用戶購買行為來制定行銷策略。為此我們需要在 data 中創建三種不同格式的假資料:

  1. 訂單數據 (orders.csv)

    order_id,user_id,product_id,quantity,order_date,total_amount
    1001,101,2001,2,2024-01-15,299.98
    1002,102,2002,1,2024-01-16,149.99
    1003,101,2001,1,2024-01-17,149.99
    1004,103,2003,3,2024-01-18,899.97
    1005,102,2002,2,2024-01-19,299.98
    1006,104,2001,1,2024-01-20,149.99
    1007,105,2003,1,2024-01-21,299.99
    1008,103,2002,4,2024-01-22,599.96
    1009,106,2001,2,2024-01-23,299.98
    1010,104,2003,1,2024-01-24,299.99
    
  2. 產品目錄 (products.json) - NDJSON 格式

    {"product_id": 2001, "name": "無線藍牙耳機", "category": "電子產品", "price": 149.99, "supplier": "TechCorp", "stock": 150}
    {"product_id": 2002, "name": "智能藍牙音箱", "category": "電子產品", "price": 149.99, "supplier": "AudioTech", "stock": 85}
    {"product_id": 2003, "name": "智能運動手錶", "category": "穿戴裝置", "price": 299.99, "supplier": "WearableTech", "stock": 120}
    
  3. 用戶資料 (users.parquet) - 程式生成
    由於 Parquet 是二進位格式,無法直接用文字編輯器創建,因此在後面的實作中會展示如何用程式生成 parquet 資料。

主涵式結構

資料準備好後就可以開始實作啦! 這次的主涵式會調用三個輔助涵式,分別涵蓋了所有的程式邏輯,接下來就依序介紹他們吧!

use datafusion::prelude::*;
use datafusion::error::Result;
use datafusion::dataframe::DataFrameWriteOptions;

#[tokio::main]  
async fn main() -> Result<()> {
    let ctx = SessionContext::new();
    
    // 創建 Parquet 格式的用戶資料(因為是二進位格式需要程式生成)
    create_users_parquet().await?;
    
    // 載入所有格式的數據
    load_all_data_sources(&ctx).await?;
    
    // 執行跨格式分析
    run_cross_format_analysis(&ctx).await?;
    
    Ok(())
}

輔助涵式

create_users_parquet()

主要透過 Arrow 在記憶體中創建用戶行為資料,再透過 DataFusion 的 write_parquet 儲存成parquet 檔案

async fn create_users_parquet() -> Result<()> {
    use datafusion::arrow::array::*;
    use datafusion::arrow::datatypes::{DataType, Field, Schema};
    use datafusion::arrow::record_batch::RecordBatch;
    use std::sync::Arc;

    let schema = Arc::new(Schema::new(vec![
        Field::new("user_id", DataType::Int64, false),
        Field::new("name", DataType::Utf8, false),
        Field::new("age", DataType::Int32, false),
        Field::new("age_group", DataType::Utf8, false),
        Field::new("city", DataType::Utf8, false),
        Field::new("registration_date", DataType::Utf8, false),
    ]));

    let user_ids = Arc::new(Int64Array::from(vec![101, 102, 103, 104, 105, 106]));
    let names = Arc::new(StringArray::from(vec![
        "張小明", "李美麗", "王大華", "陳小芳", "林志強", "黃淑雯"
    ]));
    let ages = Arc::new(Int32Array::from(vec![25, 32, 28, 35, 22, 29]));
    let age_groups = Arc::new(StringArray::from(vec![
        "25-30", "30-35", "25-30", "30-35", "20-25", "25-30"
    ]));
    let cities = Arc::new(StringArray::from(vec![
        "台北", "高雄", "台中", "台南", "桃園", "新竹"
    ]));
    let reg_dates = Arc::new(StringArray::from(vec![
        "2023-01-15", "2023-02-20", "2023-03-10", "2023-04-05", "2023-05-12", "2023-06-18"
    ]));

    let batch = RecordBatch::try_new(
        schema.clone(),
        vec![user_ids as ArrayRef, names as ArrayRef, ages as ArrayRef,
             age_groups as ArrayRef, cities as ArrayRef, reg_dates as ArrayRef],
    )?;

    let ctx = SessionContext::new();
    let df = ctx.read_batch(batch)?;
    df.write_parquet("data/users.parquet", DataFrameWriteOptions::new(), None).await?;
    
    println!("✓ users.parquet 創建完成");
    Ok(())
}

load_all_data_sources

整合 csv, JSON 以及 parquet 數據源的註冊,其中在載入 csv 時特別設置了 schema_infer_max_records(1000)。這是一個重要的性能優化技巧。對於大型 CSV 文件,完整的 schema 推斷可能會很耗時,限制推斷記錄數可以顯著提升載入速度。

async fn load_all_data_sources(ctx: &SessionContext) -> Result<()> {
	// 1. 載入 CSV 格式的訂單數據
	println!("載入訂單數據 (CSV 格式)...");
	ctx.register_csv(
	    "orders", 
	    "data/orders.csv", 
	    CsvReadOptions::new()
	        .has_header(true)
	        .delimiter(b',')
	        .schema_infer_max_records(1000)
	).await?;
	
	// 2. 載入 NDJSON 格式的產品目錄
	println!("載入產品目錄 (NDJSON 格式)...");
	ctx.register_json(
	    "products",
	    "data/products.json",
	    NdJsonReadOptions::default()
	).await?;
	
	// 3. 載入 Parquet 格式的用戶資料
	println!("載入用戶資料 (Parquet 格式)...");
	ctx.register_parquet(
	    "users",
	    "data/users.parquet", 
	    ParquetReadOptions::default()
	).await?;
	
	println!("✓ 所有數據源載入完成\n");
	Ok(())
}	

cross_format_analysis

現在來實際進行查詢吧! 從範例中 SQL 語法可知,即使數據源不同,我們還是可以透過 JOIN 將資料結合進行行統一的查詢。 在底層,DataFusion 會將所有數據轉換為 Arrow 格式進行處理,確保了高效的記憶體使用和計算性能。

async fn cross_format_analysis(ctx: &SessionContext) -> Result<()> {
    // 跨格式 JOIN 查詢:整合 CSV、JSON、Parquet 三種格式
    let result = ctx.sql("
        SELECT 
            u.name as customer_name,
            p.name as product_name,
            p.category,
            o.quantity,
            o.total_amount,
            u.city
        FROM orders o
        JOIN users u ON o.user_id = u.user_id
        JOIN products p ON o.product_id = p.product_id
        WHERE o.order_date >= '2024-01-15'
        ORDER BY o.total_amount DESC
        LIMIT 10
    ").await?;
    
    println!("=== 跨格式聯合查詢結果 ===");
    result.show().await?;
    Ok(())
}

耶! 大功告成~ 實際執行程式後我們會看到:

✓ users.parquet 創建完成
載入訂單數據 (CSV 格式)...
載入產品目錄 (NDJSON 格式)...
載入用戶資料 (Parquet 格式)...
✓ 所有數據源載入完成

=== 跨格式聯合查詢結果 ===
+---------------+--------------+----------+----------+--------------+------+
| customer_name | product_name | category | quantity | total_amount | city |
+---------------+--------------+----------+----------+--------------+------+
| 王大華        | 智能運動手錶   | 穿戴裝置 | 3        | 899.97       | 台中  |
| 王大華        | 智能藍牙音箱   | 電子產品 | 4        | 599.96       | 台中  |
| 陳小芳        | 智能運動手錶   | 穿戴裝置 | 1        | 299.99       | 台南  |
| 林志強        | 智能運動手錶   | 穿戴裝置 | 1        | 299.99       | 桃園  |
| 張小明        | 無線藍牙耳機   | 電子產品 | 2        | 299.98       | 台北  |
| 黃淑雯        | 無線藍牙耳機   | 電子產品 | 2        | 299.98       | 新竹  |
| 李美麗        | 智能藍牙音箱   | 電子產品 | 2        | 299.98       | 高雄  |
| 李美麗        | 智能藍牙音箱   | 電子產品 | 1        | 149.99       | 高雄  |
| 陳小芳        | 無線藍牙耳機   | 電子產品 | 1        | 149.99       | 台南  |
| 張小明        | 無線藍牙耳機   | 電子產品 | 1        | 149.99       | 台北  |
+---------------+--------------+----------+----------+--------------+------+

恭喜大家完成今天的演練,也掌握了 DataFusion 多數據源整合的核心能力和實戰技巧,明天我們將探討 SQL 查詢進階技巧並接觸查詢執行計劃的解讀,明天見啦~

參考資料

  1. DataFusion 官方文檔 - 多格式支援介紹
  2. Apache DataFusion GitHub
  3. Delta Lake 與 DataFusion 整合最佳實務

上一篇
Day 04: DataFusion 核心工作模式 - 從數據到洞察的完整流程
下一篇
Day 6: SQL 查詢進階技巧 - 解決實際分析問題
系列文
DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅9
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言