iT邦幫忙

2021 iThome 鐵人賽

DAY 3
0
Modern Web

『卡夫卡的藏書閣』- 程序猿必須懂的Kafka開發與實作系列 第 3

卡夫卡的藏書閣【Book3】- Kafka 訊息與主題 Record and Topic

半夜的蘋果發表會,想起了賈伯斯的那句“Stay Hungry. Stay Foolish”
“Youth is happy because it has the capacity to see beauty. Anyone who keeps the ability to see beauty never grows old.”
Franz Kafka

轉眼也快奔三了啊


3.1 Kakfa 中的一筆訊息 ( Record )

https://ithelp.ithome.com.tw/upload/images/20210907/20140255DCsxmBw5zg.png

這邊會先從最小的一筆訊息可以介紹起,Kafka 中主題的每一筆訊息都是包含索引鍵值 ( key-value )時間戳 ( timstamp )

  • 鍵 ( key ) 可以不用設定、不設定的情況下預設會自動將訊息平均發配所有分區上,如果有設定可以用來當作寫入哪個分區的依據,一般最常見的做法是將雜湊值 ( hash ) 除以分區的數量得出餘數,將訊息分配給相對應編號的分區。

  • 時間戳 ( timestamp )

    1. 拉取資料時的索引: 一般來說,Kafka 的常態用法跟傳統的 MQ 一樣取資料都是即時的 pop 出來使用,但如果有以時間戳取得某個時間點的偏移量,並從那個點開始拉取資料
    2. 作為刪除的依據: Kafka 預設是七天會將舊資料資料清除
    3. 作為切分的依據: Kafka 預設也是七天會將log檔案切分
  • 值 ( value ) 的部分,通常是以 JSON (Javascript Object Notation) 的格式儲存,通常建議可以統一加上一些標示或描述讓訊息可讀性更高、更容易理解。

訊息可以一筆筆的傳送,但是在大數量的情境下會消耗掉很大量的網路傳輸成本,因此 Kafka 是批次寫入的,但是批次寫入一定會造成寫入的延遲性,這必須視情況下去考量,看使用場景是 I/O 重要、還是低延遲比較重要。

3.2 來看看訊息 ( log ) 實際長怎樣吧

Kafka內建的kafka-run-class kafka.tools.DumpLogSegments指令可以查看log檔案內容,—file為必帶、用逗號隔開可查詢多個檔案,指令是精華後面會再詳細介紹。

這邊主要是先讓大家看一下,訊息的 metadata 有哪些,一般需要知道就是上面提到的索引鍵、值、時間戳和偏移量。

可以看到 Kafka 是會先訊息累積到 batch 中,等累積達一定量再送出,先以這一筆來看,就是累積了三筆 ( count: 3 ) 一次送出。

baseOffset: 0 lastOffset: 2 count: 3 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1631771619770 size: 98 magic: 2 compresscodec: NONE crc: 16374966 isvalid: true

以下是主題 topic_for_test_lot 三個分區的 log 檔案內容

但是這邊看不到訊息實際的內容,只有一個個包起來的批次

$ kafka-run-class kafka.tools.DumpLogSegments --files broker1/topic_for_test_log-0/00000000000000000000.log,broker1/topic_for_test_log-1/00000000000000000000.log,broker2/topic_for_test_log-2/00000000000000000000.log

Dumping broker1/topic_for_test_log-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 2 count: 3 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1631771619770 size: 98 magic: 2 compresscodec: NONE crc: 16374966 isvalid: true
baseOffset: 3 lastOffset: 4 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 98 CreateTime: 1631771621294 size: 81 magic: 2 compresscodec: NONE crc: 487960023 isvalid: true
baseOffset: 5 lastOffset: 10 count: 6 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 179 CreateTime: 1631771625952 size: 128 magic: 2 compresscodec: NONE crc: 3083079563 isvalid: true
baseOffset: 11 lastOffset: 16 count: 6 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 307 CreateTime: 1631771634662 size: 124 magic: 2 compresscodec: NONE crc: 1152822458 isvalid: true


