如題,當初會寫這篇參考筆記,主要是因為,當時正在解【Spark】Spark基础练习题(二)這篇文章上的題目時,碰到了(如上圖)這題,需要使用 spark 把資料寫入 Mysql 中,再讀取出來,在順便把搜尋到相關的教學資料,統整成一篇筆記,並補上一些圖片方便閱讀XD。
特此撰寫本篇文章作為紀錄文件,用以方便後續有需要的時候,可以快速的重複查閱,雖然後面比較沒有什麼機會再用到,但也算是一個還不錯的經驗。
JAVA_HOME = C:\Java\jre1.8.0_241
pip3 install pyspark
HADOOP_HOME = C:\winutils
接著還會遇到暫存資料夾 tmp\hive 權限的問題
10. 在你要開啟專案的硬碟的根目錄 (如 E:) 建立資料夾 \tmp\hive
(假如 ipynb 在 E:\abc\def\code.ipynb 內,就建立兩個資料夾在 E:\tmp\hive,一般此資料夾會自動在執行 pyspark 時建立,但權限會有問題,需手動修改)
11. 用 winutils.exe 改變該暫存資料夾的權限
%HADOOP_HOME%\bin\winutils.exe chmod 777 E:\tmp\hive
%HADOOP_HOME%\bin\winutils.exe ls E:\tmp\hive
應該要為 drwxrwxrwx
這樣應該就可以正常使用 pyspark 了
pyspark連接Mysql是通過java實現的,所以需要下載連接Mysql的jar包。
下載地址
選擇下載Connector/J,然後選擇操作系統為Platform Independent,下載壓縮包到本地。
然後因為是直接通過 pip3 install pyspark 的方式安裝 PySpark,可以先透過以下程式碼查詢 PySpark路徑:(參考:PySpark 連線 MySQL 示例)
from pyspark import find_spark_home
print(find_spark_home._find_spark_home())
然後解壓文件,將其中的jar包mysql-connector-java-8.0.22.jar放入spark的安裝目錄下的jars資料夾
下方程式碼參考:PySpark 連接 MySQL 示例的Spark 代碼示例
from pyspark import SparkContext
from pyspark.sql import SQLContext
if __name__ == '__main__':
# spark 初始化
sc = SparkContext(master='local', appName='sql')
spark = SQLContext(sc)
# mysql 配置(需要修改)
prop = {'user': 'root',
'password': 'rootroot',
'driver': 'com.mysql.cj.jdbc.Driver'}
# database 地址(需要修改)
url = 'jdbc:mysql://localhost:3306/testdb?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC'
# 讀取表
data = spark.read.jdbc(url=url, table='user', properties=prop)
# 打印data數據類型
print(type(data))
# 展示數據
data.show()
# 關閉spark會話
sc.stop()
注意點:
輸出畫面如下:
參考:使用JDBC連接MySql時出現...
在連接字符串後面加上?serverTimezone=UTC
其中UTC是統一標準世界時間。
完整的連接字符串示例:jdbc:mysql://localhost:3306/test?serverTimezone=UTC
或者還有另一種選擇:jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8
,這個是解決中文亂碼輸入問題,當然也可以和上面的一起結合:jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
下方程式碼參考:pyspark對Mysql數據庫進行讀寫
的4 寫入Mysql
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
if __name__ == '__main__':
# spark 初始化
sc = SparkContext(master='local', appName='sql')
spark = SQLContext(sc)
# mysql 配置(需要修改)
prop = {'user': 'root',
'password': 'rootroot',
'driver': 'com.mysql.cj.jdbc.Driver'}
# database 地址(需要修改)
url = 'jdbc:mysql://localhost:3306/testdb?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC'
# 創建spark DataFrame
# 方式1:list轉spark DataFrame
l = [(1, 12), (2, 22)]
# 創建並指定列名
list_df = spark.createDataFrame(l, schema=['id', 'value'])
# 方式2:rdd轉spark DataFrame
rdd = sc.parallelize(l) # rdd
col_names = Row('id', 'value') # 列名
tmp = rdd.map(lambda x: col_names(*x)) # 設置列名
rdd_df = spark.createDataFrame(tmp)
# 方式3:pandas dataFrame 轉spark DataFrame
df = pd.DataFrame({'id': [1, 2], 'value': [12, 22]})
pd_df = spark.createDataFrame(df)
# 寫入數據庫
pd_df.write.jdbc(url=url, table='new', mode='append', properties=prop)
# 關閉spark會話
sc.stop()
效果如下:
注意點:
當數據庫無寫入的表時,這四種模式都會根據設定的表名稱自動創建表,無需在Mysql裡先建表。