使用 Cloud Dataprep 做資料前處理
主旨:了解如何使用 Cloud Dataprep 做資料前處理
Cloud Dataprep
在訓練ML模型前,最一開始要做的事就是將資料準備好,資料準備的步驟中則包含了探索和清理資料等一連串的前處理工作。
今天我們將介紹到使用 Cloud Dataprep 來做前處理,圖形和視覺化的使用者互動,能幫助別人與自己更了解資料的特性,並對特徵工程有相當大的貢獻,而透過 GCP 便能將巨量的資料做到這些前處理的任務。
下面是兩種在 GCP 做前處理的方式,左邊是之前許多實作所採用的,今天的實作將會介紹到右邊透過 Cloud Dataprep 的方式。
所以 Cloud Dataprep 到底是蝦米碗糕呢?它可以讓你探索和轉變你的資料,只需要很小程度的程式碼。它也可以從不同的地方拿取資料包含 Google Cloud storage 和 BigQuery,你也可以上傳自己的資料。
在你探索和了解資料後,你可以使用 Dataprep 來計算資料轉變流(flows),這個轉變流和管道(pipeline)類似,下面是整個運作的示意:
在 Dataprep 中,flows 是透過一串的 recipes 所實現,recipes 是在 wranglers 函式庫中所建立的資料處理步驟。
而一般常用的資料處理任務,都有先建立好的 wranglers,例如說有:
- 資料提取 (data ingestion)
- 資料清理 (data cleansing)
- 聚合 (aggregations)
- 加入、連結 (joins, unions)
- 轉變 (transformations)
- 資料型態轉換 (type conversions)
若你使用 wranglers,Dataprep 將會把你的 flow 和其recipes 轉變成 dataflow pipeline,接著使用相同的介面,你可以將該 flow 作為一個 job 運行在 Dataflow 上並且監看整個過程,下圖是使用 Dataprep 運行一個 Dataflow job 的流程圖:
接著我們就來實際操作看看吧!
[GCP Lab實作-14]:在 Cloud Dataprep 中計算時間窗口的特徵
在這個實作中,我們將學會:
- 使用 Cloud Dataprep 建造一個新的 Flow
- 使用 recipes 建構和鏈結變形的步驟
- 運行 Cloud Dataprep jobs
今天的實作步驟會蠻多的,看倌們請做好心理準備XDDD
[Part 1]:創建 BigQuery 資料集用來儲存 Cloud Dataprep 輸出
-
登入 GCP,並在 storage 創建一個 bucket。
-
在 GCP 的 navigation bar 上,選擇 BigQuery:
- 點到你的 project ID,選擇 CREATE DATASET:
- 在 Dataset ID 填入名稱(這裡我用 taxi_cab_reporting),填好後按 Create dataset:
- 在 GCP 的 navigation bar 上,選擇 Dataprep:
- 接著就是一連串的確認、同意、接受使用條款:
[Part 2]:從 Google Cloud Storage 輸入 NYC Taxi 資料到 Dataprep Flow
- 在 Dataprep Flow 介面右邊,選擇 Create Flow:
- 填入 Flow Name 和 Description 後,按 Create:
- 接著選擇 Import & Add Dataset:
- Import Data 介面上選擇 GCS,點選鉛筆按鈕來選擇要引入資料的 GCP 路徑:
- 分別填入下面2個資料路徑(分別是2015年和2016年的資料),然後按 Go (做兩次):
gs://cloud-training/gcpml/c4/tlc_yellow_trips_2015_small.csv
gs://cloud-training/gcpml/c4/tlc_yellow_trips_2016_small.csv
- 可以看到我們加入的兩個資料集,接著按 Import & Add to Flow:
- 按下 tlc_yellow_trips_2015_small 按鈕後,選擇 Add New Recipe:
- 選擇 Edit Recipe 後,稍等一下就會顯示資料管理圖:
- 接著我們將 2015 和 2016 的資料做結合,點選上方的 Union 按鈕:
- 選擇 Add data 後,勾選 2016 年的資料按 Apply,然後選擇 Add to Recipe:
- 接著我們將日期和時間的行數接在一起作為新的時間戳記,先選到 Merge columns,在 Columns 內填入 pickup_day 和 pickup_time,Separator 內填入一個空格,最後將新的行取名為 pickup_daytime,按下 Add:
- 再來我們產生一個新的行數是最近一小時內平均的乘客數量,選到 Dates and times -> DATEFORMAT,在 Formula 內填入
DATEFORMAT(pickup_datetime,"yyyyMMddHH")
,行取名為 hour_pickup_daytime,按下Add,可以看到這個操作是 Recipe 3:
- 為了讓之前時間戳記的格式變成 DATETIME 資料格式,我們把後面的分和秒的位置加上0,選到 Merge columns,在 Columns 內填入 hour_pickup_daytime 和 '0000' ,將新的行取名為 pickup_hour,按下 Add,這個操作是 Recipe 4:
- 下一步,我們計算前一小時的平均乘客數,選到 Aggregation -> AVERAGE,在 Formula 內填入
AVERAGE(fare_amount)
,Sort rows by pickup_daytime
,Group rows by pickup_hour
,,按下 Add,這個操作是 Recipe 5:
- 再來我們計算最近3小時內車錢的滾動式平均,選到 Window -> ROLLINGAVERAGE,在 Formula 內填入
ROLLINGAVERAGE(average_fare_amount, 3, 0)
,Sort rows by -pickup_hour
,這邊負號代表遞減的順序,按下 Add,這個操作是 Recipe 6:
- 都設定好後,在 Recipe 上選 Run job:
- 點 Edit 修改路徑:
- 選擇 BigQuery 的 taxi_cab_reporting 後,按 Create a new table:
- Table 的設定中,填入名字(tlc_yellow_trips_reporting),選擇 Drop the table every run,按下 Update:
- 可以看到 Settings 那邊改變了,按下 Run job:
- 若要觀察 job,可以按下右邊的 View Dataflow job:
- Job detail 面板:
- 產生資料的過程我們也可以使用 BigQuery 來搶先看一下:
#standardSQL
SELECT
pickup_hour,
FORMAT("$%.2f",ROUND(average_3hr_rolling_fare,2)) AS avg_recent_fare,
ROUND(average_trip_distance,2) AS average_trip_distance_miles,
FORMAT("%'d",sum_passenger_count) AS total_passengers_by_hour
FROM
`asl-ml-immersion.demo.nyc_taxi_reporting`
ORDER BY
pickup_hour DESC;
- Job 完成後,可以看到狀態變為 Completed/Succeeded:
呼~到這邊總算做完了這次的實作,透過這些步驟你使用 Cloud Dataprep UI 建立了一個資料轉變的 pipeline。
今天介紹了Cloud Dataprep,明天我們將介紹什麼是 “特徵組合”。