前面已經講完 Processor 和 Connection 兩個重要的 Componenet,我們就可以透過這兩個去建立基本的 Data Pipeline。但有時候我們會建立很多 Data Pipeline 在 NiFi,有些流程甚至是重疊的,這樣會覺得不太符合效益,所以就要透過今天的主角 - Processor Group 來做處理。
當 Date Pipeline 複雜且多樣化時,有些小流程可能會有重複的建置,此時可以透過 Processor Group(接下來簡稱 PG) 來做處理,簡單來說就是變成 Module,然後整合到 Data Pipeline 來做一個重複使用的功能。
PG 除了有作為 Module 的功能之外,他也有幾個額外重點,其實之前都有提到他,這邊再稍微列一下給各位:
從上面的 gif 檔可以看到如何建立 PG,而當我們移動到 PG 內部時,可以注意到左下角也會改變成我們目前所屬的階層(很像 Folder 的概念)。
這邊我來帶一個場景應用,一樣拿kaggle titanic 來做一個應用,這裡就做基本一些簡單的前處理就好,這裡示範的前處理步驟如下:
1. 將 Sex 欄位轉換成數值
2. 依據 Embarked 欄位分成 3 個 connection 出來
大致上的 Pipeline 會長如下圖:
其中中間的 PG (titanic_preprocess) 內部會長如下圖:
接下來先針對 PG 內部來做講解:
input
和 output
這兩個 port,如此一來才有辦法將 FlowFiles 從上游流入,且再從下游流出。所以如同紅框所示即可知道如何加入 input
和 output
:
而在 PG 內可以同時存在著多的 input
和 output
,再由外部建立 connection 時來決定要將 FlowFiles 流入到 PG 的哪一個 port。
其中CSVReader
和 CSVRecordWriter
是屬於 Controller Service
的一種,所以當選完指定類型時,還要點選右手邊的箭頭,此時會看到如下畫面:
接著按下 閃電
的符號,就能正式啟用,為何要這樣做呢?詳細原因在下一節 Controller Service 會再介紹到。
Sex
這個欄位原本的 female 轉成 0, male 轉成 1。一樣當中有幾個參數要去做設定:
這裡會看到我額外加入了一個 /Sex
這個 property,其中他的value 是:
${field.Sex:equals('female'):ifElse(0,1)}
這個語法是 NiFi 自己的語法,叫做 NiFi Expression Language
,是可以幫助我們針對 Flowfiles 的 attributes 和 content 作處理的。
詳細的寫法也會在後續介紹,這裡簡單的意思是我們想要取得 Sex
的欄位,且若底下有 female
這個 value 時轉成 0,否則轉成 1,接著再寫回 Sex
欄位作替代,所以 key 才會是 /Sex
(斜線是這個 Processor 的規定,這樣才有辦法找到對應的欄位)。
[{"PassengerId":889,"Survived":0,"Pclass":3,"Name":"Johnston, Miss. Catherine Helen \"Carrie\"","Sex":"1","Age":null,"SibSp":1,"Parch":2,"Ticket":"W./C. 6607","Fare":23.45,"Cabin":null,"Embarked":"S"}]
那我們就可以透過上圖中的 embark
將特定的 key 轉成 FlowFiles 的 attributes:
$[0].Embarked
如此一來經過該 Processor 之後的 FlowFiles,都會多帶一個 embark
的 attributes,而 value 就會是 content 對應到的 value,以這個例子就會是 S。
embark_c: ${embark:equals("C")}
embark_q: ${embark:equals("Q")}
embark_s: ${embark:equals("S")}
如此一來,這個 Processor 就會產生 embark_c
, embark_q
和 embark_s
這三個 Connection,因此我們再回來看一次 PG 的圖下方橘框:
就可看到這三個 Connection 以及對接對應的 Port。
input port
。所以當我們在建立連線時,就可以看到如下圖所示的狀況,我們可在右手邊決定要選擇的 input port
。
output port
。一樣從下圖左方,我們可選擇要與下游 Processor 對接的 port 為何,藉此建立出對應的 Connection。
透過上面的描述,我們可以得知 PG 要如何建立,以及一些細節的設定,還有如何與 Data Pipeline 來做一個整合,像是 input
和 output
這些 port 的設定與對接,雖然看起來好像操作很簡單,但這是在 NiFi 中一個非常重要的操作與概念,希望讀者們若有使用的話一定要將此學習起來,這會幫助你更輕鬆地去組織與建構出有 performance、有可讀性的 Data Pipeline。
最後,PG 的介紹就先到這邊一個段落,明天會來介紹 - Controller Service。
您好想詢問關於UpdateRecord Processor 部分NEL問題
${field.Sex:equals('female'):ifElse(0,1)} 這邊是將性別欄位資訊 female 轉成0 male 1
但是從您轉出的結果看起來 這筆原始資料為female 應該要成功轉成0才是
[{"PassengerId":889,"Survived":0,"Pclass":3,"Name":"Johnston, Miss. Catherine Helen "Carrie"","Sex":"1","Age":null,"SibSp":1,"Parch":2,"Ticket":"W./C. 6607","Fare":23.45,"Cabin":null,"Embarked":"S"}]
有參考您出版的書進行實作後 轉出來的資料也是不分性別都轉為1 ,想請教是否這段NEL有問題?謝謝