在前 29 天的系列文章中,我們已經從使用者角度理解了 ClickHouse 的表引擎設計:
今天作為系列收尾篇,我們要從 開發者角度 帶大家走進 ClickHouse GitHub 原始碼,探索 MergeTree 的內部結構。
我從茫茫 code 海中挑出了 6 個最重要的模組與函式(我愛 GPT 🤚😭🤚),對應 MergeTree 的「一生」:從建立 → 插入 → 合併 → 查詢 → 搬移。
本篇建議各位在電腦上看,開兩個視窗分別對應,函數都會特別提醒位置。
MergeTreeData::initializeDirectoriesAndFormatVersion(...)
該函式在 src/Storages/MergeTree/MergeTreeData.cpp
format_version.txt
(寫入至第一個 non-readonly 磁碟中)以下是寫入 format_version.txt
的部分程式碼,有解釋到 write once disk 幾乎是等於 read-only(例如 S3 object storage),我們不能寫入在裡面,因為不支援移動或刪除,避免後續 DROP 留下垃圾,造成空間浪費。
/// When data path or file not exists, ignore the format_version check
if (!attach || !read_format_version)
{
format_version = min_format_version;
/// Try to write to first non-readonly disk
for (const auto & disk : getStoragePolicy()->getDisks())
{
if (disk->isBroken())
continue;
/// Write once disk is almost the same as read-only for MergeTree,
/// since it does not support move, that is required for any
/// operation over MergeTree, so avoid writing format_version.txt
/// into it as well, to avoid leaving it after DROP.
if (!disk->isReadOnly() && !disk->isWriteOnce())
{
auto buf = disk->writeFile(format_version_path, 16, WriteMode::Rewrite, getContext()->getWriteSettings());
writeIntText(format_version.toUnderType(), *buf);
buf->finalize();
if (getContext()->getSettingsRef()[Setting::fsync_metadata])
buf->sync();
}
break;
}
}
else
{
format_version = *read_format_version;
}
沒有這一步,後面所有 parts 都無法正確讀寫。
代表函式:
MergeTreeData::loadDataParts(...)
MergeTreeData::getDataParts(...)
MergeTreeData::renameTempPartAndReplace(...)
在 MergeTree 中,資料不是一張完整的大表,而是一個個 Immutable parts。
loadDataParts
掃描磁碟,把所有 parts 載入記憶體索引結構。renameTempPartAndReplace
,正式掛到 parts 目錄。getDataParts
取得一致性的 parts 視圖(支援 snapshot 概念)。這就是為什麼 MergeTree 能同時支援 高效讀取 與 高併發寫入。
該函式在 src/Storages/MergeTree/MergeTreeData.cpp
loadDataParts
負責:
DataPart
物件,並加入 data_parts_indexes
。ProfileEvents
記錄載入耗時、載入數量。if (!getStoragePolicy()->isDefaultPolicy() && !skip_sanity_checks ...)
{
// 確保所有 parts 都落在定義的 storage policy 的磁碟上
// 如果發現 part 在未知磁碟,直接丟 Exception
}
防止「資料實體還在,但 metadata 已經指不到」,確保資料一致性。
runner([&expected_parts, &unexpected_disk_parts, &disk_parts, this, disk_ptr]()
{
for (auto it = disk_ptr->iterateDirectory(relative_data_path); it->isValid(); it->next())
{
if (auto part_info = MergeTreePartInfo::tryParsePartName(it->name(), format_version))
{
if (expected_parts && !expected_parts->contains(it->name()))
unexpected_disk_parts.emplace_back(...);
else
disk_parts.emplace_back(...);
}
}
});
tmp*
、format_version.txt
、detached/
這些特殊目錄。MergeTreePartInfo
(不符合規則就當垃圾忽略)。auto loading_tree = PartLoadingTree::build(std::move(parts_to_load));
auto loaded_parts = loadDataPartsFromDisk(active_parts);
對於每個 active part:
這裡是把磁碟上的目錄轉換為
DataPart
物件的關鍵。
detached/broken-on-start/
目錄if (have_non_adaptive_parts && have_adaptive_parts && !enable_mixed_granularity_parts)
throw Exception(...);
如果表裡同時有 舊的非 adaptive granularity parts 和 新的 adaptive parts,預設是不允許混用的(除非開
enable_mixed_granularity_parts
設定)。
LOG_DEBUG(log, "Loaded data parts ({} items) took {} seconds", data_parts_indexes.size(), watch.elapsedSeconds());
ProfileEvents::increment(ProfileEvents::LoadedDataParts, data_parts_indexes.size());
該函式在 src/Storages/MergeTree/MergeTreeData.cpp
這個函式很短,它主要是查詢要讀哪些 parts,就是從這裡開始決定的。
MergeTreeData::DataParts MergeTreeData::getDataParts(
const DataPartStates & affordable_states,
const DataPartsKinds & affordable_kinds) const
affordable_states
:允許的 part 狀態(active、outdated、temporary…)affordable_kinds
:允許的 part 類型(wide、compact、in-memory…)DataParts
容器,包含目前符合條件的 parts。auto lock = lockParts();
data_parts_indexes
不會在迭代 (iteration) 時被修改。for (auto state : affordable_states)
{
for (auto kind : affordable_kinds)
{
auto range = getDataPartsStateRange(state, kind);
res.insert(range.begin(), range.end());
}
}
(state, kind)
配對,從內部索引拿出符合條件的 parts range。getDataPartsStateRange(state, kind)
→ 回傳一個 iterator 範圍,代表目前符合條件的 parts。res
。該函式在 src/Storages/MergeTree/MergeTreeData.cpp
這個函式主要是 INSERT 進 MergeTree流程的收尾
MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
MutableDataPartPtr & part,
Transaction & out_transaction,
bool rename_in_transaction)
part
:剛寫好的 臨時 part (tmp_xxx)
out_transaction
:事務物件,用來確保替換 parts 的操作原子性rename_in_transaction
:是否在事務中完成 rename(可選擇延遲到 commit)covered_parts
:被新 part 覆蓋、取代的舊 parts(例如 overlapped 的小 part)。auto part_lock = lockParts();
parts
容器加鎖,確保替換過程 thread-safe。renameTempPartAndReplaceImpl(part, out_transaction, part_lock, &covered_parts, rename_in_transaction);
renameTempPartAndReplaceImpl
:
tmp_xxx
的 part 實際 rename 成正式名稱(例如 all_1_1_0
)。data_parts_indexes
,把這個新 part 插入。rename_in_transaction
,則這些更新會在 transaction commit 時一次性生效。return covered_parts;
把被覆蓋的舊 parts 傳回給呼叫端(方便後續處理,例如 detach 或刪除)。
INSERT INTO table VALUES (...)
tmp_
開頭),確保寫入中途崩潰不會污染active parts。renameTempPartAndReplace(...)
:
tmp_xxx
rename 成正式 part 名稱可以達到以下的功能
tmp_
part,只有最後 rename 成功才「生效」,確保 Atomicity。rename_in_transaction
,就能確保多個操作同時 commit 或 rollback ,支持更複雜的 ALTER/REPLACE 操作。代表函式:
MergeTreeDataMergerMutator::mergePartsToTemporaryPart(...)
MergeTreeBackgroundExecutor<Queue>::trySchedule(...)
MergeTask::execute(...)
MergeTree 的核心是「不做更新,只新增 part,後台再合併」。
tryScheduleMerge
) 會挑選多個小 part,呼叫 mergePartsToTemporaryPart
合併成一個大 part。這就是 ClickHouse 可以維持高寫入速度的祕密:寫入快 → 在背景自行慢慢整理。
該函式在 src/Storages/MergeTree/MergeTreeBackgroundExecutor.cpp
這個函數是 MergeTree 合併/後台任務排程的入口
template <class Queue>
bool MergeTreeBackgroundExecutor<Queue>::trySchedule(ExecutableTaskPtr task)
true
→ 成功排程false
→ 被拒絕(可能是系統 shutdown 或 queue 已滿)std::lock_guard lock(mutex);
保護 pending
任務隊列,避免多執行緒同時修改。
auto & value = CurrentMetrics::values[metric];
if (value.load() >= static_cast<int64_t>(max_tasks_count))
return false;
CurrentMetrics
監控目前這個 executor 的 task 數量。max_tasks_count
(例如限制同時執行的合併數量),就拒絕排程。pending.push(std::make_shared<TaskRuntimeData>(std::move(task), metric));
task
包裝成 TaskRuntimeData
放入 pending
queue。TaskRuntimeData
附帶 metric,用來追蹤這個 task 的執行狀態。has_tasks.notify_one();
這個函式做了非阻塞調度、資源控制、監控 等作用,避免同時合併過多 parts,導致磁碟 I/O 打爆、所有任務數量都掛在 CurrentMetrics
上,方便監控系統看到目前 Merge/Mutation 壓力
該函式在 src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp
這函數是 MergeTree 合併的入口。我們來一步步拆解:
MergeTask
,它代表一次「合併(merge)或變更(mutation)」的任務。src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp
MergeTaskPtr
(指向新建的 MergeTask
)注意:這裡只是「構造 MergeTask」,真正的合併邏輯會在 MergeTask::execute()
裡跑。
FutureMergedMutatedPartPtr future_part,
StorageMetadataPtr metadata_snapshot,
MergeList::Entry * merge_entry,
std::unique_ptr<MergeListElement> projection_merge_list_element,
TableLockHolder & holder,
time_t time_of_merge,
ContextPtr context,
ReservationSharedPtr space_reservation,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
MergeTreeData::MergingParams merging_params,
MergeTreeTransactionPtr txn,
bool need_prefix,
IMergeTreeDataPart * parent_part,
const String & suffix
future_part
metadata_snapshot
merge_entry / projection_merge_list_element
system.merges
(或 system.part_log
)的紀錄,方便監控。holder
space_reservation
deduplicate / deduplicate_by_columns
ReplacingMergeTree
、OPTIMIZE TABLE ... DEDUPLICATE
)。cleanup
merging_params
txn
need_prefix / parent_part / suffix
if (future_part->isResultPatch())
{
merging_params = MergeTreeData::getMergingParamsForPatchParts();
metadata_snapshot = future_part->parts.front()->getMetadataSnapshot();
}
return std::make_shared<MergeTask>(...);
MergeTask
,包含所有合併需要的上下文:
future_part
)merging_params
、deduplicate
、cleanup
)space_reservation
、holder
)merge_entry
、projection_merge_list_element
)之後 background thread pool 會呼叫
MergeTask::execute()
,真正讀取來源 parts → 做合併 → 輸出新 part。
所以 mergePartsToTemporaryPart(...)
的角色:
renameTempPartAndReplace(...)
變成正式 part。該函式在 src/Storages/MergeTree/MergeTask.cpp
這個函數就是 MergeTree 真正執行合併的核心函數
true
→ 還有後續 Stage 未完成(task 還需要繼續跑)false
→ 所有 Stage 都完成,合併結束chassert(stages_iterator != stages.end());
const auto & current_stage = *stages_iterator;
MergeTask
持有一個 stages
向量(pipeline 概念)。stages_iterator
指向當前要執行的 Stage。if (current_stage->execute())
return true;
true
→ 表示下次還要繼續跑這個 Stage。UInt64 current_elapsed_ms = global_ctx->merge_list_element_ptr->watch.elapsedMilliseconds();
UInt64 stage_elapsed_ms = current_elapsed_ms - global_ctx->prev_elapsed_ms;
global_ctx->prev_elapsed_ms = current_elapsed_ms;
global_ctx
是 merge 任務的全域上下文,裡面有 ProfileEvents 追蹤器。if (global_ctx->parent_part == nullptr)
{
ProfileEvents::increment(current_stage->getTotalTimeProfileEvent(), stage_elapsed_ms);
ProfileEvents::increment(ProfileEvents::MergeTotalMilliseconds, stage_elapsed_ms);
}
MergeReadBlocks
、MergeWriteBlocks
。++stages_iterator;
if (stages_iterator == stages.end())
return false;
stages.end()
→ 合併全部完成,回傳 false
。(*stages_iterator)->setRuntimeContext(std::move(next_stage_context), global_ctx);
return true;
true
→ 表示還有 Stage 要跑。catch (...)
{
merge_failures.withLabels({String(ErrorCodes::getName(getCurrentExceptionCode()))}).increment();
throw;
}
merge_failures
是一個 metrics counter,用來統計失敗的類型。代表函式:
當使用者執行 SELECT
,查詢流程會進入 MergeTreeDataSelectExecutor::read
:
readFromParts
讀取真正需要的資料塊const auto & snapshot_data = assert_cast<const MergeTreeData::SnapshotData &>(*storage_snapshot->data);
StorageSnapshot
,裡面包含表當前可見的 parts 與 mutation snapshot。snapshot_data.parts
→ 這次查詢要讀的 parts(已經裁剪過的 active parts)snapshot_data.mutations_snapshot
→ 確保查詢在一個一致性版本上執行。auto step = readFromParts(
snapshot_data.parts,
snapshot_data.mutations_snapshot,
column_names_to_return,
storage_snapshot,
query_info,
context,
max_block_size,
num_streams,
max_block_numbers_to_read,
/*merge_tree_select_result_ptr=*/ nullptr,
enable_parallel_reading);
輸入參數:
parts
→ 要讀的資料分片mutations_snapshot
→ 保證一致性column_names_to_return
→ 只讀需要的欄位query_info
→ 包含 WHERE、ORDER BY、LIMIT 等查詢條件max_block_size
→ 每個 Block 的最大行數(控制批次大小)num_streams
→ 幾個執行緒並行讀enable_parallel_reading
→ 是否允許副本平行讀回傳值:
step
→ 一個 QueryPlanStep (ReadFromMergeTree
),代表「如何從 MergeTree parts 讀資料」。auto plan = std::make_unique<QueryPlan>();
if (step)
plan->addStep(std::move(step));
return plan;
QueryPlan
。代表函式:
MergeTreeWhereOptimizer::optimize(...)
查詢過程中,ClickHouse 不會「全表掃描」,而是依靠索引:
MergeTreeWhereOptimizer::optimize
負責重寫 WHERE 條件,盡量利用索引過濾掉無效 granules,讓查詢只掃必要的資料。
該函式在 src/Storages/MergeTree/MergeTreeWhereOptimizer.cpp
這個函式是在分析 SELECT ... WHERE ...
條件,決定哪些可以下推到 PREWHERE,然後改寫 AST,把部分條件移到 PREWHERE
PREWHERE 的特點:
if (!select.where() || select.prewhere())
return;
auto block_with_constants = KeyCondition::getBlockWithConstants(...);
WhereOptimizerContext where_optimizer_context;
where_optimizer_context.context = context;
where_optimizer_context.array_joined_names = determineArrayJoinedNames(select);
where_optimizer_context.move_all_conditions_to_prewhere = context->getSettingsRef()[Setting::move_all_conditions_to_prewhere];
...
準備優化器需要的上下文,包括:
FINAL
(可能會影響可下推性)RPNBuilderTreeContext tree_context(context, std::move(block_with_constants), {});
RPNBuilderTreeNode node(select.where().get(), tree_context);
auto optimize_result = optimizeImpl(node, where_optimizer_context);
把 WHERE 條件解析成 布林運算樹 (RPN – Reverse Polish Notation)。
呼叫 optimizeImpl
,嘗試把合適的條件下推。
回傳結果包含:
where_conditions
(留下的 WHERE 條件)prewhere_conditions
(被搬移的條件)auto where_filter_ast = reconstructAST(optimize_result->where_conditions);
auto prewhere_filter_ast = reconstructAST(optimize_result->prewhere_conditions);
select.setExpression(ASTSelectQuery::Expression::WHERE, std::move(where_filter_ast));
select.setExpression(ASTSelectQuery::Expression::PREWHERE, std::move(prewhere_filter_ast));
把結果重新組裝回 AST(抽象語法樹)。
SELECT 查詢此時變成:
SELECT ...
PREWHERE <部分條件>
WHERE <剩餘條件>
LOG_DEBUG(
log,
"MergeTreeWhereOptimizer: condition \"{}\" moved to PREWHERE",
select.prewhere()->formatForLogging(...));
這是從「數十億行」中秒級查詢的關鍵。
代表函式:
MergeTreeData::moveParts(...)
MergeTreeData::removeOutdatedPartsAndDirs(...)
MergeTree 支援 TTL(Time-to-Live),讓資料自動過期或搬移:
removeOutdatedPartsAndDirs
清理過期 partsmoveParts
把舊資料搬到慢速磁碟(HDD、S3),新資料留在 SSD這讓 MergeTree 不只是 OLAP 表,還能做到資料生命週期管理 (Data Lifecycle Management)。
MergeTreeData::moveParts(...)
該函式在 src/Storages/MergeTree/MergeTreeWhereOptimizer.cpp
會根據 StoragePolicy(例如多磁碟策略:SSD → HDD,或節省空間)將 parts 實際移動到不同磁碟。
LOG_INFO(log, "Got {} parts to move.", moving_tagger->parts_to_move.size());
MovePartsOutcome result{MovePartsOutcome::PartsMoved};
記錄這次要搬多少個 parts,初始假設結果是成功移動。
for (const auto & moving_part : moving_tagger->parts_to_move)
每個 part 都要嘗試 clone(複製)並替換。
auto moves_list_entry = getContext()->getMovesList().insert(...);
更新系統表 system.moves
,讓使用者可觀察目前有哪些 parts 正在被移動。
if (supportsReplication() && disk->supportZeroCopyReplication() && (*settings)[MergeTreeSetting::allow_remote_fs_zero_copy_replication])
如果表是 ReplicatedMergeTree 且啟用零拷貝複製:
auto lock = tryCreateZeroCopyExclusiveLock(moving_part.part->name, disk);
if (!lock)
{
result = MovePartsOutcome::MoveWasPostponedBecauseOfZeroCopy;
break;
}
如果沒拿到 lock,就 postpon,避免競爭。
cloned_part = parts_mover.clonePart(moving_part, read_settings, write_settings);
if (lock->isLocked())
parts_mover.swapClonedPart(cloned_part);
cloned_part = parts_mover.clonePart(moving_part, read_settings, write_settings);
parts_mover.swapClonedPart(cloned_part);
直接 clone → swap,完成 part 移動。
write_part_log({});
寫入 system.part_log
,紀錄這次 MOVE_PART 操作,包含耗時、來源/目的磁碟等資訊。
catch (...)
{
write_part_log(ExecutionStatus::fromCurrentException("", true));
throw;
}
如果移動過程失敗,紀錄錯誤並重新拋出例外。
終於這系列迎來了一個結束,跟我的暑期實習一起畢業了 XD(沒啦,開學後還會繼續做),希望各位喜歡這系列文章。
之前有人問我:「Vic,你為什麼想開始鐵人賽」。當初覺得透過鐵人賽壓力讓自己快速深入一個服務,而且查閱了市面上很少人做 ClickHouse 相關的文章,就算有也只是講應用居多,我可以說是第一個開始提及底層原理和架構,可能客群都是資料科學家,很少我這種後端工程師會專注於這個)?
ClickHouse 是個很讓人著迷的 TB~PB 級別資料處理神器,前提是具備良好的基礎知識,才能在建表的時候考慮到所有情況。透過正確的根據業務邏輯、資料型別採取對應建表策略。
像是我在這次實習當中,替公司從雲上搬遷了資料,原本在 PostgreSQL 上單個表有約 400GB,但是使用 ClickHouse 搭配正確的配置策略,我可以將壓縮比達到 5x~86x 左右(依照欄位資料型別而定),替公司單一表格省下了約 360GB 的儲存成本,並且提高查詢效率,有助於公司內部資料分析、自動化效率。
這一系列文章也側面驗證了:為什麼 ClickHouse 能同時承受高寫入與大規模查詢,並在業界成為主流 OLAP 資料庫服務。