iT邦幫忙

2017 iT 邦幫忙鐵人賽
DAY 21
2
Big Data

Spark 2.0 in Scala系列 第 21

[Spark-Day21](Spark Streaming篇)Stateful Streaming

  • 分享至 

  • xImage
  •  

昨天我們看了SparkingStreamingContext的起手式與一個stateless的use case。基本上,我認為stateless不用處理關注其他的批次的關聯,就很很像一般RDD的寫法,反正就是處理批次內的資料就對了。而stateful的寫法就稍有不同了,但是stateful有許多實用的場景,最常見的就是想持續監控統計某些欄位時,我們延續昨天的股票交易紀錄,假如我們想要:

任務2

持續監控交易量最高的客戶前5名(每N秒更新)

分析一下這樣的需求,肯定不能只處理這個批次的資料了吧,一定要知道過往的資料如何吧,那就是Stateful Streaming的範疇啦。Stateful Streaming可以再細分成兩類:

  • 只往前關注某N個批次的資料,這邊就會有WindowDuration/Sliding duration的概念,但我們肯定不能只關注前N批吧,這種應用之後再講。
  • 持續維護某些值狀態,這些狀態是會一直維護狀態的,不論經過幾個批次(除非你刻意讓他停止或timeout),要實現這種維護狀態的實際API,在1.6版後有兩種API:
    • updateStateByKey
    • mapWithState (1.6版引入)

先看看updateStateByKey怎麼用吧,懂了之後要換到mapWithState也很簡單。


updateSateByKey

從updateSateByKey的名稱,有沒有看出他適用哪類的RDD阿??沒錯~就是pair類RDD啦,才有key阿XDD,所以在使用之前,要先把DStream RDD轉成pairDStream

昨天我們花了很多時間建出了orders,我們今天重複使用他當作資料來源,既然要知道金額最高的前5個客戶,那就在生個有金額跟客戶資訊的KV吧:

val amountPerClient = orders.map(record => (record.clientId, record.amount * record.price))

OK,我們把客戶ID當作key,每筆order的總金額當作value,再來要如何用updateSateByKey勒?,看看原始碼先:

def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => 
Option[S]): DStream[(K, S)] = ssc.withScope { 
   updateStateByKey(updateFunc, defaultPartitioner())
  }

透過原始碼可以發現,要使用updateStateByKey函式最重要就是傳給它一個函式,那個函式必須包含:

  1. 傳入值1Seq[V]:此值代表這批資料中,這個key相關的events,因為可能是多個,所以傳入一個Seq類別物件,Seq有點類似Java中的Collection抽象集合類別。
  2. 傳入值2Option[S]:代表先前此key對應到的狀態(值),當然如果此Key是第一次遇到,當然先前的狀態就會是的麻~所以這邊用Option包起來,代表有可能為null
  3. 回傳值Option[S]:通常是回傳更新後的狀態出去,怎麼更新就是函式要做的事啦

[Snippet.41]建立updateSateByKey所需更新狀態函式
來寫這個好玩的函式吧:

def func(
   vals: Seq[(Double)],preValue: Option[Double]): Option[Double] =  ①
preValue match { ②
    case Some(total) => Some(vals.sum + total) ③
    case None => Some(vals.sum) ④
  }

①先確認一下我們V的類型為何?沒錯,依之前的amountPerClient,V為Double,所以我們會吃一個名稱為vals的資料,代表本批次此key值的集合,而此key之前的狀態值已經更新為的回傳值勒?應該也是Double沒錯吧~
②對狀態變數作pattern match(可以當作強化版的switch),這也是Scala處理Option物件常用的作法之一,可以仔細瞧瞧喔
③如果Option有值,會得到Some(total),Some跟Option都是Option的子類別,信不信XD:

final case class Some[+A](x: A) extends Option[A] {
  def isEmpty = false
  def get = x
}

沒騙你吧~extends Option,提供兩個方法,回傳不為空的isEmpty跟取值get,等等就會用到了。回到原題,如果有值(且順便被存於total變數中),當然就疊加並回傳囉:Some(vals.sum + total)vals.sum直接可以取得這批數字類RDD的加總,好用吧~
④如果沒值(None),也就是這是新Key,那就乖乖的回傳這批的加總吧Some(vals.sum)


