iT邦幫忙

2021 iThome 鐵人賽

DAY 26
0
AI & Data

Apache NiFi - 讓你輕鬆設計 Data Pipeline系列 第 26

Day26 NiFi 場景應用範例 (一)

今天開始會有兩天來介紹簡單的應用場景,會建置完整的 NiFi Data Pipeline 來讓大家知道過程中的流程。首先第一天,先來個簡單的場景:

假設有個 csv 檔案在 AWS S3,我們希望從 s3 下載下來經過簡單的處理,接著以 parquet 的方式儲存在 local 的路徑上。

事前準備

  1. 我們先下載一個 kaggle - Digimon Database: A database of Digimon and their moves from Digimon Story CyberSleuth 的資料,只要 DigiDB_digimonlist.csv 即可 ,且上傳到 s3 指定好的路徑做存放。
  2. 設定好 AWS Credential 的 Controller Service,好讓後續的 Processor 可以做使用,詳細設定參考 Day9 NiFi - Controller Service

What is data preprocessing about example data?

我們下載完資料且上傳到 s3 之後,接下來就要先設想我們要執行的資料前處理,這裡先簡單列給各位一個範例,那各位讀者們可以再從這裡去做更多的延伸:

  1. Stage 欄位轉換成數值
  2. 只保留 Number, Digimon, Stage, Type 四個欄位

先簡單以上這兩個處理就好,處理完接著儲存到 local 的某一個路徑上,再以 parquet 格式儲存。

How to build?

這個範例大致上的 Flow 會長得如下圖:

接下來來帶各位來看一下每一個 Processor 的設定。

ListS3


這邊記得指定好你的 Bucket, Region, PrefixAWS Crendentials Provider service 的參數

設定好之後就會列出你指定的 Prefix folder 下的檔案。

FetchS3Object


接著透過 FetchS3Object 來取得真實的檔案內容。

QueryRecord

這個 Processor 是可以讓我們透過 SQL 的方式來取得要的資料和轉換,先看下圖設定:

  • Record Reader: 代表要用什麼格式讀取,這邊我選擇用 CSVReader。
  • Record Writer: 代表從該 Processor 流出去的 Content 會是什麼格式,這邊依據場景假設選用 ParquetRecordSerWritter。
  • statement: 這邊是我新加入的 parameters,就是我要處理的 SQL

可以來看一下底下的 SQL:

select Number, Digimon,
    CASE WHEN Stage='Baby' THEN '0'
         WHEN Stage='In-Training' THEN '1'
         WHEN Stage='Rookie' THEN '2'
         WHEN Stage='Champion' THEN '3'
         WHEN Stage='Ultimate' THEN '4'
         WHEN Stage='Mega' THEN '5'
         WHEN Stage='Ultra' THEN '6'
         WHEN Stage='Armor' THEN '7'
    end Stage, Type
from FLowfile

簡單來說,我從 FLowfile 取得我要的四個欄位,接著對於 Stage 這個欄位去做一個轉換。

然而,我下游的 Processor 這必須要用 statement 作為 connection,如此一來處理過後的資料才能流到下游 Processor。

UpdateAttribute


這邊是我用來更新 filename 這個 attribute 的 value,因為下一個 PutFile 會預設去讀這個 attributes 去作為寫入檔案的命名,這邊以 result.parquet

PutFile


透過這個 Processor 就可以指定寫入的路徑,這邊以 /tmp/data/ 這個 folder 為例,注意必須事先有該 folder 存在,否則會寫入錯誤。

如此一來,就會將處理完的資料寫成 local 的 /tmp/data/result.parquet 即可完成。

Running Result

最後進到 container 的 terminal,就可以看到確實有產生對應的 parquet file。

接著,我們透過 Processor 的 View data Provenance,就可以看到處理完後的 Content 的狀況:

小總結

以上是第一個小範例的操作,很簡單地5個 Processors 就可以達到我們想要的效果,明天會再利用這個 file 再做一下步地延伸與範例。

Reference


上一篇
Day25 NiFi - 第三方程式執行
下一篇
Day27 NiFi 場景應用範例 (二)
系列文
Apache NiFi - 讓你輕鬆設計 Data Pipeline30

尚未有邦友留言

立即登入留言