spark-shell玩夠了,開始寫標準的APP吧。
這支簡易的應用會load一份json檔案並透過SparkSQL操作一些查詢。
sparkSQL厲害的地方之一就是SparkSQL會為結構資料自動推斷schema(綱要)
,稍候看範例中更直覺。
這是Scala常見的開發環境,如何設定就不多說,查一下就有了(Intellij派站出來XD)
而簡易build.sbt檔案內容如下:
name := "Spark"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.0"
libraryDependencies += "log4j" % "log4j" % "1.2.16"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.0.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.0.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8-assembly_2.11" % "2.0.0"
這檔案有點類似Maven的pom.xml
或是gradle的build.gradle
,會去一些公開的repo抓套件。編寫好之後intellij會跑一陣子下載相關套件。上面的相依性已經把core、sql、streaming、kafka
這幾個我們之後會玩到的套件都先準備好了,若以後還需要其他套件會再說明。
[Snippet.5]撰寫第1支Spark App
我們將分析
資料準備:我們使用一份網路上開放的GitHub模擬事件的json檔案,並進行一些分析
資料來源:http://data.githubarchive.org/2015-03-01-0.json.gz
$ mkdir -p home/joechh/sia/github-archive
$ cd home/joechh/sia/github-archive
$ wget http://data.githubarchive.org/2015-03-01-0.json.gz
Linux下可以簡單的用wget下載檔案
範例中檔案最後的路徑為/home/joechh/sia/github-archive/2015-03-01-0.json
看一下json檔案吧:
$ head -n 1 2015-03-01-0.json
{"id":"2614896652","type":"CreateEvent","actor":{"id":739622,"login":"treydock","gravatar_id":"","url":"https://api.github.com/users/treydock","avatar_url":"https://avatars.githubusercontent.com/u/739622?"},"repo":{"id":23934080,"name":"Early-Modern-OCR/emop-dashboard","url":"https://api.github.com/repos/Early-Modern-OCR/emop-dashboard"},"payload":{"ref":"development","ref_type":"branch","master_branch":"master","description":"","pusher_type":"user"},"public":true,"created_at":"2015-03-01T00:00:00Z","org":{"id":10965476,"login":"Early-Modern-OCR","gravatar_id":"","url":"https://api.github.com/orgs/Early-Modern-OCR","avatar_url":"https://avatars.githubusercontent.com/u/10965476?"}}
這邊特別注意,Spark可以處理的json檔案並不會我們常見的一般跨行json檔案,spark能處理的json檔案,一筆資料就必須是一個json!
所以一個spark處理的json檔案,裡面可能就會塞上萬個json,每行都是完整的一筆json。這種作法也方便streaming處理,把一個message event塞成一個一筆json即可XD
....若嫌太醜可以裝個jq小套件觀察 (apt安裝找jq即可),但只有改動輸出,底層還是一筆一個json
裝好後再看看
$head -n 1 2015-03-01-0.json | jq '.'
{
"id": "2614896652",
"type": "CreateEvent",
"actor": {
"id": 739622,
"login": "treydock",
"gravatar_id": "",
"url": "https://api.github.com/users/treydock",
"avatar_url": "https://avatars.githubusercontent.com/u/739622?"
},
"repo": {
"id": 23934080,
"name": "Early-Modern-OCR/emop-dashboard",
"url": "https://api.github.com/repos/Early-Modern-OCR/emop-dashboard"
},
"payload": {
"ref": "development",
"ref_type": "branch",
"master_branch": "master",
"description": "",
"pusher_type": "user"
},
"public": true,
"created_at": "2015-03-01T00:00:00Z",
"org": {
"id": 10965476,
"login": "Early-Modern-OCR",
"gravatar_id": "",
"url": "https://api.github.com/orgs/Early-Modern-OCR",
"avatar_url": "https://avatars.githubusercontent.com/u/10965476?"
}
}
恩,好多了。假設我們想要進行以下分析:
檔案中所有事件的筆數?push的事件(也就是"type"為"PushEvent")有幾次?
Code:
import org.apache.spark.sql.SparkSession
/**
* Created by joechh on 2016/12/20.
*/
object FirstApp {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder() ①
.appName("GitHub push counter")
.master("local[*]")
.getOrCreate()
val homeDir = System.getenv("HOME") ②
val inputPath = homeDir + "/sia/github-archive/2015-03-01-0.json"
val githubLog = spark.read.json(inputPath) ③
val pushes = githubLog.filter("type ='PushEvent'") ④
githubLog.printSchema ⑤
println("all events:" + githubLog.count) ⑥
println("push events:" + pushes.count)
pushes.show(5) ⑦
val grouped = pushes.groupBy("actor.login").count() ⑧
grouped.show(5)
val ordered = grouped.orderBy(grouped("count").desc) ⑨
ordered.show(5)
}
}
①之前在shell中都不用自己宣告Spark進入點,現在就要了XDD,此範例只會用到SparkSQL,其進入點在2.0後統一為SparkSession,並有factory方法提供物件(物件型別為DataSet*)。appName
是應用程式名稱,之後用監控工具觀看時,就會以此定義的名稱為準。master
裡面直接寫明"local[*]"
代表這是本機執行的spark應用程式,不用丟到叢集上跑,對開發比較方便。
②homeDir、inputPath
在組路徑名稱
③SparkSession
有提供多種read方式(在此使用read.json(FilePath)
)讀取檔案
自此一個擁有綱要,可以使用類似SQL語法的githubLog RDD就這樣產生了。
④當然一般RDD支援的函式,例如map、filter等函式當然能用(還能搭配綱要,酷吧)。在此filter欄位type
為PushEvent
的資料,並放入pushes RDD
⑤印出githubLog DataSet的綱要資訊
⑥透過先前用過的count函式
印出githubLog與pushes的筆數
⑦show
類似DB的select語法,在此印出5筆(若不指定預設是印出20筆)
⑧SQL中的groupBy語法當然也有支援,範例中groupBy Json中的action.login
欄位。在此搭配groupBy的 Aggregate函式為count,也能搭配max、min、mean、sum等,就跟RDB一樣~
⑨也能orderBy某個欄位
注:若有用過SparkSQL的應該有聽過DataFrame,2.0後DataFrame只是DataSet的一種特殊型別:DataSet[Row]
輸出畫面:
⑤的綱要
root
|-- actor: struct (nullable = true)
| |-- avatar_url: string (nullable = true)
| |-- gravatar_id: string (nullable = true)
| |-- id: long (nullable = true)
| |-- login: string (nullable = true)
| |-- url: string (nullable = true)
|-- created_at: string (nullable = true)
|-- id: string (nullable = true)
|-- org: struct (nullable = true)
| |-- avatar_url: string (nullable = true)
| |-- gravatar_id: string (nullable = true)
| |-- id: long (nullable = true)
...
...
... 略
⑥的筆數
all events:17786
push events:8793
/*⑦的欄位太多,不放了XD,整個JSON按階層輸出*/
⑧GroupBy結果
+------------+-----+
| login|count|
+------------+-----+
|john-griffin| 1|
| digitized| 3|
| theCodeBear| 1|
| WillHK| 1|
| sakuya3834| 1|
+------------+-----+
only showing top 5 rows
⑨OrderBy結果
+------------------+-----+
| login|count|
+------------------+-----+
| greatfirebot| 192|
|diversify-exp-user| 146|
| KenanSulayman| 72|
| manuelrp07| 45|
| mirror-updates| 42|
+------------------+-----+
only showing top 5 rows
補充一篇jq相關的文章XD
https://newtoypia.blogspot.tw/2015/03/json-jq.html