在昨天的文章中,我們探討了 ExecutionPlan
trait 的核心設計,了解了 schema()
、properties()
和 execute()
等關鍵方法。今天我們要深入探討 execute()
方法背後的 Stream 執行模型。
今天的學習目標是:
SendableRecordBatchStream
的設計與作用RecordBatch
在執行計劃間的流動過程TaskContext
在執行期間提供的支援這些概念是理解 DataFusion 查詢執行引擎的關鍵,也是實現自訂執行算子的基礎。
在 DataFusion 中,查詢執行的結果不是一次性返回所有數據,而是通過一個異步流(Stream)逐批次地產生。這個流的型別就是 SendableRecordBatchStream
:
// datafusion/execution/src/stream.rs
pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;
這個型別定義看似簡單,但包含了幾個重要的設計考量:
1. RecordBatchStream Trait
RecordBatchStream
是一個擴展了 Rust 標準 Stream
trait 的特化 trait:
/// Trait for types that stream RecordBatch
pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
/// Returns the schema of this RecordBatchStream.
fn schema(&self) -> SchemaRef;
}
它的核心特點是:
Stream<Item = Result<RecordBatch>>
:每次從流中取出一個 RecordBatch
,或遇到錯誤schema()
方法:即使流中還沒有數據,也能提前知道數據的結構(Schema)RecordBatch
必須符合同一個 Schema這種設計讓下游算子能夠在開始處理數據前就規劃好記憶體分配和轉換邏輯,而不需要等待第一筆數據到達。
2. Pin 與 Box 的組合
Pin<Box<...>>
的組合提供了:
3. Send Trait Bound
Send
trait bound 表示這個流可以安全地在執行緒間傳遞,這是 DataFusion 實現並行執行的基礎。每個分區(partition)的執行可以在不同的執行緒或異步任務中進行。
在實際實作中,DataFusion 提供了 RecordBatchStreamAdapter
來簡化流的創建:
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
use futures::stream;
// 創建一個簡單的流
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
]));
let batches = vec![
RecordBatch::try_new(schema.clone(), vec![...])?,
];
// 將 Vec<RecordBatch> 轉換為流
let stream = stream::iter(batches.into_iter().map(Ok));
let adapter = RecordBatchStreamAdapter::new(schema, stream);
let sendable_stream: SendableRecordBatchStream = Box::pin(adapter);
這個 adapter 封裝了實作 RecordBatchStream
trait 所需的樣板代碼,讓實作者只需專注於數據生成邏輯。
歷史背景
Volcano 模型由 Goetz Graefe 在 1990 年的論文 "Volcano - An Extensible and Parallel Query Evaluation System" 中提出,是資料庫查詢執行引擎設計的里程碑。這個模型奠定了現代資料庫系統的基礎架構,包括 PostgreSQL、MySQL、Apache Spark 和 DataFusion 等都受其影響。
核心思想
Volcano 模型將查詢執行計劃組織成一棵算子樹,每個算子都實作統一的介面:
trait Operator {
fn open(); // 初始化算子
fn next() -> Tuple; // 獲取下一個元組
fn close(); // 清理資源
}
關鍵特點:
next()
方法產生數據next()
被調用時才進行計算在查詢執行引擎中,有兩種主要的數據流動範式:
Pull-based(拉取式)- Volcano 模型
┌─────────────────┐
│ Parent Op │ ──┐
│ (consumer) │ │ 1. "我需要數據"
└────────┬────────┘ │ parent.next()
│ │
│ 2. 調用 │
↓ │
┌─────────────────┐ │
│ Child Op │ ──┘ 3. "這是數據"
│ (producer) │ return tuple
└─────────────────┘
特點:父算子主動拉取數據,控制權在消費方。
Push-based(推送式)
┌─────────────────┐
│ Child Op │ ──┐ 1. "這是數據"
│ (producer) │ │ child.produce()
└────────┬────────┘ │
│ │
│ 2. 推送 │
↓ │
┌─────────────────┐ │
│ Parent Op │ ──┘ 3. "已接收"
│ (consumer) │ parent.consume()
└─────────────────┘
特點:子算子主動推送數據,控制權在生產方。
特性 | Pull-based (Volcano) | Push-based |
---|---|---|
控制流 | 從上到下(父節點控制) | 從下到上(子節點控制) |
函數調用 | parent → child.next() |
child → parent.consume() |
背壓處理 | 自然支援(不拉取就不產生) | 需要額外機制(緩衝隊列) |
記憶體使用 | 按需分配,易於控制 | 可能需要大量緩衝區 |
LIMIT 優化 | 容易(停止拉取即可) | 困難(已產生的數據難以停止) |
管道並行 | 較難(受迭代器限制) | 較易(可並行推送) |
快取友好性 | 一般 | 較好(批次處理) |
實作複雜度 | 簡單直觀 | 較複雜(需要狀態管理) |
假設查詢:SELECT name FROM users WHERE age > 18 LIMIT 10;
Pull-based 執行流程:
// pseudocode
impl LimitExec {
fn next() -> Option<Tuple> {
if self.count >= 10 {
return None; // 已達 LIMIT,停止拉取
}
let tuple = self.input.next()?; // 向下拉取
self.count += 1;
Some(tuple)
}
}
// 優勢:一旦達到 LIMIT,立即停止
// FilterExec 和 TableScan 不會被繼續調用
Push-based 執行流程:
// pseudocode
impl TableScan {
fn produce() {
for tuple in self.data {
if !self.parent.consume(tuple) {
break; // 需要檢查父節點是否還需要數據
}
}
}
}
// 劣勢:需要額外的信號機制來停止生產
在 DataFusion 中,Volcano 模型的體現為:
┌─────────────────────────────────┐
│ ProjectionExec │
│ (需要數據時呼叫 poll_next) │
└──────────────┬──────────────────┘
│ poll_next()
↓
┌──────────────────────────────────┐
│ FilterExec │
│ (從輸入拉取,過濾後返回) │
└──────────────┬───────────────────┘
│ poll_next()
↓
┌──────────────────────────────────┐
│ TableScan │
│ (從存儲讀取數據) │
└──────────────────────────────────┘
DataFusion 選擇 Pull-based Volcano 模型基於以下考量:
1. 自然的背壓機制(Backpressure)
這是 Pull-based 最大的優勢。在處理大規模數據時:
情境:快速生產 vs 慢速消費
Pull-based:
TableScan (快) ──X──> Filter (慢) ──X──> Consumer (很慢)
↑ 不被調用 ↑ 調用次數少
不產生數據 只在需要時拉取
Push-based:
TableScan (快) ──→→→→ Buffer ──→ Filter (慢) ──→ Consumer (很慢)
不斷推送 ↑ 記憶體爆炸風險
需要大量緩衝區
實際影響:
2. 延遲計算與短路優化(Lazy Evaluation & Short-circuit)
考慮這些常見查詢:
-- 場景 A: LIMIT 查詢
SELECT * FROM billion_rows_table LIMIT 10;
-- 場景 B: 提前退出
SELECT * FROM users WHERE username = 'admin'; -- 唯一索引
Pull-based 的優勢:
// LimitExec 達到 10 行後
fn poll_next(...) -> Poll<Option<RecordBatch>> {
if self.produced >= self.limit {
return Poll::Ready(None); // 直接結束
}
// 不再調用 input.poll_next()
// TableScan 自動停止讀取
}
Push-based 則需要:
3. 與 Rust 異步生態的完美結合
Rust 的 Stream
trait 天生就是 pull-based:
pub trait Stream {
type Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Self::Item>>;
}
這意味著:
futures
crate 的豐富工具鏈(map
, filter
, try_flatten
等)4. 簡化的錯誤處理
Pull-based 的錯誤傳播非常直觀:
// 子算子發生錯誤
fn poll_next(...) -> Poll<Option<Result<RecordBatch>>> {
match self.read_data() {
Err(e) => return Poll::Ready(Some(Err(e))), // 立即返回錯誤
Ok(data) => // ...
}
}
// 父算子接收錯誤
match ready!(self.input.poll_next_unpin(cx)) {
Some(Err(e)) => return Poll::Ready(Some(Err(e))), // 向上傳播
// ...
}
錯誤會自然地向上傳播,整個查詢會 fail-fast。
Push-based 則需要:
5. 除錯與性能分析更容易
Pull-based 的調用棧清晰可追蹤:
LimitExec::poll_next()
└─> ProjectionExec::poll_next()
└─> FilterExec::poll_next()
└─> TableScan::poll_next()
可以輕鬆使用 Rust 的 profiler(如 cargo flamegraph
)來分析性能瓶頸。
儘管 DataFusion 選擇了 Pull-based,Push-based 在某些場景下仍有優勢:
混合模型
一些現代系統採用混合策略:
讓我們看一個真實的例子,了解 Pull-based 模型如何運作:
// datafusion/physical-plan/src/filter.rs
impl ExecutionPlan for FilterExec {
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
// 1. 先執行輸入算子,獲得輸入流
let input_stream = self.input.execute(partition, context)?;
// 2. 創建 FilterExecStream,包裝輸入流
Ok(Box::pin(FilterExecStream {
schema: self.schema(),
predicate: Arc::clone(&self.predicate),
input: input_stream, // 保存輸入流的引用
baseline_metrics,
projection: self.projection.clone(),
}))
}
}
注意這裡的關鍵點:
execute()
方法本身不是異步的,它只是設置好流的結構並立即返回在 Rust 的異步系統中,Stream
trait 的核心是 poll_next()
方法。每次父算子需要數據時,會呼叫子算子流的 poll_next()
:
pub trait Stream {
type Item;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>>;
}
Poll
是一個列舉,表示異步操作的狀態:
Poll::Ready(Some(batch))
:成功產生一個 RecordBatch
Poll::Ready(None)
:流已結束,沒有更多數據Poll::Pending
:數據尚未就緒,需要等待(例如等待 I/O)讓我們看一個實際的 poll_next
實作,了解數據如何在算子間流動:
// datafusion/physical-plan/src/coalesce_batches.rs
impl Stream for CoalesceBatchesStream {
type Item = Result<RecordBatch>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
loop {
match &self.inner_state {
CoalesceBatchesStreamState::Pull => {
// 1. 向輸入流請求下一個 batch
let input_batch = ready!(self.input.poll_next_unpin(cx));
match input_batch {
Some(Ok(batch)) => {
// 2. 將 batch 加入合併緩衝區
match self.coalescer.push_batch(batch) {
CoalescerState::Continue => {
// 繼續拉取更多 batch
continue;
}
CoalescerState::TargetReached => {
// 緩衝區已滿,準備返回
self.inner_state =
CoalesceBatchesStreamState::ReturnBuffer;
}
CoalescerState::LimitReached => {
self.inner_state =
CoalesceBatchesStreamState::Exhausted;
}
}
}
None => {
// 輸入流已結束
self.inner_state =
CoalesceBatchesStreamState::Exhausted;
}
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
}
}
CoalesceBatchesStreamState::ReturnBuffer => {
// 3. 返回合併後的 batch
match self.coalescer.finish()? {
Some(batch) => {
self.inner_state = CoalesceBatchesStreamState::Pull;
return Poll::Ready(Some(Ok(batch)));
}
None => {
self.inner_state = CoalesceBatchesStreamState::Pull;
}
}
}
CoalesceBatchesStreamState::Exhausted => {
// 4. 返回最後的緩衝數據
return Poll::Ready(self.coalescer.finish()?.map(Ok));
}
}
}
}
}
這個範例展示了幾個重要的模式:
Pull
/ ReturnBuffer
/ Exhausted
)管理執行流程self.input.poll_next_unpin(cx)
向上游請求數據讓我們追蹤一個查詢中 RecordBatch
的完整流動路徑:
SELECT name FROM users WHERE age > 18 LIMIT 10;
執行計劃可能是:
LimitExec: fetch=10
ProjectionExec: [name]
FilterExec: age > 18
TableScan: users
數據流動過程:
1. 用戶呼叫 stream.next().await
│
↓
2. LimitExec.poll_next()
│ 判斷是否已滿足 LIMIT
↓ 若未滿足,向下游請求
3. ProjectionExec.poll_next()
│ 向下游請求完整的 RecordBatch
↓
4. FilterExec.poll_next()
│ 向下游請求原始數據
↓
5. TableScan.poll_next()
│ 從存儲讀取數據(可能是 Parquet 檔案)
│ 返回 Poll::Ready(Some(batch_raw))
↑
6. FilterExec 接收到 batch_raw
│ 應用過濾條件 age > 18
│ 返回 Poll::Ready(Some(batch_filtered))
↑
7. ProjectionExec 接收到 batch_filtered
│ 只保留 name 列
│ 返回 Poll::Ready(Some(batch_projected))
↑
8. LimitExec 接收到 batch_projected
│ 檢查行數,可能截斷 batch
│ 返回 Poll::Ready(Some(batch_limited))
↑
9. 最終結果返回給用戶
整個過程中,數據是逐批次流動的,不需要將整個結果集載入記憶體。
在 Day 10 的文章中,我們介紹過 DataFusion 的三層架構(SessionContext → SessionState → TaskContext),並了解到 TaskContext
是執行層的輕量級上下文。今天我們要從執行期間的實際應用角度,深入探討 TaskContext
如何支撐查詢的運行。
每次呼叫 execute()
方法時,都會傳入一個 Arc<TaskContext>
,它在執行期間扮演著關鍵的資源管理者角色。讓我們看看它在實際執行中提供哪些具體支援:
1. 記憶體池管理(Memory Pool)
let memory_pool = context.memory_pool();
// 嘗試分配記憶體
memory_pool.try_grow(required_size)?;
記憶體池確保單一查詢不會耗盡系統記憶體,並在必要時觸發 spilling(將數據寫入磁碟)。
2. 提供函數註冊表
當執行計劃中包含 UDF(User-Defined Function)時,可以通過 TaskContext
查找函數實作:
let scalar_func = context.scalar_functions()
.get("my_custom_func")
.ok_or_else(|| DataFusionError::Plan("Function not found".to_string()))?;
3. 訪問運行時環境(RuntimeEnv)
let runtime = context.runtime_env();
// 獲取對象存儲、磁碟管理器等
let object_store = runtime.object_store(...)?;
這使得執行算子能夠訪問外部資源,如讀取 S3 上的 Parquet 檔案。
4. 配置訪問
let batch_size = context.session_config()
.batch_size();
執行算子可以根據配置調整行為,例如決定每個 RecordBatch
的大小。
TaskContext
被包裝在 Arc
中,在整個執行計劃樹中共享:
impl ExecutionPlan for FilterExec {
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>, // 接收共享的 TaskContext
) -> Result<SendableRecordBatchStream> {
// 訪問記憶體池
let memory_pool = context.memory_pool();
// 獲取配置
let batch_size = context.session_config().batch_size();
// 向下傳遞給輸入算子
let input = self.input.execute(partition, Arc::clone(&context))?;
// 創建執行流...
}
}
這種設計確保:
TaskContext
,因為內部的記憶體池等組件已經處理好並發訪問在數據流系統中,背壓(Backpressure)是指當下游處理速度跟不上上游生產速度時,系統需要一種機制來減緩上游的生產速度,防止記憶體溢出。
DataFusion 的 Stream 執行模型天然支援背壓,無需額外的複雜機制:
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
// 向上游請求數據
let input_batch = ready!(self.input.poll_next_unpin(cx));
// 如果上游返回 Poll::Pending,
// ready!() 宏會直接返回 Poll::Pending
// 這意味著當前算子也暫停了
// 處理 batch...
}
關鍵機制:
Poll::Pending
,當前算子也暫停poll_next()
,自然實現流量控制假設我們從慢速網路讀取 Parquet 檔案:
Time 0: FilterExec.poll_next()
↓
TableScan.poll_next()
↓ 發起網路請求
返回 Poll::Pending (數據未到達)
↑ 傳播 Pending
FilterExec 也返回 Poll::Pending
Time 1: (數據到達,Waker 喚醒任務)
FilterExec.poll_next() 再次被呼叫
↓
TableScan.poll_next()
↓ 數據已就緒
返回 Poll::Ready(Some(batch))
↑
FilterExec 處理 batch 並返回
這種機制確保:
DataFusion 還實作了 CooperativeStream
來防止單一任務壟斷 CPU:
// datafusion/physical-plan/src/coop.rs
impl<T> Stream for CooperativeStream<T>
where
T: RecordBatchStream + Unpin,
{
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
// 檢查是否還有執行預算
if self.budget == 0 {
// 預算耗盡,主動讓出 CPU
cx.waker().wake_by_ref();
self.budget = YIELD_FREQUENCY;
return Poll::Pending;
}
self.budget -= 1;
self.inner.poll_next_unpin(cx)
}
}
這確保即使有大量數據可處理,也會定期讓出 CPU 給其他任務,實現公平調度。
今天我們深入探討了 DataFusion 的 Stream 執行模型:
poll_next()
方法級聯拉取,數據逐批次在算子間傳遞明天我們將深入 Projection 和 Filter 算子,看看它們如何實作表達式求值和數據過濾,以及如何利用向量化執行提升性能。