iT邦幫忙

2025 iThome 鐵人賽

DAY 10
0
Rust

DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅系列 第 10

Day 10: 查詢執行生命週期 Part 2 - 從 SQL 到 logical plan

  • 分享至 

  • xImage
  •  

前言

嗨嗨,相信大家在昨天的文章中對 AST 和 logical plan 已經有了基礎的認識,今天就要來看看 DataFusion 是如何處理 SQL 語法的啦~ 讓我們開始吧!

查詢處理的分層架構

DataFusion 採用三層架構設計,將查詢處理分為使用者介面、查詢規劃和實際執行三個層次,每層都有對應的 Context 負責管理狀態和資源:

  1. SessionContext : 使用者介面層,提供高層次的 API,如 sql(), read_csv(), read_parquet() 等,同一個 SessionContext 下的查詢可以共享相同的配置和資源。

    #[derive(Clone)]
    pub struct SessionContext {
        /// UUID for the session
        session_id: String,
        /// Session start time
        session_start_time: DateTime<Utc>,
        /// Shared session state for the session
        state: Arc<RwLock<SessionState>>,
    }
    
  2. SessionState : 查詢計劃層, 包含查詢規劃所需的完整狀態,負責將 SQL 轉換為邏輯計劃和物理計劃,每個查詢都使用獨立的**SessionState**確保隔離性。

    #[derive(Clone)]
    pub struct SessionState {
    		// ...
        /// Responsible for optimizing a logical plan
        optimizer: Optimizer,
        /// Responsible for optimizing a physical execution plan
        physical_optimizers: PhysicalOptimizer,
        /// Responsible for planning `LogicalPlan`s, and `ExecutionPlan`
        query_planner: Arc<dyn QueryPlanner + Send + Sync>,
        /// Scalar functions that are registered with the context
        scalar_functions: HashMap<String, Arc<ScalarUDF>>,
        /// Session configuration
        config: SessionConfig,
        /// Runtime environment
        runtime_env: Arc<RuntimeEnv>,
        // ... 
    }
    
  3. TaskContext: 是執行層的輕量級上下文,只包含執行執行計畫 (ExecutionPlan) 所需的最小狀態,支援並行執行和記憶體管理。

    #[derive(Debug)]
    pub struct TaskContext {
        /// Session Id
        session_id: String,
        /// Optional Task Identify
        task_id: Option<String>,
        /// Session configuration
        session_config: SessionConfig,
        /// Scalar functions associated with this task context
        scalar_functions: HashMap<String, Arc<ScalarUDF>>,
        /// Aggregate functions registered in the context
        aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
        /// Window functions registered in the context
        window_functions: HashMap<String, Arc<WindowUDF>>,
        /// Runtime environment associated with this task context
        runtime: Arc<RuntimeEnv>,
    }
    

而今日重點— SQL → AST → logical plan 的流程大部分都會集中在 SessionState 實作的各種方法中呦!

SQL 解析流程

DataFusion 將 SQL 轉換為 logical plan 的過程分為三個階段:

1. 將 SQL 字串拆分成 tokens

首先創建 DFParserBuilder ,並調用一系列的方法初始化 DFParser 實例 (build),最後透過 DFParser 將 SQL 字串,轉換成 AST (Statement 物件)。

// datafusion/core/src/execution/session_states.rs
let mut statements = DFParserBuilder::new(sql)
        .with_dialect(dialect.as_ref())
        .with_recursion_limit(recursion_limit)
        .build()?
        .parse_statements()?;

觀察 build可以發現目標的 SQL 字串會先經過 Tokenizer 將關鍵字、識別符或運算符等字眼拆分成獨立的 token 放入 vector 並去除不需要的標點符號或空白符等。

// datafusion/sql/src/parser.rs

pub fn build(self) -> Result<DFParser<'a>, DataFusionError> {
    let mut tokenizer = Tokenizer::new(self.dialect, self.sql);
    // Convert TokenizerError -> ParserError
    let tokens = tokenizer
        .tokenize_with_location()
        .map_err(ParserError::from)?;

    Ok(DFParser {
        parser: Parser::new(self.dialect)
            .with_tokens_with_locations(tokens)
            .with_recursion_limit(self.recursion_limit),
        options: SqlParserOptions {
            recursion_limit: self.recursion_limit,
            ..Default::default()
        },
    })
}

2. Tokens 到 AST

