iT邦幫忙

2017 iT 邦幫忙鐵人賽
DAY 29
0
Big Data

Spark 2.0 in Scala系列 第 29

[Spark-Day29](Spark好友篇) Cassandra with Spark長篇

  • 分享至 

  • xImage
  •  

昨天完成Cassandra的簡易安裝,並在cqlsh中走過基本的CRUDUDT以及一些稍微特別的操作馬拉松後,今天當然要進一步用Spark與Cassandra互動阿!!還記得Cassandra沒有join操作嗎?疑..剛好Spark有耶...怎麼那麼巧XDDD。咳..回到正題,我們還是趕快看看怎麼透過Spark連接Cassandra吧!

先在sbt的build.sbt加入相依性敘述

libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "2.0.0-M3"
resolvers += "Spark Packages Repo" at "https://dl.bintray.com/spark-packages/maven"

因為我們的cassandra與spark版本都相當新,可以發現我們的connector用的是2.0.0-M3版本,也就是Spark2.0與Cassandra3.X版的連接器。

而整個練習環境的簡易build.sbt檔案如下:

name := "Spark"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.0"
libraryDependencies += "log4j" % "log4j" % "1.2.16"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.0.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.0.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8-assembly_2.11" % "2.0.0"
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "2.0.0-M3"
resolvers += "Spark Packages Repo" at "https://dl.bintray.com/spark-packages/maven"

之後若要將Code佈署到production,最好還是搭配assembly等功能。

Spark-Cassandra五分鐘讀寫速成班

先建個測試用的test keyspacekv table,然後透過Spark接Cassandra讀寫:

CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };
CREATE TABLE test.kv(key text PRIMARY KEY, value int);

寫個兩筆資料進去kv表,然後確認一下:

cqlsh:test> INSERT INTO test.kv(key, value) VALUES ('key1', 1);
cqlsh:test> INSERT INTO test.kv(key, value) VALUES ('key2', 2);
cqlsh:test> SELECT * FROM kv;

 key  | value
------+-------
 key1 |     1
 key2 |     2

OK,回到程式段,import Spark中連接Cassandra所需的函式庫

import com.datastax.spark.connector._

都在com.datastax.spark.connector之下,這邊就先簡單的用_全包了


[Snippet.65] spark-cassandra Entry Point
而Spark使用Cassandra的進入點就是sc.cassandraTable啦,簡單一行就可以接進table了。

val rdd = sc.cassandraTable("test", "kv")
println(rdd.count) ①
println(rdd.first) ②
println(rdd.map(_.getInt("value")).sum)③
//Output:
//2
//CassandraRow{key: key1, value: 1}
//3.0

①rdd的count直接就可以算出筆數啦,類似RDB中count的效果
②也可以取出第1筆
③先將rdd map到value欄位,然後加總,類似RDB中sum的效果


[Snippet.66] spark-cassandra saveToCassandra
那資料要怎麼寫入Cassandra的一張表內勒?

val collection = sc.parallelize(Seq(("key3", 3), ("key4", 4))) ①
collection.saveToCassandra("test", "kv", SomeColumns("key", "value")) ②

①建立一個簡單的pairRDD,模擬kv中的兩個欄位
②透過saveToCassandra寫入test keyspace中的kv表,且透過函式SomeColumns指定寫入順序分別對應欄位keyvalue

那要怎麼將rdd寫成新的另外一張cassandra表呢?可以用saveAsCassandraTablesaveAsCassandraTableEx。saveAsCassandraTable比較簡單,而saveAsCassandraTableEx可以定義控制新表的細節。

用cqlsh check(或用讀取程式重跑一次檢視是否寫入成功):

cqlsh:test> SELECT * FROM kv;
 key  | value
------+-------
 key1 |     1
 key4 |     4
 key3 |     3
 key2 |     2

完整的讀寫Snippet:

package SparkIronMan

import com.datastax.spark.connector._
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by joechh on 2017/1/13.
  */
object Day29_Cassandra extends App {
  val conf = new SparkConf()
    .set("spark.cassandra.connection.host", "127.0.0.1")
    .setMaster("local[2]")
    .setAppName("cassandra")
  val sc = new SparkContext(conf)
  val rdd = sc.cassandraTable("test", "kv")
  println(rdd.count)
  println(rdd.first)
  println(rdd.map(_.getInt("value")).sum)
  rdd.foreach(println)
  