Dumping broker1/topic_for_test_log-1/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 2 count: 3 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1631771626645 size: 95 magic: 2 compresscodec: NONE crc: 1577038537 isvalid: true
baseOffset: 3 lastOffset: 9 count: 7 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 95 CreateTime: 1631771633592 size: 135 magic: 2 compresscodec: NONE crc: 372605860 isvalid: true
baseOffset: 10 lastOffset: 15 count: 6 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 230 CreateTime: 1631771635665 size: 122 magic: 2 compresscodec: NONE crc: 2359930090 isvalid: true


Dumping broker2/topic_for_test_log-2/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 5 count: 6 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1631771620918 size: 124 magic: 2 compresscodec: NONE crc: 2581760372 isvalid: true
baseOffset: 6 lastOffset: 6 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 124 CreateTime: 1631771627429 size: 69 magic: 2 compresscodec: NONE crc: 3572829527 isvalid: true
baseOffset: 7 lastOffset: 7 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 193 CreateTime: 1631771635834 size: 69 magic: 2 compresscodec: NONE crc: 1038836103 isvalid: true

如果想要看到每個 batch 中的每一筆訊息內容(payload欄位),還需要加上參數--print-data-log,這樣就可以看到真實的記錄了

$ kafka-run-class kafka.tools.DumpLogSegments --files broker1/topic_for_test_log-0/00000000000000000000.log --print-data-log

Dumping broker1/topic_for_test_log-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 2 count: 3 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1631771619770 size: 98 magic: 2 compresscodec: NONE crc: 16374966 isvalid: true
| offset: 0 isValid: true crc: null keySize: -1 valueSize: 7 CreateTime: 1631771618877 baseOffset: 0 lastOffset: 2 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 98 magic: 2 compressType: NONE position: 0 sequence: -1 headerKeys: [] payload: asdf as
| offset: 1 isValid: true crc: null keySize: -1 valueSize: 3 CreateTime: 1631771619471 baseOffset: 0 lastOffset: 2 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 98 magic: 2 compressType: NONE position: 0 sequence: -1 headerKeys: [] payload: sdf
| offset: 2 isValid: true crc: null keySize: -1 valueSize: 4 CreateTime: 1631771619770 baseOffset: 0 lastOffset: 2 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 98 magic: 2 compressType: NONE position: 0 sequence: -1 headerKeys: [] payload: asdf
baseOffset: 3 lastOffset: 4 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 98 CreateTime: 1631771621294 size: 81 magic: 2 compresscodec: NONE crc: 487960023 isvalid: true
| offset: 3 isValid: true crc: null keySize: -1 valueSize: 2 CreateTime: 1631771621106 baseOffset: 3 lastOffset: 4 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 81 magic: 2 compressType: NONE position: 98 sequence: -1 headerKeys: [] payload: as
| offset: 4 isValid: true crc: null keySize: -1 valueSize: 3 CreateTime: 1631771621294 baseOffset: 3 lastOffset: 4 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 81 magic: 2 compressType: NONE position: 98 sequence: -1 headerKeys: [] payload: dfa
...
...
baseOffset: 11 lastOffset: 16 count: 6 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 307 CreateTime: 1631771634662 size: 124 magic: 2 compresscodec: NONE crc: 1152822458 isvalid: true
| offset: 11 isValid: true crc: null keySize: -1 valueSize: 4 CreateTime: 1631771633791 baseOffset: 11 lastOffset: 16 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 124 magic: 2 compressType: NONE position: 307 sequence: -1 headerKeys: [] payload: asdf
...
...
| offset: 15 isValid: true crc: null keySize: -1 valueSize: 2 CreateTime: 1631771634475 baseOffset: 11 lastOffset: 16 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 124 magic: 2 compressType: NONE position: 307 sequence: -1 headerKeys: [] payload: fa
| offset: 16 isValid: true crc: null keySize: -1 valueSize: 3 CreateTime: 1631771634662 baseOffset: 11 lastOffset: 16 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 124 magic: 2 compressType: NONE position: 307 sequence: -1 headerKeys: [] payload: dsf

