Spark Streaming支援多種資料來源,最基本內建常用的有:
外部系統
作為串流輸入來源,例如:SMACK架構
中的K
(Kafka)跟S
(Spark)這兩個好朋友怎麼接在一起吧!!orders
Topicmetrics
Topic (OUT)我們要建立兩個topic:orders跟metrics,還記得怎麼建立Topics嗎?
joechh@joechh:/opt/kafka_2.11-0.8.2.1$ bin/kafka-topics.sh --create --zookeeper 192.168.56.1 --replication-factor 1 --partitions 1 --topic orders
Created topic "orders"
joechh@joechh:/opt/kafka_2.11-0.8.2.1$ bin/kafka-topics.sh --create --zookeeper 192.168.56.1 --replication-factor 1 --partitions 1 --topic orders
Created topic "metrics"
Script(檔名為streamOrders.sh)如下:
#!/bin/bash
BROKER=$1
if [ -z "$1" ]; then
BROKER="192.168.56.1:9092" ①
fi
cat orders.txt | while read lorders.txt | while read line; do
echo "$line"
sleep 0.1
done | /opt/kafka_2.11-0.8.2.1/bin/kafka-console-producer.sh --broker-list $BROKER --topic orders ②
或者你可以透過我的Google Drive Link直接下載:
①的部份需要改寫成自己的kafka broker IP位置,若用預設的話可以用本機localhost:9092
②/opt/kafka_2.11-0.8.2.1/bin/kafka-console-producer.sh
這個執行檔要看你安裝的方式,如果是用bin解開直接用的話就看你的bin目錄在哪(以我為例是在/opt/kafka_2.11-0.8.2.1
下),但若是用service (daemon)安裝的方式(例如yum或apt-get)的話,有可能是在/usr/local/kafka/bin/...
找一下就有了。
透過指令將script轉成執行檔比較好用
$chmod +x streamOrders.sh
PS:先前的orders.txt要與此腳本放在同一目錄
,類似如下
joechh@joechh:~/Scala/sparkIronMan/Spark/src/main/resources/day26$ ls
orders.txt streamOrders.sh
從腳本的內容可以大致看出來他做了啥時,簡單來說,他會readline然後每一行透過kafka-console-producer.sh
傳到Topic orders
。此外,每筆間隔0.1秒,避免throughput太高一下傳完了XD。
接著測試一下吧,根據之前安裝後的測試方式,開一個console的consume在orders topic:
joechh@joechh:/opt/kafka_2.11-0.8.2.1$ bin/kafka-console-consumer.sh --zookeeper 192.168.56.1:2181 --topic orders
然後,執行./streamOrders.sh
,如果順利。orders.txt資料就會寫入orders topics然後被console-consumer吃到:
joechh@joechh:/opt/kafka_2.11-0.8.2.1$ bin/kafka-console-consumer.sh --zookeeper 192.168.56.1:2181 --topic orders
2016-03-22 20:25:28,1,80,EPE,710,51.00,B
2016-03-22 20:25:28,2,70,NFLX,158,8.00,B
2016-03-22 20:25:28,3,53,VALE,284,5.00,B
2016-03-22 20:25:28,4,14,SRPT,183,34.00,B
2016-03-22 20:25:28,5,62,BP,241,36.00,S
2016-03-22 20:25:28,6,52,MNKD,296,28.00,S
....
....
好,改寫程式Spark程式之前,我們先看看假設不透過SparkRDD
,要怎麼透過Scala Client端使用Kafka呢?
簡單來說,Kafka有兩個相當重要的元件:
Producer的code可以視為一段小程式,有時候可以會放到產生資料端的機器中與其他程式整合,並將資料送回Kafka Broker,先看看一段Sample的Kafka Producer:
[Snippet.57] Kafka Producer Without Spark
package sparkIronMan
import java.util.{Date, Properties}
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import scala.util.Random
object ScalaProducerExample extends App {
val topic = "testTopic" ①
val brokers = "192.168.56.1:9092" ②
val rnd = new Random()
val props = new Properties() ③
props.put("metadata.broker.list", brokers) ④
props.put("serializer.class", "kafka.serializer.StringEncoder") ⑤
props.put("producer.type", "async") ⑥
val producer = new Producer[String, String]new ProducerConfig(props) ⑦
for (nEvents <- Range(0, 10000b)) {
val runtime = new Date().getTime();
val ip = "192.168.2." + rnd.nextInt(255);
val msg = runtime + "," + nEvents + ",www.example.com," + ip;
val data = new KeyedMessage[String, String](topic, ip, msg); ⑧
producer.send(data); ⑨
}
producer.close();
}
①②要把資料寫到Broker,當然要知道寫到哪個topic跟brokers的資訊啦
③透過java Prop建立Prop資訊
④把②的資料寫入metadata.broker.list
中,看到list就知道broker可以是很多個IP的組合啦
⑤這邊的Producer是產生String型別的資料,所以需要一個String的Encoder
⑥用非同步的方式來傳輸,基本上非同步的方式會是比較高效
⑦初始化一個producer
接著用亂數隨機模擬產生IP字串當作Message
⑧產生一筆Kafka用的傳輸物件KeyedMessage,可以看到有3個參數(topic, ip, msg)。這啥意思勒?看看原始碼吧~
case class KeyedMessage[K, V](val topic : scala.Predef.String, val key : K, val partKey : scala.Any, val message : V) extends scala.AnyRef with scala.Product with scala.Serializable {
def this(topic : scala.Predef.String, message : V) = { /* compiled code */ } ①
def this(topic : scala.Predef.String, key : K, message : V) = { /* compiled code */ }
def partitionKey : scala.Any = { /* compiled code */ }
def hasKey : scala.Boolean = { /* compiled code */ }
}
喔喔,原來KeyedMessage是個Case Message!然後3個參數分別是topic
、key
跟val
。但是也能忽略key
這個參數一樣可以建立KeyedMessage。
⑨建立好後,透過producer送出。
基本上Producer是Thread Safe的,所以對很適合寫成所謂的singleton producer。這件事我們留到Kafka with Spark時候再來說
再來就是吃資料的consumer啦:
[Snippet.58] Kafka Consumer Without Spark
而consumer既然是抽取資料的,那肯定可以開Thread來提高throughput的阿,提到開Thread就會有run阿,threadpool這些東東,看看吧:
package sparkIronMan
import java.util.Properties
import java.util.concurrent._
import kafka.consumer.{Consumer, ConsumerConfig, KafkaStream}
import kafka.utils.Logging
object ScalaConsumerExample extends App {
val example = new ScalaConsumer(args(0), args(1), args(2)) ①
example.run(args(3).toInt)
}
class ScalaConsumer(val zookeeper: String,val groupId: String,
val topic: String) extends Logging {
val config = createConsumerConfig(zookeeper, groupId) ②
val consumer = Consumer.create(config)
var executor: ExecutorService = null
def shutdown() = {
if (consumer != null)
consumer.shutdown();
if (executor != null)
executor.shutdown();
}
def createConsumerConfig(zookeeper: String, groupId: String): ConsumerConfig = { ③
val props = new Properties()
props.put("zookeeper.connect", zookeeper);
props.put("group.id", groupId); ④
props.put("auto.offset.reset", "largest"); ⑤
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
val config = new ConsumerConfig(props)
config
}
def run(numThreads: Int) = {
val topicCountMap = Map(topic -> numThreads) ⑥
val consumerMap = consumer.createMessageStreams(topicCountMap);
val streams = consumerMap.get(topic).get; ⑦
executor = Executors.newFixedThreadPool(numThreads);
var threadNumber = 0;
for (stream <- streams) { ⑧
executor.submit(new ScalaConsumerTest(stream, threadNumber))
threadNumber += 1
}
}
}
class ScalaConsumerTest(val stream: KafkaStream[Array[Byte], Array[Byte]], val threadNumber: Int) extends Logging with Runnable {
def run {
val it = stream.iterator() ⑨
while (it.hasNext()) {
val msg = new String(it.next().message()); ⑩
System.out.println(System.currentTimeMillis() + ",Thread " + threadNumber + ": " + msg); ⑪
}
System.out.println("Shutting down Thread: " + threadNumber);
}
①將建立所需參數全都傳入ScalaConsumer以產生物件
②將參數轉拋到createConsumerConfig建立ConsumerConfig,裏面比較特別的就是GroupID
,因為一個topic的Streaming是可以被任何人消費的,所以可以想像想要使用的人必須提供自己的GroupID,而Kafka會紀錄此GroupID的消費進度
(offset),而既然被稱為GroupID,同一個程式開多個Thread當然會在同一個Group囉
③createConsumerConfig函式
④將Group寫入Prop中
⑤這邊還有另外一個比較特別的參數是auto.offset.reset,簡單來說,如果第一次連上Kafka,它不知道你之前的消費進度(offset),那他會幫你設定一個,有兩種選項(1):Largest,也就是以前此topic的資料我不吃了;或是(2):Smallest,最小的Offset,也就是從頭開始消費topic內的Stream
⑥指定每個一個Topic要用幾個Thread來處理
⑦取得Stream
⑧將Stream丟到不同的Thread中處理。
⑨每個Thread中的處理邏輯。而Stream有提供一個iterator讓你方便抓取Streaming Message
⑩取出Message,並轉成String
⑪印出
呼~~在一般的Scala中取值,如果要用Multi-Thread來增加Throughput要做好多事阿。明天我們會看在Spark中用KafkaStream RDD取Kafka的資料,那就簡單多了。