iT邦幫忙

第 12 屆 iThome 鐵人賽

DAY 28
0
AI & Data

索引結構與機器學習的相遇系列 第 28

Day 28 - Learned Index實作(2)

  • 分享至 

  • xImage
  •  

延續昨天的實作,首先我們先來修改一下昨天建置的 Learned Index 類別,還有一些參數需要儲存(昨天忘記嘞QQ),當我們的 Learned Index 被實體化後進行 build,建置模型以便後續訓練資料:

import numpy as np 
from scipy.stats import norm
import keras
from keras.models import Sequential
from keras.layers import Dense
from sklearn.linear_model import LinearRegression
from sklearn import preprocessing
import matplotlib.pyplot as plt
import math

class Learned_Index():
    
    def __init__(self, model_num):
        self.RMI = [1, model_num]    # Learned Index RMI架構
        self.index = []              # 儲存模型的索引
        self.N = None                # key值總數
        self.data = None             # 所有key值
        self.error_bound = []        # 儲存最後一層每個Model的 min_err and max_err 
        self.mean = None             # 儲存均值,資料標準化用
        self.std = None              # 儲存標準差,資料標準化用
        self.build()


    def build(self):
        for m in self.RMI:
            if m==1 :
                
                # 第一層 => 建置 NN Model 8x8
                model = Sequential()
                model.add(Dense(8, input_dim=1, activation="relu"))
                model.add(Dense(8, activation="relu"))
                model.add(Dense(1))
                
                sgd=keras.optimizers.SGD(lr=0.000001)    # lr:學習率,可調參數
                model.compile(loss="mse", optimizer=sgd, metrics=["mse"])
                self.index.append(model)
            else:
                # 第二層 => 建置多個簡單線性回歸
                self.index.append([])
                for j in range(m):
                    model = LinearRegression()
                    self.index[1].append(model)

CDF

建置CDF資料(pos=CDF(Key)),當作Model學習的Label:

def crtCDF(self,x):
        if(type(x) == np.ndarray):
            loc = x.mean()
            scale = x.std()
            N = x.size
            pos = norm.cdf(x, loc, scale)*N
            return pos
        else:
            print("Wrong Type! x must be np.ndarray ~")    
            return

Train

從第一層疊帶訓練至第二層模型,主要可以分4個步驟: 第一層的模型訓練、分配資料至第二層的模型、第二層的模型訓練、計算最後一層的模型誤差

def train(self, data):

    self.data = data
    self.N = data.size
    y = self.crtCDF(data)
    norm_data = preprocessing.scale(data)    # 標準化: 零均值化
    self.mean = data.mean()
    self.std = data.std()

    for m in self.RMI:
        if m==1 :
            # 訓練第一層 NN 8x8 Model
            sgd=keras.optimizers.SGD(lr=0.000001)    # lr:學習率,可調參數
            self.index[0].compile(loss="mse", optimizer=sgd, metrics=["mse"])
            self.index[0].fit(norm_data, y, epochs=100, batch_size=32, verbose=0)

        else:
            # 依據第一層 Model 訓練結果將資料分配至第二層
            sub_data = [ [] for i in range(m)]        # 儲存第二層模型的各個keys
            sub_y = [ [] for i in range(m)]           # 儲存第二層模型的各個labels

            for i in range(self.N):
                print(data[i])
                mm = int(self.index[0].predict([[norm_data[i]]])*m/self.N)

                if mm < 0:
                    mm=0
                elif mm > m-1:
                    mm = m-1

                sub_data[mm].append(data[i])
                sub_y[mm].append(y[i])

            # 訓練第二層所有的 SLR Model
            for j in range(m):
                xx = np.array(sub_data[j])
                yy = np.array(sub_y[j])

                min_err = max_err = 0
                    if xx.size > 0:
                        xx = np.reshape(xx,(-1,1))
                        self.index[1][j].fit(xx, yy)
                        
                        # 計算最後一層 Model 的 min_err/max_err
                        for i in range(data.size):
                            pred_pos, _ = self.predict(data[i])
                            err = pred_pos - i
                            if err < min_err:
                                min_err = math.floor(err)
                            elif err > max_err:
                                max_err = math.ceil(err)
                    self.error_bound.append([min_err, max_err])

Search

查詢資料是否存在,首先執行predict,預測資料所在位置,如果預測不準,在 [pos+min_err, pos+max_err] 中進行Binary Search,也就是所謂的Model Biased Search。如果查詢到此key值回傳True,查詢不到則回傳False。

def predict(self, key):
    mm = int(self.index[0].predict([[(key-self.mean)/self.std]])*self.RMI[1]/self.N)
    if mm < 0:
        mm=0
    elif mm > self.RMI[1]-1:
        mm = self.RMI[1]-1
    pred_pos = int(self.index[1][mm].predict([[key]]))
    return pred_pos, mm
        

def search(self, key):   # Model Biased Search
    pos, model = self.predict(key)

    l = pos + self.error_bound[model][0]
    r = pos + self.error_bound[model][1]



    # 檢查預測出的位置是否超出資料範圍
    if pos < 0:
        l = pos = 0 
    if pos > self.N-1:
        r = pos = self.N-1

    if l < 0:
        l=0
    if r > self.N-1:
        r = self.N-1

    print(l,pos,r)

    # binary search
    while l<=r:

        if self.data[pos] == key:
            return True
        elif self.data[pos] > key:
            r = pos - 1
        elif self.data[pos] < key:
            l = pos + 1
        pos = int((l+r)/2)

    return False

Learned Index的模擬實作大致告一個段落,寫的很爛請多包涵XD
應該是沒有bugㄅ..有的話再改XD
提醒一下,訓練的資料必須是排序好的一維array且型態必須是numpy array!
明天我們會來比較Learned Index與單一個Model(NN、SLR)的預測分布 ~ 掰噗

https://ithelp.ithome.com.tw/upload/images/20201013/20129198RdFaOxcTGg.png


上一篇
Day 27 - Learned Index實作(1)
下一篇
Day 29 - Learned Index測試&比較
系列文
索引結構與機器學習的相遇30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言