iT邦幫忙

2021 iThome 鐵人賽

DAY 24
1
DevOps

喬叔帶你上手 Elastic Stack - 探索與實踐 Observability系列 第 24

24 - 建立結構化的 Log (2/4) - Elasticsearch Ingest Pipeline 資料 Index 前的轉換好幫手 - 基本介紹

建立結構化的 Log 系列文章


本篇學習重點

  • Elasticserach 內建的 Ingest Pipeline 基本介紹
  • 如何使用 Ingest Pipeline
  • 使用 Ingest Pipeline 時的注意事項

Ingest Pipeline 的功用

Ingest Pipeline (擷取管道) 是一個內建在 Elasticsearch 中,文件在進入 Index 前的資料轉換 (Transformation) 的工具,主要的任務就是針對透過 Indexing RESTful API 傳入的文件,在真正進入 Elasticsearch Indexing 處理之前,先進行前處理,這個前處理可以像是以下幾種例子:

  • 將原始的資料豐富化 (enrich),透過查找 Elasticsearch 裡存放在別的 Index 裡的相關資料,加入到文件之中,來豐富原有的文件。
  • 將日期格式正確的從文件中的某個欄位擷取出來,讓 Elasticsearch 的 @timestamp 有正確的時間。
  • 將 IP 的欄位,透過反查 GeoIP 的資料庫,加入 GeoLocation 的資訊在文件中,以利於之後能使用地圖檢視資料。
  • 將文字內容,依照特定的格式,拆解成結構化的 JSON 欄位與值。
  • 將 URL 拆解成包含 pathschemeportdomainquery…各個欄位。
  • 將 URL 中 Query String 裡的 Key / Value,轉成結構化的 JSON 欄位與值。
  • 當某個條件成立時,填加某個固定值。

Ingest Pipeline 的運作及使用方式

24-ingest-pipeline-flow

上圖是 Ingest Pipeline 運作時的主要流程,在這過程中,我們分別解釋每個階段的運作及操作方式:

  • Incoming Documents (傳入的文件): 在使用 Indexing API、或是 _bulk API 將文件傳入至 Elasticsearch 準備進行 Indexing 時,可以指定要使用哪一組預先定義好的 Ingest Pipeline。
  • Ingest Pipeline (擷取管道): Elasticsearch 在到 Indexing 的請求時,如果有指定 Ingest Pipeline,Coordinator (協調者) Node 會把這個請求,交給 ingest node,透過定義好的 Pipeline 設定,經由當中指定的各種 Processor (處理器) 一步一步的將資料進行處理。
  • Target Index (目標索引): Ingest Pipeline 在處理完之後,會將最終的文件,透過 Coordinator 傳送到 Primary Shard 所在的 Node,進行 Indexing 後續的處理。

接下來將會依照實際準備時的步驟,進行說明。

定義 Ingest Pipeline

使用 Ingest APIs

在文件進入 Elasticsearch 之前,我們必須先準備好 Ingest Pipeline 的定義,這邊主要是透過 _ingest 的 API 進行設定:

PUT /_ingest/pipeline/<pipeline>
  • <pipeline>:是自己取的 pipeline 名稱

提供一個實際的範例如下:

PUT /_ingest/pipeline/my-pipeline-id
{
  "version": 1,
  "description" : "My optional pipeline description",
  "processors" : [
    {
      "set" : {
        "description" : "My optional processor description",
        "field": "my-keyword-field",
        "value": "foo"
      }
    }
  ],
  "_meta": {
    "reason": "set my-keyword-field to foo",
    "serialization": {
      "class": "MyPipeline",
      "id": 10
    }
  }
}
  • Pipeline 的名字是 my-pipeline-id
  • version 是提供使用者自己記錄及參考,與 ingest pipeline 本身的運作功能無關,是選用的欄位。
  • processors 裡面指定 1 至多個 processors,這部份會是資料 Transform (轉換) 的主要處理,每個 processors 會依照先後順序來執行,一個 processor 做完後會將輸出交給下一個 processor 進行處理,也就是這個功能取名 pipeline 的原因,Elasticsearch 中內建 30 多個 processors,詳細可參考 官方文件 - Ingest Processor Reference
  • _meta 是提供給使用者自己存放自己想要額外加入的資訊所使用的,如果 pipeline 是由程式或其他機制在管理時,可以額外記錄一些參考的資訊。

