這邊會先介紹 Ingest Pipeline 當中,常用到的幾個 Processor,並且說明他們的能力與效果。
Grok 怎麼使用我就不多介紹了,如果不知道 Grok 要怎麼使用的,可以上網查詢,有非常多的資料。
Grok 在針對『非結構化的文字資料,整理成結構化的資料』會是非常常用的一個重要工具,這邊要特別介紹的是, Elasticsearch 在 Grok 的支援上,有許多內建好的 Patterns 可以直接拿來使用,至於有支援哪些,請直接從 GitHub - Elasticsearch Grok Patterns 查看,特別是裡面的 grok-patterns 檔案,記錄了一些通用型的 patterns。
至於 Grok processor 的使用方式如下:
POST _ingest/pipeline/_simulate
{
  "pipeline": {
  "description" : "parse multiple patterns",
  "processors": [
    {
      "grok": {
        "field": "message",
        "patterns": ["%{FAVORITE_DOG:pet}", "%{FAVORITE_CAT:pet}"],
        "pattern_definitions" : {
          "FAVORITE_DOG" : "beagle",
          "FAVORITE_CAT" : "burmese"
        },
        "trace_match": true
      }
    }
  ]
},
"docs":[
  {
    "_source": {
      "message": "I love burmese cats!"
    }
  }
  ]
}
field:從哪個欄位中讀取資料。patterns:Grok patterns 的描述字串,可以使用已經定義好的 patterns,也可以自己定義。(上例是將比對到的結果,存放到 pet 欄位中)pattern_definitions:可以自行指定相同字串比對的規則,值的宣告中,也可以使用 | 來定義多個值。trace_match:是否要回傳 _grok_match_index 這個 grok 執行結果的資訊。回傳結果為:
{
  "docs": [
    {
      "doc": {
        "_type": "_doc",
        "_index": "_index",
        "_id": "_id",
        "_source": {
          "message": "I love burmese cats!",
          "pet": "burmese"
        },
        "_ingest": {
          "_grok_match_index": "1",
          "timestamp": "2016-11-08T19:43:03.850+0000"
        }
      }
    }
  ]
注意:為了避免某些 Grok 的處理花太久的時間、佔用太多系統資源,Elasticsearch 當中有定義
ingest.grok.watchdog.interval與ingest.grok.watchdog.max_execution_time(預設值都是 1 秒),來檢查及限制 Grok 任務的處理。
這個是與 Grok processor 類似的功能,但功能較單純一些,可以說在 Elasticsearch Ingest Pipeline 當中,更值得被優先選擇用來處理『非結構化的文字資料,整理成結構化的資料』的 processor,因為處理的方式較單純,不支援正規表示式 (Regular Expression),也因此執行速度在不少情境下比 Grok 快非常多。
注意:效能考量,基本上可以當作如果能使用 Dissect 做到的,就不要用 Grok。
Dissect processor 的使用方式
{
  "dissect": {
    "field": "message",
    "pattern" : "%{clientip} %{ident} %{auth} [%{@timestamp}] \"%{verb} %{request} HTTP/%{httpversion}\" %{status} %{size}"
   }
}
針對以下的文件內容
{
  "message": "1.2.3.4 - - [30/Apr/1998:22:00:52 +0000] \"GET /english/venues/cities/images/montpellier/18.gif HTTP/1.0\" 200 3171"
}
可以拆解並得到以下結構化的結果
"doc": {
  "_index": "_index",
  "_type": "_type",
  "_id": "_id",
  "_source": {
    "request": "/english/venues/cities/images/montpellier/18.gif",
    "auth": "-",
    "ident": "-",
    "verb": "GET",
    "@timestamp": "30/Apr/1998:22:00:52 +0000",
    "size": "3171",
    "clientip": "1.2.3.4",
    "httpversion": "1.0",
    "status": "200"
  }
}
使用 Dissert 時,主要是使用 %{keyname} 並在比對到值的時候,將抓到的值存放到對應 keyname 的欄位中,
另外可以配合使用以下這些主要的修飾符 (Modifier):
| 修飾符 | 使用方式 | 描述 | 文件 | 
|---|---|---|---|
| -> | %{keyname1->} | 右方的字元不論重覆多少次,都忽略它,常用在右方有很多空格時,要一口氣忽略,就可以使用。 | link | 
| + | %{+keyname} %{+keyname} | 將多個比對到的欄位結果,合併在一起,可以透過 append_separator指定這些值合併時的分隔符號,預設是空格。 | link | 
| +with/n | %{+keyname/2} %{+keyname/1} | 和 +一樣是合併多個結果,但多透過/n來指定這些值合併時的順序,n從 1 開始。 | link | 
| ? | %{?ignoreme} | 忽略比對到的這個結果,其實效果和 %{}是一樣的,不過指定名字會更利於 Dissert pattern 的閱讀。 | link | 
| *and& | %{*r1} %{&r1} | 使用 *將從比對到的結果當成欄位的名稱,並且欄位的值是&比對到的結果,例子中的r1代表的只是相同名字的*與&是同一組。 | link | 
由於 Elastic Common Schema 的規範之中,每個 event 都會需要有 @timestamp 的欄位,並且實務中要將日期的資料使用日期的格式儲存在 Elasticsearch 中也是很常見的需求,因此要先介紹 Date processor。
{
  "description" : "...",
  "processors" : [
    {
      "date" : {
        "field" : "initial_date",
        "target_field" : "timestamp",
        "formats" : ["ISO8601"],
        "timezone" : "{{{my_timezone}}}",
        "locale" : "{{{my_locale}}}"
      }
    }
  ]
}
Date Processor 擁有幾個重要的設定:
field:從哪個欄位中讀取資料。format:依照哪一種日期格式來解析資料,支援常見在 JSON 中使用的日期格式  ISO8601 或是 timestamp 的數字表示方式 UNIX、UNIX_MS 或是使用 Java 的 date format 定義時間格式。timezone:支援指定時區。locale:支援日期格式中與地區語言相關的表示法,例如日星期、月份有些格式會用英文來表示。如果要將某個欄位的型態,轉換成另種型態,就可以使用 Convert processor。
PUT _ingest/pipeline/my-pipeline-id
{
  "description": "converts the content of the id field to an integer",
  "processors" : [
    {
      "convert" : {
        "field" : "id",
        "type": "integer"
      }
    }
  ]
}
field:從哪個欄位中讀取資料。type:轉換成哪個指定的型態。上述的例子就是將原本可能是字串型態的欄位,指定轉換成為 integer 的型態。
有時我們放入 Elasticsearch 的文件沒有能當作識別的唯一主鍵 (Primary Key),但是有多個欄位合併在一起就能代表是唯一的複合鍵 (Composite Key),當我們想避免資料重覆被 indexing 進入 Elasticsearch 時,又或是進入 Elasticsearch 之後,想要更有效的找出這些重覆的資料時,就是在事前加工,將這些欄位合併在一起產生出一個唯一的 Fingerprint (指紋),就能當成識別使用。
Finterprint processor 的使用方式:
POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "fingerprint": {
          "fields": ["user"],
          "target_field": "unique_key",
          "method": "SHA-1"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "user": {
          "last_name": "Smith",
          "first_name": "John",
          "date_of_birth": "1980-01-15",
          "is_active": true
        }
      }
    }
  ]
}
fields:主要在這裡提供多個值,甚至如上方的例子能直接給予一個物件。target_field:產生的 Fingerprint 要存在哪個欄位中,預設是 fingerprint 的欄位。method:使用哪種 Hash Function,有支援 MD5、SHA-1、SHA-256、SHA-512、MurmurHash3。salt:有需要的話甚至可以另外指定 salt 的值加在 Hash Function 的運算之中。當我們有 IP 的資料,想要轉換成地理位置的資訊,在查詢時甚至能使用地圖的方式來呈現時,我們就可以使用 GeoIP processor:
{
  "description" : "Add geoip info",
  "processors" : [
    {
      "geoip" : {
        "field" : "ip"
      }
    }
  ]
}
當我們指定某個 IP 值,要透過 geoip processor 進行處理:
PUT my-index-00001/_doc/my_id?pipeline=geoip
{
  "ip": "8.8.8.8"
}
產生出來的結果就會是
{
  "found": true,
  "_index": "my-index-00001",
  "_type": "_doc",
  "_id": "my_id",
  "_version": 1,
  "_seq_no": 55,
  "_primary_term": 1,
  "_source": {
    "ip": "8.8.8.8",
    "geoip": {
      "continent_name": "North America",
      "country_name": "United States",
      "country_iso_code": "US",
      "location": { "lat": 37.751, "lon": -97.822 }
    }
  }
}
GeoIP processor 預設是使用 MaxMind 這間公司所提供免費版的資料庫,如果有要使用其他特定的資料庫來源,可以透過 database_file 來指定。
另外針對產生出來的欄位,可以透過 target_field 來指定名稱,甚至透過 properties 來決定產生出來的物件裡面要包含哪些 Geo 屬性。
KV 是指 Key/Value,能將在字串中同樣規則的 Key/Value 值給解析出來。
例如最常使用在 URL 的 Query String 當中,如果我們想把 Query String 裡的值都結構化,可以使用:
{
  "kv": {
    "field": "query_string",
    "field_split": "&",
    "value_split": "="
  }
}
就能將 URL 的格式
{
  "query_string": "q=elasticsearch&locale=en"
}
轉變成
"doc": {
  "_index": "_index",
  "_type": "_type",
  "_id": "_id",
  "_source": {
    "query_string": "q=elasticsearch&locale=en",
    "q": "elasticsearch",
    "locale": "en"
  }
}
也能透過 target_field 將所以拆解出來的結果收集在某一個欄位之中。
如果我們定義了許多的 Pipeline,但希望能『模組化』的方式,將某些宣告重覆使用,就可以透過這個 Pipeline Processor 的方式,來組合其他定義好的 Processor。
例如我們先定義一個 pipelineA
PUT _ingest/pipeline/pipelineA
{
  "description" : "inner pipeline",
  "processors" : [
    {
      "set" : {
        "field": "inner_pipeline_set",
        "value": "inner"
      }
    }
  ]
}
在另外的 pipelineB 有其他的處理要進行,但又希望使用到 pipelineA 所定義的部份,就可以像以下的方試來宣告:
PUT _ingest/pipeline/pipelineB
{
  "description" : "outer pipeline",
  "processors" : [
    {
      "pipeline" : {
        "name": "pipelineA"
      }
    },
    {
      "set" : {
        "field": "outer_pipeline_set",
        "value": "outer"
      }
    }
  ]
}
其他還有許多好用的 Processors,例如:
drop:刪除某個欄位。set:增加一個新欄位並填入指定的值。urldecode:常常收集到的 log 裡面的 URL 是有 encode 過的,當裡面有非英文的語言、或是特殊符號時,在 Log 的解讀上會很辛苦,可以先透過 urldecode processor 進行 URL decode。uri_parts:直接依照 URI 的標準,拆解出 scheme、domain、port、path、query、extension、fragment…等欄位。user_agent:將 User Agent 的字串,拆解出 name、version、os、device…等欄位。script:透過 painless 的語言,編寫處理的邏輯,很強大的 processor。fail:配合 if 的條件設定,可以在 ingest pipeline 時進行檢查,針對不符合要求的文件,拋出錯誤。查看最新 Elasticsearch 或是 Elastic Stack 教育訓練資訊: https://training.onedoggo.com
歡迎追蹤我的 FB 粉絲頁: 喬叔 - Elastic Stack 技術交流
不論是技術分享的文章、公開線上分享、或是實體課程資訊,都會在粉絲頁通知大家哦!