Enrich (充實、使豐富),指的是在 Ingest Pipeline 中,透過其他地方取得相關的資料,並加在原來的資料當中,讓資料更為豐富。
這種做法在資料處理 ETL (Extract, Transform, Load) 的過程中蠻常使用,也很重要的一種做法,能讓我們能做到『空間換時間』或是『先苦後甘』這樣的目的。
由於 Elasticsearch 不是關聯式資料庫,而是 Document Based (文件型) 的 NoSQL 資料庫,所以文件在存入 Elasticsearch 之前應該要視情況去正規化,同時為了追求查詢時能有較快的執行速度,會在文件存入時,盡可能將文件查詢時會使用到的資訊先一併寫入在文件之中,避免後續執行時要另外透過 Elasticsearch 的 Join 或是 Application 端另外處理資料查詢及合併等動作。
例如以下幾種情境:
在 Elasticsearch Ingest Pipeline 的處理過程中,有定義一個 Enrich Processor
,就是專門提供資料 Enrich 的處理,接著將介紹這個 Enrich Processor 的運作方式。
先摘錄 Enrich Processor 的運作重點:
接下來我們針對運作的架構與流程進行較細部的說明。
上圖的運作架構,在 Ingest Pipeline 的處理過程中,加上了 enrich
processor ,這個 enrich
的背後,共有三個不同的角色:
首先 Enrich Policy 是一組需要另外建立的設定,其中定義了 Enrich 的操作應該如何進行,包含
policy_type
定義找資料時要用哪一種比對方式。match
欄位,表示要從 Source Index 中的哪個欄位來進行查尋。enrich_fields
,要將從 Source Index 中查尋到文件裡的哪些欄位,加入到原來的文件中。Enrich Policy 是要經過 Execute
(執行) 的 API 來觸發運作,並不是自動會在背景執行的機制,在執行時,會將 Source Index 裡符合條件的資料找出,並寫入到 Enrich Index 當中進行獨立的儲存。
注意,Enrich Policy 建立後不能修改,只能刪除並建立新的 Enrich Policy。
Enrich 的處理過程中,會透過某個資料的來源進行查詢以取得額外的資料,這個資料來源必須是 Elasticsearch 中的 Index,也就所謂的 Source Index。
Source Index 可以是一個或多個 Elasticsearch 的 Index,而這個 Index 其實就是一般 Elasticsearch 的 Index,並沒有不同,所以能用一般存取的方式進行資料的維護,並且一個 Elasticsearch 的 Index 可以同時當作多個不同 Enrich 處理的 Source Index。
由於每次 Enrich Processor 在處理 Indexing 的文件時,若當下直接從 Source Index 查找資料時,因為較花資源,另外也可能因為查詢條件較複雜會執行較久,所以 Enrich 的運作機制中,有定義了 Enrich Index,讓 Enrich Policy 執行時,透過 Elasticsearch 所建立一個系統層級的 Index,並且會與 Enrich Policy 綁定,裡面存放著在 Source Index 裡找到的文件,也是 Enrich Processor 在處理 Indexing 文件時,實際會用來查找資料的資料來源。
Enrich Index 有以下幾個特性:
.enrich-*
開頭。在了解 Enrich Processor 的運作方式之後,這邊來介紹要使用時的完整步驟:
enrich
processor:可以將 enrich
processor 添加到現有的 Ingest Pipeline 之中,或是建立新的 Ingest Pipeline。enrich
processor 使用新的 Enrich Policy,再刪除舊的 Enrich Policy。依照上述的步驟,我們首先準備 Source Index users
:
PUT /users/_doc/1?refresh=wait_for
{
"email": "mardy.brown@asciidocsmith.com",
"first_name": "Mardy",
"last_name": "Brown",
"city": "New Orleans",
"county": "Orleans",
"state": "LA",
"zip": 70116,
"web": "mardy.asciidocsmith.com"
}
接著我們定義 Enrich Policy - users-policy
,並且指定使用 email
欄位來進行查閱,若有查到,我們要將 first_name
、last_name
、city
、zip
、state
的資料增加到 indexing 的文件中。
PUT /_enrich/policy/users-policy
{
"match": {
"indices": "users",
"match_field": "email",
"enrich_fields": ["first_name", "last_name", "city", "zip", "state"]
}
}
執行 Enrich Policy,以建立 Enrich Index。
POST /_enrich/policy/users-policy/_execute
這時可以先使用 _cat/indices
查看 Enrich Index 是否有正確建立:
GET _cat/indices/.enrich-users-policy*?v
並使用 _search
查看 Enrich Index 裡的內容:
GET .enrich-users-policy-*/_search
接著我們建立 Ingest Pipeline 並且使用 enrich
processor
PUT /_ingest/pipeline/user_lookup
{
"processors" : [
{
"enrich" : {
"description": "Add 'user' data based on 'email'",
"policy_name": "users-policy",
"field" : "email",
"target_field": "user",
"max_matches": "1"
}
}
]
}
我們可以 Indexing 文件,並指定 Ingest Pipeline 來確認是否正常運作
PUT /my-index-000001/_doc/my_id?pipeline=user_lookup
{
"email": "mardy.brown@asciidocsmith.com"
}
最後確認 Indexing 進入 Elasticsearch 的文件有正確的如我們的預期被 Enrich。
GET /my-index-000001/_doc/my_id
除了上述的 term
查閱的 Enrich 方式,Enrich Processor 也有提供 geo_shape
查閱方式,可以參考 官方文件 - Enrich you data based on geolocation。
使用 Ingest Pipeline 時,如果發生錯誤,預設的處理行為會丟出 Exception (例外狀況) 的錯誤,並且停止這筆資料的 Indexing 處理。
如果我們希望在某一個特定 Ingest Processor 的處理發生錯誤時,能忽略這個錯誤,繼續的向下執行,我們可以有三種作法:
ignore_failure
的屬性,並設定成 true
,讓錯誤發生時,直接略過當前的 processor,進入下一個 processor 的處理。on_failure
的設定,讓錯誤發生時,執行另外一系列的 processors。(裡面的 processor 也可以再指定錯誤發生時的 on_failure
,型成巢狀的設定)PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"rename": {
"description": "Rename 'provider' to 'cloud.provider'",
"field": "provider",
"target_field": "cloud.provider",
"on_failure": [
{
"set": {
"description": "Set 'error.message'",
"field": "error.message",
"value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
"override": false
}
}
]
}
}
]
}
on_failure
,將整個 pipeline 最終會發生的錯誤,給抓住。(以下的例子配合 set
processor,將這筆發生錯誤的資料,另外寫到指定的 index 中。)PUT _ingest/pipeline/my-pipeline
{
"processors": [ ... ],
"on_failure": [
{
"set": {
"description": "Index document to 'failed-<index>'",
"field": "_index",
"value": "failed-{{{ _index }}}"
}
}
]
}
在使用 on_failure
時,也可以使用以下的屬性,取得錯誤相關的資訊:
on_failure_message
on_failure_processor_type
on_failure_processor_tag
on_failure_pipeline
使用方式如下:
PUT _ingest/pipeline/my-pipeline
{
"processors": [ ... ],
"on_failure": [
{
"set": {
"description": "Record error information",
"field": "error_information",
"value": "Processor {{ _ingest.on_failure_processor_type }} with tag {{ _ingest.on_failure_processor_tag }} in pipeline {{ _ingest.on_failure_pipeline }} failed with message {{ _ingest.on_failure_message }}"
}
}
]
}
查看最新 Elasticsearch 或是 Elastic Stack 教育訓練資訊: https://training.onedoggo.com
歡迎追蹤我的 FB 粉絲頁: 喬叔 - Elastic Stack 技術交流
不論是技術分享的文章、公開線上分享、或是實體課程資訊,都會在粉絲頁通知大家哦!