隨後 parse_statements() 就會開始將 tokens 轉換成 AST 啦,一開始會用 peek_nth_token 偷看一下第一個 token 是甚麼,如果是關鍵字的話就會根據不同關鍵字有不同的處理方式:

  1. 標準 ANSI SQL(SELECT, INSERT, UPDATE 等) : 這類的關鍵字會交給第三方的 sqlparser-rs Crate處理
  2. DataFusion 專屬擴展語法(CREATE EXTERNAL TABLE, COPY TO 等): 顧名思義就是只有 DataFusion 才有的語法,只能自己實作解決囉。
pub fn parse_statement(&mut self) -> Result<Statement, DataFusionError> {
    match self.parser.peek_token().token {
        Token::Word(w) => {
            match w.keyword {
                Keyword::CREATE => {
                    self.parser.next_token(); // CREATE
                    self.parse_create()
                }
                Keyword::COPY => {
                    if let Token::Word(w) = self.parser.peek_nth_token(1).token {
                        // use native parser for COPY INTO
                        if w.keyword == Keyword::INTO {
                            return self.parse_and_handle_statement();
                        }
                    }
                    self.parser.next_token(); // COPY
                    self.parse_copy()
                }
                Keyword::EXPLAIN => {
                    self.parser.next_token(); // EXPLAIN
                    self.parse_explain()
                }
                _ => {
                    // use sqlparser-rs parser
                    self.parse_and_handle_statement()
                }
            }
        }
        _ => {
            // use the native parser
            self.parse_and_handle_statement()
        }
    }
}

3. AST 到 logical plan

parse_statement 完成後返回的 statements 便是解析完成的 AST,往外一層會發現我們剛剛觀察的範圍都在 create_logical_plan 內的 sql_to_statement,接著我們就繼續往下看 statement_to_plan

// datafusion/core/src/execution/session_states.rs

#[cfg(feature = "sql")]
    pub async fn create_logical_plan(
        &self,
        sql: &str,
    ) -> datafusion_common::Result<LogicalPlan> {
        let dialect = self.config.options().sql_parser.dialect.as_str();
        let statement = self.sql_to_statement(sql, dialect)?;
        let plan = self.statement_to_plan(statement).await?;
        Ok(plan)
    }

我們可以將 statement_to_plan 拆解成三個步驟:

Step1: 從 AST 中提取所有表格引用,為後續查找做準備**。**

// datafusion/core/src/execution/session_states.rs 

let references = self.resolve_table_references(&statement)?;

舉例來說假設查詢如下:

SELECT u.name, o.total 
FROM users u 
JOIN orders o ON u.id = o.user_id

就會提取出:["users", "orders"]

Step2: 建立 SessionContextProvider,並逐一解析和檢查表格中 Schema 和表格欄位的定義,此時如果發生表格或欄位不存在就會提早拋出錯誤。

 let mut provider = SessionContextProvider {
            state: self,
            tables: HashMap::with_capacity(references.len()),
        };

for reference in references {
    let resolved = self.resolve_table_ref(reference);
    if let Entry::Vacant(v) = provider.tables.entry(resolved) {
        let resolved = v.key();
        if let Ok(schema) = self.schema_for_ref(resolved.clone()) {
            if let Some(table) = schema.table(&resolved.table).await? {
                v.insert(provider_as_source(table));
            }
        }
    }
}

Step3: 基於 SessionContextProvider 創建 SqlToRelSqlToRel 是 DataFusion 的語義分析器,可以檢查 SQL 語法或語意是否符合規則,最後就會將 AST 轉換成 logical plan

let query = SqlToRel::new_with_options(&provider, self.get_parser_options());
        query.statement_to_plan(statement)

範例

DataFusion 提供了完整的範例程式碼,位於 datafusion-examples/examples/ :

  • sql_frontend.rs: 如何將 SQL 轉換為 logical plan
  • plan_to_sql.rs: 如何將 logical plan 反向轉換為 SQL
  • sql_analysis.rs: 如何分析 SQL 查詢結構

大家不妨實際執行這些範例:

git clone https://github.com/apache/datafusion
cd datafusion/datafusion-examples
cargo run --example sql_frontend

小結

今天我們揭開了 DataFusion 查詢執行的神秘面紗,通過閱讀實際原始碼理解了從 SQL 字串到 logical plan 的轉換機制。明天我們將探討 logical plan 優化,我們會看到 DataFusion 的優化器如何將初始計畫改寫為更高效的版本,我們拭目以待!

參考資料


上一篇
Day 9: 查詢執行生命週期 Part 1 - 從 SQL 到 LogicalPlan
下一篇
Day 11: 查詢執行生命週期 Part 3 - 優化與執行
系列文
DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅14
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言