除了建立 pipeline 的這個 API,Ingest API 總共有提供:

  • Create or update pipeline
  • Get pipeline
  • Delete pipeline

可以參考 官方文件 - Ingest APIs 的使用說明。

使用 Kibana Ingest Node Pipeline

另外 Kibana 也有提供 UI 的設定畫面,可以在 Kibana > Stake Management > Ingest > Ingest Node Pipelines 建立或管理 Ingest Pipeline:

24-kibana-ingest-pipeline

在建立 Create Pipeline 時,就能夠使透過 Add a processor 進入以下的畫面選擇要使用的 processor 並進行相關的設定。

24-kibana-ingest-pipeline-create

上線前使用 Simulate 模擬一下

當我們依照需求定義好 Ingest Pipeline 之後,我們可以透過 _simulate API,並提供測試的文件,確認一下 Ingest Pipeline 的執行結果。

_simulate API 有提供兩種模式,第一種是針對還沒有建立 Pipeline 時,一併透過 API 模擬指定的 Pipeline 定義 + 測試文件,另一種是已經建立好 Pipelien 的定義,提供測試文件來試用。

Simulate 已建立好的 Pipeline

我們針對 my-pipeline-id 這個 pipeline 來測試,並提供兩份文件:

POST /_ingest/pipeline/my-pipeline-id/_simulate
{
  "docs": [
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "foo": "bar"
      }
    },
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "foo": "rab"
      }
    }
  ]
}

可以得到回傳結果:

{
   "docs": [
      {
         "doc": {
            "_id": "id",
            "_index": "index",
            "_type": "_doc",
            "_source": {
               "field2": "_value",
               "foo": "bar"
            },
            "_ingest": {
               "timestamp": "2017-05-04T22:30:03.187Z"
            }
         }
      },
      {
         "doc": {
            "_id": "id",
            "_index": "index",
            "_type": "_doc",
            "_source": {
               "field2": "_value",
               "foo": "rab"
            },
            "_ingest": {
               "timestamp": "2017-05-04T22:30:03.188Z"
            }
         }
      }
   ]
}

Simulate 指定的 Pipeline 規則 + 測試的文件

我們不用指定 Pipeline 名稱,直接使用 _ingest/pipeline/_simulate 的 API,並且在 Request Body 中帶入 pipeline 的定義:

POST /_ingest/pipeline/_simulate
{
  "pipeline" :
  {
    "description": "_description",
    "processors": [
      {
        "set" : {
          "field" : "field2",
          "value" : "_value"
        }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "foo": "bar"
      }
    },
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "foo": "rab"
      }
    }
  ]
}

以下是 Simulate 的回傳結果:

{
   "docs": [
      {
         "doc": {
            "_id": "id",
            "_index": "index",
            "_type": "_doc",
            "_source": {
               "field2": "_value",
               "foo": "bar"
            },
            "_ingest": {
               "timestamp": "2017-05-04T22:30:03.187Z"
            }
         }
      },
      {
         "doc": {
            "_id": "id",
            "_index": "index",
            "_type": "_doc",
            "_source": {
               "field2": "_value",
               "foo": "rab"
            },
            "_ingest": {
               "timestamp": "2017-05-04T22:30:03.188Z"
            }
         }
      }
   ]
}

使用 Kibana 的 Test Pipeline

在 Kibana 之中,也有對應的 Test Pipeline 功能,可以直接填入測試的文件,來檢驗 Pipeline 的運作結果。

24-kibana-ingest-pipeline-test

Indexing 資料時,使用 Ingest Pipeline 的方法

