iT邦幫忙

2017 iT 邦幫忙鐵人賽
DAY 26
0
Big Data

Spark 2.0 in Scala系列 第 26

[Spark-Day26](Spark 好友篇) Streaming with Kafka初探

  • 分享至 

  • xImage
  •  

Spark Streaming支援多種資料來源,最基本內建常用的有:

  • 檔案輸入串流 (File input Stream)
  • Socket輸入串流 (Socket input Stream)
    這兩種我們都已經用過了,但除此之外,Spark還能接許多的外部系統作為串流輸入來源,例如:
  • Kafka
  • Flume
  • Amazon
  • Twitter
  • ZeroMQ
  • MQTT
    再來就來看看SMACK架構中的K(Kafka)跟S(Spark)這兩個好朋友怎麼接在一起吧!!
    PS:Spark 2.0測試中的新功能Structured Streaming,Kafka為built-in function。

工具準備

  • 在Kafka建立所需的Topic
  • 準備一個小的script用於將檔案上傳至Kafka
    整個流程大概是這樣:
    1.透過script將orders.txt以行為單位摹擬串流push到Kafka orders Topic
    2.將原本的程式改寫,資料來源改成接Kafka (IN)
    3.將處理完的結果,輸出到Kafka metrics Topic (OUT)

在Kafka建立所需的Topic

我們要建立兩個topic:orders跟metrics,還記得怎麼建立Topics嗎?

joechh@joechh:/opt/kafka_2.11-0.8.2.1$ bin/kafka-topics.sh --create --zookeeper 192.168.56.1 --replication-factor 1 --partitions 1 --topic orders
Created topic "orders"

joechh@joechh:/opt/kafka_2.11-0.8.2.1$ bin/kafka-topics.sh --create --zookeeper 192.168.56.1 --replication-factor 1 --partitions 1 --topic orders
Created topic "metrics"

準備一個小的script用於將檔案上傳至Kafka

Script(檔名為streamOrders.sh)如下:

#!/bin/bash
BROKER=$1
if [ -z "$1" ]; then
        BROKER="192.168.56.1:9092" ①
fi

cat orders.txt | while read lorders.txt | while read line; do
        echo "$line"
        sleep 0.1
done | /opt/kafka_2.11-0.8.2.1/bin/kafka-console-producer.sh --broker-list $BROKER --topic orders ②

或者你可以透過我的Google Drive Link直接下載:
①的部份需要改寫成自己的kafka broker IP位置,若用預設的話可以用本機localhost:9092
/opt/kafka_2.11-0.8.2.1/bin/kafka-console-producer.sh這個執行檔要看你安裝的方式,如果是用bin解開直接用的話就看你的bin目錄在哪(以我為例是在/opt/kafka_2.11-0.8.2.1下),但若是用service (daemon)安裝的方式(例如yum或apt-get)的話,有可能是在/usr/local/kafka/bin/...找一下就有了。

透過指令將script轉成執行檔比較好用

$chmod +x streamOrders.sh

PS:先前的orders.txt要與此腳本放在同一目錄,類似如下

joechh@joechh:~/Scala/sparkIronMan/Spark/src/main/resources/day26$ ls
orders.txt  streamOrders.sh

測試streamOrders.sh是否可用

從腳本的內容可以大致看出來他做了啥時,簡單來說,他會readline然後每一行透過kafka-console-producer.sh傳到Topic orders。此外,每筆間隔0.1秒,避免throughput太高一下傳完了XD。

接著測試一下吧,根據之前安裝後的測試方式,開一個console的consume在orders topic:

joechh@joechh:/opt/kafka_2.11-0.8.2.1$ bin/kafka-console-consumer.sh --zookeeper 192.168.56.1:2181 --topic orders

然後,執行./streamOrders.sh,如果順利。orders.txt資料就會寫入orders topics然後被console-consumer吃到:

joechh@joechh:/opt/kafka_2.11-0.8.2.1$ bin/kafka-console-consumer.sh --zookeeper 192.168.56.1:2181 --topic orders
2016-03-22 20:25:28,1,80,EPE,710,51.00,B
2016-03-22 20:25:28,2,70,NFLX,158,8.00,B
2016-03-22 20:25:28,3,53,VALE,284,5.00,B
2016-03-22 20:25:28,4,14,SRPT,183,34.00,B
2016-03-22 20:25:28,5,62,BP,241,36.00,S
2016-03-22 20:25:28,6,52,MNKD,296,28.00,S
....
....

好,改寫程式Spark程式之前,我們先看看假設不透過SparkRDD,要怎麼透過Scala Client端使用Kafka呢?
簡單來說,Kafka有兩個相當重要的元件:

  1. Kafka Producer:資料產生端,將資料送到Kafka Broker Cluster
  2. Kafka Consumer:資料消耗處理端,將存到Kafka Cluster的Streaming讀取出來處理。

Producer的code可以視為一段小程式,有時候可以會放到產生資料端的機器中與其他程式整合,並將資料送回Kafka Broker,先看看一段Sample的Kafka Producer:


[Snippet.57] Kafka Producer Without Spark

package sparkIronMan

import java.util.{Date, Properties}
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import scala.util.Random

object ScalaProducerExample extends App {
  val topic = "testTopic" ①
  val brokers = "192.168.56.1:9092" ②
  val rnd = new Random()
  
  val props = new Properties() ③
  props.put("metadata.broker.list", brokers) ④
  props.put("serializer.class", "kafka.serializer.StringEncoder") ⑤
  props.put("producer.type", "async") ⑥