  val collection = sc.parallelize(Seq(("key3", 3), ("key4", 4)))
  collection.saveToCassandra("test", "kv", SomeColumns("key", "value"))
}

五分鐘速成班結束,進行下一段吧:


[Snippet.67] conf與sc建立方式 for cassandra
如果cassandra有做權限管理(需要帳號密碼)或是想要連指定位置的節點,以下設定會比較泛用:

val conf = new SparkConf(true)
        .set("spark.cassandra.connection.host", "192.168.123.10")
        .set("spark.cassandra.auth.username", "cassandra")            
        .set("spark.cassandra.auth.password", "cassandra")

val sc = new SparkContext("spark://192.168.123.10:7077", "test", conf)

要特別注意的是這邊的連線節點也可用List(Address1:Port1,Address2:Port2,..)串起來,但這邊只是描述初次連線節點,當Client連上叢集之後,會擷取整個叢集的可用節點資訊,這意味著程式運行中即便初次連線節點全都失效了若叢集中尚有可用節點,那服務還是可以繼續運行不會中斷。而這種初始連線節點的概念在許多分散式環境中隨處可見,例如zookeeperElasticSearchKafka等都有類似的設定

sc.cassandraTable("keyspace name", "table name")回傳值若沒有轉型,則會是CassandraRDD[CassandraRow]型別,這個物件之後可以很方便的轉換成其他的物件(Object Mapping)

在test keyspace下開另外一張測試小表並塞入測試值:

cqlsh:test> CREATE TABLE test.words (word text PRIMARY KEY, count int);
cqlsh:test> INSERT INTO test.words (word, count) VALUES ('foo', 20);
cqlsh:test> INSERT INTO test.words (word, count) VALUES ('bar', 20);

若要讀值知道該怎麼做了吧?

val rdd = sc.cassandraTable("test", "words")
rdd.toArray.foreach(println)
//Output
//CassandraRow{word: bar, count: 20}
//CassandraRow{word: foo, count: 20}

這時候的型別都是CassandraRow,因為我們沒有特別指定,若我們有預期的型別了,該怎麼做?


[Snippet.67] 存取特定型別欄位值
如果知道某個欄位的型別,那要怎麼用對應型別取出呢?就用get系列函數吧!

val firstRow = rdd.first
firstRow.getInt("count")       // 20       
firstRow.getLong("count")      // 20L  

除了透過getInt、getLong以外,也可以自己用萬用get搭配cast轉型:

firstRow.get[Int]("count")                   // 20       
firstRow.get[Long]("count")                  // 20L
firstRow.get[BigInt]("count")                // BigInt(20)

若擔心值有可能是null造成nullPointException的話,就再wrapper一層Option吧:

firstRow.getIntOption("count")        // Some(20)
firstRow.get[Option[Int]]("count").getOrElse(0)    // Some(20) 

[Snippet.68] 存取集合型別欄位
看過一般欄位後,存取集合物件欄位勒?
再開另外一張表來玩玩~

cqlsh:test> CREATE TABLE test.users (username text PRIMARY KEY, emails SET<text>);
cqlsh:test> INSERT INTO test.users (username, emails)       
            VALUES ('joe', {'joechh@gmail.com', 'joeAnothermail@gmail.com'});
cqlsh:test> SELECT * from users ;
 username | emails
----------+--------------------------------------------------
      joe | {'joeAnothermail@gmail.com', 'joechh@gmail.com'}

開好了,開始拿set中的值囉~

val firstSetrow = sc.cassandraTable("test", "users").first ①
println(firstSetrow) ②
println(firstSetrow.getList[String]("emails")) ③
println(firstSetrow.get[List[String]]("emails")) ④
println(firstSetrow.get[String]("emails")) ⑤
//Output:
//CassandraRow{username: joe, emails: {joeAnothermail@gmail.com,joechh@gmail.com}}
//Vector(joeAnothermail@gmail.com, joechh@gmail.com)
//List(joeAnothermail@gmail.com, joechh@gmail.com)
//{joeAnothermail@gmail.com,joechh@gmail.com}

