Ingest Pipeline (擷取管道) 是一個內建在 Elasticsearch 中,文件在進入 Index 前的資料轉換 (Transformation) 的工具,主要的任務就是針對透過 Indexing RESTful API 傳入的文件,在真正進入 Elasticsearch Indexing 處理之前,先進行前處理,這個前處理可以像是以下幾種例子:
@timestamp
有正確的時間。path
、scheme
、port
、domain
、query
…各個欄位。上圖是 Ingest Pipeline 運作時的主要流程,在這過程中,我們分別解釋每個階段的運作及操作方式:
_bulk
API 將文件傳入至 Elasticsearch 準備進行 Indexing 時,可以指定要使用哪一組預先定義好的 Ingest Pipeline。ingest
node,透過定義好的 Pipeline 設定,經由當中指定的各種 Processor (處理器) 一步一步的將資料進行處理。接下來將會依照實際準備時的步驟,進行說明。
在文件進入 Elasticsearch 之前,我們必須先準備好 Ingest Pipeline 的定義,這邊主要是透過 _ingest
的 API 進行設定:
PUT /_ingest/pipeline/<pipeline>
<pipeline>
:是自己取的 pipeline 名稱提供一個實際的範例如下:
PUT /_ingest/pipeline/my-pipeline-id
{
"version": 1,
"description" : "My optional pipeline description",
"processors" : [
{
"set" : {
"description" : "My optional processor description",
"field": "my-keyword-field",
"value": "foo"
}
}
],
"_meta": {
"reason": "set my-keyword-field to foo",
"serialization": {
"class": "MyPipeline",
"id": 10
}
}
}
my-pipeline-id
。version
是提供使用者自己記錄及參考,與 ingest pipeline 本身的運作功能無關,是選用的欄位。processors
裡面指定 1 至多個 processors,這部份會是資料 Transform (轉換) 的主要處理,每個 processors 會依照先後順序來執行,一個 processor 做完後會將輸出交給下一個 processor 進行處理,也就是這個功能取名 pipeline 的原因,Elasticsearch 中內建 30 多個 processors,詳細可參考 官方文件 - Ingest Processor Reference。_meta
是提供給使用者自己存放自己想要額外加入的資訊所使用的,如果 pipeline 是由程式或其他機制在管理時,可以額外記錄一些參考的資訊。除了建立 pipeline 的這個 API,Ingest API 總共有提供:
可以參考 官方文件 - Ingest APIs 的使用說明。
另外 Kibana 也有提供 UI 的設定畫面,可以在 Kibana > Stake Management > Ingest > Ingest Node Pipelines 建立或管理 Ingest Pipeline:
在建立 Create Pipeline 時,就能夠使透過 Add a processor 進入以下的畫面選擇要使用的 processor 並進行相關的設定。
當我們依照需求定義好 Ingest Pipeline 之後,我們可以透過 _simulate
API,並提供測試的文件,確認一下 Ingest Pipeline 的執行結果。
_simulate
API 有提供兩種模式,第一種是針對還沒有建立 Pipeline 時,一併透過 API 模擬指定的 Pipeline 定義 + 測試文件,另一種是已經建立好 Pipelien 的定義,提供測試文件來試用。
我們針對 my-pipeline-id
這個 pipeline 來測試,並提供兩份文件:
POST /_ingest/pipeline/my-pipeline-id/_simulate
{
"docs": [
{
"_index": "index",
"_id": "id",
"_source": {
"foo": "bar"
}
},
{
"_index": "index",
"_id": "id",
"_source": {
"foo": "rab"
}
}
]
}
可以得到回傳結果:
{
"docs": [
{
"doc": {
"_id": "id",
"_index": "index",
"_type": "_doc",
"_source": {
"field2": "_value",
"foo": "bar"
},
"_ingest": {
"timestamp": "2017-05-04T22:30:03.187Z"
}
}
},
{
"doc": {
"_id": "id",
"_index": "index",
"_type": "_doc",
"_source": {
"field2": "_value",
"foo": "rab"
},
"_ingest": {
"timestamp": "2017-05-04T22:30:03.188Z"
}
}
}
]
}
我們不用指定 Pipeline 名稱,直接使用 _ingest/pipeline/_simulate
的 API,並且在 Request Body 中帶入 pipeline
的定義:
POST /_ingest/pipeline/_simulate
{
"pipeline" :
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value" : "_value"
}
}
]
},
"docs": [
{
"_index": "index",
"_id": "id",
"_source": {
"foo": "bar"
}
},
{
"_index": "index",
"_id": "id",
"_source": {
"foo": "rab"
}
}
]
}
以下是 Simulate 的回傳結果:
{
"docs": [
{
"doc": {
"_id": "id",
"_index": "index",
"_type": "_doc",
"_source": {
"field2": "_value",
"foo": "bar"
},
"_ingest": {
"timestamp": "2017-05-04T22:30:03.187Z"
}
}
},
{
"doc": {
"_id": "id",
"_index": "index",
"_type": "_doc",
"_source": {
"field2": "_value",
"foo": "rab"
},
"_ingest": {
"timestamp": "2017-05-04T22:30:03.188Z"
}
}
}
]
}
在 Kibana 之中,也有對應的 Test Pipeline 功能,可以直接填入測試的文件,來檢驗 Pipeline 的運作結果。
當上述的方式建立好 Ingest Pipeline 之後,我們將 Document Indexing 進入 Elasticsearch 時,就可以指定要使用 Ingest Pipeline,以下是常用的幾種方式:
使用 Index API 時,指定 pipeline
的名字
POST my-data-stream/_doc?pipeline=my-pipeline
{
"@timestamp": "2099-03-07T11:04:05.000Z",
"my-keyword-field": "foo"
}
使用 Bulk API 時,同樣也可以指定 pipeline
的名字
PUT my-data-stream/_bulk?pipeline=my-pipeline
{ "create":{ } }
{ "@timestamp": "2099-03-07T11:04:06.000Z", "my-keyword-field": "foo" }
{ "create":{ } }
{ "@timestamp": "2099-03-07T11:04:07.000Z", "my-keyword-field": "bar" }
使用 Update by Query 時,也可以指定 pipeline
的名字
POST my-data-stream/_update_by_query?pipeline=my-pipeline
Reindex 時也可以指定 pipeline
的名字
POST _reindex
{
"source": {
"index": "my-data-stream"
},
"dest": {
"index": "my-new-data-stream",
"op_type": "create",
"pipeline": "my-pipeline"
}
}
在 Index Setting 中,可以透過以下的設定,來決定資料寫入這個 Index 時,要透過 ingest pipeline 來處理:
index.default_pipeline
:如果 request 沒有帶入指定的 pipeline
,就會依照這個設定來執行。index.final_pipeline
:這是不論有沒有其他經由 request 帶入的 pipeline
或是 default_pipeline
的設定,在最後都一定會執行的的 pipeline 設定。(也就是如果有其他指定的 pipeline,這兩種 pipeline 都會被執行)因此也可以透過 Index Template 指定 Index setting 中的這兩個設定值。
其他 Beats、Logstash,也都會有對應的設定,能夠在資料透過 Index API 或是 Bulk API 寫入時,指定要使用的 Ingest Pipeline,詳細請參考這些產品的官方文件說明。
ingest
角色的 Node,如果 Ingest 的工作量很繁重的話,建議安排專門處理 Ingest 的 Node 來進行 Ingest 任務的處理,又或是使用 Logstash 等其他 ETL 工具,避免佔用資源而影響 Elasticsearch 其他功能的運作。manage_pipeline
的權限,如果要從 Kibana 的 Ingest Node Pipeline 畫面來操作 Ingest Pipeline 的功能的話,另外還會需要 cluster:monitor/nodes/info
的權限。version
的版本號碼,並且將 Pipeline 的定義進行版控管理,在佈署或除錯時,也能多透過確認 Pipeline 的 version
來確保版本的正確性。以上介紹了 Elasticsearch Ingest Pipeline 的基本說明,接下來會以實際的例子進行介紹,說明 Ingest Pipeline 如何協助我們將 Log 結構化。
查看最新 Elasticsearch 或是 Elastic Stack 教育訓練資訊: https://training.onedoggo.com
歡迎追蹤我的 FB 粉絲頁: 喬叔 - Elastic Stack 技術交流
不論是技術分享的文章、公開線上分享、或是實體課程資訊,都會在粉絲頁通知大家哦!