iT邦幫忙

2021 iThome 鐵人賽

DAY 26
0
自我挑戰組

實驗室助理的技術文章自我整理系列 第 26

Python - PySparkPracticeQuestions - PySpark 練習題參考筆記

Python - PySparkPracticeQuestions - PySpark 練習題參考筆記

參考資料

我自己的 Github:Eterna-E

專案來源連結:Eterna-E/PySparkPracticeQuestions

Spark 練習題目來源:
【Spark】Spark基础练习题(一)
【Spark】Spark基础练习题(二)

說明

如題,當初會寫這篇參考筆記,主要是因為,當時正要準備開始協助 IMAC 實驗室的工研院的產學合作案的專案開發,於是乎,就開始做 PySpark 的程式設計練習,然後 Spark 題目是參考使用學長推薦的這兩篇:【Spark】Spark基础练习题(一)【Spark】Spark基础练习题(二),但是因為這兩篇教學文章的題目解答,都是用 JAVA 語言寫的,沒有找到 PySpark 版本的練習,所以我只好自己用 Python 程式語言在寫一遍,邊寫邊對答案,然後又因為之前在練習題目的時候,每一題都是各自建新的檔案做練習,導致後面必須要一個一個點開看,所以就順便再重新統一格式,整理成一篇參考筆記拉XD。

特此撰寫本篇文章作為紀錄文件,用以方便後續有需要的時候,可以快速的重複查閱,雖然後面比較沒有什麼機會再用到,但也算是一個還不錯的經驗。

PySpark 基礎練習題(一)

Spark 基礎練習題目如下所示:

讀取文件的數據test.txt
一共有多少個不到 20 歲的人參加考試?
一共有多少個等於 20 歲的人參加考試?
一共有多少個大於20歲的人參加考試?
一起參加考試?
一共有多少個女生參加考試?
12班有多少人參加考試?
13班有多少人參加考試?
語文的平均成績是多少?
數學數學的平均成績是多少?
英語的平均成績是多少?
有多少人平均成績是多少?
12班平均成績是多少?
12班班平均總成績是多少?
12班女生平均總成績是多少?
13班平均成績是多少?
13班班平均總成績是多少?
13班女生平均總成績是多少?
全校成績最高分是多少?
12班語文成績最低分是多少?
13班數學最高成績是多少?
總成績大150分的12班的女生有幾個?
總成績大於150分,且數學大於70,且年齡大於19歲的學生的平均成績是多少?

參考答案如下所示:

# 題目:
# 1. 讀取文件的數據test.txt
# 2. 一共有多少個不到 20 歲的人參加考試?
# ans:2
from pyspark import SparkContext
from operator import add

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: int(x[2])<20).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: int(x[2])<20).map(lambda x: (x[1],1)).reduceByKey(add).count()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: int(x[2])<20).groupBy(lambda x:x[1]).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: int(x[2])<20).groupBy(lambda x:x[1]).count()
print(numBs)
# 題目:
# 3. 一共有多少個等於 20 歲的人參加考試?
# ans:2
from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: int(x[2])==20).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: int(x[2])==20).groupBy(lambda x:x[1]).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: int(x[2])==20).groupBy(lambda x:x[1]).count()
print(numBs)
# 題目:
# 4. 一共有多少个大于20岁的人参加考试?
# ans:2
from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: int(x[2])>20).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: int(x[2])>20).groupBy(lambda x:x[1]).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: int(x[2])>20).groupBy(lambda x:x[1]).count()
print(numBs)
# 題目:
# 5. 一共有多个男生参加考试?
# ans:4
from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[3]=='男').collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[3]=='男').groupBy(lambda x:x[1]).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[3]=='男').groupBy(lambda x:x[1]).count()
print(numBs)
# 題目:
# 6. 一共有多少个女生参加考试?
# ans:4
from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[3]=='女').collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[3]=='女').groupBy(lambda x:x[1]).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[3]=='女').groupBy(lambda x:x[1]).count()
print(numBs)
# 題目:
# 7. 12班有多少人参加考试?
# ans:3
from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='12').collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='12').groupBy(lambda x:x[1]).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='12').groupBy(lambda x:x[1]).count()
print(numBs)
# 題目:
# 8. 13班有多少人参加考试?
# ans:3
from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='13').collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='13').groupBy(lambda x:x[1]).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='13').groupBy(lambda x:x[1]).count()
print(numBs)
# 題目:
# 9. 语文科目的平均成绩是多少?

from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='chinese').collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='chinese').map(lambda x: int(x[5])).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='chinese').map(lambda x: int(x[5])).mean()
print(numBs)
# 題目:
# 10. 数学科目的平均成绩是多少?

from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='math').collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='math').map(lambda x: int(x[5])).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='math').map(lambda x: int(x[5])).mean()
print(numBs)
# 題目:
# 11. 英语科目的平均成绩是多少?63.3333

