今天開始會有兩天來介紹簡單的應用場景,會建置完整的 NiFi Data Pipeline 來讓大家知道過程中的流程。首先第一天,先來個簡單的場景:
假設有個 csv 檔案在 AWS S3,我們希望從 s3 下載下來經過簡單的處理,接著以 parquet 的方式儲存在 local 的路徑上。
DigiDB_digimonlist.csv
即可 ,且上傳到 s3 指定好的路徑做存放。我們下載完資料且上傳到 s3 之後,接下來就要先設想我們要執行的資料前處理,這裡先簡單列給各位一個範例,那各位讀者們可以再從這裡去做更多的延伸:
Stage
欄位轉換成數值先簡單以上這兩個處理就好,處理完接著儲存到 local 的某一個路徑上,再以 parquet 格式儲存。
這個範例大致上的 Flow 會長得如下圖:
接下來來帶各位來看一下每一個 Processor 的設定。
這邊記得指定好你的 Bucket, Region, Prefix 和 AWS Crendentials Provider service 的參數
設定好之後就會列出你指定的 Prefix folder 下的檔案。
接著透過 FetchS3Object 來取得真實的檔案內容。
這個 Processor 是可以讓我們透過 SQL 的方式來取得要的資料和轉換,先看下圖設定:
可以來看一下底下的 SQL:
select Number, Digimon,
CASE WHEN Stage='Baby' THEN '0'
WHEN Stage='In-Training' THEN '1'
WHEN Stage='Rookie' THEN '2'
WHEN Stage='Champion' THEN '3'
WHEN Stage='Ultimate' THEN '4'
WHEN Stage='Mega' THEN '5'
WHEN Stage='Ultra' THEN '6'
WHEN Stage='Armor' THEN '7'
end Stage, Type
from FLowfile
簡單來說,我從 FLowfile 取得我要的四個欄位,接著對於 Stage
這個欄位去做一個轉換。
然而,我下游的 Processor 這必須要用 statement 作為 connection,如此一來處理過後的資料才能流到下游 Processor。
這邊是我用來更新 filename
這個 attribute 的 value,因為下一個 PutFile 會預設去讀這個 attributes 去作為寫入檔案的命名,這邊以 result.parquet
。
透過這個 Processor 就可以指定寫入的路徑,這邊以 /tmp/data/
這個 folder 為例,注意必須事先有該 folder 存在,否則會寫入錯誤。
如此一來,就會將處理完的資料寫成 local 的 /tmp/data/result.parquet
即可完成。
最後進到 container 的 terminal,就可以看到確實有產生對應的 parquet file。
接著,我們透過 Processor 的 View data Provenance,就可以看到處理完後的 Content 的狀況:
以上是第一個小範例的操作,很簡單地5個 Processors 就可以達到我們想要的效果,明天會再利用這個 file 再做一下步地延伸與範例。