iT邦幫忙

2021 iThome 鐵人賽

DAY 27
0
AI & Data

資料三十-那些最基本的資料處理與分析技能系列 第 27

【Day27-並列】大 大 大資料——操作巨量資料的必備觀念MapReduce

在處理資料分析的時候,有的時候我們會需要將非常大量的資料之間進行一些交互的計算(例如矩陣乘法之類的),而過程中會需要進行儲存的記憶體空間需求就會變得很大,因此就會需要一種可以有效將本來需要大量暫存空間的計算方式給改良的計算過程,因此今天就介紹一下在做這塊的部分吧

MapReduce

Mapreduce在巨量資料處理中的概念是利用了將可以把原本都要一起計算的東西,拆分成許多小的計算步驟,而每個可並行的小步驟之間又可以獨立計算,因此可以減少記憶體空間的需求或是可以用多台資源進行計算。

Map——將每個步驟的最小元素進行轉換

我們這邊以計算單字數量為例,那如果要按照傳統的計算方式我們可能會需要先將所有的內容存到同一個地方才能計算,那這邊我們假設這邊的步驟是可以分散進行的,那計算單字數量一共可以拆分成兩步

  • 第一步:將所有單字都計數1
  • 第二步:將相同單字(key)的每兩個元素的計數相加,結合成一個元素
words = ["Apple", "Banana", "Watermelon", "Banana", "Apple", "Apple"]
words_mapped = list(map(lambda x: (x,1), words))
words_mapped

輸出:

[('Apple', 1),
 ('Banana', 1),
 ('Watermelon', 1),
 ('Banana', 1),
 ('Apple', 1),
 ('Apple', 1)]

Reduce——前一步驟的結果整合

這邊先以pandas作為演練

import pandas as pd
df = pd.DataFrame(words_mapped, columns = ["word","count"])
df

輸出:

df.groupby("word").agg({"sum"})

輸出:

以大矩陣乘法為例示範pyspark

那實際上通常在跑巨量資料的一個架構是spark,而它也有在python上的套件

lines = sc.textFile("inputdata.txt")



# %%
def mapper_input(line):
    matrix_list = line.split(",")
    matrix_map = (matrix_list[0], [matrix_list[1],matrix_list[2],matrix_list[3]])
    return matrix_map

linesRDD = lines.map(mapper_input)

# 因為有了`'M'`或是`'N'`的key值,所以可以將資料分成M矩陣和N矩陣  
# 這邊用RDD的`filter`來進行
# %%
M_RDD = linesRDD.filter(lambda x : "M" in x[0]) # [0]is the key value
N_RDD = linesRDD.filter(lambda x : "N" in x[0])

# Multiply


# %%
M_RDD_matrix = M_RDD.map(lambda x : (x[1][1], [x[0],x[1][0],x[1][2]] ))
N_RDD_matrix = N_RDD.map(lambda x : (x[1][0], [x[0],x[1][1],x[1][2]] ))

MN_RDD = M_RDD_matrix.join(N_RDD_matrix)


# %%
P_RDD_matrix = MN_RDD.map(lambda x: ((int(x[1][0][1]),int(x[1][1][1])),int(x[1][0][2])*int(x[1][1][2])) )

P_RDD = P_RDD_matrix.reduceByKey(lambda x,y: int(x)+int(y))

# 為了符合輸出個格式,先用`sortByKey()`來排序

# %%
P_RDD_sort = P_RDD.sortByKey()

# %%
P_RDD_sort.collect()

# %% [markdown]
# 將排序好的RDD重新用`map()`來把$((i,k),P_{ij})$映射到同一層以方便輸出

# %%
P_RDD_reshape = P_RDD_sort.map(lambda x : (x[0][0],x[0][1],x[1]) )

# %%
output = P_RDD_reshape.collect()

產生基本RDD元素

定義mapper_input()function,用來將原本每一行的資料切分和整理
輸入格式: M,0,0,10
輸出格式: ('M', ['0', '0', '10']),

get martix M and N by filter

因為有了'M'或是'N'的key值,所以可以將資料分成M矩陣和N矩陣
這邊用RDD的filter來進行

相乘

因為,兩個矩陣可以相乘的話一定要有共同的j,所以將M和N分別以row vector和column vector來表示以便進行後續的計算

在進行大矩陣運算的時候的重點在於找出可以不會互相影響的步驟而拆開進行

輸入輸出

輸入

每行表示M或N矩陣的第i,j個元素為多少

輸出

每行表示M或N矩陣的第i,j個元素為多少


上一篇
【Day26-報表】我的資料儀錶板動起來了——超方便的互動式報表工具Google Data Studio上手教學
下一篇
【Day28-爬蟲】資料分析有時候還是需要自己生資料的——以python自動抓取gif梗圖為例,十分鐘簡單上手爬蟲(含範例程式)
系列文
資料三十-那些最基本的資料處理與分析技能30

尚未有邦友留言

立即登入留言