iT邦幫忙

2021 iThome 鐵人賽

DAY 27
0
自我挑戰組

實驗室助理的技術文章自我整理系列 第 27

Python - 在 Windows 10 上使用 PySpark 連接 Mysql 資料庫參考筆記

Python - 在 Windows 10 上使用 PySpark 連接 Mysql 資料庫參考筆記

參考資料

說明

如題,當初會寫這篇參考筆記,主要是因為,當時正在解【Spark】Spark基础练习题(二)這篇文章上的題目時,碰到了(如上圖)這題,需要使用 spark 把資料寫入 Mysql 中,再讀取出來,在順便把搜尋到相關的教學資料,統整成一篇筆記,並補上一些圖片方便閱讀XD。

特此撰寫本篇文章作為紀錄文件,用以方便後續有需要的時候,可以快速的重複查閱,雖然後面比較沒有什麼機會再用到,但也算是一個還不錯的經驗。

Window 10 PySpark 安裝

  1. 先確定已安裝 Python (建議到 Python 官網下載安裝 64 bits https://www.python.org/downloads/windows/ )
  2. 安裝 Java https://www.java.com/ ,建議安裝到 C:\Java\ 下。若 Java 安裝路徑有空白,pyspark 執行會出現錯誤。
  3. 新增 JAVA_HOME 到環境變數,例如:
JAVA_HOME = C:\Java\jre1.8.0_241

  1. 在命令提示字元輸入
pip3 install pyspark
  1. 在命令提示字元輸入 pyspark 即可啟動
    但是在 Windows 上還需要手動下載 winutils.exe 讓 Hadoop 在 Windows 上正常運作
  2. https://github.com/steveloughran/winutils/ 下載對應版本的 winutils.exe (在 bin 資料夾內,例如: https://github.com/steveloughran/winutils/blob/master/hadoop-3.0.0/bin/winutils.exe)
  3. 將 winutils.exe 放到 C:\winutils\bin\ 下
  4. 新增 Windows 環境變數:
HADOOP_HOME = C:\winutils

接著還會遇到暫存資料夾 tmp\hive 權限的問題
10. 在你要開啟專案的硬碟的根目錄 (如 E:) 建立資料夾 \tmp\hive
(假如 ipynb 在 E:\abc\def\code.ipynb 內,就建立兩個資料夾在 E:\tmp\hive,一般此資料夾會自動在執行 pyspark 時建立,但權限會有問題,需手動修改)
11. 用 winutils.exe 改變該暫存資料夾的權限

%HADOOP_HOME%\bin\winutils.exe chmod 777 E:\tmp\hive
  1. 檢查該資料夾權限
%HADOOP_HOME%\bin\winutils.exe ls E:\tmp\hive

應該要為 drwxrwxrwx

這樣應該就可以正常使用 pyspark 了

PySpark Mysql環境配置

pyspark連接Mysql是通過java實現的,所以需要下載連接Mysql的jar包。
下載地址


選擇下載Connector/J,然後選擇操作系統為Platform Independent,下載壓縮包到本地。

然後因為是直接通過 pip3 install pyspark 的方式安裝 PySpark,可以先透過以下程式碼查詢 PySpark路徑:(參考:PySpark 連線 MySQL 示例)

from pyspark import find_spark_home
print(find_spark_home._find_spark_home())

然後解壓文件,將其中的jar包mysql-connector-java-8.0.22.jar放入spark的安裝目錄下的jars資料夾

讀取 MySql

下方程式碼參考:PySpark 連接 MySQL 示例的Spark 代碼示例

from pyspark import SparkContext
from pyspark.sql import SQLContext

if __name__ == '__main__':
    # spark 初始化
    sc = SparkContext(master='local', appName='sql')
    spark = SQLContext(sc)
    # mysql 配置(需要修改)
    prop = {'user': 'root',
            'password': 'rootroot',
            'driver': 'com.mysql.cj.jdbc.Driver'}
    # database 地址(需要修改)
    url = 'jdbc:mysql://localhost:3306/testdb?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC'
    
    # 讀取表
    data = spark.read.jdbc(url=url, table='user', properties=prop)
    # 打印data數據類型
    print(type(data))
    # 展示數據
    data.show()
    # 關閉spark會話
    sc.stop()

注意點:

  1. prop參數需要根據實際情況修改,文中用戶名和密碼用xxx代替了,driver參數也可以不需要;
  2. url參數需要根據實際情況修改,格式為jdbc:mysql://主機:端口/數據庫;
  3. 通過調用方法read.jdbc進行讀取,返回的數據類型為spark DataFrame;

輸出畫面如下:

解決使用JDBC連接 MySql 時,時區跟編碼的錯誤

參考:使用JDBC連接MySql時出現...
在連接字符串後面加上?serverTimezone=UTC

其中UTC是統一標準世界時間。

完整的連接字符串示例:
jdbc:mysql://localhost:3306/test?serverTimezone=UTC

或者還有另一種選擇:
jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8,這個是解決中文亂碼輸入問題,當然也可以和上面的一起結合:jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC

寫入 MySql

下方程式碼參考:pyspark對Mysql數據庫進行讀寫
4 寫入Mysql

import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row

if __name__ == '__main__':
    # spark 初始化
    sc = SparkContext(master='local', appName='sql')
    spark = SQLContext(sc)
    # mysql 配置(需要修改)
    prop = {'user': 'root',
            'password': 'rootroot',
            'driver': 'com.mysql.cj.jdbc.Driver'}
    # database 地址(需要修改)
    url = 'jdbc:mysql://localhost:3306/testdb?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC'

    # 創建spark DataFrame
    # 方式1:list轉spark DataFrame
    l = [(1, 12), (2, 22)]
    # 創建並指定列名
    list_df = spark.createDataFrame(l, schema=['id', 'value']) 
    
    # 方式2:rdd轉spark DataFrame
    rdd = sc.parallelize(l)  # rdd
    col_names = Row('id', 'value')  # 列名
    tmp = rdd.map(lambda x: col_names(*x))  # 設置列名
    rdd_df = spark.createDataFrame(tmp)  
    
    # 方式3:pandas dataFrame 轉spark DataFrame
    df = pd.DataFrame({'id': [1, 2], 'value': [12, 22]})
    pd_df = spark.createDataFrame(df)

    # 寫入數據庫
    pd_df.write.jdbc(url=url, table='new', mode='append', properties=prop)
    # 關閉spark會話
    sc.stop()

效果如下:

注意點:

  1. prop和url參數同樣需要根據實際情況修改;
  2. 寫入數據庫要求的對像類型是spark DataFrame,提供了三種常見數據類型轉spark DataFrame的方法;
  3. 通過調用write.jdbc方法進行寫入,其中的model參數控制寫入數據的行為。


當數據庫無寫入的表時,這四種模式都會根據設定的表名稱自動創建表,無需在Mysql裡先建表。


上一篇
Python - PySparkPracticeQuestions - PySpark 練習題參考筆記
下一篇
Python - Video to Ascii 影片轉 Ascii 套件參考筆記
系列文
實驗室助理的技術文章自我整理30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言