延續昨天的案例,昨天的案例中,我們已經可以順利取得某類GitHub Event(例如PushEvent)的所有成員,並且進行一些額外操作(例如GroupBy, Sort等),今天我們延續這個案例,假如:
我想知道ph公司的攻城獅在GitHub上的push次數?(假設有一份提供成員名單)
提供的成員名單就是一份很單純的txt檔案:
ghEmployees.txt:
aclindsa
adamschwartz
ahsojar
AiMadobe
Akkyie
...
...
檔案Google Drive Link(之後會全部整到GitHub上)
這邊的解法可以有許多種:
1 例如將txt透過SparkSQLload成另外一張表,然後用RDB的概念做join操作
2 或是將檔案讀出放入某個集合物件中,然後將先前的ordered表透過某種方式進行比對
3 其他...
在此為了介紹For expression、Set與SparkSQL UDF,我們選擇第2種方式:
觀察過ghEmployees.txt,我們想要
逐行
讀出去除空白(trim)
塞入某個物件集合
該怎麼做?Java工程師應該會開始宣告List或Set
、開File Reader
、readLine
、list.add
之類的吧?
整段處理下來也要宣告幾個變數,再加點程式碼也要十來行吧
在Spark&Scala中會怎麼做?最常見的作法之一就是透過好用的For expression來讀檔:
[Snippet.6]透過For expression讀取檔案Byline
for {
line <- Source.fromFile([filePath]).getLines() ①
} yield line.trim ②
有沒有感覺在Scala中優雅許多呢~
① line <- Source.fromFile([filePath]).getLines 做了一系列的操作:
Source.fromFile([filePath])
取得檔案物件getLines
以行為單位② yield line.trim
代表產生一個集合物件,裡面的每個元素為trim過的line
for Expression是Scala中很方便的表示方式之一,可以用很精練的方式表示許多操作。
ex:遍歷1到5,然後將每個元素都*2,並放入另外一個集合物件中
scala> for (i <- 1 to 5) yield i * 2
res0: scala.collection.immutable.IndexedSeq[Int] = Vector(2, 4, 6, 8, 10)
ex:不想遍歷1到5,想一次跳2(也就是1,3,5 )怎麼做?
scala> for (i <- 1 to 5 by 2) yield i * 2
res1: scala.collection.immutable.IndexedSeq[Int] = Vector(2, 6, 10)
OK回到原題,yield line.trim
總要有變數接吧?不是要放入集合物件中嗎?
在此我們選擇將資料放入Set物件
中,一小段程式碼會長的像這樣:
val empPath = homeDir + "/spark/day06/ghEmployees.txt"
val employees = Set() ++ ( ①
for {
line <- Source.fromFile(empPath).getLines()
} yield line.trim
)
① 用++將yield line.trim
產生的集合物件放入空的Set中
OK,將檔案讀出放入集合物件了,再來我們要寫一個函式,這個函式的作用是:
輸入一個字串,若有出現在employees中則回傳True,否則就回傳False:
[Snippet.7]函式變數
val isEmp: String => Boolean = {
name => employees.contains(name)
}
我們來好好解釋一下這段,
val isEmp
: 變數名稱: String => Boolean
: 代表的是這個變數的型態!沒錯,isEmp不是一個簡單的String或Boolean變數,他代表輸入一個String,輸出為Boolean的一個函式型態
,也就是說isEmp是一個函式變數
,Scala中,函式也可以當作參數傳來傳去,酷吧,因為在Scala中有“functions are first class values”的概念,函式與一般變數無異。={name => employees.contains(name)}
:是函式本體,這段就比較清楚了,lambda表示式,將一個name字串導入,執行employees.contains(name)
的判斷,這就跟Java的contains方法很像了,輸出為Boolean。OK,寫了一個isEmp函式,但這又不是標準SQL函式該怎麼辦?那就將他註冊成DB概念中的UDF吧:
[Snippet.8]SparkSQL User Define Fuction(UDF)註冊
val isEmployee = spark.udf.register("isEmpUdf", isEmp)
接著就能透過UDF比對order中的資料了:
val filtered = ordered.filter(isEmployee($"login"))
可以看到透過ordered RDD透過filter方式呼叫isEmployee進行過慮,將結果放入filtered。
完整程式碼(本機模式):
package sia
import org.apache.spark.sql.SparkSession
import scala.io.Source
/**
* Created by joechh on 2016/12/20.
*/
object Day06App {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("GitHub push counter")
.master("local[*]")
.getOrCreate()
val homeDir = System.getenv("HOME")
val inputPath = homeDir + "/sia/github-archive/2015-03-01-0.json"
val githubLog = spark.read.json(inputPath)
val pushes = githubLog.filter("type ='PushEvent'")
val grouped = pushes.groupBy("actor.login").count()
val ordered = grouped.orderBy(grouped("count").desc)
ordered.show(5) ②
val empPath = homeDir + "/spark/day06/ghEmployees.txt"
val employees = Set() ++ (
for {
line <- Source.fromFile(empPath).getLines
} yield line.trim
)
val isEmp: String => Boolean = {
name => employees.contains(name)
}
val isEmployee = spark.udf.register("isEmpUdf", isEmp)
import spark.implicits._ ①
val filtered = ordered.filter(isEmployee($"login"))
filtered.show(5) ③
}
}
比較特別的是①這邊要做一個匯入implicits函數的動作(預設不會載入),否則isEmployee($"login")這種寫法會報錯,說不認得isEmployee($"login")這個語法,這代表將ordered中的login欄位傳給isEmployee UDF
輸出結果:
②的order原始輸出
+------------------+-----+
| login|count|
+------------------+-----+
| greatfirebot| 192|
|diversify-exp-user| 146|
| KenanSulayman| 72|
| manuelrp07| 45|
| mirror-updates| 42|
+------------------+-----+
only showing top 5 rows
③與employees比對過後的過慮結果
+---------------+-----+
| login|count|
+---------------+-----+
| KenanSulayman| 72|
| manuelrp07| 45|
| Somasis| 26|
|direwolf-github| 24|
|EmanueleMinotto| 22|
+---------------+-----+
only showing top 5 rows
文章寫得相當詳細,超讚的。
我是 scala 的新手,從文章中也學到 scala 的觀念,
既學習 spark 又學習 scala。
過一年了還有人看,太感恩了XD。只要有幫上忙就好了~