延續昨天的程式碼說明
data_cnt = join_products.groupBy("product_id", "product_name", "user_id").count()
data_rank = data_cnt.withColumn("row_num", F.row_number().over(Window.partitionBy("user_id").orderBy(data_cnt["count"].desc())))
data_top_five = data_rank.filter(data_rank.row_num <= 5)
top_five_list = data_top_five["product_name","user_id","count","row_num"].orderBy("user_id", "row_num", ascending=[1,1])
- 這部分是透過 spark SQL 對資料進行 Group 與 排名
- data_cnt:將 "product_id", "product_name", "user_id" 三個欄位進行 Gloup 計算出每個 user 購買的商品數量
- data_rank:以 user_id 與購買的商品數量進行排名
- data_top_five:只取每個 user 購買數量前五多的商品
- top_five_list:只取"product_name","user_id","count","row_num" 四個欄位並依照 user 與排名進行排序
combi_data = top_five_list.coalesce(1)
top_five_DyF = DynamicFrame.fromDF(combi_data, glueContext, "top_five_DyF")
- combi_data:因為 spark 是分散式的運算所以運算結果會多個檔案呈現,所以透過 coalesce 將 spark 每個節點運算出來的結果進行合併
- top_five_DyF:最後將資料類型由 DataFrame 轉為 DynamicFrame,因為接下來要以 Glue 的 Function 將資料寫入 S3
## @type: ApplyMapping
## @args: [mapping = [("product_name", "long", "product_name", "long"), ("user_id", "long", "user_id", "long"), ("count", "bigint", "count", "bigint"), ("row_num", "bigint", "row_num", "bigint")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = top_five_DyF, mappings = [("product_name", "long", "product_name", "long"), ("user_id", "long", "user_id", "long"), ("count", "bigint", "count", "bigint"), ("row_num", "bigint", "row_num", "bigint")], transformation_ctx = "applymapping1")
- 定義有哪些欄位要寫入 S3,並且定義該欄位的資料類型
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://it.sample.s3/it_spark_job"}, format = "csv", transformation_ctx = "datasink2"]
## @return: datasink2
## @inputs: [frame = applymapping1]
datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://it.sample.s3/it_spark_job"}, format = "csv", transformation_ctx = "datasink2")
job.commit()
- glueContext.write_dynamic_frame:將運算結果寫入指定的 S3 或 Database
- connection_options:設定要寫入的 S3 路徑
- format:設定要寫入的檔案格式
完整程式碼
- 程式碼可以直接複製貼上,但資料源的 Database 與 table 名稱需要調整,以及 datasink2 的 S3 路徑也需要修改
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
import pyspark.sql.functions as F
from pyspark.sql.window import Window
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "it_db", table_name = "order_products_prior", transformation_ctx = "datasource0"]
## @return: order_products_prior
## @inputs: []
order_products_prior = glueContext.create_dynamic_frame.from_catalog(database = "it_db", table_name = "order_products_prior", transformation_ctx = "order_products_prior")
## @type: DataSource
## @args: [database = "sampledata", table_name = "order", transformation_ctx = "order"]
## @return: order
## @inputs: []
order = glueContext.create_dynamic_frame.from_catalog(database = "it_db", table_name = "order", transformation_ctx = "order")
## @type: DataSource
## @args: [database = "sampledata", table_name = "order", transformation_ctx = "order"]
## @return: order
## @inputs: []
products = glueContext.create_dynamic_frame.from_catalog(database = "it_db", table_name = "products", transformation_ctx = "products")
## @type: Join
## @args: [keys1 = ["order_id"], keys2 = ["order_id"]]
## @return: joindata
## @inputs: [frame1 = order_products_prior, frame2 = order]
join_order = Join.apply(frame1 = order_products_prior, frame2 = order, keys1 = ["order_id"], keys2 = ["order_id"], transformation_ctx = "joindata")
## @type: Join
## @args: [keys1 = ["order_id"], keys2 = ["order_id"]]
## @return: joindata
## @inputs: [frame1 = order_products_prior, frame2 = order]
join_products = Join.apply(frame1 = join_order, frame2 = products, keys1 = ["product_id"], keys2 = ["product_id"], transformation_ctx = "joindata_products").toDF()
data_cnt = join_products.groupBy("product_id", "product_name", "user_id").count()
data_rank = data_cnt.withColumn("row_num", F.row_number().over(Window.partitionBy("user_id").orderBy(data_cnt["count"].desc())))
data_top_five = data_rank.filter(data_rank.row_num <= 5)
top_five_list = data_top_five["product_name","user_id","count","row_num"].orderBy("user_id", "row_num", ascending=[1,1])
combi_data = top_five_list.coalesce(1)
top_five_DyF = DynamicFrame.fromDF(combi_data, glueContext, "top_five_DyF")
## @type: ApplyMapping
## @args: [mapping = [("order_id", "long", "order_id", "long"), ("product_name", "long", "product_name", "long"), ("add_to_cart_order", "long", "add_to_cart_order", "long"), ("reordered", "long", "reordered", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = top_five_DyF, mappings = [("product_name", "long", "product_name", "long"), ("user_id", "long", "user_id", "long"), ("count", "bigint", "count", "bigint"), ("row_num", "bigint", "row_num", "bigint")], transformation_ctx = "applymapping1")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://it.sample.s3/it_spark_job"}, format = "csv", transformation_ctx = "datasink2"]
## @return: datasink2
## @inputs: [frame = applymapping1]
datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://it.sample.s3/it_spark_job"}, format = "csv", transformation_ctx = "datasink2")
job.commit()