上一篇我們已經完成了使用telegram來發送訊息,那麼今日我們決定不使用Kibana Alerting,改用metrics資料來看要如何實作,才能達到類似警報器的功能。
我們將使用索引metricbeat-*
的的原始數據資料,搭配使用Query DSL
的統計工具 Aggregation
語法,來完成警報訊息發送功能。
更多詳細的Aggregation請參考
接下範例是使用的索引是metricbeat-*
+Aggregation
,語法結構如下:
{
"aggs": {
"avg_data": {
"avg": {
"field": "docker.cpu.total.pct"
}
}
},
"query": {
"bool": {
"must": [
{
"range": {
"@timestamp": {
"from": "2021-10-06T02:55:50Z",
"include_lower": true,
"include_upper": true,
"to": "2021-10-06T02:56:04Z"
}
}
},
{
"match": {
"event.dataset": {
"query": "docker.cpu"
}
}
},
{
"match": {
"container.name": {
"query": "es01-test"
}
}
}
]
}
},
"from": 0,
"size": 100
}
完整範例程式
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"time"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api"
)
var (
es *elasticsearch.Client
bot *tgbotapi.BotAPI
)
func setElkClient() {
var err error
cfg := elasticsearch.Config{
Addresses: []string{"http://127.0.0.1:9200"},
}
es, err = elasticsearch.NewClient(cfg)
if err != nil {
panic(err)
}
}
func setNewBotAPI() {
var err error
bot, err = tgbotapi.NewBotAPI("youToken")
if err != nil {
panic(err)
}
bot.Debug = false
}
func main() {
//初始設定
setElkClient()
setNewBotAPI()
//搜尋並發送訊息
searchRequest()
}
func sendTelegramMsg(msg string) error {
NewMsg := tgbotapi.NewMessage(chatID, msg) //傳送訊息給使用者
NewMsg.ParseMode = tgbotapi.ModeHTML
_, err := bot.Send(NewMsg)
if err != nil {
return err
}
return nil
}
func searchRequest() {
var r map[string]interface{}
t := time.Now().UTC().Add(-1 * time.Minute).Format(time.RFC3339)
nt := time.Now().UTC().Format(time.RFC3339)
query := map[string]interface{}{
"query": map[string]interface{}{
"bool": map[string]interface{}{
"must": []map[string]interface{}{
{"range": map[string]interface{}{
"@timestamp": map[string]interface{}{
"from": t,
"to": nt,
"include_lower": true,
"include_upper": true,
},
}},
{"match": map[string]interface{}{
"event.dataset": map[string]interface{}{
"query": "docker.cpu",
},
}},
{"match": map[string]interface{}{
"container.name": map[string]interface{}{
"query": "es01-test",
},
}},
},
},
},
"aggs": map[string]interface{}{
"avg_data": map[string]interface{}{
"avg": map[string]interface{}{
"field": "docker.cpu.total.pct",
},
},
},
"size": 100,
"from": 0,
}
jsonBody, _ := json.Marshal(query)
req := esapi.SearchRequest{
Index: []string{"metricbeat-*"}, // 索引名稱
Body: bytes.NewReader(jsonBody),
}
res, err := req.Do(context.Background(), es)
if err != nil {
panic(err)
}
defer res.Body.Close()
if err = json.NewDecoder(res.Body).Decode(&r); err != nil {
panic(err)
}
aggregations := r["aggregations"].(map[string]interface{})["avg_data"]
avg := aggregations.(map[string]interface{})["value"].(float64)
if avg <= 40 {
msg := fmt.Sprintf("es01-test CPU %v%s異常 for go-elasticsearch", avg, "%")
if err = sendTelegramMsg(msg); err != nil {
fmt.Println("send msg error")
}
}
}
發送的訊息內容如下:
今日我們的go-elasticsearch
實作終於完成,經過Aggregation
語法使用,我們可以發現警報器產生索引資料和metrics原始數據使用上的差別,在警報器上已經把一些較複雜性的語法都包裝好,方便快速使用,而且透過警報器可以快速的修改警報發送內容,不必去修改程式並且重新部署。通過今天的簡單的範例程式,我們了解在使用metrics資料,會需要組合許多的條件語法,不是使用單一的DSL語法就能達到想要的結果,而大幅增加進入的門檻。當然如果你是用付費版本的elk,在訊息通知上就不用這麼麻煩啦。