①取出一筆紀錄
②沒有轉型,所以輸出為CassandraRow型別
③CassandraRow的getList是給你集合的上層類別Vector型別唷!
④若想拿到List的話,要自己用get然後轉型成String的List
⑤也可以把整筆資料轉成String

如果有UDT勒?假設有個UDT如下格式:

CREATE TYPE test.address (city text, street text, number int);
CREATE TABLE test.companies (name text PRIMARY KEY, address FROZEN<address>);

則可以透過getUDTValue取出UDT物件,然後再用一般get方式即可啦,例如:

val address: UDTValue = row.getUDTValue("address")
val city = address.getString("city")
val street = address.getString("street")
val number = address.getInt("number")

這邊就不用實例示範了,有興趣的可以用昨天的UDT來玩玩

Cassandra Join with Spark

終於到了一直念茲在茲的Join with Spark終於到了!(其實只是我愛murmur..XD)

先建兩個有共同欄位的小表來玩Join!規劃如下:

  • customer_info:(cust_id text PRIMARY KEY, name text)
  • shopping_info:(cust_id text PRIMARY KEY, date date, item text, price double)
cqlsh:test> create table customer_info(cust_id text PRIMARY KEY, name text);
cqlsh:test> INSERT INTO customer_info (cust_id , name ) VALUES ( '1','joe');
cqlsh:test> INSERT INTO customer_info (cust_id , name ) VALUES ( '2','doris');
cqlsh:test> SELECT * from customer_info ;

 cust_id | name
---------+-------
       2 | doris
       1 |   joe

(2 rows)

OK,第一張表沒有問題,建下一張然後放個3筆資料進去:

cqlsh:test> create table shopping_info(cust_id text PRIMARY KEY, date date, item text, price double) ;

cqlsh:test> INSERT INTO shopping_info (cust_id , date , item , price ) VALUES ( '1','2017-01-02','book',30.5);
cqlsh:test> INSERT INTO shopping_info (cust_id , date , item , price ) VALUES ( '2','2017-01-02','toys',40.87);
cqlsh:test> INSERT INTO shopping_info (cust_id , date , item , price ) VALUES ( '2','2017-01-05','cake',10);

cqlsh:test> SELECT * from shopping_info ;
 cust_id | date       | item | price
---------+------------+------+-------
       2 | 2017-01-05 | cake |    10
       1 | 2017-01-02 | book |  30.5

疑?...怎麼剩下兩筆?

原來是Data Modeling的方式錯了,只用cust_id當key的話,後面兩筆(cust_id=2)的會疊在一起只剩一筆,那該如何在Cassanra中建消費紀錄表勒?

CREATE TABLE test.shopping_info (
    cust_id text,
    date date,
    shopping_id uuid,
    item text,
    price double,
    PRIMARY KEY ((cust_id), date, shopping_id) ①
    );  

①主鍵的成份變成((cust_id), date, shopping_id),在小括號內的(cust_id)是所謂的分區鍵(partition key),而date與shopping_id則是clustering column。
簡單解釋這樣的主鍵會讓同一個客戶cust_id物理上(磁碟上)排列在一起!成為一個分區。而clustering column會影響排序的順序,而三個欄位的組合則提供了唯一性(這點跟RDB一樣)

先前的key只有cust_id無法滿足唯一性,所以才讓值疊在一起。新表的shopping_id型別為uuid,看就知道是為了保證獨立性才放入主鍵的。熟悉RDB的朋友一定會想,有shopping_id就可以滿足唯一性了,要cust_iddate幹嘛勒??這就是cassandra跟RDB最大的差異之一:主鍵的設計除了提供唯一性,還必須符合查詢的需求。這邊就不多說了,還是回到我們的Spark主題吧

測試新表:

cqlsh:test> insert into shopping_info (cust_id , date , shopping_id , item , price ) VALUES ( '1','2017-01-02',uuid(),'book',30.5);

cqlsh:test> insert into shopping_info (cust_id , date , shopping_id , item , price ) VALUES ( '2','2017-01-02',uuid(),'toys',40.87) ;

