iT邦幫忙

2023 iThome 鐵人賽

DAY 14
0
Software Development

由淺入深來探討Elasticsearch,從基礎語法到底層相關原理系列 第 14

【Day 14】由淺入深來探討Elasticsearch - Ingest Pipeline

  • 分享至 

  • xImage
  •  

昨天介紹了runtime field,也就是schema on read,相當於將存在索引的資料在讀取前進行處理
如果我們對於數據處理有想法了
有辦法在進入索引前透過ES內建的功能處理嗎?
今天要介紹的ingest pipeline就有這樣數據預處理的功能

Ingest Pipeline
可以在資料進入索引前進行預處理,資料經過不同processors的處理後,再將處理過的資料存入索引中。如果要使用的話,叢集設定中,需要有至少一個node有設置以下配置

node.roles: [ ingest ]

而其中processors的功能又有以下幾種:

1. grok
像是log出來的message,裡面包含ip、ua等等,使用grok可以將其分割來再給下面其他processors做進一步處理

{
	"grok": {
		"field": "message",
    "patterns": ["%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes:int} %{NUMBER:duration:double}"]
	}
}

2. date
將資料來源欄位,根據設定進行解析最終傳至目標欄位,並且也能調整解析後的格式

{
	"date" : {
		"field" : "initial_date", // 資料來源
    "target_field" : "timestamp", // option 解析後儲存欄位
    "formats" : ["dd/MM/yyyy HH:mm:ss"], // 資料來源的時間格式
    "timezone" : "Europe/Amsterdam" // option 用來輔助分析時間
		"output_format": "yyyy-MM-dd'T'HH:mm:ss.SSSXXX" // option 決定儲存時的格式
	}
}

3. GeoIP
默認使用MaxMind的資料,根據IPv4或是IPv6的ip位置,取得相關的地理位置資訊

{
	"geoip" : {
	  "field" : "ip", // 取得ip位置的欄位
    "target_field" : "geo", // option 從MaxMind資料庫取得的地理資訊
  }
}

還有像是script、user-agent等等這邊就不一一列舉了~
這邊舉幾個讓大家有個初步的概念就好,很好奇自己的資料有沒有可以先ingest的可以看以下文檔:
(進去連接後看左邊的下拉選單有很多欄位可以看)
https://www.elastic.co/guide/en/elasticsearch/reference/current/processors.html

製作與更新pipeline

PUT /_ingest/pipeline/my_pipline-name
{
  "version": 1,
  "description" : "pipeline description",
  "processors" : [
			{
			"lowercase" : {
			        "field": "my-field",
					"ignore_failure": true,
					"on_failure": [
                        {
                            "set": {
                              "description": "error-message",
				              "field": "error.message",
				              "value": "contain Chinese. Cannot lowercase",
				              "override": false
				            }
				          }
				     ]
			    }
			} 
  ],
  "_meta": {
    "reason": "your reason",
		"serialization": "lowercase-type-pipeline"
  }

	
}
  • _meta可以放一些資料,用於描述這個pipeline的用途等等,不過因為是存放在叢集中,所以不要太冗長
  • 更新的話也是一樣的的API,只要記得名字或是你取的id。更改內容後直接調用就好
  • version是一個可選項,讓你在使用query去查詢時,能使用if_version更新特定pipeline
  • 如果是想對object field做篩選,使用下面語法
"processors": [
    {
      "dot_expander": {
        "description": "using for object field",
        "field": "object-field.property"
      }
    }
]
  • ignore_failure:如果有多個processor處理,可能一遇到錯誤就會跳出,可以設置true讓processor失敗也能繼續往下跑
  • 可以設置on_failure參數(圖中是針對單一processor),在error發生時馬上調用裡面的processors
    • 觸發on_failure的話,即使發生錯誤也會繼續跑剩餘的processors
    • 也可以設置成巢狀結構,這個失敗裡面還能繼續包on_failure
    • 或是在pipeline層級設置(如下圖),但是跟針對單一processor不同,觸發的話不會跑剩餘沒跑的processor
PUT _ingest/pipeline/my-pipeline
{
  "processors": [ ... ],
  "on_failure": [
    {
      "set": {
        "description": "Index document to 'failed-<index>'",
        "field": "_index",
        "value": "failed-{{{ _index }}}"
      }
    }
  ]
}

測試pipeline
先創建好pipeline,再調用simulate pipeline API做測試

// 創建pipeline
PUT /_ingest/pipeline/my_log_pipeline
{
  "version": 1,
  "description" : "manage log doc",
  "processors" : [
			{
        "grok": {
          "field": "message",
          "patterns": ["%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes:int} %{NUMBER:duration:double}"]
        }
      }
  ]
}

// 模擬測試
POST _ingest/pipeline/my_log_pipeline/_simulate
{
  "docs":[
    {
      "_source": {
        "message": "55.3.244.1 GET /index.html 15824 0.043"
      }
    }
  ]
}

