iT邦幫忙

2018 iT 邦幫忙鐵人賽
DAY 22
1
Data Technology

Hadoop ecosystem 工具簡介, 安裝教學與各種情境使用系列 第 22

Day 22 - Spark Streaming 簡介

Spark streaming是以Spark核心API擴充出來的一個模組,他在處理資料串流(streaming)上具有可擴充性、高吞吐量、高容錯性特點。可以從Kafka,Flume,Kinesis或TCP等許多來源介接資料,也可以透過Spark API的map,reduce,join和window等函數進行複雜的運算來處理資料。最後再將運算結果送到檔案系統(如HDFS)、資料庫或是即時的監控系統,也可以將資料餵給機器學系的系統,進行即時的運算。

Spark streaming提供了一個高層級的抽象層,稱之為discretized stream(DStream),意指連續的資料串流。DStream可以通過Kafka,Flume和Kinesis等資料來源來建立,也可以通過在其他DStream的API來新增。在Spark streaming中,一個DStream即為一種有順序的RDD。

目前Spark streaming可以使用Scala,Java或Python(Spark 1.2開始支援)撰寫應用程式。

現在就來看個範例程式吧!下列的程式碼是在Spark 2.1.2執行。

import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

//建立一個DStream來表示來自TCP的data source,指定為主機名(例如localhost)和port(例如9999)。
val lines = ssc.socketTextStream("localhost", 9999)

// 以空白切開每行的字串
val words = lines.flatMap(_.split(" "))

// 計算每批資料的文字數量
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// 印出DStreamPrint到console視窗
wordCounts.print()

ssc.start()             // 開始計算
ssc.awaitTermination()  // 等待資料傳送到command視窗

如果已經從官方網站下載Spark,上列的程式碼已經被編譯並放入裡面,可以透過下面的指令以local模式來啟動這個Saprk streaming的程式:

./bin/run-example streaming.NetworkWordCount localhost 9999

啟動之後我們需要一個tcp的server來傳入資料,可以使用nc這個工具來偽裝一個tcp伺服器輸入資料,開啟另一個command視窗來使用下來指令:

nc -lk 9999

啟動後,可以在裡面輸入資料,這樣另一個Spark streaming的視窗就會顯示由tpc server傳入的文字。

最後

看完的入門的Spark streaming,Spark的簡介也告一段落了,如果想看更進階的使用方法,可以到Apache Spark官方網站查看文件,官方文件蠻齊全的,如有問題也可以到Spark 社群討論,或者在者邊留言。

接下來,我們要邁入最後的篇章Apache Hive。


上一篇
Day 21 - Spark SQL 簡介
下一篇
Day 23 - Apache Hive 簡介
系列文
Hadoop ecosystem 工具簡介, 安裝教學與各種情境使用30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言