from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='english').collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='english').map(lambda x: int(x[5])).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='english').map(lambda x: int(x[5])).mean()
print(numBs)
# 題目:
# 12. 每个人平均成绩是多少?
# (王英,73)
# (杨春,70)
# (宋江,60)
# (李逵,63)
# (吴用,50)
# (林冲,53)
# val every_socre: RDD[(String, Any)] = data.map(x=>x.split(" ")).map(x=>(x(1),x(5).toInt)).groupByKey().map(t=>(t._1,t._2.sum /t._2.size))


from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).map(lambda x: (x[1],int(x[5]))).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).map(lambda x : (x[0], int(sum(x[1])/len(x[1])) )).collect()
print(numBs)
# 題目:
# 13. 12班平均成绩是多少? 60.0

from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='12').collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='12').map(lambda x: int(x[5])).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='12').map(lambda x: int(x[5])).mean()
print(numBs)
# 題目:
# // 14. 12班男生平均总成绩是多少?165.0
# // (宋江,180)
# // (吴用,150)
from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='12' and x[3] == '男').collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='12' and x[3] == '男').map(lambda x: (x[1],int(x[5]))).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='12' and x[3] == '男').map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='12' and x[3] == '男').map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).map(lambda x : (x[0], sum(x[1]))).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='12' and x[3] == '男').map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).map(lambda x : (x[0], sum(x[1]))).map(lambda x: x[1]).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='12' and x[3] == '男').map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).map(lambda x : (x[0], sum(x[1]))).map(lambda x: x[1]).mean()
print(numBs)
# 題目:
# // 15. 12班女生平均总成绩是多少?210.0
# // (杨春,210)
from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='12' and x[3] == '女').map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).map(lambda x : (x[0], sum(x[1]))).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='12' and x[3] == '女').map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).map(lambda x : (x[0], sum(x[1]))).map(lambda x: x[1]).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='12' and x[3] == '女').map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).map(lambda x : (x[0], sum(x[1]))).map(lambda x: x[1]).mean()
print(numBs)
# 題目:
# 16. 13班平均成绩是多少?63.333333333333336

from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='13').collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='13').map(lambda x: int(x[5])).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='13').map(lambda x: int(x[5])).mean()
print(numBs)
# 題目:
# // 17. 13班男生平均总成绩是多少?175.0
# //(李逵,190)
# //(林冲,160)
from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='13' and x[3] == '男').collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='13' and x[3] == '男').map(lambda x: (x[1],int(x[5]))).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='13' and x[3] == '男').map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='13' and x[3] == '男').map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).map(lambda x : (x[0], sum(x[1]))).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='13' and x[3] == '男').map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).map(lambda x : (x[0], sum(x[1]))).map(lambda x: x[1]).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='13' and x[3] == '男').map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).map(lambda x : (x[0], sum(x[1]))).map(lambda x: x[1]).mean()
print(numBs)
# 題目:
# // 18. 13班女生平均总成绩是多少?
# //(王英,220)
from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='13' and x[3] == '女').map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).map(lambda x : (x[0], sum(x[1]))).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='13' and x[3] == '女').map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).map(lambda x : (x[0], sum(x[1]))).map(lambda x: x[1]).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='13' and x[3] == '女').map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).map(lambda x : (x[0], sum(x[1]))).map(lambda x: x[1]).mean()
print(numBs)
# 題目:
# // 19. 全校语文成绩最高分是多少?70
# var max1 = data.map(x => x.split(" ")).filter(x => x(4).equals("chinese")).map(x => x(5).toInt).max()

from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='chinese').collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='chinese').map(lambda x: int(x[5])).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='chinese').map(lambda x: int(x[5])).max()
print(numBs)
# 題目:
# // 20. 12班语文成绩最低分是多少?50
# var max2 = data.map(x => x.split(" ")).filter(x => x(4).equals("chinese") && x(0).equals("12")).map(x => x(5).toInt).min()

from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='chinese' and x[0]=='12').collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='chinese' and x[0]=='12').map(lambda x: int(x[5])).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='chinese' and x[0]=='12').map(lambda x: int(x[5])).min()
print(numBs)
# 題目:
# // 21. 13班数学最高成绩是多少?80
# var max3 = data.map(x => x.split(" ")).filter(x => x(4).equals("math") && x(0).equals("13")).map(x => x(5).toInt).max()

from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='math' and x[0]=='13').collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='math' and x[0]=='13').map(lambda x: int(x[5])).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[4]=='math' and x[0]=='13').map(lambda x: int(x[5])).max()
print(numBs)
# 題目:
# // 22. 总成绩大于150分的12班的女生有几个?1
# //(杨春,210)
# val count12_gt150girl: Long = data.map(x=>x.split(" ")).filter(x=>x(0).equals("12") && x(3).equals("女")).map(x=>(x(1),x(5).toInt)).groupByKey().map(t=>(t._1,t._2.sum)).filter(x=>x._2>150).count()

from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='12' and x[3] == '女').map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).collect()
print(numBs)