https://ithelp.ithome.com.tw/upload/images/20230916/20161866Yx0IVa2XfG.png
或是直接調用

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors" : [
			{
        "grok": {
          "field": "message",
          "patterns": ["%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes:int} %{NUMBER:duration:double}"]
        }
      }
  ]
  }, 
  "docs":[
    {
      "_source": {
        "message": "55.3.244.1 GET /index.html 15824 0.043"
      }
    }
  ]
}

查看pipeline

// 看全部pipeline
GET /_ingest/pipeline

也可以使用node state API
裡面的count以及time_in_millis可以幫助我們去看pipeline處理過程的重要資訊

GET _nodes/stats/ingest?filter_path=nodes.*.ingest

// 輸出
"pipline-name": {
            "count": 0,
            "time_in_millis": 0,
            "current": 0,
            "failed": 0,
            "processors": [
              {
                "set": {
                  "type": "set",
                  "stats": {
                    "count": 0,
                    "time_in_millis": 0,
                    "current": 0,
                    "failed": 0
                  }
                }
              }
            ]
          }

刪除pipeline

DELETE /_ingest/pipeline/pipeline_name

在創建索引時也能透過特定參數來調控:
index.default_pipeline:在資料索引前調用,但是如果在indexing的請求中有聲明其他pipeline會被蓋掉
index.final_pipeline:會在default跟request的pipeline之後才調用

在Beat家族匯入資料時,也能在yml檔中指定pipeline

output.elasticsearch:
  hosts: ["localhost:9200"]
  pipeline: my-pipeline-name

在使用index、bulk與reindex API以及update_by_query也能透過參數調用

POST my-data-stream/_doc?pipeline=my-pipeline-name

最後來提及一下pipeline的重要功能:
可以使用enrich processor去豐富你要存入的文檔,資料來源則是已經存在的索引

使用步驟如下:

  1. 製作enrich policy
  2. 執行enrich policy
  3. ingest pipeline設置enrich processor

製作enrich policy

PUT /_enrich/policy/policy_name
{
  "match": {
    "indices": "users",
    "match_field": "email",
    "enrich_fields": ["first_name", "last_name", "city", "zip", "state"]
  }
}
  • 一旦建立了policy,就不能修改,只能採取以下措施
    • 建立以及執行新的policy,或是用其他建立好的enrich processor取代
    • 使用delete enrich policy API刪除policy
  • match代表使用term-level query,也可替換成range或geo_match,端看你的需求
  • indices可以放1或多個source index,但是如果是多個來源,要注意都要有同一個match_field
  • match_field表示source index中的哪個欄位當作搜尋條件的欄位
  • enrich_fields表示要從source index中的哪些欄位取資料加到incoming documents中
    • 不要把match_field跟enrich_fields搞混
    • 例如,我想取source index中message欄位的資料來添加我的incoming documents,而我想篩選id為100以後的資料。其中message就是enrich_field,而id就是match_field
  • query,為可選項,默認是match_all

執行enrich policy

PUT /_enrich/policy/policy_name/_execute
  • 會根據設置的policy來創建enrich index
  • 創建後的enrich index為只讀屬性,並且名稱通常帶有.enrich-*
  • enrich index只可以用來幫助enrich processor,不要用在其他用途
  • 不能更新或是添加文檔進enrich index,如果source index更新並且也想更新enrich index的話
    • 重新製作enrich index
    • 舊的enrich index會在默認時間內刪除

ingest pipeline設置enrich processor

PUT /_ingest/pipeline/pipeline_name
{
  "processors": [
    {
      "enrich": {
        "description": "",
        "policy_name": "your_policy_name",
        "field": "",
        "target_field": ""
      }
    }
  ]
}

在processor中設置,並且附上policy_name

整體流程如下:
https://ithelp.ithome.com.tw/upload/images/20230916/20161866k3K7nUjk28.png

構圖參考:https://www.elastic.co/guide/en/elasticsearch/reference/current/enrich-setup.html

  • incoming document透過index等API要進入target index前,會藉由ingest pipeline中的processors們去過濾或是豐富資料
  • 可以透過設置enrich policy,來將已經有的source index中的資料製作成新的enrich index。再將特定欄位的資料取出,豐富化原有的資料

今天介紹完了ingest pipeline,但是我們還會在mapping的這個大主體待兩天
明天會介紹我們在設置mapping時,還有哪些參數是我們要注意的
以及在開發過程適合哪些設定等等~

參考資料
ingest pipelines:
https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html

grok processor:
https://www.elastic.co/guide/en/elasticsearch/reference/current/grok-processor.html

date processor:
https://www.elastic.co/guide/en/elasticsearch/reference/current/date-processor.html

index pipeline settings:
https://www.elastic.co/guide/en/elasticsearch/reference/current/index-modules.html#index-default-pipeline

enrich policy:
https://www.elastic.co/guide/en/elasticsearch/reference/current/enrich-setup.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/put-enrich-policy-api.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/execute-enrich-policy-api.html


上一篇
【Day 13】由淺入深來探討Elasticsearch - Runtime fields
下一篇
【Day 15】由淺入深來探討Elasticsearch - Mapping parameters
系列文
由淺入深來探討Elasticsearch,從基礎語法到底層相關原理30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言