嗨嗨,相信大家在昨天的文章中對 AST 和 logical plan 已經有了基礎的認識,今天就要來看看 DataFusion 是如何處理 SQL 語法的啦~ 讓我們開始吧!
DataFusion 採用三層架構設計,將查詢處理分為使用者介面、查詢規劃和實際執行三個層次,每層都有對應的 Context 負責管理狀態和資源:
SessionContext
: 使用者介面層,提供高層次的 API,如 sql()
, read_csv()
, read_parquet()
等,同一個 SessionContext
下的查詢可以共享相同的配置和資源。
#[derive(Clone)]
pub struct SessionContext {
/// UUID for the session
session_id: String,
/// Session start time
session_start_time: DateTime<Utc>,
/// Shared session state for the session
state: Arc<RwLock<SessionState>>,
}
SessionState
: 查詢計劃層, 包含查詢規劃所需的完整狀態,負責將 SQL 轉換為邏輯計劃和物理計劃,每個查詢都使用獨立的**SessionState
**確保隔離性。
#[derive(Clone)]
pub struct SessionState {
// ...
/// Responsible for optimizing a logical plan
optimizer: Optimizer,
/// Responsible for optimizing a physical execution plan
physical_optimizers: PhysicalOptimizer,
/// Responsible for planning `LogicalPlan`s, and `ExecutionPlan`
query_planner: Arc<dyn QueryPlanner + Send + Sync>,
/// Scalar functions that are registered with the context
scalar_functions: HashMap<String, Arc<ScalarUDF>>,
/// Session configuration
config: SessionConfig,
/// Runtime environment
runtime_env: Arc<RuntimeEnv>,
// ...
}
TaskContext
: 是執行層的輕量級上下文,只包含執行執行計畫 (ExecutionPlan) 所需的最小狀態,支援並行執行和記憶體管理。
#[derive(Debug)]
pub struct TaskContext {
/// Session Id
session_id: String,
/// Optional Task Identify
task_id: Option<String>,
/// Session configuration
session_config: SessionConfig,
/// Scalar functions associated with this task context
scalar_functions: HashMap<String, Arc<ScalarUDF>>,
/// Aggregate functions registered in the context
aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
/// Window functions registered in the context
window_functions: HashMap<String, Arc<WindowUDF>>,
/// Runtime environment associated with this task context
runtime: Arc<RuntimeEnv>,
}
而今日重點— SQL → AST → logical plan 的流程大部分都會集中在 SessionState
實作的各種方法中呦!
DataFusion 將 SQL 轉換為 logical plan 的過程分為三個階段:
首先創建 DFParserBuilder
,並調用一系列的方法初始化 DFParser
實例 (build
),最後透過 DFParser
將 SQL 字串,轉換成 AST (Statement
物件)。
// datafusion/core/src/execution/session_states.rs
let mut statements = DFParserBuilder::new(sql)
.with_dialect(dialect.as_ref())
.with_recursion_limit(recursion_limit)
.build()?
.parse_statements()?;
觀察 build
可以發現目標的 SQL 字串會先經過 Tokenizer
將關鍵字、識別符或運算符等字眼拆分成獨立的 token 放入 vector 並去除不需要的標點符號或空白符等。
// datafusion/sql/src/parser.rs
pub fn build(self) -> Result<DFParser<'a>, DataFusionError> {
let mut tokenizer = Tokenizer::new(self.dialect, self.sql);
// Convert TokenizerError -> ParserError
let tokens = tokenizer
.tokenize_with_location()
.map_err(ParserError::from)?;
Ok(DFParser {
parser: Parser::new(self.dialect)
.with_tokens_with_locations(tokens)
.with_recursion_limit(self.recursion_limit),
options: SqlParserOptions {
recursion_limit: self.recursion_limit,
..Default::default()
},
})
}
隨後 parse_statements()
就會開始將 tokens 轉換成 AST 啦,一開始會用 peek_nth_token
偷看一下第一個 token 是甚麼,如果是關鍵字的話就會根據不同關鍵字有不同的處理方式:
pub fn parse_statement(&mut self) -> Result<Statement, DataFusionError> {
match self.parser.peek_token().token {
Token::Word(w) => {
match w.keyword {
Keyword::CREATE => {
self.parser.next_token(); // CREATE
self.parse_create()
}
Keyword::COPY => {
if let Token::Word(w) = self.parser.peek_nth_token(1).token {
// use native parser for COPY INTO
if w.keyword == Keyword::INTO {
return self.parse_and_handle_statement();
}
}
self.parser.next_token(); // COPY
self.parse_copy()
}
Keyword::EXPLAIN => {
self.parser.next_token(); // EXPLAIN
self.parse_explain()
}
_ => {
// use sqlparser-rs parser
self.parse_and_handle_statement()
}
}
}
_ => {
// use the native parser
self.parse_and_handle_statement()
}
}
}
parse_statement
完成後返回的 statements 便是解析完成的 AST,往外一層會發現我們剛剛觀察的範圍都在 create_logical_plan
內的 sql_to_statement
,接著我們就繼續往下看 statement_to_plan
// datafusion/core/src/execution/session_states.rs
#[cfg(feature = "sql")]
pub async fn create_logical_plan(
&self,
sql: &str,
) -> datafusion_common::Result<LogicalPlan> {
let dialect = self.config.options().sql_parser.dialect.as_str();
let statement = self.sql_to_statement(sql, dialect)?;
let plan = self.statement_to_plan(statement).await?;
Ok(plan)
}
我們可以將 statement_to_plan
拆解成三個步驟:
Step1: 從 AST 中提取所有表格引用,為後續查找做準備**。**
// datafusion/core/src/execution/session_states.rs
let references = self.resolve_table_references(&statement)?;
舉例來說假設查詢如下:
SELECT u.name, o.total
FROM users u
JOIN orders o ON u.id = o.user_id
就會提取出:["users", "orders"]
Step2: 建立 SessionContextProvider
,並逐一解析和檢查表格中 Schema 和表格欄位的定義,此時如果發生表格或欄位不存在就會提早拋出錯誤。
let mut provider = SessionContextProvider {
state: self,
tables: HashMap::with_capacity(references.len()),
};
for reference in references {
let resolved = self.resolve_table_ref(reference);
if let Entry::Vacant(v) = provider.tables.entry(resolved) {
let resolved = v.key();
if let Ok(schema) = self.schema_for_ref(resolved.clone()) {
if let Some(table) = schema.table(&resolved.table).await? {
v.insert(provider_as_source(table));
}
}
}
}
Step3: 基於 SessionContextProvider
創建 SqlToRel
,SqlToRel
是 DataFusion 的語義分析器,可以檢查 SQL 語法或語意是否符合規則,最後就會將 AST 轉換成 logical plan
let query = SqlToRel::new_with_options(&provider, self.get_parser_options());
query.statement_to_plan(statement)
DataFusion 提供了完整的範例程式碼,位於 datafusion-examples/examples/
:
sql_frontend.rs
: 如何將 SQL 轉換為 logical planplan_to_sql.rs
: 如何將 logical plan 反向轉換為 SQLsql_analysis.rs
: 如何分析 SQL 查詢結構大家不妨實際執行這些範例:
git clone https://github.com/apache/datafusion
cd datafusion/datafusion-examples
cargo run --example sql_frontend
今天我們揭開了 DataFusion 查詢執行的神秘面紗,通過閱讀實際原始碼理解了從 SQL 字串到 logical plan 的轉換機制。明天我們將探討 logical plan 優化,我們會看到 DataFusion 的優化器如何將初始計畫改寫為更高效的版本,我們拭目以待!