numBs = logData.map(lambda s: s.split(' ')).filter(lambda x: x[0]=='12' and x[3] == '女').map(lambda x: (x[1],int(x[5]))).groupByKey().map(lambda x : (x[0], list(x[1]))).map(lambda x : (x[0], sum(x[1]))).filter(lambda x: x[1]>150).collect()
print(numBs)
# 題目:
# // 23. 总成绩大于150分,且数学大于等于70,且年龄大于等于19岁的学生的平均成绩是多少?
# // 过滤出总分大于150的,并求出平均成绩    (13,李逵,男,(60,1))               (13,李逵,男,(190,3))             总成绩大于150                (13,李逵,男,63)
# val com1: RDD[(String, Int)] = complex1.map(x=>(x._1,(x._2,1))).reduceByKey((a, b)=>(a._1+b._1,a._2+b._2)).filter(a=>(a._2._1>150)).map(t=>(t._1,t._2._1/t._2._2))


from pyspark import SparkContext

logFile = "test.txt"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

# data1 = logData.map(lambda s: s.split(' '))\
#         .map(lambda x: (x[0]+','+x[1]+','+x[3],int(x[5])))\
#         .collect()
# print(data1)

# data1 = logData.map(lambda s: s.split(' '))\
#         .map(lambda x: (x[0]+','+x[1]+','+x[3],int(x[5])))\
#         .map(lambda x: (x[0],(x[1],1)))\
#         .collect()
# print(data1)

# data1 = logData.map(lambda s: s.split(' '))\
#         .map(lambda x: (x[0]+','+x[1]+','+x[3],int(x[5])))\
#         .map(lambda x: (x[0],(x[1],1)))\
#         .reduceByKey(lambda a,b: (a[0]+b[0],a[1]+b[1]))\
#         .collect()
# print(data1)

# data1 = logData.map(lambda s: s.split(' '))\
#         .map(lambda x: (x[0]+','+x[1]+','+x[3],int(x[5])))\
#         .map(lambda x: (x[0],(x[1],1)))\
#         .reduceByKey(lambda a,b: (a[0]+b[0],a[1]+b[1]))\
#         .filter(lambda x: x[1][0]>150)\
#         .collect()
# print(data1)

data1 = logData.map(lambda s: s.split(' '))\
        .map(lambda x: (x[0]+','+x[1]+','+x[3],int(x[5])))\
        .map(lambda x: (x[0],(x[1],1)))\
        .reduceByKey(lambda a,b: (a[0]+b[0],a[1]+b[1]))\
        .filter(lambda x: x[1][0]>150)\
        .map(lambda x: (x[0], int( x[1][0]/x[1][1] )))\
        .collect()
print(data1) # 过滤出总分大于150的,并求出平均成绩

# //过滤出 数学大于等于70,且年龄大于等于19岁的学生                filter方法返回一个boolean值 【数学成绩大于70并且年龄>=19】                                       为了将最后的数据集与com1做一个join,这里需要对返回值构造成com1格式的数据
# val com2: RDD[(String, Int)] = 
# complex2.filter(a=>{val line = a._1.split(",");
# line(4).equals("math") &&
#  a._2>=70 && line(2).toInt>=19})
# .map(a=>{val line2 = a._1.split(",");(line2(0)+","+line2(1)+","+line2(3),a._2.toInt)})
# //(12,杨春,女 , 70)
# //(13,王英,女 , 80)

data2 = logData.map(lambda s: s.split(' '))\
        .filter(lambda x: int(x[2])>=19 and x[4]=='math' and int(x[5])>=70)\
        .map(lambda x: (x[0]+','+x[1]+','+x[3] , int(x[5])))\
        .collect()
print(data2)

data1 = logData.map(lambda s: s.split(' '))\
        .map(lambda x: (x[0]+','+x[1]+','+x[3],int(x[5])))\
        .map(lambda x: (x[0],(x[1],1)))\
        .reduceByKey(lambda a,b: (a[0]+b[0],a[1]+b[1]))\
        .filter(lambda x: x[1][0]>150)\
        .map(lambda x: (x[0], int( x[1][0]/x[1][1] )))

data2 = logData.map(lambda s: s.split(' '))\
        .filter(lambda x: int(x[2])>=19 and x[4]=='math' and int(x[5])>=70)\
        .map(lambda x: (x[0]+','+x[1]+','+x[3] , int(x[5])))

print(data1.join(data2).collect())

# // 使用join函数聚合相同key组成的value元组
# // 再使用map函数格式化元素
# val result = com1.join(com2).map(a =>(a._1,a._2._1))
# //(12,杨春,女,70)
# //(13,王英,女,73)

print(data1.join(data2).map(lambda x: (x[0],x[1][0])).collect())

PySpark基礎練習題(二)

Spark 基礎練習題目如下所示:

1、創建一個1-10數組的RDD,將所有元素*2形成新的RDD

2、創建一個10-20數組的RDD,使用mapPartitions將所有元素*2形成新的RDD

3、創建一個元素為 1-5 的RDD,運用 flatMap創建一個新的 RDD,新的 RDD 為原 RDD 每個元素的 平方和三次方 來組成 1,1,4,8,9,27..

4、創建一個 4 個分區的 RDD數據為Array(10,20,30,40,50,60),使用glom將每個分區的數據放到一個數組

5、創建一個 RDD數據為Array(1, 3, 4, 20, 4, 5, 8),按照元素的奇偶性進行分組

