這邊會先介紹 Ingest Pipeline 當中,常用到的幾個 Processor,並且說明他們的能力與效果。
Grok 怎麼使用我就不多介紹了,如果不知道 Grok 要怎麼使用的,可以上網查詢,有非常多的資料。
Grok 在針對『非結構化的文字資料,整理成結構化的資料』會是非常常用的一個重要工具,這邊要特別介紹的是, Elasticsearch 在 Grok 的支援上,有許多內建好的 Patterns 可以直接拿來使用,至於有支援哪些,請直接從 GitHub - Elasticsearch Grok Patterns 查看,特別是裡面的 grok-patterns
檔案,記錄了一些通用型的 patterns。
至於 Grok processor 的使用方式如下:
POST _ingest/pipeline/_simulate
{
"pipeline": {
"description" : "parse multiple patterns",
"processors": [
{
"grok": {
"field": "message",
"patterns": ["%{FAVORITE_DOG:pet}", "%{FAVORITE_CAT:pet}"],
"pattern_definitions" : {
"FAVORITE_DOG" : "beagle",
"FAVORITE_CAT" : "burmese"
},
"trace_match": true
}
}
]
},
"docs":[
{
"_source": {
"message": "I love burmese cats!"
}
}
]
}
field
:從哪個欄位中讀取資料。patterns
:Grok patterns 的描述字串,可以使用已經定義好的 patterns,也可以自己定義。(上例是將比對到的結果,存放到 pet
欄位中)pattern_definitions
:可以自行指定相同字串比對的規則,值的宣告中,也可以使用 |
來定義多個值。trace_match
:是否要回傳 _grok_match_index
這個 grok 執行結果的資訊。回傳結果為:
{
"docs": [
{
"doc": {
"_type": "_doc",
"_index": "_index",
"_id": "_id",
"_source": {
"message": "I love burmese cats!",
"pet": "burmese"
},
"_ingest": {
"_grok_match_index": "1",
"timestamp": "2016-11-08T19:43:03.850+0000"
}
}
}
]
注意:為了避免某些 Grok 的處理花太久的時間、佔用太多系統資源,Elasticsearch 當中有定義
ingest.grok.watchdog.interval
與ingest.grok.watchdog.max_execution_time
(預設值都是 1 秒),來檢查及限制 Grok 任務的處理。
這個是與 Grok processor 類似的功能,但功能較單純一些,可以說在 Elasticsearch Ingest Pipeline 當中,更值得被優先選擇用來處理『非結構化的文字資料,整理成結構化的資料』的 processor,因為處理的方式較單純,不支援正規表示式 (Regular Expression),也因此執行速度在不少情境下比 Grok 快非常多。
注意:效能考量,基本上可以當作如果能使用 Dissect 做到的,就不要用 Grok。
Dissect processor 的使用方式
{
"dissect": {
"field": "message",
"pattern" : "%{clientip} %{ident} %{auth} [%{@timestamp}] \"%{verb} %{request} HTTP/%{httpversion}\" %{status} %{size}"
}
}
針對以下的文件內容
{
"message": "1.2.3.4 - - [30/Apr/1998:22:00:52 +0000] \"GET /english/venues/cities/images/montpellier/18.gif HTTP/1.0\" 200 3171"
}
可以拆解並得到以下結構化的結果
"doc": {
"_index": "_index",
"_type": "_type",
"_id": "_id",
"_source": {
"request": "/english/venues/cities/images/montpellier/18.gif",
"auth": "-",
"ident": "-",
"verb": "GET",
"@timestamp": "30/Apr/1998:22:00:52 +0000",
"size": "3171",
"clientip": "1.2.3.4",
"httpversion": "1.0",
"status": "200"
}
}
使用 Dissert 時,主要是使用 %{keyname}
並在比對到值的時候,將抓到的值存放到對應 keyname
的欄位中,
另外可以配合使用以下這些主要的修飾符 (Modifier):
修飾符 | 使用方式 | 描述 | 文件 |
---|---|---|---|
-> |
%{keyname1->} |
右方的字元不論重覆多少次,都忽略它,常用在右方有很多空格時,要一口氣忽略,就可以使用。 | link |
+ |
%{+keyname} %{+keyname} |
將多個比對到的欄位結果,合併在一起,可以透過 append_separator 指定這些值合併時的分隔符號,預設是空格。 |
link |
+ with /n |
%{+keyname/2} %{+keyname/1} |
和 + 一樣是合併多個結果,但多透過 /n 來指定這些值合併時的順序,n 從 1 開始。 |
link |
? |
%{?ignoreme} |
忽略比對到的這個結果,其實效果和 %{} 是一樣的,不過指定名字會更利於 Dissert pattern 的閱讀。 |
link |
* and & |
%{*r1} %{&r1} |
使用 * 將從比對到的結果當成欄位的名稱,並且欄位的值是 & 比對到的結果,例子中的 r1 代表的只是相同名字的 * 與 & 是同一組。 |
link |
由於 Elastic Common Schema 的規範之中,每個 event 都會需要有 @timestamp
的欄位,並且實務中要將日期的資料使用日期的格式儲存在 Elasticsearch 中也是很常見的需求,因此要先介紹 Date processor。
{
"description" : "...",
"processors" : [
{
"date" : {
"field" : "initial_date",
"target_field" : "timestamp",
"formats" : ["ISO8601"],
"timezone" : "{{{my_timezone}}}",
"locale" : "{{{my_locale}}}"
}
}
]
}
Date Processor 擁有幾個重要的設定:
field
:從哪個欄位中讀取資料。format
:依照哪一種日期格式來解析資料,支援常見在 JSON 中使用的日期格式 ISO8601
或是 timestamp 的數字表示方式 UNIX
、UNIX_MS
或是使用 Java 的 date format 定義時間格式。timezone
:支援指定時區。locale
:支援日期格式中與地區語言相關的表示法,例如日星期、月份有些格式會用英文來表示。如果要將某個欄位的型態,轉換成另種型態,就可以使用 Convert processor。
PUT _ingest/pipeline/my-pipeline-id
{
"description": "converts the content of the id field to an integer",
"processors" : [
{
"convert" : {
"field" : "id",
"type": "integer"
}
}
]
}
field
:從哪個欄位中讀取資料。type
:轉換成哪個指定的型態。上述的例子就是將原本可能是字串型態的欄位,指定轉換成為 integer
的型態。
有時我們放入 Elasticsearch 的文件沒有能當作識別的唯一主鍵 (Primary Key),但是有多個欄位合併在一起就能代表是唯一的複合鍵 (Composite Key),當我們想避免資料重覆被 indexing 進入 Elasticsearch 時,又或是進入 Elasticsearch 之後,想要更有效的找出這些重覆的資料時,就是在事前加工,將這些欄位合併在一起產生出一個唯一的 Fingerprint (指紋),就能當成識別使用。
Finterprint processor 的使用方式:
POST _ingest/pipeline/_simulate
{
"pipeline": {
"processors": [
{
"fingerprint": {
"fields": ["user"],
"target_field": "unique_key",
"method": "SHA-1"
}
}
]
},
"docs": [
{
"_source": {
"user": {
"last_name": "Smith",
"first_name": "John",
"date_of_birth": "1980-01-15",
"is_active": true
}
}
}
]
}
fields
:主要在這裡提供多個值,甚至如上方的例子能直接給予一個物件。target_field
:產生的 Fingerprint 要存在哪個欄位中,預設是 fingerprint
的欄位。method
:使用哪種 Hash Function,有支援 MD5
、SHA-1
、SHA-256
、SHA-512
、MurmurHash3
。salt
:有需要的話甚至可以另外指定 salt
的值加在 Hash Function 的運算之中。當我們有 IP 的資料,想要轉換成地理位置的資訊,在查詢時甚至能使用地圖的方式來呈現時,我們就可以使用 GeoIP processor:
{
"description" : "Add geoip info",
"processors" : [
{
"geoip" : {
"field" : "ip"
}
}
]
}
當我們指定某個 IP 值,要透過 geoip
processor 進行處理:
PUT my-index-00001/_doc/my_id?pipeline=geoip
{
"ip": "8.8.8.8"
}
產生出來的結果就會是
{
"found": true,
"_index": "my-index-00001",
"_type": "_doc",
"_id": "my_id",
"_version": 1,
"_seq_no": 55,
"_primary_term": 1,
"_source": {
"ip": "8.8.8.8",
"geoip": {
"continent_name": "North America",
"country_name": "United States",
"country_iso_code": "US",
"location": { "lat": 37.751, "lon": -97.822 }
}
}
}
GeoIP processor 預設是使用 MaxMind 這間公司所提供免費版的資料庫,如果有要使用其他特定的資料庫來源,可以透過 database_file
來指定。
另外針對產生出來的欄位,可以透過 target_field
來指定名稱,甚至透過 properties
來決定產生出來的物件裡面要包含哪些 Geo 屬性。
KV 是指 Key/Value,能將在字串中同樣規則的 Key/Value 值給解析出來。
例如最常使用在 URL 的 Query String 當中,如果我們想把 Query String 裡的值都結構化,可以使用:
{
"kv": {
"field": "query_string",
"field_split": "&",
"value_split": "="
}
}
就能將 URL 的格式
{
"query_string": "q=elasticsearch&locale=en"
}
轉變成
"doc": {
"_index": "_index",
"_type": "_type",
"_id": "_id",
"_source": {
"query_string": "q=elasticsearch&locale=en",
"q": "elasticsearch",
"locale": "en"
}
}
也能透過 target_field
將所以拆解出來的結果收集在某一個欄位之中。
如果我們定義了許多的 Pipeline,但希望能『模組化』的方式,將某些宣告重覆使用,就可以透過這個 Pipeline Processor 的方式,來組合其他定義好的 Processor。
例如我們先定義一個 pipelineA
PUT _ingest/pipeline/pipelineA
{
"description" : "inner pipeline",
"processors" : [
{
"set" : {
"field": "inner_pipeline_set",
"value": "inner"
}
}
]
}
在另外的 pipelineB
有其他的處理要進行,但又希望使用到 pipelineA
所定義的部份,就可以像以下的方試來宣告:
PUT _ingest/pipeline/pipelineB
{
"description" : "outer pipeline",
"processors" : [
{
"pipeline" : {
"name": "pipelineA"
}
},
{
"set" : {
"field": "outer_pipeline_set",
"value": "outer"
}
}
]
}
其他還有許多好用的 Processors,例如:
drop
:刪除某個欄位。set
:增加一個新欄位並填入指定的值。urldecode
:常常收集到的 log 裡面的 URL 是有 encode 過的,當裡面有非英文的語言、或是特殊符號時,在 Log 的解讀上會很辛苦,可以先透過 urldecode
processor 進行 URL decode。uri_parts
:直接依照 URI 的標準,拆解出 scheme
、domain
、port
、path
、query
、extension
、fragment
…等欄位。user_agent
:將 User Agent 的字串,拆解出 name
、version
、os
、device
…等欄位。script
:透過 painless 的語言,編寫處理的邏輯,很強大的 processor。fail
:配合 if
的條件設定,可以在 ingest pipeline 時進行檢查,針對不符合要求的文件,拋出錯誤。查看最新 Elasticsearch 或是 Elastic Stack 教育訓練資訊: https://training.onedoggo.com
歡迎追蹤我的 FB 粉絲頁: 喬叔 - Elastic Stack 技術交流
不論是技術分享的文章、公開線上分享、或是實體課程資訊,都會在粉絲頁通知大家哦!