OK,寫好函式,整入updateStateByKey吧:

val amountState = amountPerClient.updateStateByKey(func)

updateStateByKey的checkpoint

Spark要求用這類維護State的RDD都必須寫入checkpoint,否則中斷要從哪裡重建阿!!RDD值都是一百年前的累積下來的,DAG血統圖爆炸長,不用checkpoint截斷不行(還記得嗎checkpoint嗎):

ssc.sparkContext.setCheckpointDir("/home/joechh/chpoint")

示範是放在本機上,但一般來說都是放在HDFSS3這類分散式+redundant儲存系統中,否則本機直接掛掉,checkpoint一起不見不就搞笑...

如果要把updateStateByKey所需函式寫成匿名函式怎麼寫?也差不多:

amountPerClient.updateStateByKey(
      (vals: Seq[(Double)], preValue: Option[Double]) => 
      {
        Some(vals.sum + preValue.getOrElse[Double](0))
      }
    )

有帥一點嗎,這邊用了個偷吃步的方式簡化patten match:getOrElse,自己體會看看吧~

好,我們現在有個amountStatepairDStream,該如何一直印出前5名勒?(K是ClientID,V是總金額),如果有sortBy可以用就好了,但是pairDStream查了一下沒得用,那怎麼辦勒?如果能把他轉成一般RDD就好了,可以嗎,當然可以囉,這就是公認的Streaming後門萬用函式XD:transform,可以做RDDtoRDD轉換:

amountState.transform(_.sortBy(_._2, false))

好我可以排序了,但是怎麼抓前5名勒?沒有欄位是1到5阿...那就加個欄位過濾後清掉囉:

  val top5clients = amountState.
    transform(_.sortBy(_._2, false).
    zipWithIndex. ①
    filter(_._2 < 5)). ②
    map(x=>x._1) ③

加個欄位
過濾後
清掉
用fluent style寫functional programming真的超爽的阿~~一直串串串。但Debug時候會不太爽QQ

完整Code(10秒版本):

package SparkIronMan

/**
  * Created by joechh on 2016/12/6.
  */


import java.sql.Timestamp
import java.text.SimpleDateFormat

import org.apache.spark._
import org.apache.spark.streaming._

object Day21_TopFiveCustomer extends App {
  val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
  val ssc = new StreamingContext(conf, Seconds(10))
  val fileStream = ssc.textFileStream("hdfs://localhost:9000/user/joechh/sparkDir")
  val orders = fileStream.flatMap(line => {
    val dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss")
    val words = line.split(",")
    try {
      assert(words(6) == "B" || words(6) == "S")
      List(Order(new Timestamp(dateFormat.parse(words(0)).getTime),
        words(1).toLong,
        words(2).toLong,
        words(3),
        words(4).toInt,
        words(5).toDouble,
        words(6) == "B"
      ))
    }
    catch {
      case e: Throwable => println("wrong line format(" + e + "):" + line)
        List()
    }
  })

  val amountPerClient = orders.map(record => (record.clientId, record.amount * record.price))

  def func(vals: Seq[(Double)], preValue: Option[Double]): Option[Double] = preValue match {
    case Some(total) => Some(vals.sum + total)
    case None => Some(vals.sum)
  }
  

  val amountState = amountPerClient.updateStateByKey(func)
  ssc.sparkContext.setCheckpointDir("/home/joechh/chpoint")

  val top5clients = amountState.
    transform(_.sortBy(_._2, false).zipWithIndex.filter(_._2 < 5)).map(x=>x._1)

  top5clients.print()

  ssc.start()
  ssc.awaitTermination()
}

套用前一天的方式傳資料,看看輸出結果:

http://ithelp.ithome.com.tw/upload/images/20170106/20103839WKke94Riq3.png

左邊ClientID,右邊排序的金額,達成。


上一篇
[Spark-Day20](Spark Streaming篇)Stateless Streaming by Use Case
下一篇
[Spark-Day22](Spark Streaming篇) mapWithState by Use Case
系列文
Spark 2.0 in Scala30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言