iT邦幫忙

第 12 屆 iT 邦幫忙鐵人賽

DAY 24
0
Elastic Stack on Cloud

Elastic 戰台股系列 第 24

[Day24] 每日收盤資料更新 using Logstash (3) - Ruby filter plugin

昨天狠狠的卡在了日收盤資料的 Filter 處理上,今天總算搞出來了,一起來看看怎麼做的吧!

資料格式檢視

我們來 Review 一下日收盤資料的格式:
https://ithelp.ithome.com.tw/upload/images/20201008/20129624BsUBGzzC5E.png
圈起來的部分是我們需要的盤後資訊。其中要注意的是, Data 欄位的資料型式為陣列,陣列的每個項目內容為每檔個股當日的收盤資訊,亦為陣列,而非 JSON 物件。這是一個挺特規的格式,所以用常見的 Filter Plugin 無法很好的解析出我們需要的資訊。但還是讓我摸索出了解析的方法。

split filter plugin

首先,JSON array 物件可以利用 split filter plugin 來拆分成「事件」(event)。光看官方文件,我是一頭霧水,實驗看看最實在,conf 檔編輯如下:

input {
   http_poller {
      urls => {
         urlname => "https://www.twse.com.tw/exchangeReport/STOCK_DAY_ALL"
      }
      request_timeout => 20
      schedule => { every =>"60s"}  
      codec => "json"
   }
}

filter {
   split {
         field => "data"
   }
}

output {
  stdout {
     codec => rubydebug
  }
}

我們關注 Split 的部分,要拆分的欄位為 "data"。得到的結果如下圖:
https://ithelp.ithome.com.tw/upload/images/20201008/201296246NrSeJPhF2.png
可以發現,原本 Data 陣例中的每筆資料被拆分到每個 Event 中,而其他的欄位也被保留在每個 Event 中。
第一階段處理完成,我們簡化了 Event 的結構,接下來進一步處理每個 Event。

ruby filter plugin

再來處理日期。資料中的日期格式為 20201008,與我原本 Index 的格式 yyyy-mm-dd 不相同。因此需要做個轉化,把 filter plugin 修改如下:

filter {![https://ithelp.ithome.com.tw/upload/images/20201008/20129624KhNHrBA1di.png](https://ithelp.ithome.com.tw/upload/images/20201008/20129624KhNHrBA1di.png)
   split {
         field => "data"
   }
   ruby {
         code => 
         'event.set("date", Time.parse(event.get("date")).strftime("%Y-%m-%d"))'
   }
   mutate{
         remove_field => ["title", "stat", "notes", "fields", "@version", "@timestamp"]
   }
}

在上面 Config 中,我初次使用了 ruby filter plugin,inline 執行了一段代碼,目的是在 split 拆分後的每個 event 中,把 data 欄位的值,轉化成我需要的格式。並且利用 mutate filter plugin 把不需要的欄位去除。再來看看第二階段的結果:
https://ithelp.ithome.com.tw/upload/images/20201008/20129624CJaHwSZJUr.png
看起來清爽多啦!
最後一階段,就是把 Array 轉化成 Key-value pairs。

filter {
   split {
         field => "data"
   }
   ruby {
         code => 
         'event.set("date", Time.parse(event.get("date")).strftime("%Y-%m-%d"))'
   }
   ruby {
         code => 
               'event.get("data").each_with_index { |value, i| 
                  if i == 0
                     event.set("stock_id", value)
                  elsif i == 2
                     event.set("volume", value)
                  elsif i == 4
                     event.set("open", value)  
                  elsif i == 5
                     event.set("high", value)  
                  elsif i == 6
                     event.set("low", value)
                  elsif i == 7
                     event.set("close", value)
                  else
                     end                          
               }'
   }
   mutate{
         remove_field => ["title", "stat", "notes", "fields", "@version", "@timestamp", "data"]
   }
}

如上,再接一階 filter,得到如下圖的結果:
https://ithelp.ithome.com.tw/upload/images/20201008/20129624Vn5Jp1gL2X.png
至此大功告成!

鬆了一口氣,剩最後幾天了,加油! 明天繼續


上一篇
[Day23] 每日收盤資料更新 using Logstash (2) - http_poller plug-in
下一篇
[Day25] 資料統計 & 運算 - Aggregation
系列文
Elastic 戰台股30

尚未有邦友留言

立即登入留言