作為範例的 ELK 的版本是當前的 stable release 7.3.1。
由於我比較熟悉 GCP / GKE 的服務,這篇的操作過程都會以 GCP 平台作為範例,不過操作過程大體上是跨平台通用的。
Logstash 是開元的資料處理引擎,可以動態的將輸入的資料做大量的處裡。原先的目的是處理 log ,但目前以不限於處理 log ,各種 ELK beat 或是其他來源的不同監測數據,都能處理。
Logastash 內部的功能也大多模組化,因此可以組裝不同的 plugin,來快速處理不同來源資料。
基本上常見的資料來源,logstash 都能夠處理,並且有寫好的 plugin 可以直接使用,細節請見logstash 官方文件
在開始架設 logstash 要先考慮 pipeline 處理過後送的資料庫,可使用的資料庫非常多,這邊會展示的有:
Elasticsearch 在前幾篇已經架設好,GCP Big Query 的設定也事先開好。
kubernetes resource 的 yaml 請參考 我的 github elk-kubernetes
kubectl apply -f config-configmap.yaml
kubectl apply -f pipelines-configmap.yam
kubectl apply -f deployment.yaml
kubectl apply -f service.yaml
放上去的 resource
簡單講一下 kubernetes service 的負載均衡,關於 kubernetes service 細節這篇附上文件
$ kubectl get services
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
logstash ClusterIP 10.15.254.47 <none> 5044/TCP 182d
$ kubectl get endpoints
NAME ENDPOINTS AGE
logstash 10.12.0.132:5044,10.12.10.162:5044,10.12.9.167:5044 + 12 more... 182d
講到最白,就是 filebeat LOGSTASH URL 設定為 http://logstash 就會打到其中一台 logstash
更改 filebeat configmap
$ kubectl edit configmap filebeat-configmap
# Disable output to elasticsearch
output.elasticsearch:
enabled: false
# Output to logstash
output.logstash:
hosts: ["logstash:5044"]
protocol: "http"
username: "elastic"
password:
這邊要先說,logstash 也支援 centralized configuration,如果你的 logstash 不是跑在 Kubernetes 上,沒辦法配置一套 configmap 就應用到全部的 instance>,記的一定要使用。
Logastash 的運行設定 logstash.yml,這邊我們沒有做設定,都是預設值,有需求可以自行更改
當然之後要調整 batch size 或是 queue, cache 等等效能調校,也是來這邊改,改完 configmap ,rolling update logstash 就可以。
這邊主要是來講 pipeline 設定。
$ kubectl describe configmap pipelines-configmap
apiVersion: v1
kind: ConfigMap
metadata:
name: logstash-pipelines
namespace: elk
labels:
k8s-app: logstash
data:
# Nginx Template
# https://www.elastic.co/guide/en/logstash/7.3/logstash-config-for-filebeat-modules.html#parsing-nginx
nginx.conf: |
...
Configmap 裡面只有一個 pipeline,就是 nginx.conf
,我們這邊就只有一條,這邊一段一段看
input {
beats {
# The lisening port of logstash
port => 5044
host => "0.0.0.0"
}
}
設定 Input 來源,是 beat 從 5044 進來
接下來一大段是 filter,每個 filter 中間的 block 都是一個 plugin,logstash 支援非常多有趣的 plugin ,處理不同來源的工作,細節請看這篇
filter {
# Ignore data from other source in case filebeat input is incorrectly configured.
if [kubernetes][container][name] == "nginx-ingress-controller" {
# Parse message with grok
# Use grok debugger in kibana -> dev_tools -> grok_debugger
grok {
match => { "message" => "%{IPORHOST:[nginx][access][remote_ip]} - \[%{IPORHOST:[nginx][access][remote_ip_list]}\] - %{DATA:[nginx][access][user_name]} \[%{HTTPDATE:[nginx][access][time]}\] \"%{WORD:[nginx][access][method]} %{DATA:[nginx][access][request_url]} HTTP/%{NUMBER:[nginx][access][http_version]}\" %{NUMBER:[nginx][access][response_code]} %{NUMBER:[nginx][access][body_sent][bytes]} \"%{DATA:[nginx][access][referrer]}\" \"%{DATA:[nginx][access][agent]}\" %{NUMBER:[nginx][access][request_length]} %{NUMBER:[nginx][access][request_time]} \[%{DATA:[nginx][access][proxy_upstream_name]}\] %{DATA:[nginx][access][upstream_addr]} %{NUMBER:[nginx][access][upstream_response_length]} %{NUMBER:[nginx][access][upstream_response_time]} %{NUMBER:[nginx][access][upstream_status]} %{DATA:[nginx][access][req_id]}" }
}
# Match url parameters if has params
grok {
match => { "[nginx][access][request_url]" => "%{DATA:[nginx][access][url]}\?%{DATA:[nginx][access][url_params]}" }
}
# Remove and add fields
mutate {
remove_field => "[nginx][access][request_url]"
add_field => { "read_timestamp" => "%{@timestamp}" }
# Add fileset.module:nginx to fit nginx dashboard
add_field => { "[fileset][module]" => "nginx"}
add_field => { "[fileset][name]" => "access"}
}
# Parse date string into timestamp
date {
match => [ "[nginx][access][time]", "dd/MMM/YYYY:H:m:s Z" ]
remove_field => "[nginx][access][time]"
}
# Split url_parameters with &
# /api?uuid=123&query=456
# become
# nginx.access.url_params.uuid=123 nginx.access.url_params.query=456
kv {
source => "[nginx][access][url_params]"
field_split => "&"
}
# Parse useragent
useragent {
source => "[nginx][access][agent]"
target => "[nginx][access][user_agent]"
remove_field => "[nginx][access][agent]"
}
# Search remote_ip with GeoIP database, output geoip information for map drawing
geoip {
source => "[nginx][access][remote_ip]"
target => "[nginx][access][geoip]"
#fields => ["country_name","city_name","real_region_name","latitude","longitude","ip","location"]
}
# ==============
# Remove message to reduce data
# ==============
if [nginx][access][url] {
mutate {
# source:/var/lib/docker/containers/6e608bfc0a437c038a1dbdf2e3d28619648b58a1d1ac58635f8178fc5f871109/6e608bfc0a437c038a1dbdf2e3d28619648b58a1d1ac58635f8178fc5f871109-json.log
remove_field => "[source]"
# Origin message
remove_field => "[message]"
#add_field => { "[nginx][access][message]" => "[message]"}
remove_field => "[nginx][access][message]"
# url_params:client_id=1d5ffd378296c154d3e32e5890d6f4eb×tamp=1546849955&nonce=9a52e3e6283f2a9263e5301b6724e2c0d723def860c4724c9121470152a42318
remove_field => "[nginx][access][url_params]"
}
}
} # nginx-ingress-controller
} # filter
Grok 本身的文件又是一大段,個人建議各路大德,如果要使用,請直接搜尋人家配置好的設定,不要自己寫
真的要寫的話要善用工具
YOUR_KIBANA_HOST/app/kibana#/dev_tools/grokdebugger
grok {
match => { "message" => "%{IPORHOST:[nginx][access][remote_ip]} - \[%{IPORHOST:[nginx][access][remote_ip_list]}\] - %{DATA:[nginx][access][user_name]} \[%{HTTPDATE:[nginx][access][time]}\] \"%{WORD:[nginx][access][method]} %{DATA:[nginx][access][request_url]} HTTP/%{NUMBER:[nginx][access][http_version]}\" %{NUMBER:[nginx][access][response_code]} %{NUMBER:[nginx][access][body_sent][bytes]} \"%{DATA:[nginx][access][referrer]}\" \"%{DATA:[nginx][access][agent]}\" %{NUMBER:[nginx][access][request_length]} %{NUMBER:[nginx][access][request_time]} \[%{DATA:[nginx][access][proxy_upstream_name]}\] %{DATA:[nginx][access][upstream_addr]} %{NUMBER:[nginx][access][upstream_response_length]} %{NUMBER:[nginx][access][upstream_response_time]} %{NUMBER:[nginx][access][upstream_status]} %{DATA:[nginx][access][req_id]}" }
}
其實就是 nginx 的 access log
1.2.3.4 - [1.2.3.4] - - [21/Sep/2019:07:21:21 +0000] "GET /v1/core/api/list?type=queued×tamp=1569050481&nonce=d1e80e00381e0ba6e42d4601912befcf03fbf291748e77b178230c19cd1fdbe2 HTTP/1.1" 200 3 "-" "python-requests/2.18.4" 425 0.031 [default-chechiachang-server-80] 10.12.10.124:8003 3 0.031 200 f43db228afe66da67b2c7417d0ad2c04
預設的 log 送件來,格式是 text,經過 pattern matching 後變成 json-like format,也就是可以從資料結構取得 .nginx.access.remote_ip
這樣的欄位,讓原本的 access log 從 text 變成可以查找的內容。
原本的 text 送進 elasticsearch 當然也可以查找,但就會在 text 裡面做全文檢索,功能很侷限,效率很差。
logstash 支援的 output 以及設定在這邊
output {
elasticsearch {
hosts => ["https://${ELASTICSEARCH_HOST}:${ELASTICSEARCH_PORT}"]
user => "${ELASTICSEARCH_USERNAME}"
password => "${ELASTICSEARCH_PASSWORD}"
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
manage_template => false
}
}
Elasticsearch 的配置很單純
output {
google_bigquery {
project_id => ${GCP_PROJECT_ID}
dataset => ${GCP_BIG_QUERY_DATASET_NAME}
csv_schema => "path:STRING,status:INTEGER,score:FLOAT"
json_key_file => ${GCP_JSON_KEY_FILE_PATH}
error_directory => "/tmp/bigquery-errors"
date_pattern => "%Y-%m-%dT%H:00"
flush_interval_secs => 30
}
}
其中的變數,我們全都用環境變數,在 deployment.yaml 配置,啟動 logstash pods 時代入
GCP_JSON_KEY_FILE_PATH
這邊要配置一隻 GCP 的服務帳號金鑰,一個有 Big Query 寫入權限的 service account,把 json 使用 kubernetes secret 放到集群上,然後在 pod 上使用 volume from secret 掛載進來。csv_schema => "path:STRING,status:INTEGER,score:FLOAT"
這邊要配置之後會存入 Big Query 的 csv 結構
我先前有匯入過一個 wireshark 的 json 檔,不知道為何 logstash filter 出來的東西在 kibana 上怪快的。如 :
{
{
ip:192.168.100.100
port:22
}
}
但上述在 kibana 會看到 message 的資訊只有 {
這個符號其它的 tag 都沒看到。
不知道大大是否遇過此問題。
上面這個 input 不是 json
把 space 都去掉變成這樣
{{ip:192.168.100.100 port:22}}
Grok Pattern 最懶的那種
ip:%{WORD:ip1}.%{WORD:ip2}.%{WORD:ip3}.%{WORD:ip4} port:%{WORD:port}
Output
{
"port": "22",
"ip2": "168",
"ip1": "192",
"ip4": "100",
"ip3": "100"
}
都當成字串處理,就一定可以跑。pattern match 的東西跑不出來,就是你想像中的 pattern,其實不是你想像中的 pattern
小建議:你可以先都當字串處理,從最左邊加表達式語法,然後直接在 debugger 裡面試跑,你就會發現,match 結果跟你想像中的落差
給你作業:用 regex 把你想要的東西抓出來
http://doc.yonyoucloud.com/doc/logstash-best-practice-cn/filter/grok.html