在大規模資料場景下,企業越來越需要能夠 實時處理與分析 Data Streaming 的技術架構。
而 ClickHouse 天生與 Kafka 的整合,提供了一條高效能的 Event Streaming → Real-Time Analytics Data Pipeline,讓資料從產生到分析僅需「秒級延遲」。
今天我們會透過專案帶各位了解 Kafka + ClickHouse 的即時 Data Streaming Pipeline,各位可以先將 Repository Clone 下來,接下來我們都會用檔案內的內容來教學。
流程階段 | 說明 |
---|---|
Producers | 產生 Data Streaming 的上游系統(如 Web 行為、IoT、APM 等)。 |
Kafka Topics | 接收 Data Streaming 的 Message Queue,具備高吞吐、可重播特性。 |
Kafka Connect / ClickHouse Kafka Engine | 負責將 Kafka Topic 資料持續寫入 ClickHouse。 |
ClickHouse Tables | 儲存與實時分析,可搭配 Materialized View、Partitioning 設計加速查詢。 |
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [ALIAS expr1],
name2 [type2] [ALIAS expr2],
...
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'host:port',
kafka_topic_list = 'topic1,topic2,...',
kafka_group_name = 'group_name',
kafka_format = 'data_format'[,]
[kafka_security_protocol = '',]
[kafka_sasl_mechanism = '',]
[kafka_sasl_username = '',]
[kafka_sasl_password = '',]
[kafka_schema = '',]
[kafka_num_consumers = N,]
[kafka_max_block_size = 0,]
[kafka_skip_broken_messages = N,]
[kafka_commit_every_batch = 0,]
[kafka_client_id = '',]
[kafka_poll_timeout_ms = 0,]
[kafka_poll_max_batch_size = 0,]
[kafka_flush_interval_ms = 0,]
[kafka_thread_per_consumer = 0,]
[kafka_handle_error_mode = 'default',]
[kafka_commit_on_select = false,]
[kafka_max_rows_per_message = 1];
參數名 | 說明 |
---|---|
kafka_broker_list | Kafka Broker 位址與 Port,支援多節點(逗號分隔)。 |
kafka_topic_list | 監聽的 Kafka Topic 名稱,支援多個 Topic(逗號分隔)。 |
kafka_group_name | Consumer Group 名稱,ClickHouse 會以這個 Group 協調消費者進度。 |
kafka_format | 資料格式,如 JSONEachRow、CSV、Avro、Protobuf 等。 |
參數名 | 說明 |
---|---|
kafka_security_protocol | 設定連線安全協定 (如 PLAINTEXT, SSL, SASL_SSL 等)。 |
kafka_sasl_mechanism | SASL 認證方式 (如 PLAIN, SCRAM-SHA-256, SCRAM-SHA-512)。 |
kafka_sasl_username | SASL 認證用戶名稱。 |
kafka_sasl_password | SASL 認證密碼。 |
kafka_schema | 若使用 Avro/Protobuf/Cap’n Proto 格式時,需指定 Schema 檔案路徑。 |
參數名 | 說明 |
---|---|
kafka_num_consumers | 這張表會啟用的 Consumer 數量。建議設定不超過 Topic Partition 數量與伺服器 CPU 核心數。 |
kafka_max_block_size | 每次 Poll Kafka 拉取的最大訊息數量 (預設為 ClickHouse max_insert_block_size)。 |
kafka_poll_max_batch_size | 每次 Kafka poll 最大訊息批次量。與 kafka_max_block_size 類似,雙方限制取較小者。 |
kafka_poll_timeout_ms | Kafka poll 的超時時間。預設為 stream_poll_timeout_ms。 |
kafka_flush_interval_ms | Flush 資料到 ClickHouse 的間隔時間 (毫秒)。預設為 stream_flush_interval_ms。 |
kafka_thread_per_consumer | 設為 1 時,會為每個 Consumer 啟動獨立執行緒,並行寫入,適合多 Partition 場景。 |
參數名 | 說明 |
---|---|
kafka_skip_broken_messages | 允許每個批次 (Block) 最多跳過 N 筆無法解析的錯誤訊息 (格式錯誤等)。預設為 0。 |
kafka_handle_error_mode | 錯誤處理模式:default (直接拋錯)、stream (錯誤記錄在虛擬欄位 _error)、dead_letter_queue (寫入 system.dead_letter_queue)。 |
kafka_commit_every_batch | 每處理完一個 Batch 就 Commit Offset,預設為 0 (整個 Block 完成才 Commit)。 |
kafka_commit_on_select | 是否在 SELECT 查詢時 Commit Offset,預設為 false。 |
參數名 | 說明 |
---|---|
kafka_max_rows_per_message | 針對 row-based 格式,每個 Kafka 訊息最大可包含幾筆資料。預設為 1。 |
場景 | 推薦設定 |
---|---|
高吞吐量 (大量 Partition, 高頻率) | kafka_num_consumers = Partition 數,kafka_thread_per_consumer = 1 |
敏感錯誤容忍 (格式錯誤需記錄) | kafka_handle_error_mode = 'stream' |
大批次穩定寫入 | kafka_max_block_size 與 kafka_poll_max_batch_size 設為 10萬或更多 |
跨 DC 資料同步 | 適當延長 kafka_poll_timeout_ms 與 kafka_flush_interval_ms,確保網路環境適應性 |
該專案已經使用 Docker Compose 將所有服務和設定都處理好了,各位可以簡單使用。
首先我們看到 create_tables.sql
,這是我們建立 Kafka + ClickHouse 的關鍵。
create_tables.sql
// create_tables.sql
-- ClickHouse Tables Setup
DROP TABLE IF EXISTS default.user_events;
DROP TABLE IF EXISTS default.kafka_user_events;
DROP TABLE IF EXISTS default.kafka_to_events_mv;
user_events
// create_tables.sql
-- Main Events Table
CREATE TABLE IF NOT EXISTS default.user_events
(
UserID UInt64,
Action String,
EventDate DateTime,
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(EventDate)
ORDER BY (UserID, EventDate);
kafka_user_events
// create_tables.sql
-- Kafka Engine Table
CREATE TABLE IF NOT EXISTS default.kafka_user_events
(
UserID UInt64,
Action String,
EventDate DateTime,
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka:29092',
kafka_topic_list = 'user_events_topic',
kafka_group_name = 'clickhouse_consumer_v3',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 1,
kafka_thread_per_consumer = 1;
這是一張 Kafka Engine Table,本身不會儲存資料,而是作為 ClickHouse 消費 Kafka 訊息的入口
Kafka 表本身無法直接查詢。你必須透過 Materialized View 將資料寫入實體表才能存取。
這是整個 Streaming Pipeline 的關鍵橋樑。Materialized View 會監聽 kafka_user_events,並將其每筆資料自動寫入目標表 user_events。
TO default.user_events
表示這是一個「推送型」 MV。// create_tables.sql
-- Materialized View to stream data from Kafka to main table
CREATE MATERIALIZED VIEW IF NOT EXISTS default.kafka_to_events_mv TO default.user_events AS
SELECT
UserID,
Action,
EventDate,
FROM default.kafka_user_events;
kafka_producer.py
import json
import time
from kafka import KafkaProducer
from datetime import datetime
import random
# Kafka Config
KAFKA_BROKER = 'localhost:9092'
TOPIC = 'user_events_topic'
# Initialize Kafka Producer
producer = KafkaProducer(
bootstrap_servers=[KAFKA_BROKER],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# Generate Random Events
def generate_event():
return {
"EventDate": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"UserID": random.randint(0, 10),
"Action": random.choice(["click", "view", "purchase"]),
"Version": 1
}
# Produce Events Continuously
def produce():
print("Starting Kafka Producer...")
try:
while True:
event = generate_event()
producer.send(TOPIC, value=event)
producer.flush() # NOTEICED: it'll be cache default, so we flush it
print(f"Produced: {event}")
time.sleep(1) # Send 1 message per second (adjust as needed)
except KeyboardInterrupt:
print("Stopped Producer.")
finally:
producer.close()
if __name__ == "__main__":
produce()
producer.flush()
確保訊息立即送出(預設會暫存)。time.sleep(1)
控制發送頻率,可改成更快或更慢。import clickhouse_connect
# ClickHouse Config
CLICKHOUSE_HOST = 'localhost'
CLICKHOUSE_PORT = 8123
def connect_clickhouse():
"""connect to ClickHouse"""
try:
client = clickhouse_connect.get_client(
host=CLICKHOUSE_HOST,
port=CLICKHOUSE_PORT,
username='default',
password='default'
)
print("connected to ClickHouse!")
return client
except Exception as e:
print(f"connect to ClickHouse failed: {e}")
return None
def check_tables(client):
"""check tables"""
try:
result = client.query("SHOW TABLES")
print("existing tables:")
for row in result.result_rows:
print(f" - {row[0]}")
return True
except Exception as e:
print(f"check tables failed: {e}")
return False
def query_data(client):
"""query data"""
try:
# query total records
count_result = client.query("SELECT COUNT(*) FROM default.user_events")
total_count = count_result.result_rows[0][0]
print(f"\ntotal records: {total_count}")
if total_count > 0:
# query recent 10 records
recent_result = client.query("""
SELECT EventDate, UserID, Action
FROM default.user_events
ORDER BY EventDate DESC
LIMIT 10
""")
print("\nrecent 10 records:")
print("-" * 60)
for row in recent_result.result_rows:
print(f" {row[0]} | UserID: {row[1]} | Action: {row[2]}")
# group by Action
action_stats = client.query("""
SELECT Action, COUNT(*) as count
FROM default.user_events
GROUP BY Action
ORDER BY count DESC
""")
print("\ngroup by Action:")
print("-" * 30)
for row in action_stats.result_rows:
print(f" {row[0]}: {row[1]} records")
except Exception as e:
print(f"query data failed: {e}")
def main():
print("ClickHouse data query tool")
print("=" * 40)
client = connect_clickhouse()
if client:
check_tables(client)
query_data(client)
client.close()
if __name__ == "__main__":
main()
我們會在後面的文章中講到 RBAC
接著我們使用 SQL Query 取得被 MV 轉發的資料。
-- 查詢最近 10 筆
SELECT EventDate, UserID, Action
FROM default.user_events
ORDER BY EventDate DESC
LIMIT 10
-- Group by 統計
SELECT Action, COUNT(*)
FROM default.user_events
GROUP BY Action
ORDER BY count DESC
ClickHouse 與 Kafka 的整合,使我們能夠在毫秒級時間內,將龐大的事件資料流進行儲存、轉換與查詢分析。
透過 Materialized View 與 Kafka Engine,從資料進來到 BI 報表呈現,整個過程都能保持高效能且可擴展的設計。