昨天介紹了runtime field,也就是schema on read,相當於將存在索引的資料在讀取前進行處理
如果我們對於數據處理有想法了
有辦法在進入索引前透過ES內建的功能處理嗎?
今天要介紹的ingest pipeline就有這樣數據預處理的功能
Ingest Pipeline:
可以在資料進入索引前進行預處理,資料經過不同processors的處理後,再將處理過的資料存入索引中。如果要使用的話,叢集設定中,需要有至少一個node有設置以下配置
node.roles: [ ingest ]
而其中processors的功能又有以下幾種:
1. grok
像是log出來的message,裡面包含ip、ua等等,使用grok可以將其分割來再給下面其他processors做進一步處理
{
"grok": {
"field": "message",
"patterns": ["%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes:int} %{NUMBER:duration:double}"]
}
}
2. date
將資料來源欄位,根據設定進行解析最終傳至目標欄位,並且也能調整解析後的格式
{
"date" : {
"field" : "initial_date", // 資料來源
"target_field" : "timestamp", // option 解析後儲存欄位
"formats" : ["dd/MM/yyyy HH:mm:ss"], // 資料來源的時間格式
"timezone" : "Europe/Amsterdam" // option 用來輔助分析時間
"output_format": "yyyy-MM-dd'T'HH:mm:ss.SSSXXX" // option 決定儲存時的格式
}
}
3. GeoIP
默認使用MaxMind的資料,根據IPv4或是IPv6的ip位置,取得相關的地理位置資訊
{
"geoip" : {
"field" : "ip", // 取得ip位置的欄位
"target_field" : "geo", // option 從MaxMind資料庫取得的地理資訊
}
}
還有像是script、user-agent等等這邊就不一一列舉了~
這邊舉幾個讓大家有個初步的概念就好,很好奇自己的資料有沒有可以先ingest的可以看以下文檔:
(進去連接後看左邊的下拉選單有很多欄位可以看)
https://www.elastic.co/guide/en/elasticsearch/reference/current/processors.html
製作與更新pipeline:
PUT /_ingest/pipeline/my_pipline-name
{
"version": 1,
"description" : "pipeline description",
"processors" : [
{
"lowercase" : {
"field": "my-field",
"ignore_failure": true,
"on_failure": [
{
"set": {
"description": "error-message",
"field": "error.message",
"value": "contain Chinese. Cannot lowercase",
"override": false
}
}
]
}
}
],
"_meta": {
"reason": "your reason",
"serialization": "lowercase-type-pipeline"
}
}
"processors": [
{
"dot_expander": {
"description": "using for object field",
"field": "object-field.property"
}
}
]
PUT _ingest/pipeline/my-pipeline
{
"processors": [ ... ],
"on_failure": [
{
"set": {
"description": "Index document to 'failed-<index>'",
"field": "_index",
"value": "failed-{{{ _index }}}"
}
}
]
}
測試pipeline:
先創建好pipeline,再調用simulate pipeline API做測試
// 創建pipeline
PUT /_ingest/pipeline/my_log_pipeline
{
"version": 1,
"description" : "manage log doc",
"processors" : [
{
"grok": {
"field": "message",
"patterns": ["%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes:int} %{NUMBER:duration:double}"]
}
}
]
}
// 模擬測試
POST _ingest/pipeline/my_log_pipeline/_simulate
{
"docs":[
{
"_source": {
"message": "55.3.244.1 GET /index.html 15824 0.043"
}
}
]
}
或是直接調用
POST _ingest/pipeline/_simulate
{
"pipeline": {
"processors" : [
{
"grok": {
"field": "message",
"patterns": ["%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes:int} %{NUMBER:duration:double}"]
}
}
]
},
"docs":[
{
"_source": {
"message": "55.3.244.1 GET /index.html 15824 0.043"
}
}
]
}
查看pipeline:
// 看全部pipeline
GET /_ingest/pipeline
也可以使用node state API
裡面的count以及time_in_millis可以幫助我們去看pipeline處理過程的重要資訊
GET _nodes/stats/ingest?filter_path=nodes.*.ingest
// 輸出
"pipline-name": {
"count": 0,
"time_in_millis": 0,
"current": 0,
"failed": 0,
"processors": [
{
"set": {
"type": "set",
"stats": {
"count": 0,
"time_in_millis": 0,
"current": 0,
"failed": 0
}
}
}
]
}
刪除pipeline:
DELETE /_ingest/pipeline/pipeline_name
在創建索引時也能透過特定參數來調控:
index.default_pipeline:在資料索引前調用,但是如果在indexing的請求中有聲明其他pipeline會被蓋掉
index.final_pipeline:會在default跟request的pipeline之後才調用
在Beat家族匯入資料時,也能在yml檔中指定pipeline
output.elasticsearch:
hosts: ["localhost:9200"]
pipeline: my-pipeline-name
在使用index、bulk與reindex API以及update_by_query也能透過參數調用
POST my-data-stream/_doc?pipeline=my-pipeline-name
最後來提及一下pipeline的重要功能:
可以使用enrich processor去豐富你要存入的文檔,資料來源則是已經存在的索引
使用步驟如下:
製作enrich policy:
PUT /_enrich/policy/policy_name
{
"match": {
"indices": "users",
"match_field": "email",
"enrich_fields": ["first_name", "last_name", "city", "zip", "state"]
}
}
執行enrich policy:
PUT /_enrich/policy/policy_name/_execute
ingest pipeline設置enrich processor:
PUT /_ingest/pipeline/pipeline_name
{
"processors": [
{
"enrich": {
"description": "",
"policy_name": "your_policy_name",
"field": "",
"target_field": ""
}
}
]
}
在processor中設置,並且附上policy_name
整體流程如下:
構圖參考:https://www.elastic.co/guide/en/elasticsearch/reference/current/enrich-setup.html
今天介紹完了ingest pipeline,但是我們還會在mapping的這個大主體待兩天
明天會介紹我們在設置mapping時,還有哪些參數是我們要注意的
以及在開發過程適合哪些設定等等~
參考資料
ingest pipelines:
https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html
grok processor:
https://www.elastic.co/guide/en/elasticsearch/reference/current/grok-processor.html
date processor:
https://www.elastic.co/guide/en/elasticsearch/reference/current/date-processor.html
index pipeline settings:
https://www.elastic.co/guide/en/elasticsearch/reference/current/index-modules.html#index-default-pipeline
enrich policy:
https://www.elastic.co/guide/en/elasticsearch/reference/current/enrich-setup.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/put-enrich-policy-api.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/execute-enrich-policy-api.html