AI 跟 ML 主要是透過演算法,並使用大量的資料
進行訓練模型,訓練完的資料模型可以在當又有新近來的資料可以透過該模型進行預測.所以在要做 AI 跟 ML 之前必須先收集大量的資料並透過 ETL 整理出需要訓練的資料.
而 python 提供非常多 AI、ML 的模組可以呼叫運算,但運到大量的資料可能會遇到資料分散在不同機器上,而且又有分 batch 跟 streaming 的資料,這時候如果只靠一台機器運算可能應付不來,所以 spark 可以幫助 python 程式達到分散式運算.而 spark 需要學的東西非常多,先 run 個 sample 玩一下,計算文章出現的字,每個字個別出現幾次.
下載 apache spark 並將 spark-2.4.4-bin-hadoop2.7.tgz
解壓縮.
啟動 pyspark
spark-2.4.4-bin-hadoop2.7/bin > ./pyspark
Python 2.7.13 (v2.7.13:a06454b1afa1, Dec 17 2016, 12:39:47)
[GCC 4.2.1 (Apple Inc. build 5666) (dot 3)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
19/10/09 18:07:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.4
/_/
Using Python version 2.7.13 (v2.7.13:a06454b1afa1, Dec 17 2016 12:39:47)
SparkSession available as 'spark'.
>>>
建立一個檔案,內容隨便找篇文章貼上.然後利用 pyspark 統計文章內出現字的數量.
> cat wordcount.txt
This tutorial provides a quick introduction to using Spark. We will first introduce the API through Spark’s interactive shell (in Python or Scala), then show how to write applications in Java, Scala, and Python.
......
使用 spark context (sc) 提供的 api text_file
讀取一個 local file,會回傳一個 RDD 物件(彈性分散式資料集),RDD 在 spark 是很重要的東西,spark 將外部資料讀進來都會轉成 RDD 做操作.另外除了 RDD 還有 DataFrame 和 DataSet 資料結構可以運用.
>>> text_file = sc.textFile('/Volumes/Transcend/spark/testFile/wordcount.txt')
>>> type(text_file)
<class 'pyspark.rdd.RDD'>
接著透過 RDD 的 api flatMap
把每一行資料用 split
分隔出每個字,然後把出現的每個字用 tuple 型態存取,第一個放該字第二個元素給 1 表示出現一次,接著用 reduceByKey
就是說遇到相同的字的話就把第二個元素(出現字數)做加總(a+b)的動作,接著把結果用 saveAsTextFile
output 成檔案.可以看到 spark 幫忙把這程式分成了 2 個 task 跑.
>>> text_file.flatMap(lambda line: line.split(' ')).map(lambda word: (word,1)).reduceByKey(lambda a,b: a + b).saveAsTextFile('file:///Volumes/Transcend/spark/testFile/wordcount_result')
[Stage 2:> (0 + 2) / 2]/Volumes/Transcend/spark/spark-2.4.4-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/shuffle.py:60: UserWarning: Please install psutil to have better support with spilling
/Volumes/Transcend/spark/spark-2.4.4-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/shuffle.py:60: UserWarning: Please install psutil to have better support with spilling
可以看到產生的檔案會在 wordcount_result 目錄底下,有三個檔案,但,只有 part-
開頭的是資料內容,由於上面可以看到 spark 分了 2 個 task 所以 output 就會有 2 個檔案.
> ll wordcount_result
total 512
-rwxrwxrwx 1 daniel staff 0B Oct 9 18:35 _SUCCESS
-rwxrwxrwx 1 daniel staff 915B Oct 9 18:35 part-00000
-rwxrwxrwx 1 daniel staff 753B Oct 9 18:35 part-00001
看一下 pyspark 統計文章 word 的數量,tuple 第一個元素是出現的字,第二個是出現的次數(python string 以 u 開頭表示後面字串是 unicode).
> cat wordcount_result/part-0000*
(u'', 3)
(u'detailed', 1)
(u'reference', 1)
(u'is', 2)
(u'Distributed', 1)
(u'we', 2)
(u'Note', 1)
(u'download', 2)
(u'through', 1)
......
也可以用下面方式讀取檔案得到 DataFrame 來對檔案內容做操作.
>>> text_file = spark.read.text('/Volumes/Transcend/spark/testFile/wordcount.txt')
>>> type(text_file)
<class 'pyspark.sql.dataframe.DataFrame'>
>>> text_file.printSchema()
root
|-- value: string (nullable = true)
>>> text_file.count()
6
>>> text_file.show()
+--------------------+
| value|
+--------------------+
|This tutorial pro...|
| |
|To follow along w...|
| |
|Note that, before...|
| |
+--------------------+
spark 還可以跑在 Hadoop 的 yarn 上面去執行,可以利用多台機器的資源做運算.
鐵人賽終於來到最後一天了,這陣子了解了不少 python 的東西,像 Pandas、Numpy、airflow、crawler、pyspark 之類的,真的蠻有趣的.但其實 python 的東西真的太多了,而且有越來越紅的趨勢.雖然工作上不一定會用到,但還是非常值得去學習.