設定完成後可以開始撰寫所需的 ETL 程式
- 設定完 Python Shell 後可以看到一個空白的編輯器,這裡我們可以自行撰寫所需的 ETL 程式,接下來會介紹如何使用 pandas 進行資料處理,產生出每個user 最喜歡購買的前五項商品清單
接下來會簡單介紹所使用的程式碼
import pandas as pd
- Import 所需要的 Library,這邊沒有 import 昨天所設定的 s3fs 是因為 s3fs 是透過 pandas 進行呼叫,所以 import 的部分會由 pandas 處理
order_products_prior_df = pd.read_csv('s3://it.sample.s3/SampleData/order_products_prior/order_products__prior.csv')
orders_df = pd.read_csv('s3://it.sample.s3/SampleData/order/orders.csv')
products_df = pd.read_csv('s3://it.sample.s3/SampleData/products/products.csv')
- 從 S3 抓取所需要的資料order_products__prior.csv、orders.csv、products.csv
order_products_prior_df = pd.merge(order_products_prior_df , products_df , on = 'product_id' , how= 'left')
order_products_prior_df = pd.merge(order_products_prior_df , orders_df ,on = 'order_id' , how = 'left')
- 透過 pandas 將三個資料源進行 Join,pandas 的 join 是使用 merge 這個 Function,裡面的第一與第二個參數,代表要 Join 的 Table,第三個參數 on 代表要 Join 的欄位,最後一個參數 how 代表要 Join 的方式
product_count_for_user = order_products_prior_df.groupby(['product_name','user_id']).size().reset_index()
- 以 product_name 與 user_id 兩個欄位進行 Group 計算出單一 User 購買的單一商品數量
product_count_for_user.columns = ['product name', 'user_id', 'frequency_count']
product_count_for_user.sort_values(['user_id', 'frequency_count'], ascending=[True, False], inplace=True)
- 針對 user_id 與 frequency_count 兩個欄位進行排序,並且 user_id 使用升冪排序,frequency_count 使用降冪排序,排序方式則是透過 ascending 這個參數決定,True 代表升冪;False 代表降冪
top_five = product_count_for_user.groupby(['user_id']).head(5)
- 透過 Group 只取每個 user_id 的前五筆資料
top_five['sort_id'] = top_five['frequency_count'].groupby(top_five['user_id']).rank(ascending=False,method='first')
- 新增一個排名欄位 sort_id,這個欄位會以 user_id 為基準進行降冪排名,升降冪排名是透過 ascending 參數進行控制,True 代表升冪;False 代表降冪
top_five.to_csv('s3://it.sample.s3/PythonShellOutput/top_five.csv', index=False)
完整程式碼
- 程式碼可以直接使用,但 read_csv 與 to_csv 的 S3 路徑需適當修改
import pandas as pd
order_products_prior_df = pd.read_csv('s3://it.sample.s3/SampleData/order_products_prior/order_products__prior.csv')
orders_df = pd.read_csv('s3://it.sample.s3/SampleData/order/orders.csv')
products_df = pd.read_csv('s3://it.sample.s3/SampleData/products/products.csv')
order_products_prior_df = pd.merge(order_products_prior_df , products_df , on = 'product_id' , how= 'left')
order_products_prior_df = pd.merge(order_products_prior_df , orders_df ,on = 'order_id' , how = 'left')
product_count_for_user = order_products_prior_df.groupby(['product_name','user_id']).size().reset_index()
product_count_for_user.columns = ['product name', 'user_id', 'frequency_count']
product_count_for_user.sort_values(['user_id', 'frequency_count'], ascending=[True, False], inplace=True)
top_five = product_count_for_user.groupby(['user_id']).head(5)
top_five['sort_id'] = top_five['frequency_count'].groupby(top_five['user_id']).rank(ascending=False,method='first')
top_five.to_csv('s3://it.sample.s3/PythonShellOutput/top_five.csv', index=False)