“Paths are made by walking”
― Franz Kafka
千里之行,始於足下
KafkaJS 為了要盡量降低關聯的套件數量,預設只有包含 GZIP,但是未來是有考慮要包含其他壓縮方式進來
接下來我們在生產者的 send 加上可選選項壓縮方式(optional compression),並且指定為 GZIP
const { CompressionTypes } = require('kafkajs')
async () => {
await producer.send({
topic: 'topic-name',
compression: CompressionTypes.GZIP,
messages: [
{ key: 'key1', value: 'hello world' },
{ key: 'key2', value: 'hey hey!' }
],
})
}
消費者是可以自己解開zip壓縮,不需要額外的動作,在之後的消費者篇章會提到
安裝套件 kafkajs-snappy 就可以使用 snappy 來壓縮
Snappy是用C++開發的壓縮和解壓縮開發包,旨在提供高速壓縮速度和合理的壓縮率。雖然生成的壓縮檔案可能會比其他壓縮庫的要大上20%至100%,但是,相比其他的壓縮庫,Snappy卻能夠在特定的壓縮率下擁有驚人的壓縮速度。
npm install --save kafkajs-snappy
# yarn add kafkajs-snappy
const { CompressionTypes, CompressionCodecs } = require('kafkajs')
const SnappyCodec = require('kafkajs-snappy')
CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec
安裝套件 kafkajs-lz4 就可以使用 LZ4
npm install --save kafkajs-lz4
# yarn add kafkajs-lz4
const { CompressionTypes, CompressionCodecs } = require('kafkajs')
const LZ4 = require('kafkajs-lz4')
CompressionCodecs[CompressionTypes.LZ4] = new LZ4().codec
kafkajs-lz4 套件還有支援一些可選選項去調整 LZ4 的壓縮程度
如下,可以設定最高壓縮度(16)
const lz4Codec = new LZ4Codec({
preferences: {
compressionLevel: 16
}
}).codec;
CompressionCodecs[CompressionTypes.LZ4] = lz4Codec;
Compression level 設定範圍是從 1 ~ 16,數字越大代表壓縮比越大,如果設定大於16也會同樣視為16,壓縮比是用壓縮速度交換而來,通常建議的設定值是1、代表最快的壓縮速度,同時也是預設值,如果想要高壓縮比建議設定為9,當然,壓縮速度和壓縮比率的關係會依據要壓縮資料的而有所不同、所以需要視情況調整。
解壓縮速度的部分,不管 Compression level 設定為何速度都會一樣快。
要使用 Zstandard 要安裝套件 @kafkajs/zstd
npm install --save @kafkajs/zstd
# yarn add @kafkajs/zstd
const { CompressionTypes, CompressionCodecs } = require('kafkajs')
const ZstdCodec = require('@kafkajs/zstd')
CompressionCodecs[CompressionTypes.ZSTD] = ZstdCodec()
帶入參數一樣可以設定壓縮比,壓縮速度最高是 level 1
const compressionParams = { level: 1 }
const decompressionParams = {}
CompressionCodecs[CompressionTypes.ZSTD] = ZstdCodec(compressionParams, decompressionParams)
客製化編碼解碼器
可以簡單地用現成的 lib 去實作編碼解碼器 ( codec ),一個編碼解碼器是一個有兩個 async 方法的物件,分別是壓縮 ( compress ) 和解壓縮 ( decompress )
const MyCustomSnappyCodec = {
async compress(encoder) {
return someCompressFunction(encoder.buffer)
},
async decompress(buffer) {
return someDecompressFunction(buffer)
}
}
現在將它實作
const { CompressionTypes, CompressionCodecs } = require('kafkajs')
CompressionCodecs[CompressionTypes.Snappy] = () => MyCustomSnappyCodec
新的編碼解碼器 ( codec ) 現在可以使用了
await producer.send({
topic: 'topic-name',
compression: CompressionTypes.Snappy,
messages: [
{ key: 'key1', value: 'hello world' },
{ key: 'key2', value: 'hey hey!' }
],
})
p.s. 編解碼器對應的英文「codec」(compress和decompress簡化而成的合成詞語)
資料來源:
https://github.com/indix/kafkajs-lz4
https://github.com/tulios/kafkajs-snappy
https://github.com/kafkajs/zstd