iT邦幫忙

2017 iT 邦幫忙鐵人賽
DAY 6
0
Big Data

Spark 2.0 in Scala系列 第 6

[Spark-Day6](基礎篇) For expression、 Set 、 SparkSQL UDF by Use Case

延續昨天的案例,昨天的案例中,我們已經可以順利取得某類GitHub Event(例如PushEvent)的所有成員,並且進行一些額外操作(例如GroupBy, Sort等),今天我們延續這個案例,假如:

我想知道ph公司的攻城獅在GitHub上的push次數?(假設有一份提供成員名單)

提供的成員名單就是一份很單純的txt檔案:
ghEmployees.txt:

aclindsa
adamschwartz
ahsojar
AiMadobe
Akkyie
...
...

檔案Google Drive Link(之後會全部整到GitHub上)

這邊的解法可以有許多種:
1 例如將txt透過SparkSQLload成另外一張表,然後用RDB的概念做join操作
2 或是將檔案讀出放入某個集合物件中,然後將先前的ordered表透過某種方式進行比對
3 其他...

在此為了介紹For expression、Set與SparkSQL UDF,我們選擇第2種方式:
觀察過ghEmployees.txt,我們想要

  • 將檔案逐行讀出
  • 去除空白(trim)
  • 塞入某個物件集合該怎麼做?

Java工程師應該會開始宣告List或Set開File ReaderreadLinelist.add之類的吧?
整段處理下來也要宣告幾個變數,再加點程式碼也要十來行吧
在Spark&Scala中會怎麼做?最常見的作法之一就是透過好用的For expression來讀檔:


[Snippet.6]透過For expression讀取檔案Byline

for {
    line <- Source.fromFile([filePath]).getLines() ①
   } yield line.trim ②

有沒有感覺在Scala中優雅許多呢~
① line <- Source.fromFile([filePath]).getLines 做了一系列的操作:

  • Source.fromFile([filePath])取得檔案物件
  • 使用getLines以行為單位
  • 傳給line

yield line.trim 代表產生一個集合物件,裡面的每個元素為trim過的line

for Expression是Scala中很方便的表示方式之一,可以用很精練的方式表示許多操作。
ex:遍歷1到5,然後將每個元素都*2,並放入另外一個集合物件中

scala>  for (i <- 1 to 5) yield i * 2
res0: scala.collection.immutable.IndexedSeq[Int] = Vector(2, 4, 6, 8, 10)

ex:不想遍歷1到5,想一次跳2(也就是1,3,5 )怎麼做?

scala>  for (i <- 1 to 5 by 2) yield i * 2
res1: scala.collection.immutable.IndexedSeq[Int] = Vector(2, 6, 10)

OK回到原題,yield line.trim總要有變數接吧?不是要放入集合物件中嗎?
在此我們選擇將資料放入Set物件中,一小段程式碼會長的像這樣:

val empPath = homeDir + "/spark/day06/ghEmployees.txt"
val employees = Set() ++ ( ①
  for {
      line <- Source.fromFile(empPath).getLines() 
     } yield line.trim 
  )   

① 用++將yield line.trim產生的集合物件放入空的Set中


OK,將檔案讀出放入集合物件了,再來我們要寫一個函式,這個函式的作用是:
輸入一個字串,若有出現在employees中則回傳True,否則就回傳False:
[Snippet.7]函式變數

val isEmp: String => Boolean = {
   name => employees.contains(name)
  }

我們來好好解釋一下這段,

  • val isEmp: 變數名稱
  • : String => Boolean: 代表的是這個變數的型態!沒錯,isEmp不是一個簡單的String或Boolean變數,他代表輸入一個String,輸出為Boolean的一個函式型態,也就是說isEmp是一個函式變數,Scala中,函式也可以當作參數傳來傳去,酷吧,因為在Scala中有“functions are first class values”的概念,函式與一般變數無異。
  • ={name => employees.contains(name)}:是函式本體,這段就比較清楚了,lambda表示式,將一個name字串導入,執行employees.contains(name)的判斷,這就跟Java的contains方法很像了,輸出為Boolean。

OK,寫了一個isEmp函式,但這又不是標準SQL函式該怎麼辦?那就將他註冊成DB概念中的UDF吧:


[Snippet.8]SparkSQL User Define Fuction(UDF)註冊

  val isEmployee = spark.udf.register("isEmpUdf", isEmp)

接著就能透過UDF比對order中的資料了:

val filtered = ordered.filter(isEmployee($"login"))

可以看到透過ordered RDD透過filter方式呼叫isEmployee進行過慮,將結果放入filtered。


完整程式碼(本機模式):

package sia

import org.apache.spark.sql.SparkSession

import scala.io.Source

/**
  * Created by joechh on 2016/12/20.
  */
object Day06App {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .appName("GitHub push counter")
      .master("local[*]")
      .getOrCreate()

    val homeDir = System.getenv("HOME")
    val inputPath = homeDir + "/sia/github-archive/2015-03-01-0.json"
    val githubLog = spark.read.json(inputPath)
    val pushes = githubLog.filter("type ='PushEvent'")

    val grouped = pushes.groupBy("actor.login").count()
    val ordered = grouped.orderBy(grouped("count").desc)

    ordered.show(5) ②

    val empPath = homeDir + "/spark/day06/ghEmployees.txt"
    val employees = Set() ++ (
      for {
        line <- Source.fromFile(empPath).getLines
      } yield line.trim
      )

    val isEmp: String => Boolean = {
      name => employees.contains(name)
    }

    val isEmployee = spark.udf.register("isEmpUdf", isEmp)
    import spark.implicits._ ①
    val filtered = ordered.filter(isEmployee($"login"))
    filtered.show(5) ③
  }

}

比較特別的是①這邊要做一個匯入implicits函數的動作(預設不會載入),否則isEmployee($"login")這種寫法會報錯,說不認得isEmployee($"login")這個語法,這代表將ordered中的login欄位傳給isEmployee UDF

輸出結果:

②的order原始輸出
+------------------+-----+
|             login|count|
+------------------+-----+
|      greatfirebot|  192|
|diversify-exp-user|  146|
|     KenanSulayman|   72|
|        manuelrp07|   45|
|    mirror-updates|   42|
+------------------+-----+
only showing top 5 rows

③與employees比對過後的過慮結果
+---------------+-----+
|          login|count|
+---------------+-----+
|  KenanSulayman|   72|
|     manuelrp07|   45|
|        Somasis|   26|
|direwolf-github|   24|
|EmanueleMinotto|   22|
+---------------+-----+
only showing top 5 rows

上一篇
[Spark-Day5](基礎篇) 撰寫第1支Spark App
下一篇
[Spark-Day7](基礎篇) Broadcast與透過Spark-submit遞交工作
系列文
Spark 2.0 in Scala30

尚未有邦友留言

立即登入留言