iT邦幫忙

2017 iT 邦幫忙鐵人賽
DAY 5
0
Big Data

Spark 2.0 in Scala系列 第 5

[Spark-Day5](基礎篇) 撰寫第1支Spark App

spark-shell玩夠了,開始寫標準的APP吧。
這支簡易的應用會load一份json檔案並透過SparkSQL操作一些查詢。
sparkSQL厲害的地方之一就是SparkSQL會為結構資料自動推斷schema(綱要),稍候看範例中更直覺。

開發環境:Ubuntu + Intellij + SBT

這是Scala常見的開發環境,如何設定就不多說,查一下就有了(Intellij派站出來XD)
而簡易build.sbt檔案內容如下:

name := "Spark"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.0"
libraryDependencies += "log4j" % "log4j" % "1.2.16"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.0.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.0.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8-assembly_2.11" % "2.0.0"

這檔案有點類似Maven的pom.xml或是gradle的build.gradle,會去一些公開的repo抓套件。編寫好之後intellij會跑一陣子下載相關套件。上面的相依性已經把core、sql、streaming、kafka這幾個我們之後會玩到的套件都先準備好了,若以後還需要其他套件會再說明。


[Snippet.5]撰寫第1支Spark App
我們將分析
資料準備:我們使用一份網路上開放的GitHub模擬事件的json檔案,並進行一些分析
資料來源:http://data.githubarchive.org/2015-03-01-0.json.gz

$ mkdir -p home/joechh/sia/github-archive
$ cd home/joechh/sia/github-archive
$ wget http://data.githubarchive.org/2015-03-01-0.json.gz

Linux下可以簡單的用wget下載檔案
範例中檔案最後的路徑為/home/joechh/sia/github-archive/2015-03-01-0.json

看一下json檔案吧:

$ head -n 1 2015-03-01-0.json
{"id":"2614896652","type":"CreateEvent","actor":{"id":739622,"login":"treydock","gravatar_id":"","url":"https://api.github.com/users/treydock","avatar_url":"https://avatars.githubusercontent.com/u/739622?"},"repo":{"id":23934080,"name":"Early-Modern-OCR/emop-dashboard","url":"https://api.github.com/repos/Early-Modern-OCR/emop-dashboard"},"payload":{"ref":"development","ref_type":"branch","master_branch":"master","description":"","pusher_type":"user"},"public":true,"created_at":"2015-03-01T00:00:00Z","org":{"id":10965476,"login":"Early-Modern-OCR","gravatar_id":"","url":"https://api.github.com/orgs/Early-Modern-OCR","avatar_url":"https://avatars.githubusercontent.com/u/10965476?"}}

這邊特別注意,Spark可以處理的json檔案並不會我們常見的一般跨行json檔案,spark能處理的json檔案,一筆資料就必須是一個json!所以一個spark處理的json檔案,裡面可能就會塞上萬個json,每行都是完整的一筆json。這種作法也方便streaming處理,把一個message event塞成一個一筆json即可XD

....若嫌太醜可以裝個jq小套件觀察 (apt安裝找jq即可),但只有改動輸出,底層還是一筆一個json

裝好後再看看

$head -n 1 2015-03-01-0.json | jq '.'
{
  "id": "2614896652",
  "type": "CreateEvent",
  "actor": {
    "id": 739622,
    "login": "treydock",
    "gravatar_id": "",
    "url": "https://api.github.com/users/treydock",
    "avatar_url": "https://avatars.githubusercontent.com/u/739622?"
  },
  "repo": {
    "id": 23934080,
    "name": "Early-Modern-OCR/emop-dashboard",
    "url": "https://api.github.com/repos/Early-Modern-OCR/emop-dashboard"
  },
  "payload": {
    "ref": "development",
    "ref_type": "branch",
    "master_branch": "master",
    "description": "",
    "pusher_type": "user"
  },
  "public": true,
  "created_at": "2015-03-01T00:00:00Z",
  "org": {
    "id": 10965476,
    "login": "Early-Modern-OCR",
    "gravatar_id": "",
    "url": "https://api.github.com/orgs/Early-Modern-OCR",
    "avatar_url": "https://avatars.githubusercontent.com/u/10965476?"
  }
}