3.3 訊息的元資料 ( metadata )到底存了什麼

紀錄是看到了,那到底儲存在最深處的元資料代表了什麼,以下一個一個參數為大家初步介紹,後續篇章都會有近一步實際範例去使用、影響這些參數,目前只需要有個印象就足夠了。

  • baseOffset: 0: 該批次的起始偏移量
  • lastOffset: 2: 該批次結束的偏移量
  • count: 3: 該批次的紀錄有幾筆
  • baseSequence: -1: 需要設定消費者設定 idempotent 參數為真才會生效,否則為 -1
  • lastSequence: -1: 需要設定消費者設定 idempotent 參數為真才會生效,否則為 -1
  • producerId: -1: 生產者的編號,這邊因為使用內建的shell去新增資料所以為 -1
  • producerEpoch: -1 : 生產者的 Epoch 編號,這邊因為使用內建的shell去新增資料所以為 -1
  • partitionLeaderEpoch: 0: 該筆資料的 partition leader 是誰
  • isTransactional: false: 是否為事務
  • isControl: false
    • control batch 會有一筆 control record 是用來讓消費者判斷這個批次是否為失敗的事務
  • CreateTime: 1631771619770: Partition Leader 收到訊息的時間點
  • size: 98: 該批次的大小
  • magic: 2 => 代表訊息的版本是 V2,Apache Kafka 在版本0.8之前支援 old consumerold producer 也就是 V1版本,而現行版本都是使用 V2。
  • compresscodec: NONE
    • 壓縮的方式預設是 NONE,建議是都要開啟
    • 1:None, 2: Gzip, 3: Snappy, 4: Lz4

  • crc: 16374966
    • 是用來驗證資料在寫入硬碟的過程中是否有意外中斷導致資料遺失
    • 無可避免的會增加一些資料量,如果追求極致的效能,可以考慮關閉此驗證功能

下列是 on-disk 的 RecordBatch 資料格式:

		baseOffset: int64
		batchLength: int32
		partitionLeaderEpoch: int32
		magic: int8 (current magic value is 2)
		crc: int32
		attributes: int16
			bit 0~2:
				0: no compression
				1: gzip
				2: snappy
				3: lz4
				4: zstd
			bit 3: timestampType
			bit 4: isTransactional (0 means not transactional)
			bit 5: isControlBatch (0 means not a control batch)
			bit 6~15: unused
		lastOffsetDelta: int32
		firstTimestamp: int64
		maxTimestamp: int64
		producerId: int64
		producerEpoch: int16
		baseSequence: int32
		records: [Record]

CRC 驗證的範圍是從attributes一直到這個batch的尾端,CRC 位置是在 magic 之後,這是因為必須先解析magic 類型才能決定要怎麼解析batch lengthmagic 之間的 byte,而 partition leader epoch 也不包含在 CRC 驗證的區間,這是為了避免這個欄位值在被 broker 重新分配時 CRC 必須要重新計算,這裡 CRC 是使用 CRC-32C 來計算。


  • isvalid: true

這個參數是 Kafka 2.4.0版本新增的,主要是為了更清楚地顯示錯誤的發生原因,像是版本 ( magic ) 對應錯誤、CRCchecksum 錯誤...等,會藉由 future 物件 RecordMetadata 將詳細的錯誤原因回傳。


資料來源:


上一篇
卡夫卡的藏書閣【Book2】- 學習資源介紹和Kafka架構微介紹
下一篇
卡夫卡的藏書閣【Book4】- Kafka 主題、偏移量、分區
系列文
『卡夫卡的藏書閣』- 程序猿必須懂的Kafka開發與實作30

尚未有邦友留言

立即登入留言