今天要帶大家做另外一個簡單的場境應用,我們繼續沿用昨天所處理的 parquet File 來做今天的小實作,大致上今天要實作的內容如下:
讀取 local 端的 parquet file,並且依據 Stage 欄位的值,只選擇值 Armor、Champion 和 Mega 的資料送到 AWS SQS。
這邊的事前準備就是記得在 AWS 建立一個要來用的 SQS,並且複製 SQS 的 URL,實際的畫面如下:
這次的範例的 data pipeline 大致會長的如下圖:
這個 Processor 是可以讓我們讀取 Local 端的某一個 Folder 下的檔案,來看一下如何設定:
這邊以我的範例來說,我是將前一天實作完的檔案暫存到 /tmp/data
這個路徑下,所以只要在 Input Directory
這個 Property 設定好,他就會將底下的檔案讀上來做使用。
因為前一個 Processor 只是將檔案讀取變成一個 FlowFiles 而已,尚未將裡面的資料取出來,所以我們可以透過該 Processor 做到這件事情:
原先的檔案為 Parquet 格式,所以我們以 ParquetReader
的方式來做讀取,並且以一筆 Record為單位,接著再以 JsonRecordSetWriter
來轉換成 Json 格式來給下游 Processor 做處理,因為後續我們需要透過 Json 格式來做欄位的判斷。
因此我們可以看到經過這個 Processor 的 Content,都會轉換成 Json 格式,且一筆為單位,內容如下:
這個 Processor 是可以讓我們去解析將原先 Content 的某一個欄位轉換成 Attribute,所以來看一下設定:
flowfile-attribute
$.Stage
代表他會去解析進來的 FlowFiles 中的 Stage 這個欄位,並且帶到名為 Stage 的 Attribute。所以經過該 Processor 的 Flowfiles,我們會發現都會多帶一個名為 Stage 的 Attribute:
有了 Stage 這個 Attribute 之後,原則上該 Attribute 的值會跟 Content 內的 Stage 這個欄位值相同,接著就可以做過濾的動作,所以就會用到 RouteOnAttribute 這個 Processor。
該 Processor 是讓我們可依據 FlowFiles 的狀況動代增加下游的 Connection 的 Route,我們先看一下原先的 Processor 設定只會有 unmatched
的 Route:
但是我們可以在 Property 增加更多的條件,如下設定:
這邊我們加入了 7 個 Property,分別對應的是:
0, Baby
1, In-Training
2, Rookie
3, Champion
4, Ultimate
5, Mega
6, Ultra
7, Armor
一但設定完成之後,我們會發現 Route 會多出這些剛剛設定的 Property Name:
接著我們在連接下游 Processor 的時候,就可以選定符合哪些條件的 Route 可以連接到下游的 Processor。
以接下來的 PutSQS 的範例為例,我們只需要Armor、Champion 和 Mega 的資料送到 AWS SQS 即可其他的都不要,所以既勾選對應的 Route 即可。
接著就可以看到 RouteOnAttribute 和 PutSQS 之間就只會有這三個的 Connection:
然後其他不會用到的我們先傳送到 Wait Processor。
一切準備就緒之後,接下來就可以設定 PutSQS 這個 Processor,還記得一開始要你們先事前建立好 SQS,這邊就會用到了:
上述的設定完成,就可以將符合條件的資料送到 AWS SQS 了。
Wait 這個 Processor 其實就是一個暫停
的 Processor。通常會是什麼時候會用他呢?
上述就是我帶給各位的第二個範例,這些看似簡單的 Processor,其實都是很常用的,所以希望透過這兩天的小實作那大家可以對於 NiFi 在做 Data Pipeline 的設定與流程可以有更多的體悟與理解。
那明天會介紹一個國外企業是如何使用 Apache NiFi 的小案例分享,以及他的架構是如何做的,對於未來要導入該 Tool 的企業或許有一定的幫助。