6、創建一個 RDD(由字符串組成)Array("xiaoli", "laoli", "laowang", "xiaocang", "xiaojing", "xiaokong"),過濾出一個新 RDD(包含“xiao”子串)

7、創建一個 RDD數據為1 to 10,請使用sample不放回抽樣

8、創建一個 RDD數據為1 to 10,請使用sample放回抽樣

9、創建一個 RDD數據為Array(10,10,2,5,3,5,3,6,9,1),對 RDD 中元素執行去重操作

10、創建一個分區數為5的 RDD,數據為0 to 100,之後使用coalesce再重新減少分區的數量至 2

11、創建一個分區數為5的 RDD,數據為0 to 100,之後使用repartition再重新減少分區的數量至 3

12、創建一個 RDD數據為1,3,4,10,4,6,9,20,30,16,請給RDD進行分別進行升序和降序排列

13、創建兩個RDD,分別為rdd1和rdd2數據分別為1 to 6和4 to 10,求並集

14、創建兩個RDD,分別為rdd1和rdd2數據分別為1 to 6和4 to 10,計算差集,兩個都算

15、創建兩個RDD,分別為rdd1和rdd2數據分別為1 to 6和4 to 10,計算交集

16、創建兩個RDD,分別為rdd1和rdd2數據分別為1 to 6和4 to 10,計算 2 個 RDD 的笛卡爾積

17、創建兩個RDD,分別為rdd1和rdd2數據分別為1 to 5和11 to 15,對兩個RDD拉鍊操作

18、創建一個RDD數據為List(("female",1),("male",5),("female",5),("male",2)),請計算出female和male的總數分別為多少

19、創建一個有兩個分區的 RDD數據為List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),取出每個分區相同key對應值的最大值,然後相加

20、 創建一個有兩個分區的 pairRDD數據為Array(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)),根據 key 計算每種 key 的value的平均值

21、統計出每一個省份廣告被點擊次數的 TOP3,數據在access.log文件中
數據結構:時間戳,省份,城市,用戶,廣告 字段使用空格分割。
樣本如下:
1516609143867 6 7 64 16
1516609143869 9 4 75 18
1516609143869 1 7 87 12

22、讀取本地文件words.txt,統計出每個單詞的個數,保存數據到 hdfs 上

23、讀取 people.json 數據的文件, 每行是一個 json 對象,進行解析輸出

24、保存一個 SequenceFile 文件,使用spark創建一個RDD數據為Array(("a", 1),("b", 2),("c", 3)),保存為SequenceFile格式的文件到hdfs上

25、讀取24題的SequenceFile 文件並輸出

26、讀寫 objectFile 文件,把 RDD 保存為objectFile,RDD數據為Array(("a", 1),("b", 2),("c", 3)),並進行讀取出來

27、使用內置累加器計算Accumulator.txt文件中空行的數量

28、使用Spark廣播變量
用戶表:
id name age gender(0|1)
001,劉向前,18,0
002,馮  劍,28,1
003,李志傑,38,0
004,郭  鵬,48,2
要求,輸出用戶信息,gender必須為男或者女,不能為0,1
使用廣播變量把Map("0" -> "女", "1" -> "男")設置為廣播變量,最終輸出格式為
001,劉向前,18,女
003,李志傑,38,女
002,馮  劍,28,男
004,郭  鵬,48,男

29、mysql創建一個數據庫bigdata0407,在此數據庫中創建一張表
CREATE TABLE `user` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `username` varchar(32) NOT NULL COMMENT '用戶名稱',
  `birthday` date DEFAULT NULL COMMENT '生日',
  `sex` char(1) DEFAULT NULL COMMENT '性別',
  `address` varchar(256) DEFAULT NULL COMMENT '地址',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
數據依次是:姓名 生日 性別 省份
請使用spark將以上數據寫入mysql中,並讀取出來

30、在hbase中創建一個表student,有一個 message列族
create 'student', 'message'
scan 'student', {COLUMNS => 'message'}
給出以下數據,請使用spark將數據寫入到hbase中的student表中,並進行查詢出來
數據如下:
依次是:姓名 班級 性別 省份,對應表中的字段依次是:name,class,sex,province

參考答案如下所示:

# 1、创建一个1-10数组的RDD,将所有元素*2形成新的RDD

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
logData = sc.parallelize([x for x in range(10)]).cache()

data1 = logData.map(lambda x: x*2).collect()
print(data1)
# 2、创建一个10-20数组的RDD,使用mapPartitions将所有元素*2形成新的RDD

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
numList = [x for x in range(10,20)]
rdd = sc.parallelize(numList,len(numList))
def f(part):
    print ("====")
    for row in part:
        re = row*2
        yield re
    return re
data2 = rdd.mapPartitions(f).collect()
print(data2)
# 3、创建一个元素为 1-5 的RDD,
# 运用 flatMap创建一个新的 RDD,
# 新的 RDD 为原 RDD 每个元素的 平方和三次方 
# 来组成 1,1,4,8,9,27..

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
numList = [x for x in range(1,6)]
rdd = sc.parallelize(numList)