恩,好多了。假設我們想要進行以下分析:

檔案中所有事件的筆數?push的事件(也就是"type"為"PushEvent")有幾次?

Code:

import org.apache.spark.sql.SparkSession


/**
  * Created by joechh on 2016/12/20.
  */
object FirstApp {
  def main(args: Array[String]): Unit = {


    val spark = SparkSession.builder()  ①
      .appName("GitHub push counter") 
      .master("local[*]")
      .getOrCreate()

    val homeDir = System.getenv("HOME") ②
    val inputPath = homeDir + "/sia/github-archive/2015-03-01-0.json"
    val githubLog = spark.read.json(inputPath) ③
    val pushes = githubLog.filter("type ='PushEvent'") ④

    githubLog.printSchema ⑤
    println("all events:" + githubLog.count) ⑥
    println("push events:" + pushes.count)
    pushes.show(5) ⑦

    val grouped = pushes.groupBy("actor.login").count() ⑧
    grouped.show(5)

    val ordered = grouped.orderBy(grouped("count").desc) ⑨
    ordered.show(5)
  }

}

①之前在shell中都不用自己宣告Spark進入點,現在就要了XDD,此範例只會用到SparkSQL,其進入點在2.0後統一為SparkSession,並有factory方法提供物件(物件型別為DataSet*)。appName是應用程式名稱,之後用監控工具觀看時,就會以此定義的名稱為準。master裡面直接寫明"local[*]"代表這是本機執行的spark應用程式,不用丟到叢集上跑,對開發比較方便。

homeDir、inputPath在組路徑名稱

SparkSession有提供多種read方式(在此使用read.json(FilePath))讀取檔案
自此一個擁有綱要,可以使用類似SQL語法的githubLog RDD就這樣產生了。

④當然一般RDD支援的函式,例如map、filter等函式當然能用(還能搭配綱要,酷吧)。在此filter欄位typePushEvent的資料,並放入pushes RDD

⑤印出githubLog DataSet的綱要資訊

⑥透過先前用過的count函式印出githubLog與pushes的筆數

show類似DB的select語法,在此印出5筆(若不指定預設是印出20筆)

⑧SQL中的groupBy語法當然也有支援,範例中groupBy Json中的action.login欄位。在此搭配groupBy的 Aggregate函式為count,也能搭配max、min、mean、sum等,就跟RDB一樣~

⑨也能orderBy某個欄位

注:若有用過SparkSQL的應該有聽過DataFrame,2.0後DataFrame只是DataSet的一種特殊型別:DataSet[Row]

輸出畫面:

⑤的綱要
root
 |-- actor: struct (nullable = true)
 |    |-- avatar_url: string (nullable = true)
 |    |-- gravatar_id: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- login: string (nullable = true)
 |    |-- url: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- id: string (nullable = true)
 |-- org: struct (nullable = true)
 |    |-- avatar_url: string (nullable = true)
 |    |-- gravatar_id: string (nullable = true)
 |    |-- id: long (nullable = true)
 ...
 ...
 ... 略
 
⑥的筆數
all events:17786
push events:8793

/*⑦的欄位太多,不放了XD,整個JSON按階層輸出*/

⑧GroupBy結果
+------------+-----+
|       login|count|
+------------+-----+
|john-griffin|    1|
|   digitized|    3|
| theCodeBear|    1|
|      WillHK|    1|
|  sakuya3834|    1|
+------------+-----+
only showing top 5 rows

⑨OrderBy結果
+------------------+-----+
|             login|count|
+------------------+-----+
|      greatfirebot|  192|
|diversify-exp-user|  146|
|     KenanSulayman|   72|
|        manuelrp07|   45|
|    mirror-updates|   42|
+------------------+-----+
only showing top 5 rows

上一篇
[Spark-Day4](基礎篇) Scala & RDD中的Implicit Conversion
下一篇
[Spark-Day6](基礎篇) For expression、 Set 、 SparkSQL UDF by Use Case
系列文
Spark 2.0 in Scala30

尚未有邦友留言

立即登入留言