昨天我們講解了ingest的用途,以及簡單的先使用ingest的pipeline,也透過調用這個pipeline上傳document。今天我們進一步講解ingest的進階使用,像是昨天提到的條件式,以及如何處理失敗的pipeline。
按照昨天的範例,我們先創造一個條件式的processors,接著我們引用這個pipeline去上傳資料,檢視看看效果。
首先我們創造一個drop_guests_network,並且設定條件式如果名稱="Guest",我們就自動把它drop掉
PUT _ingest/pipeline/drop_guests_network
{
"processors": [
{
"drop": {
"if": "ctx.network?.name == 'Guest'"
}
}
]
}
接著我們在demo-index中加入document,名稱是Guest,並且引用這個pipeline
POST demo-index/_doc/1?pipeline=drop_guests_network
{
"network": {
"name": "Guest"
}
}
會發現document沒有被加入,因為它的名稱是Guest,因此就被drop掉。
而若在processors中加入以下這個設定-dot_expander,可以讓nested的資料被判斷到。
{
"dot_expander": {
"field": "network.name"
}
}
在processors中,我們也可以使用更複雜的條件式,讓pipeline更加完整,如下:
(具體的processors使用context可以檢視 processors context)
PUT _ingest/pipeline/not_prod_dropper
{
"processors": [
{
"drop": {
"if": "Collection tags = ctx.tags;if(tags != null){for (String tag : tags) {if (tag.toLowerCase().contains('prod')) { return false;}}} return true;"
}
}
]
}
可以從這個條件式忠看到,可以針對不同欄位中,甚至是list中的一個element去檢視,另外return如果是True,drop就會發生,去控制行為。
另外ingest也有提供,可以條件式引用其他pipeline的元件化寫法如下:
PUT _ingest/pipeline/logs_pipeline
{
"description": "A pipeline of pipelines for log files",
"version": 1,
"processors": [
{
"pipeline": {
"if": "ctx.service?.name == 'apache_httpd'",
"name": "httpd_pipeline"
}
},
{
"pipeline": {
"if": "ctx.service?.name == 'syslog'",
"name": "syslog_pipeline"
}
},
{
"fail": {
"if": "ctx.service?.name != 'apache_httpd' && ctx.service?.name != 'syslog'",
"message": "This pipeline requires service.name to be either `syslog` or `apache_httpd`"
}
}
]
}
可以看到,我們可以先創立針對不同任務的pipeline,再透過一個main的pipeline,根據不同條件去調用不同的pipeline
processors中也可以調用regular expression,在條件判斷中,可以透過regexp去比對,如下範例。
PUT _ingest/pipeline/check_url
{
"processors": [
{
"set": {
"if": "ctx.href?.url =~ /^http[^s]/",
"field": "href.insecure",
"value": true
}
}
]
}
最後是關於失敗處理的部分,可以透過processors中的on failure,去定義失敗時處理方式,具體如下:
{
"description" : "my first pipeline with handled exceptions",
"processors" : [
{
"rename" : {
"field" : "foo",
"target_field" : "bar",
"on_failure" : [
{
"set" : {
"field" : "error.message",
"value" : "field \"foo\" does not exist, cannot rename to \"bar\""
}
}
]
}
}
]
}
可以看到我們去執行rename的任務,如果失敗的話,可以提供一個error.message,告知為何會沒有比對到,如此一來我們可以透過index過的document中的error message去進行排錯。
除了這種用法外,on failure也可以執行別的動作
{
"description" : "my first pipeline with handled exceptions",
"processors" : [ ... ],
"on_failure" : [
{
"set" : {
"field" : "_index",
"value" : "failed-{{ _index }}"
}
}
]
}
這個動作就是當失敗時,我們讓它執行set,並且給予failed-{{index名稱}}的值,同樣也可以做為偵錯的紀錄
當然,如果錯誤並不值得紀錄,我們也可以直接透過except掉的方式
{
"description" : "my first pipeline with handled exceptions",
"processors" : [
{
"rename" : {
"field" : "foo",
"target_field" : "bar",
"ignore_failure" : true
}
}
]
}
設定ignore,就不會有錯誤處理的部分