完整內容請至Blog 剖析 OTel Collector Delta To Cumulative Processor
這篇筆記主要記錄我在研究 OpenTelemetry Collector Contrib 中 deltatocumulative Processor 的心得。除了基本的配置,我們直接從 Source Code 層級來看看它是怎麼運作的,特別是它在狀態管理上的設計,以及我們在生產環境踩過的那些「坑」。
簡單來說,deltatocumulativeprocessor 的工作就是把 Delta (增量) 指標轉成 Cumulative (累積) 指標。
聽起來很簡單?但這是一個 Stateful (有狀態) 的操作。這意味著 Processor 必須在記憶體裡「記住」所有 Time Series 當前的數值。一旦流量大起來,這裡就是記憶體洩漏或是數據遺失的高風險區。
為了不讓這個 Processor 成為效能瓶頸,此 Processor 的設計核心在於「高效的狀態管理」與「嚴格的時間序驗證」。為了在高併發下維持準確性,它採用了細粒度的鎖定策略與強型別的狀態存儲。
核心邏輯位於 processor/deltatocumulativeprocessor,主要由以下結構支撐:
identity.StreamProcessor 怎麼知道哪些數據屬於同一個 Time Series?它依賴 identity.Stream。這不光是看 Metric Name,它會把 Name、Unit、Type 甚至所有的 Label (Attribute Hash) 組合起來當作唯一的 Key。所以,只要 Label 變了,對它來說就是一個全新的 Stream。
Metric Signature: Name, Unit, Type, Monotonicity, Temporality.
Attributes Hash: 所有 DataPoint 屬性 (Labels) 的雜湊值。 這確保了即使是同一個 Metric Name,不同的 Label 組合也會被視為獨立的 Stream。
state在儲存方面,它用了 maps.Parallel (底層是 xsync.MapOf)。為了避開 Golang interface 轉換的開銷並確保型別安全,它很「搞剛」地把 Number (Sum/Gauge)、Histogram 和 ExponentialHistogram 拆成三個獨立的 Map 來存。:
nums: 存儲 NumberDataPoint (Sum/Gauge)
hist: 存儲 HistogramDataPoint
expo: 存儲 ExponentialHistogramDataPoint
mutex[T]這是效能的關鍵點。Processor 沒有使用全域鎖 (Global Lock)。 如果每次處理數據都要鎖住整個 Map,那吞吐量肯定上不去。它為每一個獨立的 Stream 分配了一個專屬的 mutex。這意味著,除非多個請求同時更新「同一個 Metric 的同一個 Label 組合」,否則大家的更新操作是完全平行、互不卡頓的。
ConsumeMetrics)當一筆 Metrics 進入 Processor 時,數據流經以下嚴格步驟:
過濾 (Filter):
AggregationTemporality。只有 Delta 類型的指標會被處理;Cumulative 指標直接透傳 (Pass-through)。識別與查找 (Identify & Lookup):
計算 DataPoint 的 identity.Stream。
嘗試從 state Map 中撈取現有累積值。
注意:如果是新 Stream 且總數超過 max_streams,它會直接標記 error="limit" 然後丟棄。這在除錯時很容易被忽略。
聚合運算 (delta.Aggregate):
在 Stream 級別的鎖保護下執行。
邏輯: New_Cumulative = Old_Cumulative + New_Delta。
時間序驗證 (Validation): 在聚合前,必須通過兩項關鍵檢查 (位於 internal/delta/delta.go):
亂序檢測 (ErrOutOfOrder):
條件: New.Time <= Stored.Time
結果: 丟棄數據。這通常發生在發送端重試或網路亂序時。
重啟檢測 (ErrOlderStart):
條件: New.Start < Stored.Start
結果: 丟棄數據。這代表來源進程可能已重啟,發送了屬於「上一代」的數據,或是時間戳生成有誤。
寫回與標記:
將計算出的 Cumulative 值寫回原始 DataPoint。
將 Temporality 修改為 Cumulative。
更新 stale map 中的最後活躍時間 (Last Seen)。
為了避免記憶體爆炸(例如 Pod 重啟頻繁導致 Stream 無限增長),這裡有一個背景 Goroutine,每分鐘會巡一次。只要發現某個 Stream 超過 max_stale 沒更新,就會把它從記憶體中清掉。
stale Map。如果 now - last_seen > max_stale,則從記憶體中刪除該 Stream 的所有狀態。只有兩個參數,但都很致命:
processors:
deltatocumulative:
# Stream 多久沒動靜就清掉?預設 5m。
# 坑點:設太短會導致狀態頻繁重置(數據斷層);設太長記憶體會爆
max_stale: 5m
# 允許追蹤的最大 Stream 數量 (預設為 Max Int)
# 影響: 這是保護 Collector 不被 High Cardinality 數據撐爆的最後防線。
# 這是防線。一旦爆了,超出的數據會被「無情丟棄」。
max_streams: 9223372036854775807
Processor 透過 internal/telemetry 暴露了自我監控指標 (Self-monitoring Metrics),這是排查數據丟失問題的首要依據。當你懷疑數據掉了,請先看這些自我監控指標 (internal/telemetry):
deltatocumulative_datapoints:這是最重要的 Counter。
error="limit":是不是 max_streams 設太小了?error="delta.ErrOutOfOrder":發送端是不是時間戳亂跳?deltatocumulative_streams_tracked:目前記憶體裡到底存了多少 Stream。| 指標名稱 (Metric Name) | 類型 | 說明 | 關鍵標籤 (Labels) |
|---|---|---|---|
deltatocumulative_datapoints |
Counter | 處理的數據點總數。請密切關注 error 標籤。 |
error: |
- (missing): 處理成功 |
|||
- limit: 觸發 max_streams 上限而丟棄 |
|||
- delta.ErrOutOfOrder: 因時間戳亂序而丟棄 |
|||
- delta.ErrOlderStart: 因起始時間異常而丟棄 |
|||
deltatocumulative_streams_tracked |
Gauge | 當前記憶體中活躍追蹤的 Stream 總數。 | 無 |
deltatocumulative_streams_limit |
Gauge | 配置的 max_streams 值。 |
無 |
deltatocumulative_streams_max_stale |
Gauge | 配置的 max_stale 值 (秒)。 |
無 |
streams_tracked 的鋸齒狀波動現象: 監控圖表顯示 streams_tracked 呈現週期性的鋸齒狀下跌,或者劇烈震盪。同時下游看到的數值可能突然歸零或重置。
原因: 這通常與 GC 機制 (stale check) 有關。
間歇性流量: 如果某個指標每 6 分鐘才送一次,而 max_stale 設為 5 分鐘。Processor 會在第 5 分鐘刪除狀態。第 6 分鐘數據進來時,被視為全新的 Stream,累積值從 0 (或當前 Delta) 開始計算,導致狀態丟失。
GC 運作: 背景 Goroutine 每分鐘一次的清理動作,會導致 streams_tracked 出現階梯式下降。
解法:
max_stale 顯著大於 metrics 的 scrape interval 或 push interval (建議至少 2-3 倍)。現象:Batch Processor 報告發送了 2.6k 點,但 exporter 報告只發送了 2.0k 點。中間的 0.6k 憑空消失,且沒有任何 Error Log。
源碼級原因: 這是 deltatocumulative 的 max_streams 限制與 Pipeline 順序共同作用的結果。
// processor.go 片段
if maps.Exceeded(last, loaded) {
attrs.Set(telemetry.Error("limit"))
return drop // 靜默丟棄,只會標記 error label
}
如果 Pipeline 配置為 [batch, deltatocumulative]:
Batch: 收到數據,計數器 batch_send_size +2.6k。
DeltaToCumulative: 發現 Stream 總數超標,靜默丟棄 0.6k 數據點 (僅增加 deltatocumulative_datapoints{error="limit"})。
Exporter: 收到剩餘的 2.0k,計數器 sent_metric_points +2.0k。
解法:
調整順序: 改為 [deltatocumulative, batch]。讓過濾發生在打包之前。
監控 Drop: 設置告警監控 sum(rate(otelcol_deltatocumulative_datapoints{error="limit"}[2m])) > 0。
現象: 數據偶爾丟失,deltatocumulative_datapoints 出現 error="out_of_order"。
原因 (internal/delta/delta.go):
case dp.Timestamp() <= state.Timestamp():
return ErrOutOfOrder{...}
Processor 為了保證累積值的單調遞增,對時間戳要求非常嚴格 (New.Time > Stored.Time)。如果發送端 (如 Prometheus Remote Write 或某些 SDK) 因重試邏輯發送了重複或舊的時間戳,Processor 會為了保護累積值的單調性而拒絕該數據。
解法: 檢查發送端的 Retry 策略或時鐘同步狀態。
為了驗證那個「Pipeline 順序導致數據消失」的鬼故事,我用 Docker Compose 搞了個實驗。 (詳細配置略,核心概念是用 60 個 telemetrygen 實例去衝撞 max_streams: 50 的限制)
┌─────────────────┐ ┌─────────────────────────────────────┐ ┌────────────┐
│ telemetrygen │────▶│ OTel Collector │────▶│ Prometheus │
│ (60 instances) │ │ ┌─────────────────────────────┐ │ │ :9090 │
│ app-01 ~ 60 │ │ │ Pipeline: │ │ └────────────┘
└─────────────────┘ │ │ receiver → cumulativetodelta│ │
│ │ → batch │ │
│ │ → deltatocumulative│ │
│ │ → exporter │ │
│ │ │ │
│ │ max_streams: 50 (< 60) │ │
│ └─────────────────────────────┘ │
└─────────────────────────────────────┘
設計理念:
60 個 telemetrygen 實例,每個發送不同的 service.name (app-01 ~ app-60)
max_streams 設為 50,刻意製造 Stream 超限
Pipeline 順序為 [cumulativetodelta, batch, deltatocumulative],重現「先 batch 後過濾」的問題
poc-lab/
├── docker-compose.yaml # Docker Compose 配置
├── otel-config.yaml # OTel Collector 配置
└── prometheus.yaml # Prometheus scrape 配置
otel-config.yamlreceivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
processors:
# 將 telemetrygen 產生的 Cumulative 轉成 Delta
cumulativetodelta:
batch:
send_batch_size: 100
timeout: 1s
deltatocumulative:
max_stale: 1m
# 【關鍵設定】設定極低的上限,強迫發生 Drop
max_streams: 50
exporters:
prometheus:
endpoint: "0.0.0.0:8889"
namespace: "poc_app"
debug:
verbosity: normal
service:
telemetry:
metrics:
readers:
- pull:
exporter:
prometheus:
host: "0.0.0.0"
port: 8888
pipelines:
metrics:
receivers: [otlp]
# 【關鍵錯誤順序】重現問題
processors: [cumulativetodelta, batch, deltatocumulative]
exporters: [prometheus, debug]
prometheus.yamlglobal:
scrape_interval: 5s
evaluation_interval: 5s
scrape_configs:
# Job 1: 監控 Collector 本身
- job_name: 'otel-collector-internal'
static_configs:
- targets: ['otel-collector:8888']
# Job 2: 監控實際輸出的數據
- job_name: 'app-metrics'
static_configs:
- targets: ['otel-collector:8889']
docker-compose.yaml (精簡版)services:
otel-collector:
image: otel/opentelemetry-collector-contrib:0.142.0
command: ["--config=/etc/otel-collector-config.yaml"]
volumes:
- ./otel-config.yaml:/etc/otel-collector-config.yaml:ro
ports:
- "4317:4317"
- "8888:8888" # Collector Internal Metrics
- "8889:8889" # Prometheus Exporter
networks:
- otel-network
prometheus:
image: prom/prometheus:latest
volumes:
- ./prometheus.yaml:/etc/prometheus/prometheus.yml:ro
ports:
- "9090:9090"
depends_on:
- otel-collector
networks:
- otel-network
# 使用 YAML anchor 定義 60 個 telemetrygen 實例
telemetrygen-01: &telemetrygen-base
image: ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen:latest
command: ["metrics", "--otlp-insecure", "--otlp-endpoint=otel-collector:4317",
"--rate=5", "--duration=1000h", "--metric-type=Sum", "--service=app-01"]
depends_on: [otel-collector]
networks: [otel-network]
telemetrygen-02: { <<: *telemetrygen-base, command: [..., "--service=app-02"] }
# ... (app-03 ~ app-60)
networks:
otel-network:
driver: bridge
cd poc-lab
docker compose up -d
docker compose ps # 確認所有容器運行中
打開 Prometheus UI (http://localhost:9090) 或使用 curl:
查詢 1: Streams 追蹤數量 (應該達到上限 50)
otelcol_deltatocumulative_streams_tracked
查詢 2: 數據點處理結果 (按 error 分類)
otelcol_deltatocumulative_datapoints_total
查詢 3: 比較 Batch vs Exporter
# Batch 處理量
otelcol_processor_batch_batch_send_size_sum
# Exporter 發送量
otelcol_exporter_sent_metric_points_total
使用錯誤的 Pipeline 順序 [cumulativetodelta, batch, deltatocumulative] 運行後:
| 指標 | 數值 | 說明 |
|---|---|---|
streams_tracked |
50 | 達到 max_streams 上限 |
streams_limit |
50 | 配置值 |
receiver_accepted |
13,000 | Receiver 接收的總數據點 |
batch_send_size_sum |
13,000 | Batch 處理的數據點 |
exporter_sent |
11,000 | Exporter 實際發送的數據點 |
數據點處理詳情:
| error 標籤 | 數值 | 說明 |
|---|---|---|
error="none" |
16,000 | 成功處理 |
error="limit" |
3,000 | 因達到 Stream 上限而丟棄 |
關鍵發現:
發送端: 60 個 telemetrygen 實例
限制: max_streams = 50
被完全丟棄的實例: 10 個 (60 - 50)
Batch (13,000) ≠ Exporter (11,000):差異 2,000 個數據點「憑空消失」
丟棄僅標記 error="limit",無 Error Log,難以察覺
修改 otel-config.yaml 中的 processors 順序:
# 修正前 (問題配置)
processors: [cumulativetodelta, batch, deltatocumulative]
# 修正後 (正確配置)
processors: [cumulativetodelta, deltatocumulative, batch]
修正後驗證結果:
| 指標 | 數值 | 說明 |
|---|---|---|
streams_tracked |
50 | 仍達到 max_streams 上限 |
receiver_accepted |
17,500 | Receiver 接收的總數據點 |
batch_send_size_sum |
14,950 | Batch 處理的數據點 |
exporter_sent |
14,950 | Exporter 實際發送的數據點 |
數據點處理詳情:
| error 標籤 | 數值 | 說明 |
|---|---|---|
error="none" |
14,950 | 成功處理 |
error="limit" |
2,490 | 因達到 Stream 上限而丟棄 |
| 指標 | 修正前 | 修正後 | 結論 |
|---|---|---|---|
| Batch 處理量 | 13,000 | 14,950 | - |
| Exporter 發送量 | 11,000 | 14,950 | - |
| Batch = Exporter? | 否 (差 2,000) | 是 (完全一致) | 問題解決 |
| error="limit" | 3,000 | 2,490 | 仍有丟棄 (預期行為) |
實驗結論:
修正前: Batch (13,000) ≠ Exporter (11,000) ← 數據「消失」,難以排查
修正後: Batch (14,950) = Exporter (14,950) ← 指標一致,問題可追溯
Pipeline 順序修正有效 - Batch 和 Exporter 的數值現在完全一致
丟棄仍然發生 (error="limit"),但這是預期行為,因為來源數 (60) > max_streams (50)
監控指標正確反映實際狀態 - 運維人員可直接從 error="limit" 指標看到丟棄量
docker compose down
你可能會問,為什麼我們需要这么麻煩的轉 Delta?
對於 Java、Go 這種 Long-running process 來說,維護一個全域計數器(Cumulative)是輕而易舉的事。它們在記憶體中有一個全域的計數器,可以一直累加數值:
00:01: 累計 10 次
00:02: 累計 15 次 (累加了 5 次)
00:03: 累計 20 次 (又累加了 5 次)
這就是 Cumulative (累積) 模式,也是 Prometheus 等後端系統最喜歡的格式。
但是,PHP 通常運行在 PHP-FPM 或 CGI 模式下。其生命週期是「一個請求一個進程」 (Per-request process):
PHP (在 FPM 模式下) 是 Share-Nothing 架構。一個請求進來,Process 啟動,處理完,記憶體釋放。 PHP 進程沒辦法簡單地告訴下一個進程:「嘿,我剛剛處理了 1 個,現在總數是 100 喔。」(除非你用 Redis or SharedMemory等外部儲存)。
因此,PHP 最自然的做法是只回報「這一次請求發生了什麼」:
所以 PHP 最自然的行為是:「這次請求我處理了 1 個 DB Query (Delta)」,這就是 Delta (增量) 模式。
至於加總的工作?就交給 OTel Collector 的 deltatocumulative processor 來扛吧。這就是它存在的最大意義。
當您的後端資料庫(如 Prometheus)只接受 Cumulative 資料,但您的應用程式(如 PHP 或 Serverless
Functions)只能提供 Delta 資料時,就會發生格式不相容。
這時候就需要 deltatocumulative processor 擔任「狀態管理者」 (Stateful Intermediary) 的角色:
雖然 Serverless (如 AWS Lambda) 或 CLI 工具也有類似需求,但 PHP的廣泛使用以及其標準的運行模式,使其成為這個 Processor 最常見的使用案例。
Pipeline 順序至關重要:請務必把 deltatocumulative 放在 batch 之前。
監控不能少:一定要針對 error="limit" 和 error="out_of_order" 設告警。
容量規劃: 根據預期的 Stream 數量合理設置 max_streams
Stale 設定: max_stale 應大於最大的 push/scrape interval (建議 2-3 倍)