data3 = rdd.flatMap(lambda x: (x**2,x**3)).collect()
print(data3)
# 4、创建一个 4 个分区的 RDD数据为
# Array(10,20,30,40,50,60),
# 使用glom将每个分区的数据放到一个数组

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
numList = [x for x in range(10,70,10)]
rdd = sc.parallelize(numList).repartition(2)
print(rdd.collect())
print(rdd.getNumPartitions()) # partition size
data4 = rdd.glom().collect()
print(data4)
# 5、创建一个 RDD数据为Array(1, 3, 4, 20, 4, 5, 8),
# 按照元素的奇偶性进行分组

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
numList = [1, 3, 4, 20, 4, 5, 8]
rdd = sc.parallelize(numList)

data5 = rdd.groupBy(lambda x: x%2).collect()
print(data5)

data5 = rdd.groupBy(lambda x: x%2).map(lambda x: (x[0],list(x[1]))).collect()
print(data5)
# 6、创建一个 RDD(由字符串组成)
# Array("xiaoli", "laoli", "laowang", "xiaocang", "xiaojing", "xiaokong"),
# 过滤出一个新 RDD(包含“xiao”子串)

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
strList = ["xiaoli", "laoli", "laowang", "xiaocang", "xiaojing", "xiaokong"]
rdd = sc.parallelize(strList)

data6 = rdd.filter(lambda x: "xiao" in x).collect()
print(data6)
# 7、创建一个 RDD数据为1 to 10,请使用sample不放回抽样

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
numList = [x for x in range(1,11)]
rdd = sc.parallelize(numList)

data7 = rdd.sample(
    withReplacement=False, # 無放回抽樣
    fraction=0.5
).collect()
print(data7)

# 下方抽樣五次,參考:https://www.iteblog.com/archives/1395.html#sample

print('origin:',rdd.collect())
sampleList = [rdd.sample(withReplacement=False,fraction=0.5) for i in range(5)]
# print(sampleList)
for cnt,y in zip(range(len(sampleList)), sampleList):
    print('sample ' + str(cnt) +' :'+ str(y.collect()))

# withReplacement = True or False代表是否有放回。fraction = x, where x = .5,代表抽取百分比

# 參考 https://zhuanlan.zhihu.com/p/34901846
# 或 https://www.cnblogs.com/tianqizhi/p/12115707.html
# 搜尋sample
# 8、创建一个 RDD数据为1 to 10,请使用sample放回抽样
# val data8 = sc.makeRDD(1 to 10)
# val data8Result = data8.sample(true, 0.5, 1)

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
numList = [x for x in range(1,11)]
rdd = sc.parallelize(numList)

data8 = rdd.sample(
    withReplacement=True, # 放回抽樣
    fraction=0.5
).collect()
print(data8)

# 下方抽樣五次,參考:https://www.iteblog.com/archives/1395.html#sample

print('origin:',rdd.collect())
sampleList = [rdd.sample(withReplacement=True,fraction=0.5) for i in range(5)]
# print(sampleList)
for cnt,y in zip(range(len(sampleList)), sampleList):
    print('sample ' + str(cnt) +' :'+ str(y.collect()))
# 9、创建一个 RDD数据为Array(10,10,2,5,3,5,3,6,9,1),
# 对 RDD 中元素执行去重操作
# val data9 = sc.makeRDD(Array(10, 10, 2, 5, 3, 5, 3, 6, 9, 1))
# val data9Result = data9.distinct()

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
numList = [10,10,2,5,3,5,3,6,9,1]
rdd = sc.parallelize(numList)

data9 = rdd.distinct().collect()
print(data9) # 未排序
data9 = sorted(data9) # 排序
print(data9)
# 10、创建一个分区数为5的 RDD,数据为0 to 100,
# 之后使用coalesce再重新减少分区的数量至 2
#  val data10 = sc.makeRDD(0 to 100, 5)
#  val data10Result = data10.coalesce(2)

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
numList = [x for x in range(100)]
rdd = sc.parallelize(numList,5)

data10 = rdd.glom().collect()
print(data10)

coalesceData10 = rdd.coalesce(2).glom().collect()
print(coalesceData10)
# 11、创建一个分区数为5的 RDD,数据为0 to 100,
# 之后使用repartition再重新减少分区的数量至 3
#  val data11 = sc.makeRDD(0 to 100, 5)
#  val data11Result = data11.repartition(3)

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
numList = [x for x in range(100)]
rdd = sc.parallelize(numList,5)

data11 = rdd.glom().collect()
print(data11)

data11Result = rdd.repartition(3).glom().collect()
print(data11Result)
# 12、创建一个 RDD数据为1,3,4,10,4,6,9,20,30,16,
# 请给RDD进行分别进行升序和降序排列
#  val data12 = sc.makeRDD(Array(1, 3, 4, 10, 4, 6, 9, 20, 30, 16))
#  val data12Result1 = data12.sortBy(x => x)
#  val data12Result2 = data12.sortBy(x => x, false)

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
numList = [1,3,4,10,4,6,9,20,30,16]
rdd = sc.parallelize(numList)

