iT邦幫忙

第 12 屆 iThome 鐵人賽

DAY 30
0
Elastic Stack on Cloud

Python&Elasticsearch 入門系列 第 30

IT鐵人第30天 Elasticsearch 使用python查詢資料 Aggregations:Scripted Metric

  • 分享至 

  • xImage
  •  

今天要介紹的是我另外一個也經常用的聚合方式,是Metrics底下的Scripted Metric

今天的測試資料
https://ithelp.ithome.com.tw/upload/images/20201013/20129976VuMbXNt2Wo.png

Scripted Metric

這種聚合方式是當內建的聚合功能沒辦法滿足使用需求時,可以自訂聚合,但如果需求是內建聚合就能夠達到,還是推薦使用內建的方式,因為有效能保證,這種聚合方式雖然可以很迎合使用者,但效能是根據使用的寫出來的東西而定的

在開始之前必須先讓大家了解這種聚合方式是如何運作的。這種聚合方式分成4個階段init_script、map_script、combine_script、reduce_script
流程是:
init_script→map_script→combine_script→reduce_script

接下來就來講解每步流程實際上在幹嘛

init_script

聚合第一步,在收集文檔之前執行,初始化設置,一般都會設置一個transactions.state變數,作用我會把它想像成電腦的記憶體,就是一個暫存的地方

map_script

聚合第二步,對所有符合條件的文檔執行map_script的內容,但要記住是在分片中執行

combine_script

聚合第三步,對所有分片執行combine_script的內容

reduce_script

聚合第四步,所有分片均返回結果後,在協調節點上執行reduce_script的內容,該腳本可以訪問變數state(就是一開始初始化的變數)

params

可以從請求傳入參數讓腳本使用,但只有combine_script、map_script、init_script可以讀取

接下來是實際應用,先定個目標,假設我想要得到資工一1跟資工一2的成績平均,但如果單科成績低於60分則加10分最高加到60分,定好了就開始吧

1.撰寫init_script和params
先從init_script開始

"init_script": "state.transactions = new HashMap();"

這邊會用HashMap是因為我希望經過map_script後state.transactions內部會長這樣

#這邊用json大概示意一下
{
  "資工一1": [77, 80], #第1,2位同學的平均
  "資工一2": [86, 60]
}

再來是params,我把不及格要加幾分放在params裡,一是之後要調整好調整,二是可以示範要如何使用

"params": {"add_point": 10},#上面提到加10分

2.撰寫map_script
這裡要做的事情就比較多了,要算出每位同學的平均,還要判斷是否低於60要加分,之後再存state.transactions內

"map_script": """
    def class = doc['class'].value;        #拿值
    def math = doc['grades.math'].value;
    def mand = doc['grades.mand'].value;
    def eng = doc['grades.eng'].value;
    def soc = doc['grades.soc'].value;
    ArrayList tmp = null;
    #判斷state.transactions內有無班級,有則直接存取,沒有則新建
    if(!state.transactions.containsKey(class)){  
        tmp = new ArrayList();
        state.transactions.put(class, tmp);
    } else {
        tmp = state.transactions.get(class);
    }
    
    #判斷小於60加分
    if (math < 60){
        if (math > 50){
            math = 60;
        }else{
            math += params.add_point;
        }
    }
    if (mand < 60){
        if (mand > 50){
            mand = 60;
        }else{
            mand += params.add_point;
        }
    }
    if (eng < 60){
        if (eng > 50){
            eng = 60;
        }else{
            eng += params.add_point;
        }
    }
    if (soc < 60){
        if (soc > 50){
            soc = 60;
        }else{
            soc += params.add_point;
        }
    }
    #計算平均且加入state.transactions
    long avg = 0;
    avg = (math + mand + eng + soc) / 4;
    tmp.add(avg);                            
"""

判斷是否小於60分的部分,因為我想不到其他比較優雅的方法,所以就土法煉鋼了,如果大家有比較好的寫法麻煩一定要告訴我!

3.撰寫combine_script
這邊要做的事情就比較少了,因為我們的目的只剩下把所有同班級的同學平均加起來再除以班級人數,但combine_script是對分片執行的,所以在這邊merge班級的話等等reduce_script也是要merge一次,所以這邊就直接return了

"combine_script": "return state.transactions"

4.撰寫reduce_script
所有分片的結果都回傳了,最後要做的事把所有班級成績merge。

Debug.explain
這邊告訴大家一個方法,大家可以善用這個方法檢視變數的情況,我們先用這個方法看一下state的內部情況(寫在reduce_script內)

