iT邦幫忙

2021 iThome 鐵人賽

DAY 14
0
Modern Web

『卡夫卡的藏書閣』- 程序猿必須懂的Kafka開發與實作系列 第 14

卡夫卡的藏書閣【Book14】- KafkaJS 生產者 - 壓縮 2

  • 分享至 

  • twitterImage
  •  

“Paths are made by walking”
― Franz Kafka
千里之行,始於足下


KafkaJS 為了要盡量降低關聯的套件數量,預設只有包含 GZIP,但是未來是有考慮要包含其他壓縮方式進來

接下來我們在生產者的 send 加上可選選項壓縮方式(optional compression),並且指定為 GZIP

const { CompressionTypes } = require('kafkajs')

async () => {
  await producer.send({
    topic: 'topic-name',
    compression: CompressionTypes.GZIP,
    messages: [
        { key: 'key1', value: 'hello world' },
        { key: 'key2', value: 'hey hey!' }
    ],
  })
}

消費者是可以自己解開zip壓縮,不需要額外的動作,在之後的消費者篇章會提到

Snappy

安裝套件 kafkajs-snappy 就可以使用 snappy 來壓縮
Snappy是用C++開發的壓縮和解壓縮開發包,旨在提供高速壓縮速度和合理的壓縮率。雖然生成的壓縮檔案可能會比其他壓縮庫的要大上20%至100%,但是,相比其他的壓縮庫,Snappy卻能夠在特定的壓縮率下擁有驚人的壓縮速度。

npm install --save kafkajs-snappy

# yarn add kafkajs-snappy
const {  CompressionTypes, CompressionCodecs } = require('kafkajs')
const SnappyCodec = require('kafkajs-snappy')

CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec

LZ4

安裝套件 kafkajs-lz4 就可以使用 LZ4

npm install --save kafkajs-lz4
# yarn add kafkajs-lz4
const { CompressionTypes, CompressionCodecs } = require('kafkajs')
const LZ4 = require('kafkajs-lz4')

CompressionCodecs[CompressionTypes.LZ4] = new LZ4().codec

kafkajs-lz4 套件還有支援一些可選選項去調整 LZ4 的壓縮程度
如下,可以設定最高壓縮度(16)

const lz4Codec = new LZ4Codec({
    preferences: {
        compressionLevel: 16
    }
}).codec;
 
CompressionCodecs[CompressionTypes.LZ4] = lz4Codec;

Compression level 設定範圍是從 1 ~ 16,數字越大代表壓縮比越大,如果設定大於16也會同樣視為16,壓縮比是用壓縮速度交換而來,通常建議的設定值是1、代表最快的壓縮速度,同時也是預設值,如果想要高壓縮比建議設定為9,當然,壓縮速度和壓縮比率的關係會依據要壓縮資料的而有所不同、所以需要視情況調整。
解壓縮速度的部分,不管 Compression level 設定為何速度都會一樣快。

ZSTD

要使用 Zstandard 要安裝套件 @kafkajs/zstd

npm install --save @kafkajs/zstd
# yarn add @kafkajs/zstd
const { CompressionTypes, CompressionCodecs } = require('kafkajs')
const ZstdCodec = require('@kafkajs/zstd')

CompressionCodecs[CompressionTypes.ZSTD] = ZstdCodec()

帶入參數一樣可以設定壓縮比,壓縮速度最高是 level 1

const compressionParams = { level: 1 }
const decompressionParams = {}
CompressionCodecs[CompressionTypes.ZSTD] = ZstdCodec(compressionParams, decompressionParams)

客製化編碼解碼器
可以簡單地用現成的 lib 去實作編碼解碼器 ( codec ),一個編碼解碼器是一個有兩個 async 方法的物件,分別是壓縮 ( compress ) 和解壓縮 ( decompress )

const MyCustomSnappyCodec = {
    async compress(encoder) {
        return someCompressFunction(encoder.buffer)
    },

    async decompress(buffer) {
        return someDecompressFunction(buffer)
    }
}

現在將它實作

const { CompressionTypes, CompressionCodecs } = require('kafkajs')
CompressionCodecs[CompressionTypes.Snappy] = () => MyCustomSnappyCodec

新的編碼解碼器 ( codec ) 現在可以使用了

await producer.send({
    topic: 'topic-name',
    compression: CompressionTypes.Snappy,
    messages: [
        { key: 'key1', value: 'hello world' },
        { key: 'key2', value: 'hey hey!' }
    ],
})

p.s. 編解碼器對應的英文「codec」(compress和decompress簡化而成的合成詞語)


  • 壓縮比:LZ4 > GZIP > Snappy
  • 吞吐量:LZ4 > Snappy > GZIP

資料來源:
https://github.com/indix/kafkajs-lz4
https://github.com/tulios/kafkajs-snappy
https://github.com/kafkajs/zstd


上一篇
卡夫卡的藏書閣【Book13】- KafkaJS 生產者 1
下一篇
卡夫卡的藏書閣【Book15】- KafkaJS 生產者 - 參數設定 3
系列文
『卡夫卡的藏書閣』- 程序猿必須懂的Kafka開發與實作30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言