iT邦幫忙

1

[Spark] 讀取MySQL大Table的效能問題

在測試從 Spark Shell讀取MySQL一張Large Table時,發生了Out of memory和connection timeout問題,記錄一下處理的過程:

MySQL Table資料筆數:1400萬筆左右
Spark Cluster配置:Master * 1,Slave * 3,皆為1 core 8G
Spark 版本:2.1.1

Spark Config配置:
spark-env.sh

SPARK_WORKER_MEMORY=6g
SPARK_DRIVER_MEMORY=6g
SPARK_EXECUTOR_MEMORY=2g

執行指令:

./bin/spark-shell --master spark://192.168.xx.xx:7077 --executor-memory 4g --packages mysql:mysql-connector-java:5.1.38

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val df = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://192.168.x.x/test").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "test_table").option("user", "root").option("password", "test").load()

df.createOrReplaceTempView("test_table")

import org.apache.spark.storage.StorageLevel
df.persist(StorageLevel.MEMORY_AND_DISK)

val sqlDf = sql("select * from test_table limit 10000")
sqlDf.show()

一開始無論執行什麼query,只要是query 這張大table都會出現OOM,重試幾次甚至出現executor heartbeat timeout。

錯誤訊息:
Error Message: Executor heartbeat timeout

[Stage 0:=======================================================> (39 + 1) / 40]17/06/06 10:26:09 WARN HeartbeatReceiver: Removing executor 2 with no recent heartbeats: 147897 ms exceeds timeout 120000 ms
17/06/06 10:26:09 ERROR TaskSchedulerImpl: Lost executor 2 on 192.168.1.181: Executor heartbeat timed out after 147897 ms
17/06/06 10:26:09 WARN TaskSetManager: Lost task 39.0 in stage 0.0 (TID 39, 192.168.1.181, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 147897 ms...略

Error Message: OOM

[Stage 0:======================================================>  (27 + 1) / 28]17/06/06 10:14:25 WARN TaskSetManager: Lost task 27.0 in stage 0.0 (TID 27, 192.168.1.184, executor 0): java.lang.OutOfMemoryError: Java heap space
        at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2157)
        at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:1964)
        at com.mysql.jdbc.MysqlIO.readSingleRowSet(MysqlIO.java:3316)
        at com.mysql.jdbc.MysqlIO.getResultSet(MysqlIO.java:463)
        at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3040)
        at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2288)
        at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2681)
        at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2551)
        at ...略

查了官方文件和這篇 spark read mysql 效能調校文章後,有幾種作法來 tuning:

(1) 增加 partition 數
會比較慢,但是至少可以把Table讀進來,Spark會將資料切成小塊partition,分散到不同的 executor上。

(2) 設定spark mysql connection options

  • 主要是這幾個設定:partitionColumn, lowerBound, upperBound, numPartitions, fetchsize
    PS 如果使用partitionColumn,則lowerBound,upperBound,numPartitions都需設定

  • partitionColumn: 用來決定partition切割的欄位,必須是numeric型態資料,不一定要唯一。

  • lowerBound \ upperBound: 決定要fetch的值range

  • 會用這幾個options來決定要query的dataset。
    SELECT * FROM table WHERE partitionColumn BETWEEN lowerBound AND upperBound

(3) Spark記憶體管理:worker 本身會保留一部份的memory來做cache

  • spark 會將memory大致分成三種,user可使用的、資料shuffle用的,儲存RDD的
  • 如果使用者程式要操作的物件很大,可以把快取RDD的大小和資料shuffle大小調低,以增加物件處理的效能

官方文件說明:

- spark.memory.fraction expresses the size of M as a fraction of the (JVM heap space - 300MB) (default 0.6). The rest of the space (40%) is reserved for user data structures, internal metadata in Spark, and safeguarding against OOM errors in the case of sparse and unusually large records.
- spark.memory.storageFraction expresses the size of R as a fraction of M (default 0.5). R is the storage space within M where cached blocks immune to being evicted by execution.

這邊使用到的是第2和第3個方法

首先在query mysql的指令加上options
val df = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://192.168.x.x/test").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "test_table").option("user", "root").option("password", "test").option("numPartitions",30).option("partitionColumn","id").option("lowerBound", "0").option("upperBound","20000").load()

並在 spark-default.conf 加入下面設定

# for heartbeat timeout
spark.network.timeout            10000000
spark.executor.heartbeatInterval 10000000
# for OOM
spark.memory.fraction            0.75
spark.storage.memoryFraction     0.45

再執行以下query,都可以順利讀入了

val sqlDf = sql("select count(*) from test_table where dt >= '2017-05-01'")
sqlDf: org.apache.spark.sql.DataFrame = [count(1): bigint]

scala> sqlDf.show()
+--------+
|count(1)|
+--------+
|  222166|
+--------+

val sqlDf = sql("select count(*) from test_table")
sqlDf: org.apache.spark.sql.DataFrame = [count(1): bigint]

scala> sqlDf.show()
+--------+
|count(1)|
+--------+
|14665557|
+--------+

不確定這樣是不是最佳解法,不過至少解決掉問題了@@
如果不需要讀整張Table,其實可以把dbtable的value改成SQL query

原本是這樣,直接寫Table名稱,就會讀整張Table
.option("dbtable", "test_table")
可以改寫成:
.option("dbtable", "(select * from test_table where dt >= '2017-05-01') as T")
PS 記得一定要用左右括號包起來,因為dbtable的value會被當成一張table作查詢,mysql connector會自動dbtable後面加上 where 1=1,如果沒包起來就會出現SQL Syntax Error之類的錯誤


尚未有邦友留言

立即登入留言