cqlsh:test> insert into shopping_info (cust_id , date , shopping_id , item , price ) VALUES ( '2','2017-01-03',uuid(),'cake',10) ;

cqlsh:test> SELECT * FROM shopping_info ;

 cust_id | date       | shopping_id              | item | price
---------+------------+--------------------------+------+-------
       2 | 2017-01-02 | de23a28b-c2ad-4018-..... | toys | 40.87
       2 | 2017-01-03 | 805ea507-cea9-468e-..... | cake |    10
       1 | 2017-01-02 | ea68f840-fc45-43ac-..... | book |  30.5

OK,沒有問題了。


[Snippet.69] 直接連接兩張PartitionKey相同的表
如果兩張表的Partition Key類似,那可以簡單透過joinWithCassandraTable直接連接。

val internalJoin = sc.cassandraTable("test",
            "customer_info").joinWithCassandraTable("test", "shopping_info")
internalJoin.collect.foreach(println)

//Output:
(CassandraRow{cust_id: 2, name: doris},CassandraRow{cust_id: 2, date: 2017-01-02, shipping_id: de23a28b-c2ad-4018-b52b-a459b1b670ac, item: toys, price: 40.87})

(CassandraRow{cust_id: 2, name: doris},CassandraRow{cust_id: 2, date: 2017-01-03, shipping_id: 805ea507-cea9-468e-9edc-e91512514130, item: cake, price: 10.0})

(CassandraRow{cust_id: 1, name: joe},CassandraRow{cust_id: 1, date: 2017-01-02, shipping_id: ea68f840-fc45-43ac-8775-3e52d8c73ebc, item: book, price: 30.5})

[Snippet.70] Join with Cassandra Table with Regular RDD
那普通RDD可以跟Cassandra表Join嗎?一樣可以,而且也是用joinWithCassandraTable即可,前提是那個普通RDD是PairRDD,而且K與表的partitionKey欄位類似

val ids = sc.parallelize(List((1, "joe"), (2, "doris")))
val localJoin = ids.joinWithCassandraTable("test", "shopping_info");
localJoin.collect.foreach(println)

//Output:
((1,joe),CassandraRow{cust_id: 1, date: 2017-01-02, shipping_id: ea68f840-fc45-43ac-8775-3e52d8c73ebc, item: book, price: 30.5})

((2,doris),CassandraRow{cust_id: 2, date: 2017-01-02, shipping_id: de23a28b-c2ad-4018-b52b-a459b1b670ac, item: toys, price: 40.87})

((2,doris),CassandraRow{cust_id: 2, date: 2017-01-03, shipping_id: 805ea507-cea9-468e-9edc-e91512514130, item: cake, price: 10.0})

[Snippet.71] 擷取兩張表的資料,建成RDD並手動Join
有時候兩個表要Join,但是不需兩張表的全部欄位該怎麼做?可以把兩個表讀成CassandraRDD。這樣有時候可以獲得比較高的彈性,那該怎麼做勒?

  val customers = sc.cassandraTable("test", "customer_info")
    .select("cust_id", "name") ①
    .as((c: String, n: String) => (c, n)) ②

  val records = sc.cassandraTable("test", "shopping_info")
    .select("cust_id", "date", "shipping_id", "price")
    .as((c: String, d: java.util.Date, s: java.util.UUID, p: Double) => (c, (d, s, p)))

  customers.join(records).collect().foreach(println)