data12Result1 = rdd.sortBy(lambda x: x).collect()
print(data12Result1) # 升序排列

data12Result2 = rdd.sortBy(lambda x: x,False).collect()
print(data12Result2) # 降序排列
# 13、创建两个RDD,
# 分别为rdd1和rdd2数据分别为1 to 6和4 to 10,
# 求并集
#  val data13_1 = sc.makeRDD(1 to 6)
#  val data13_2 = sc.makeRDD(4 to 10)
#  val data13Result = data13_1.union(data13_2)
# 并集 == 台灣的聯集
# 若A和B是集合,
# 則A和B聯集就是包含所有A的元素和所有B的元素,
# 而沒有其他元素的集合。

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
rdd1 = sc.parallelize([x for x in range(1,7)])
rdd2 = sc.parallelize([x for x in range(4,11)])

data13Result = rdd1.union(rdd2).collect()
print(data13Result) # rdd1 和 rdd2 的聯集
# 14、创建两个RDD,
# 分别为rdd1和rdd2数据分别为1 to 6和4 to 10,
# 计算差集,两个都算
#  val data14_1 = sc.makeRDD(1 to 6)
#  val data14_2 = sc.makeRDD(4 to 10)
#  val data14Result_1 = data14_1.subtract(data14_2)
#  val data14Result_2 = data14_2.subtract(data14_1)

# 差集:若A和B是集合,
# 則A在B中的相對差集(簡稱差集)
# 是由所有屬於B但不屬於A的元素組成的集合。

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
rdd1 = sc.parallelize([x for x in range(1,7)])
rdd2 = sc.parallelize([x for x in range(4,11)])

data14Result_1 = rdd1.subtract(rdd2).collect()
data14Result_2 = rdd2.subtract(rdd1).collect()

print("rdd1:",rdd1.collect())
print("rdd2:",rdd2.collect())
print("差集rdd1\\rdd2:", sorted(data14Result_1))
print("差集rdd1\\rdd2:", sorted(data14Result_2))
# 15、创建两个RDD,
# 分别为rdd1和rdd2数据分别为1 to 6和4 to 10,
# 计算交集
#  val data15_1 = sc.makeRDD(1 to 6)
#  val data15_2 = sc.makeRDD(4 to 10)
#  val data15Result_1 = data15_1.intersection(data15_2)

# 兩個集合A和B的交集是含有所有既屬於A又屬於B的元素,
# 而沒有其他元素的集合。

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
rdd1 = sc.parallelize([x for x in range(1,7)])
rdd2 = sc.parallelize([x for x in range(4,11)])

data15Result = rdd1.intersection(rdd2).collect()

print("rdd1:",rdd1.collect())
print("rdd2:",rdd2.collect())
print("rdd1和rdd2的交集:", sorted(data15Result))
# 16、创建两个RDD,
# 分别为rdd1和rdd2数据分别为1 to 6和4 to 10,
# 计算 2 个 RDD 的笛卡尔积
#  val data16_1 = sc.makeRDD(1 to 6)
#  val data16_2 = sc.makeRDD(4 to 10)
#  val data16Result = data16_1.cartesian(data16_2)

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
rdd1 = sc.parallelize([x for x in range(1,7)])
rdd2 = sc.parallelize([x for x in range(4,11)])

data16Result = rdd1.cartesian(rdd2).collect()

print("rdd1:",rdd1.collect())
print("rdd2:",rdd2.collect())
print("rdd1和rdd2的笛卡兒積:", sorted(data16Result))
# 17、创建两个RDD,
# 分别为rdd1和rdd2数据分别为1 to 5和11 to 15,
# 对两个RDD拉链操作
#  val data17_1 = sc.makeRDD(1 to 5)
#  val data17_2 = sc.makeRDD(11 to 15)
#  val data17Result = data17_1.zip(data17_2)
# zip 可參考 https://www.iteblog.com/archives/1400.html#zip
# 或 http://lxw1234.com/archives/2015/07/350.htm

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
rdd1 = sc.parallelize([x for x in range(1,6)])
rdd2 = sc.parallelize([x for x in range(11,16)])

data17Result = rdd1.zip(rdd2).collect()

print("rdd1:",rdd1.collect())
print("rdd2:",rdd2.collect())
print("rdd1和rdd2的拉链操作:", sorted(data17Result))
# // 18、创建一个RDD数据为
# List(("female",1),("male",5),("female",5),("male",2))
# ,请计算出female和male的总数分别为多少
#  val data18 = sc.makeRDD(List(("female", 1), ("male", 5), ("female", 5), ("male", 2)))
#  val data18Result = data18.reduceByKey(_ + _)

from pyspark import SparkContext
from operator import add

sc = SparkContext("local", "Simple App")
rdd1 = sc.parallelize([("female", 1), ("male", 5), ("female", 5), ("male", 2)])

data18Result = rdd1.reduceByKey(add).collect()