"reduce_script": """
    Debug.explain(states);
"""
"to_string" : "[{資工一1=[77, 77]}, {資工一2=[64, 94], 資工一1=[94]}, {資工一2=[56]}]"

可以看到成績是分散的,因為是從不同分片回傳的,接下來就把它合併

"reduce_script": """
    Map class_grades = new HashMap();
    #先合併成績,整理成新的資料結構
    while (states.size() > 0){
        def data_buffer = states.remove(0);
        for(class in data_buffer.keySet()){
            Map class_tmp = null;
            if(!class_grades.containsKey(class)){
                class_tmp = new HashMap();
                class_grades.put(class, class_tmp);
                class_tmp.put("s", 0);
                class_tmp.put("n", 0);
                for(s in data_buffer.get(class)){
                    class_tmp.s += s;
                    class_tmp.n +=1;
                }
            }else{
                class_tmp = class_grades.get(class);
                for(s in data_buffer.get(class)){
                    class_tmp.s += s;
                    class_tmp.n +=1;
                }
            }
        }
    }
    #根據新的資料結構算出平均
    Map result = new HashMap();
    for(class in class_grades.keySet()){
        Map data = class_grades.get(class);
        long avg = 0;
        avg = data.s / data.n;
        result.put(class, avg);
    }
    return result;
"""

整體aggs query:

{
  "aggs": {
    "class_avg": {
      "scripted_metric": {
        "params": {
          "add_point": 10
        },
        "init_script": "state.transactions = new HashMap();",
        "map_script": """
            def class = doc['class'].value;
            def math = doc['grades.math'].value;
            def mand = doc['grades.mand'].value;
            def eng = doc['grades.eng'].value;
            def soc = doc['grades.soc'].value;
            ArrayList tmp = null;

            if(!state.transactions.containsKey(class)){
                tmp = new ArrayList();
                state.transactions.put(class, tmp);
            } else {
                tmp = state.transactions.get(class);
            }
            if (math < 60){
                if (math > 50){
                    math = 60;
                }else{
                    math += params.add_point;
                }
            }
            if (mand < 60){
                if (mand > 50){
                    mand = 60;
                }else{
                    mand += 10;
                }
            }
            if (eng < 60){
                if (eng > 50){
                    eng = 60;
                }else{
                    eng += 10;
                }
            }
            if (soc < 60){
                if (soc > 50){
                    soc = 60;
                }else{
                    soc += 10;
                }
            }
            long avg = 0;
            avg = (math + mand + eng + soc) / 4;
            tmp.add(avg);                            
        """,
        "combine_script": "return state.transactions",
        "reduce_script": """
          Map class_grades = new HashMap();
          while (states.size() > 0){
              def data_buffer = states.remove(0);
              for(class in data_buffer.keySet()){
                  Map class_tmp = null;
                  if(!class_grades.containsKey(class)){
                      class_tmp = new HashMap();
                      class_grades.put(class, class_tmp);
                      class_tmp.put("s", 0);
                      class_tmp.put("n", 0);
                      for(s in data_buffer.get(class)){
                          class_tmp.s += s;
                          class_tmp.n +=1;
                      }
                  }else{
                      class_tmp = class_grades.get(class);
                      for(s in data_buffer.get(class)){
                          class_tmp.s += s;
                          class_tmp.n +=1;
                      }
                  }
              }
          }
          Map result = new HashMap();
          for(class in class_grades.keySet()){
              Map data = class_grades.get(class);
              long avg = 0;
              avg = data.s / data.n;
              result.put(class, avg);
          }
          return result;
      """
      }
    }
  }
}

結果:

"aggregations" : {
  "class_avg" : {
    "value" : {
      "資工一1" : 82,
      "資工一2" : 72
    }
  }
}

驗算一下
資工一1
王小明:60,87,90,74 平均77.75
小新:73,78,75,83 平均77.25
風間:91,92,95,98 平均94
平均83

資工一2
正男:76,70,69,58(60) 平均68.75
阿呆:91,99,100,87 平均94.25
許小美:34(44),65,43(53),56(60) 平均55.5
平均72.8

會有點誤差應該是因為小數點後沒有算到

今天的教學就到這邊結束,漫長的30天終於結束了,也謝謝大家觀看我的文章!


上一篇
IT鐵人第29天 Elasticsearch 使用python查詢資料 Aggregations:Terms
系列文
Python&Elasticsearch 入門30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言