  val producer = new Producer[String, String]new ProducerConfig(props) ⑦
  for (nEvents <- Range(0, 10000b)) {
    val runtime = new Date().getTime();
    val ip = "192.168.2." + rnd.nextInt(255);
    val msg = runtime + "," + nEvents + ",www.example.com," + ip;
    val data = new KeyedMessage[String, String](topic, ip, msg); ⑧
    producer.send(data);  ⑨
  }
  producer.close();
}

①②要把資料寫到Broker,當然要知道寫到哪個topic跟brokers的資訊啦
③透過java Prop建立Prop資訊
④把②的資料寫入metadata.broker.list中,看到list就知道broker可以是很多個IP的組合啦
⑤這邊的Producer是產生String型別的資料,所以需要一個String的Encoder
⑥用非同步的方式來傳輸,基本上非同步的方式會是比較高效
⑦初始化一個producer
接著用亂數隨機模擬產生IP字串當作Message
⑧產生一筆Kafka用的傳輸物件KeyedMessage,可以看到有3個參數(topic, ip, msg)。這啥意思勒?看看原始碼吧~

case class KeyedMessage[K, V](val topic : scala.Predef.String, val key : K, val partKey : scala.Any, val message : V) extends scala.AnyRef with scala.Product with scala.Serializable {
  def this(topic : scala.Predef.String, message : V) = { /* compiled code */ } ①
  def this(topic : scala.Predef.String, key : K, message : V) = { /* compiled code */ } 
  def partitionKey : scala.Any = { /* compiled code */ }
  def hasKey : scala.Boolean = { /* compiled code */ }
}

喔喔,原來KeyedMessage是個Case Message!然後3個參數分別是topickeyval。但是也能忽略key這個參數一樣可以建立KeyedMessage。
⑨建立好後,透過producer送出。

基本上Producer是Thread Safe的,所以對很適合寫成所謂的singleton producer。這件事我們留到Kafka with Spark時候再來說


再來就是吃資料的consumer啦:
[Snippet.58] Kafka Consumer Without Spark

而consumer既然是抽取資料的,那肯定可以開Thread來提高throughput的阿,提到開Thread就會有run阿,threadpool這些東東,看看吧:

package sparkIronMan

import java.util.Properties
import java.util.concurrent._

import kafka.consumer.{Consumer, ConsumerConfig, KafkaStream}
import kafka.utils.Logging

object ScalaConsumerExample extends App {      
  val example = new ScalaConsumer(args(0), args(1), args(2)) ①
  example.run(args(3).toInt)
}

class ScalaConsumer(val zookeeper: String,val groupId: String,
                           val topic: String) extends Logging {

  val config = createConsumerConfig(zookeeper, groupId)  ②
  val consumer = Consumer.create(config)
  var executor: ExecutorService = null

  def shutdown() = {
    if (consumer != null)
      consumer.shutdown();
    if (executor != null)
      executor.shutdown();
  }

  def createConsumerConfig(zookeeper: String, groupId: String): ConsumerConfig = { ③
    val props = new Properties()
    props.put("zookeeper.connect", zookeeper);
    props.put("group.id", groupId); ④
    props.put("auto.offset.reset", "largest"); ⑤ 
    props.put("zookeeper.session.timeout.ms", "400");
    props.put("zookeeper.sync.time.ms", "200");
    props.put("auto.commit.interval.ms", "1000");
    val config = new ConsumerConfig(props)
    config
  }

  def run(numThreads: Int) = {
    val topicCountMap = Map(topic -> numThreads) ⑥
    val consumerMap = consumer.createMessageStreams(topicCountMap);
    val streams = consumerMap.get(topic).get; ⑦

    executor = Executors.newFixedThreadPool(numThreads);
    var threadNumber = 0;
    for (stream <- streams) { ⑧
      executor.submit(new ScalaConsumerTest(stream, threadNumber))
      threadNumber += 1
    }
  }
}
class ScalaConsumerTest(val stream: KafkaStream[Array[Byte], Array[Byte]], val threadNumber: Int) extends Logging with Runnable {
  def run {
    val it = stream.iterator() ⑨
    while (it.hasNext()) {
      val msg = new String(it.next().message());  ⑩
      System.out.println(System.currentTimeMillis() + ",Thread " + threadNumber + ": " + msg); ⑪
    }
    System.out.println("Shutting down Thread: " + threadNumber);
  }

①將建立所需參數全都傳入ScalaConsumer以產生物件
②將參數轉拋到createConsumerConfig建立ConsumerConfig,裏面比較特別的就是GroupID,因為一個topic的Streaming是可以被任何人消費的,所以可以想像想要使用的人必須提供自己的GroupID,而Kafka會紀錄此GroupID的消費進度(offset),而既然被稱為GroupID,同一個程式開多個Thread當然會在同一個Group囉
③createConsumerConfig函式
④將Group寫入Prop中
⑤這邊還有另外一個比較特別的參數是auto.offset.reset,簡單來說,如果第一次連上Kafka,它不知道你之前的消費進度(offset),那他會幫你設定一個,有兩種選項(1):Largest,也就是以前此topic的資料我不吃了;或是(2):Smallest,最小的Offset,也就是從頭開始消費topic內的Stream
⑥指定每個一個Topic要用幾個Thread來處理
⑦取得Stream
⑧將Stream丟到不同的Thread中處理。
⑨每個Thread中的處理邏輯。而Stream有提供一個iterator讓你方便抓取Streaming Message
⑩取出Message,並轉成String
⑪印出

呼~~在一般的Scala中取值,如果要用Multi-Thread來增加Throughput要做好多事阿。明天我們會看在Spark中用KafkaStream RDD取Kafka的資料,那就簡單多了。


上一篇
[Spark-Day25](Scala番外篇) Extrator、Case Class、Sealed Class大亂鬥
下一篇
[Spark-Day27](Spark 好友篇)SparkStreaming With Kafka
系列文
Spark 2.0 in Scala30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言