print("rdd1:",rdd1.collect())
print("female和male的总数:", sorted(data18Result))
# // 19、创建一个有两个分区的 RDD数据为
# List(("a",3),("a",2),("c",4),
# ("b",3),("c",6),("c",8)),
# 取出每个分区相同key对应值的最大值,然后相加
#  /**
#   * (a,3),(a,2),(c,4)
#   * (b,3),(c,6),(c,8)
#   */
#  val data19 = 
# sc.makeRDD(List(("a", 3), ("a", 2), ("c", 4),
#  ("b", 3), ("c", 6), ("c", 8)), 2)
#  data19.glom().collect().foreach(x => 
# println(x.mkString(",")))
#  val data19Result = data19.aggregateByKey(0)
# (math.max(_, _), _ + _)

# 參考:https://blog.csdn.net/zhuzuwei/article/details/104446388

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
dataList = [("a", 3), ("a", 2), ("c", 4),("b", 3), ("c", 6), ("c", 8)]
rdd1 = sc.parallelize(dataList,2)
data19 = rdd1.glom().collect()
print(data19)
maxVal = (lambda x,y: max(x,y))
sumComb = (lambda x,y: x+y)
data19Result = rdd1.aggregateByKey(0,maxVal,sumComb).collect()

print("每个分区相同key对应值的最大值:", sorted(data19Result))
# 20、创建一个有两个分区的 pairRDD数据为
# Array(("a", 88), ("b", 95), ("a", 91), 
# ("b", 93), ("a", 95), ("b", 98)),
# 根据 key 计算每种 key 的value的平均值
#  val data20 = sc.makeRDD(Array(("a", 88), ("b", 95), ("a", 91),
#  ("b", 93), ("a", 95), ("b", 98)))
#  val data20Result = data20.groupByKey()
# .map(x => x._1 -> x._2.sum / x._2.size)
#  //或val data20Result = data20
# .map(x => (x._1, (x._2, 1)))
# .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
# .map(x => (x._1, x._2._1 / x._2._2))

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
dataList = [("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)]
rdd1 = sc.parallelize(dataList)

data20Result = rdd1.groupByKey().mapValues(list)\
    .map(lambda x: (x[0],format( sum(x[1])/len(x[1]), '.2f'))).collect()
print("rdd:",rdd1.collect())
print("计算每种 key 的value的平均值:", sorted(data20Result))
# 24、保存一个 SequenceFile 文件,
# 使用spark创建一个RDD数据为
# Array(("a", 1),("b", 2),("c", 3)),
# 保存为SequenceFile格式的文件到hdfs上
#  val data24 = 
# sc.makeRDD(Array(("a", 1), ("b", 2), ("c", 3)))
#  data24.saveAsSequenceFile
# ("hdfs://mycluster:8020/20200407_SequenceFile")

# 參考:https://stackoverflow.com/questions/34491579/saving-rdd-as-sequence-file-in-pyspark

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
dataList = [("a", 1),("b", 2),("c", 3)]
rdd1 = sc.parallelize(dataList)

data24Result = rdd1.saveAsSequenceFile("testSeq")
# 25、读取24题的SequenceFile 文件并输出
#  val data25: RDD[(String,Int)] = 
# sc.sequenceFile[String,Int]
# ("hdfs://mycluster:8020/20200407_SequenceFile/part-00000")

# 參考:https://blog.csdn.net/appleyuchi/article/details/81133270

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")

data25Result = sc.sequenceFile("testSeq")

print(data25Result.values().collect())
# 26、读写 objectFile 文件,
# 把 RDD 保存为objectFile,
# RDD数据为Array(("a", 1),("b", 2),("c", 3)),
# 并进行读取出来
#  val data26_1 = 
# sc.makeRDD(Array(("a", 1), ("b", 2), ("c", 3)))
#  data26_1.
# saveAsObjectFile
# ("output20200407/20200407_objectFile")
#  val data26_2 = 
# sc.objectFile
# ("output20200407/20200407_objectFile")

# 參考:https://spark.apache.org/docs/2.3.1/api/python/_modules/pyspark/rdd.html

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
dataList = [("a", 1), ("b", 2), ("c", 3)]
rdd1 = sc.parallelize(dataList)
# data26Result = rdd1.saveAsPickleFile("ObjectFile")

data26 = sc.pickleFile("ObjectFile").collect()

print(data26)
# /**
#   * 28、使用Spark广播变量
#   * 用户表:
#   * id name age gender(0|1)
#   * 001,刘向前,18,0
#   * 002,冯  剑,28,1
#   * 003,李志杰,38,0
#   * 004,郭  鹏,48,1
#   * 要求,输出用户信息,gender必须为男或者女,不能为0,1
#   * 使用广播变量把Map("0" -> "女", "1" -> "男")设置为广播变量,
#      最终输出格式为
#   * 001,刘向前,18,女
#   * 003,李志杰,38,女
#   * 002,冯  剑,28,男
#   * 004,郭  鹏,48,男
#   */
#  val data28 = sc.textFile("input20200407/user.txt")
#  val sex = sc.broadcast(Map("0" -> "女", "1" -> "男"))
#  data28.foreach { x => var datas = x.split(",");
#  println(datas(0) + "," + datas(1) + "," + datas(2) + "," + sex.value(datas(3))) }

# 參考:https://sparkbyexamples.com/pyspark/pyspark-broadcast-variables/

from pyspark import SparkContext

sc = SparkContext("local", "Simple App")
data28 = sc.textFile("user.txt")
# print(data28.collect())
sex = sc.broadcast({"0": "女", "1": "男"})

data28result = data28.map(lambda x: x.split(','))\
    .map(lambda x: x[0] + "," + x[1] + "," + x[2] + "," + sex.value[x[3]])
# print(data28result.collect())

for data in data28result.collect():
    print(data)

#  /**
#   * 29、mysql创建一个数据库bigdata0407,在此数据库中创建一张表
#   * CREATE TABLE `user` (
#   * `id` int(11) NOT NULL AUTO_INCREMENT,
#   * `username` varchar(32) NOT NULL COMMENT '用户名称',
#   * `birthday` date DEFAULT NULL COMMENT '生日',
#   * `sex` char(1) DEFAULT NULL COMMENT '性别',
#   * `address` varchar(256) DEFAULT NULL COMMENT '地址',
#   * PRIMARY KEY (`id`)
#   * ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
#   * 数据如下:
#   * 依次是:姓名 生日 性别 省份
#   * 安荷 1998/2/7 女 江苏省
#   * 白秋 2000/3/7 女 天津市
#   * 雪莲 1998/6/7 女 湖北省
#   * 宾白 1999/7/3 男 河北省
#   * 宾实 2000/8/7 男 河北省
#   * 斌斌 1998/3/7 男 江苏省
#   * 请使用spark将以上数据写入mysql中,并读取出来。
#   */
#  val data29 = sc.textFile("input20200407/users.txt")
#  val driver = "com.mysql.jdbc.Driver"
#  val url = "jdbc:mysql://localhost:3306/bigdata0407"
#  val username = "root"
#  val password = "root"
# /**
#  * MySQL插入数据
#  */
#  data29.foreachPartition {
#    data =>
#      Class.forName(driver)
#      val connection = java.sql.DriverManager.getConnection(url, username, password)
#      val sql = "INSERT INTO `user` values (NULL,?,?,?,?)"
#      data.foreach {
#        tuples => {
#          val datas = tuples.split(" ")
#          val statement = connection.prepareStatement(sql)
#          statement.setString(1, datas(0))
#          statement.setString(2, datas(1))
#          statement.setString(3, datas(2))
#          statement.setString(4, datas(3))
#          statement.executeUpdate()
#          statement.close()
#        }
#      }
#      connection.close()
#  }
# /**
#  * MySQL查询数据
#   */
#  var sql = "select * from `user` where id between ? and ?"
#  val jdbcRDD = new JdbcRDD(sc,
#    () => {
#      Class.forName(driver)
#      java.sql.DriverManager.getConnection(url, username, password)
#    },
#    sql,
#    0,
#    44,
#    3,
#    result => {
#      println(s"id=${result.getInt(1)},username=${result.getString(2)}" +
#        s",birthday=${result.getDate(3)},sex=${result.getString(4)},address=${result.getString(5)}")
#    }
#  )
#  jdbcRDD.collect()

import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row

if __name__ == '__main__':
    # spark 初始化
    sc = SparkContext(master='local', appName='sql')
    spark = SQLContext(sc)
    # mysql 配置(需要修改)
    prop = {'user': 'root',
            'password': 'rootroot',
            'driver': 'com.mysql.cj.jdbc.Driver'}
    # database 地址(需要修改)
    url = 'jdbc:mysql://localhost:3306/testdb?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC'
    data29 = sc.textFile("mysql_insert.txt")
    data29result = data29.map(lambda x: x.split(' '))
    # print(data29result.collect())
    username = data29.map(lambda x: x.split(' ')).map(lambda x: x[0]).collect()
    print(username)
    data_id = [x+1 for x in range(len(username))]
    print(data_id)
    birthday = data29.map(lambda x: x.split(' ')).map(lambda x: x[1]).collect()
    print(birthday)
    sex = data29.map(lambda x: x.split(' ')).map(lambda x: x[2]).collect()
    print(sex)
    address = data29.map(lambda x: x.split(' ')).map(lambda x: x[3]).collect()
    print(address)

    # 创建spark DataFrame
    # 方式3:pandas dataFrame 转spark DataFrame
    # 安荷 1998/2/7 女 江苏省
    df = pd.DataFrame({'id': data_id, 'username': username,
                       'birthday': birthday, 'sex': sex, 'address': address})
    pd_df = spark.createDataFrame(df)

    # 写入数据库
    pd_df.write.jdbc(url=url, table='user', mode='overwrite', properties=prop)

    # 读取表
    data = spark.read.jdbc(url=url, table='user', properties=prop)
    # 打印data数据类型
    print(type(data))
    # 展示数据
    data.show()

# ok

# 參考:https://zhuanlan.zhihu.com/p/136777424

上一篇
Python - 根據輸入的英文字母排列出有意義的單詞-參考筆記
下一篇
Python - 在 Windows 10 上使用 PySpark 連接 Mysql 資料庫參考筆記
系列文
實驗室助理的技術文章自我整理30

尚未有邦友留言

立即登入留言