//Output:
(1,(joe,(Mon Jan 02 00:00:00 CST 2017,ea68f840-fc45-43ac-8775-3e52d8c73ebc,30.5)))
(2,(doris,(Mon Jan 02 00:00:00 CST 2017,de23a28b-c2ad-4018-b52b-a459b1b670ac,40.8)))
(2,(doris,(Tue Jan 03 00:00:00 CST 2017,805ea507-cea9-468e-9edc-e91512514130,10.0))

從上面結果可以得知結果是巢狀的結構,但我們可以透過許多方式攤平(例如flatmap,或是用map搭配case自己控制):


[Snippet.72]將兩個表的Join結果攤平

customers.join(records)
    .map { 
         case (customer, ((name), (date, price))) => (customer, name, date, price)
         }
    .collect
    .foreach(println)

//Output:
(1,joe,Mon Jan 02 00:00:00 CST 2017,30.5,ea68f840-fc45-43ac-8775-3e52d8c73ebc)
(2,doris,Mon Jan 02 00:00:00 CST 2017,40.87,de23a28b-c2ad-4018-b52b-a459b1b670ac)
(2,doris,Tue Jan 03 00:00:00 CST 2017,10.0,805ea507-cea9-468e-9edc-e91512514130)

[Snippet.73]將Join結果寫回表
有兩種情境:

  1. 寫入一張已存在的表
  2. 寫入新表
    本例以第一種情境說明:

先建立一張表當作以存在的表,用來存放Join結果

cqlsh:test>  CREATE TABLE customer_with_shopping (
            cust_id text, 
            name text, 
            date date, 
            price double, 
            shopping_id uuid, 
            PRIMARY KEY(cust_id,shopping_id) );

接著就是之前的map操作與saveToCassandra結合!

customers.join(records)
    .map { 
    case (customer, ((name), (date, shopping_id, price))) => 
        (customer, name, date, price, shopping_id)
        }
    .saveToCassandra("test","customer_with_shopping",
                SomeColumns("cust_id", "name","date","price","shopping_id"))

用cqlsh檢查一下是否有寫入:

cqlsh:test> SELECT * from customer_with_shopping;

 cust_id | shopping_id            | date       | name  | price
---------+------------------------+------------+-------+-------
       2 | 805ea507-cea9-468e-... | 2017-01-03 | doris |    10
       2 | de23a28b-c2ad-4018-... | 2017-01-02 | doris | 40.87
       1 | ea68f840-fc45-43ac-... | 2017-01-02 |   joe |  30.5

這個長篇就到此劃下句點了。當然Cassandra本身就是一個坑(?),只能以Client端 on Spark說明一些用法。還有許多議題沒有涉及到,例如調校與一些partition相關操作,就留給各位探索囉

想要我的寶藏嗎?想要的話可以全部給你,去找吧!我把所有的財寶都放在那裡了~!


上一篇
[Spark-Day28](Spark好友篇)一次搞定Cassandra安裝與基礎查詢操作
下一篇
[Spark-Day30](完結篇)資料工程師之路
系列文
Spark 2.0 in Scala30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

1 則留言

0
atoto9
iT邦新手 5 級 ‧ 2017-04-10 16:40:36

您好,這邊有幾個問題想請教您

第一個問題是關於sbt的操作.
你用sbt的部分是建立一個jar檔然後引用嗎?build.sbt建立之後,接著要怎麼進行下去?請問是使用sbt package建成jar檔嗎?
如果不是建立成jar檔的話,能夠講解一下這段sbt是在陳述什麼嗎?謝謝,sbt這方面一直不是很明瞭(對maven和ivy也是...)

第二個問題是關於一些error的解決
目前我嘗試的方式是先去官方所提供的載點做對應版的jar檔下載( http://spark-packages.org/package/datastax/spark-cassandra-connector )
接著以spark-shell --jars XXX 的方式啟用
但是到了saveToCassandra這一步都會跳這個error
ERROR Executor: Exception in task 1.0 in stage 4.0 (TID 8)

我有找到可能的答案如下
Just using a single jar is not enough. Use the --packages command to pull down the jar and all of it's dependencies. https://github.com/datastax/spark-cassandra-connector/blob/master/doc/13_spark_shell.md
若使用的是下面的語法就能成功寫入資料進cassandra
c.withSessionDo ( session => session.execute("CREATE TABLE test.fun (k int PRIMARY KEY, v int)"))

若是採用sbt打包成jar檔然後這樣引用就不會有這樣的錯誤嗎?
我的環境是ubuntu16.04/cassandra3.9/scala2.11.1/spark2.0.1

joechh iT邦新手 5 級 ‧ 2018-01-31 13:41:58 檢舉

Q1: Sbt建立後最常見的作法應該是用Sbt Assembly包成Jar,再透過spark-submit送到Cluster上去。Sbt那段僅是說明在Spark中需要整合Cassandra所需的lib

Q2: 你已經回答出答案了..用packge取代jar

我要留言

立即登入留言