iT邦幫忙

第 12 屆 iThome 鐵人賽

DAY 19
0

Day 19 Ingest-2


前言

昨天我們講解了ingest的用途,以及簡單的先使用ingest的pipeline,也透過調用這個pipeline上傳document。今天我們進一步講解ingest的進階使用,像是昨天提到的條件式,以及如何處理失敗的pipeline。

Confitional Pipeline

按照昨天的範例,我們先創造一個條件式的processors,接著我們引用這個pipeline去上傳資料,檢視看看效果。

首先我們創造一個drop_guests_network,並且設定條件式如果名稱="Guest",我們就自動把它drop掉

PUT _ingest/pipeline/drop_guests_network
{
  "processors": [
    {
      "drop": {
        "if": "ctx.network?.name == 'Guest'"
      }
    }
  ]
}

接著我們在demo-index中加入document,名稱是Guest,並且引用這個pipeline

POST demo-index/_doc/1?pipeline=drop_guests_network
{
  "network": {
    "name": "Guest"
  }
}

會發現document沒有被加入,因為它的名稱是Guest,因此就被drop掉。

而若在processors中加入以下這個設定-dot_expander,可以讓nested的資料被判斷到。

    {
      "dot_expander": {
        "field": "network.name"
      }
    }

Complex Conditionals

在processors中,我們也可以使用更複雜的條件式,讓pipeline更加完整,如下:
(具體的processors使用context可以檢視 processors context)

PUT _ingest/pipeline/not_prod_dropper
{
  "processors": [
    {
      "drop": {
        "if": "Collection tags = ctx.tags;if(tags != null){for (String tag : tags) {if (tag.toLowerCase().contains('prod')) { return false;}}} return true;"
      }
    }
  ]
}

可以從這個條件式忠看到,可以針對不同欄位中,甚至是list中的一個element去檢視,另外return如果是True,drop就會發生,去控制行為。

Conditionals with the Pipeline Processoredit

另外ingest也有提供,可以條件式引用其他pipeline的元件化寫法如下:

PUT _ingest/pipeline/logs_pipeline
{
  "description": "A pipeline of pipelines for log files",
  "version": 1,
  "processors": [
    {
      "pipeline": {
        "if": "ctx.service?.name == 'apache_httpd'",
        "name": "httpd_pipeline"
      }
    },
    {
      "pipeline": {
        "if": "ctx.service?.name == 'syslog'",
        "name": "syslog_pipeline"
      }
    },
    {
      "fail": {
        "if": "ctx.service?.name != 'apache_httpd' && ctx.service?.name != 'syslog'",
        "message": "This pipeline requires service.name to be either `syslog` or `apache_httpd`"
      }
    }
  ]
}

可以看到,我們可以先創立針對不同任務的pipeline,再透過一個main的pipeline,根據不同條件去調用不同的pipeline

Processors with REGEXP

processors中也可以調用regular expression,在條件判斷中,可以透過regexp去比對,如下範例。

PUT _ingest/pipeline/check_url
{
  "processors": [
    {
      "set": {
        "if": "ctx.href?.url =~ /^http[^s]/",
        "field": "href.insecure",
        "value": true
      }
    }
  ]
}

Failure Handling

最後是關於失敗處理的部分,可以透過processors中的on failure,去定義失敗時處理方式,具體如下:

{
  "description" : "my first pipeline with handled exceptions",
  "processors" : [
    {
      "rename" : {
        "field" : "foo",
        "target_field" : "bar",
        "on_failure" : [
          {
            "set" : {
              "field" : "error.message",
              "value" : "field \"foo\" does not exist, cannot rename to \"bar\""
            }
          }
        ]
      }
    }
  ]
}

可以看到我們去執行rename的任務,如果失敗的話,可以提供一個error.message,告知為何會沒有比對到,如此一來我們可以透過index過的document中的error message去進行排錯。

除了這種用法外,on failure也可以執行別的動作

{
  "description" : "my first pipeline with handled exceptions",
  "processors" : [ ... ],
  "on_failure" : [
    {
      "set" : {
        "field" : "_index",
        "value" : "failed-{{ _index }}"
      }
    }
  ]
}

這個動作就是當失敗時,我們讓它執行set,並且給予failed-{{index名稱}}的值,同樣也可以做為偵錯的紀錄

當然,如果錯誤並不值得紀錄,我們也可以直接透過except掉的方式

{
  "description" : "my first pipeline with handled exceptions",
  "processors" : [
    {
      "rename" : {
        "field" : "foo",
        "target_field" : "bar",
        "ignore_failure" : true
      }
    }
  ]
}

設定ignore,就不會有錯誤處理的部分


上一篇
Day 18 Ingest-1
下一篇
Day 20 Collect Log with Filebeat
系列文
親愛的,我把ElasticSearch上雲了30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言