當上述的方式建立好 Ingest Pipeline 之後,我們將 Document Indexing 進入 Elasticsearch 時,就可以指定要使用 Ingest Pipeline,以下是常用的幾種方式:

Index API

使用 Index API 時,指定 pipeline 的名字

POST my-data-stream/_doc?pipeline=my-pipeline
{
  "@timestamp": "2099-03-07T11:04:05.000Z",
  "my-keyword-field": "foo"
}

Bulk API

使用 Bulk API 時,同樣也可以指定 pipeline 的名字

PUT my-data-stream/_bulk?pipeline=my-pipeline
{ "create":{ } }
{ "@timestamp": "2099-03-07T11:04:06.000Z", "my-keyword-field": "foo" }
{ "create":{ } }
{ "@timestamp": "2099-03-07T11:04:07.000Z", "my-keyword-field": "bar" }

Update By Query

使用 Update by Query 時,也可以指定 pipeline 的名字

POST my-data-stream/_update_by_query?pipeline=my-pipeline

Reindex

Reindex 時也可以指定 pipeline 的名字

POST _reindex
{
  "source": {
    "index": "my-data-stream"
  },
  "dest": {
    "index": "my-new-data-stream",
    "op_type": "create",
    "pipeline": "my-pipeline"
  }
}

Index Setting 或透過 Index Tempate

在 Index Setting 中,可以透過以下的設定,來決定資料寫入這個 Index 時,要透過 ingest pipeline 來處理:

  • index.default_pipeline:如果 request 沒有帶入指定的 pipeline,就會依照這個設定來執行。
  • index.final_pipeline:這是不論有沒有其他經由 request 帶入的 pipeline 或是 default_pipeline 的設定,在最後都一定會執行的的 pipeline 設定。(也就是如果有其他指定的 pipeline,這兩種 pipeline 都會被執行)

因此也可以透過 Index Template 指定 Index setting 中的這兩個設定值。

其他 Elastic Stack

其他 Beats、Logstash,也都會有對應的設定,能夠在資料透過 Index API 或是 Bulk API 寫入時,指定要使用的 Ingest Pipeline,詳細請參考這些產品的官方文件說明。

使用 Ingest Pipeline 的注意事項

  • Elasticsearch Cluster 中,至少要有一個有啟動 ingest 角色的 Node,如果 Ingest 的工作量很繁重的話,建議安排專門處理 Ingest 的 Node 來進行 Ingest 任務的處理,又或是使用 Logstash 等其他 ETL 工具,避免佔用資源而影響 Elasticsearch 其他功能的運作。
  • 如果有啟用 Security 的功能,會需要擁有 manage_pipeline 的權限,如果要從 Kibana 的 Ingest Node Pipeline 畫面來操作 Ingest Pipeline 的功能的話,另外還會需要 cluster:monitor/nodes/info 的權限。
  • 在使用 Pipeline 時,版本管理也會是很重要的一件事,為了能更有效率的避免 Pipeline 的定義是舊版,而造成資料 Indexing 時是以非預期的方式處理,善用 version 的版本號碼,並且將 Pipeline 的定義進行版控管理,在佈署或除錯時,也能多透過確認 Pipeline 的 version 來確保版本的正確性。

以上介紹了 Elasticsearch Ingest Pipeline 的基本說明,接下來會以實際的例子進行介紹,說明 Ingest Pipeline 如何協助我們將 Log 結構化。

參考資料

  1. 官方文件 - Ingest Pipelines
  2. 官方文件 - Ingest Processor Reference
  3. 官方文件 - Ingest APIs

上一篇
23 - 建立結構化的 Log (1/4) - Elastic Common Schema 結構化 Log 的規範
下一篇
25 - 建立結構化的 Log (3/4) - Elasticsearch Ingest Pipeline 資料 Index 前的轉換好幫手 - 各種常用的 Processor
系列文
喬叔帶你上手 Elastic Stack - 探索與實踐 Observability31

尚未有邦友留言

立即登入留言