iT邦幫忙

第 12 屆 iThome 鐵人賽

DAY 17
0
AI & Data

AWS 數據處理與分析實戰系列 第 17

Day 17 Glue ETL Job 教學 - Part 4

  • 分享至 

  • xImage
  •  

創建完 ETL 的 Spark Job 後我們要加入資料處理的內容,在預設的程式碼中只能做到資料搬遷,那這次我們的目標是要找出每個 user 最常購買的前五名商品,這部分會使用 PySpark 的進行

接下來會以修改後的程式進行說明,以下是每個 user 購買數量前五名商品的 PySpark 程式碼

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  • 為 Glue Job 初始化需要使用的 Library,此部分在預設的程式碼中會自動產生
from awsglue.dynamicframe import DynamicFrame
import pyspark.sql.functions as F
from pyspark.sql.window import Window
  • DynamicFrame 要將 Spark 的 DataFrame 的資料格式轉為 Glue 的 DynamicFrame
  • pyspark.sql.functions 與 pyspark.sql.window 則是之後再使用 pyspark SQL 時會使用到的 Library
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
  • Glue Job 初始化
## @type: DataSource
## @args: [database = "it_db", table_name = "order_products_prior", transformation_ctx = "datasource0"]
## @return: order_products_prior
## @inputs: []
order_products_prior = glueContext.create_dynamic_frame.from_catalog(database = "it_db", table_name = "order_products_prior", transformation_ctx = "order_products_prior")
  • 第一個資料源的程式碼會由 Glue 自動產生,會對應到 Data Catalog 中的 DB 與 Table
## @type: DataSource
## @args: [database = "sampledata", table_name = "order", transformation_ctx = "order"]
## @return: order
## @inputs: []
order = glueContext.create_dynamic_frame.from_catalog(database = "it_db", table_name = "order", transformation_ctx = "order")


## @type: DataSource
## @args: [database = "sampledata", table_name = "products", transformation_ctx = "products"]
## @return: products
## @inputs: []
products = glueContext.create_dynamic_frame.from_catalog(database = "it_db", table_name = "products", transformation_ctx = "products")
  • 這邊我們在多加兩個資料源因為我們需要 order_products_prior、order 與 products 這三張表一起 Join 的資料,程式部分可以複製第一個資料源的程式碼,並修改後面的 database、table_name、transformation_ctx 參數
## @type: Join
## @args: [keys1 = ["order_id"], keys2 = ["order_id"]]
## @return: join_order
## @inputs: [frame1 = order_products_prior, frame2 = order]
join_order = Join.apply(frame1 = order_products_prior, frame2 = order, keys1 = ["order_id"], keys2 = ["order_id"], transformation_ctx = "joindata")

## @type: Join
## @args: [keys1 = ["product_id"], keys2 = ["product_id"]]
## @return: join_products
## @inputs: [frame1 = join_order, frame2 = products]
join_products = Join.apply(frame1 = join_order, frame2 = products, keys1 = ["product_id"], keys2 = ["product_id"], transformation_ctx = "joindata_products").toDF()
  • Join 的 Function 可以從右上角的 Transform 直接匯入,Transform 有提供常見的 Function 可以使用,除了 Join 還有 Filter、SplitFields、DropNullFields 等等的 Function 可以使用
  • 需要注意的是 Transform 的 Function 要在 DynamicFrame 的格式下使用,如果已經像 join_products 一樣,已經透過最後面的 .toDF() 將 DynamicFrame 轉為 DataFrame,這個狀況下就無法再接著使用 Transform 的 Function
  • Join 的 Function 中 frame1、frame2 代表著要 Join 的兩張 Table,key1 代表 frame1 要用來 Join 的欄位,key2 也是同理

上一篇
Day 16 Glue ETL Job 教學 - Part 3
下一篇
Day 18 Glue ETL Job 教學 - Part 5
系列文
AWS 數據處理與分析實戰30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言