iT邦幫忙

2021 iThome 鐵人賽

DAY 8
0
AI & Data

Apache NiFi - 讓你輕鬆設計 Data Pipeline系列 第 8

Day8 NiFi - Processor Group

前面已經講完 Processor 和 Connection 兩個重要的 Componenet,我們就可以透過這兩個去建立基本的 Data Pipeline。但有時候我們會建立很多 Data Pipeline 在 NiFi,有些流程甚至是重疊的,這樣會覺得不太符合效益,所以就要透過今天的主角 - Processor Group 來做處理。

What is the Processor Group?

當 Date Pipeline 複雜且多樣化時,有些小流程可能會有重複的建置,此時可以透過 Processor Group(接下來簡稱 PG) 來做處理,簡單來說就是變成 Module,然後整合到 Data Pipeline 來做一個重複使用的功能。

PG 除了有作為 Module 的功能之外,他也有幾個額外重點,其實之前都有提到他,這邊再稍微列一下給各位:

  • 可以用來做 Team 或 Project 的劃分,加強組織管理與權限控管。
  • NiFi Registry 版本控管的最小單位。

如何操作?


從上面的 gif 檔可以看到如何建立 PG,而當我們移動到 PG 內部時,可以注意到左下角也會改變成我們目前所屬的階層(很像 Folder 的概念)。

場景應用

這邊我來帶一個場景應用,一樣拿kaggle titanic 來做一個應用,這裡就做基本一些簡單的前處理就好,這裡示範的前處理步驟如下:

1. 將 Sex 欄位轉換成數值
2. 依據 Embarked 欄位分成 3 個 connection 出來

大致上的 Pipeline 會長如下圖:

其中中間的 PG (titanic_preprocess) 內部會長如下圖:

PG 內部

接下來先針對 PG 內部來做講解:

  1. 需要設定 input, output port
    如果是要將 PG 視為 Module 做使用且成為 Pipeline 其中的一個流程,則必須在設計 PG 時,需要指定 inputoutput 這兩個 port,如此一來才有辦法將 FlowFiles 從上游流入,且再從下游流出。

所以如同紅框所示即可知道如何加入 inputoutput

而在 PG 內可以同時存在著多的 inputoutput,再由外部建立 connection 時來決定要將 FlowFiles 流入到 PG 的哪一個 port。

  1. 加入 SplitRecord Processor

    這個 Processor 是用來將內容做 split,我們可以去指定 split 的行數,其中參數如下:
  • Record Reader
    要用哪一種 format 來讀檔案內容,因為檔案為 csv,所以這邊選擇 CSVReader。
  • Record Writer
    要以哪一種方式寫入成 FlowFiles 格式,這邊一樣選擇 CSVRecordWriter。
  • Records Per Split
    要以 n 筆作為 split 單位,如果設定為 1,則就會1筆1筆的將資料轉成 FlowFiles。

其中CSVReaderCSVRecordWriter 是屬於 Controller Service 的一種,所以當選完指定類型時,還要點選右手邊的箭頭,此時會看到如下畫面:

接著按下 閃電 的符號,就能正式啟用,為何要這樣做呢?詳細原因在下一節 Controller Service 會再介紹到。

  1. 加入 UpdateRecord Processor

    這個 Processor 就是用來更新某一個欄位底下的 Value,如同我們這次的場景希望將 Sex 這個欄位原本的 female 轉成 0, male 轉成 1。

一樣當中有幾個參數要去做設定:

  • Record Reader
    要用哪一種 format 來讀檔案內容,因為檔案為 csv,所以這邊選擇 CSVReader。
  • Record Writer
    要以哪一種方式寫入成 FlowFiles 格式,這邊則選擇 JsonRecordWriter,因為我們等等要將一些欄位轉成 Attribute。

這裡會看到我額外加入了一個 /Sex 這個 property,其中他的value 是:

${field.Sex:equals('female'):ifElse(0,1)}

這個語法是 NiFi 自己的語法,叫做 NiFi Expression Language,是可以幫助我們針對 Flowfiles 的 attributes 和 content 作處理的。
詳細的寫法也會在後續介紹,這裡簡單的意思是我們想要取得 Sex 的欄位,且若底下有 female 這個 value 時轉成 0,否則轉成 1,接著再寫回 Sex 欄位作替代,所以 key 才會是 /Sex(斜線是這個 Processor 的規定,這樣才有辦法找到對應的欄位)。

  1. 加入 EvaluateJsonPath Processor

    在前一個 UpdateRecord Processor,已將 FlowFiles 轉成 Json 格式,而這邊的 EvaluateJsonPath 就是可以幫我們將 content 的 key 轉成 attributes。舉例來說,假如有一個如下的 content:
[{"PassengerId":889,"Survived":0,"Pclass":3,"Name":"Johnston, Miss. Catherine Helen \"Carrie\"","Sex":"1","Age":null,"SibSp":1,"Parch":2,"Ticket":"W./C. 6607","Fare":23.45,"Cabin":null,"Embarked":"S"}]

那我們就可以透過上圖中的 embark 將特定的 key 轉成 FlowFiles 的 attributes:

$[0].Embarked 

如此一來經過該 Processor 之後的 FlowFiles,都會多帶一個 embark 的 attributes,而 value 就會是 content 對應到的 value,以這個例子就會是 S。

  1. 加入 RouteOnAttributes Processor

    這個 Processor 是可以幫由我們根據 Attribute 的 value 來自定義後續的 Connection,所以可以看到我的設定:
embark_c: ${embark:equals("C")}
embark_q: ${embark:equals("Q")}
embark_s: ${embark:equals("S")}

如此一來,這個 Processor 就會產生 embark_c, embark_qembark_s 這三個 Connection,因此我們再回來看一次 PG 的圖下方橘框:

就可看到這三個 Connection 以及對接對應的 Port。

對接 PG

  • 與 PG 的 input 建立 Connection

    根據上圖紅框,我們可以看到與 PG 建立 Connection 的結果,而通常與上游 Processor 對接對應到的都會是我們在 PG 內設定好的 input port

所以當我們在建立連線時,就可以看到如下圖所示的狀況,我們可在右手邊決定要選擇的 input port

  • 與 PG 的 output 建立 Connection

    根據上圖的紅框,則可看到 PG 與下游的 Processor 建立 Connection 的結果,而這個建立會對應到我們在 PG 內建立好的 output port

一樣從下圖左方,我們可選擇要與下游 Processor 對接的 port 為何,藉此建立出對應的 Connection。

小總結

透過上面的描述,我們可以得知 PG 要如何建立,以及一些細節的設定,還有如何與 Data Pipeline 來做一個整合,像是 inputoutput 這些 port 的設定與對接,雖然看起來好像操作很簡單,但這是在 NiFi 中一個非常重要的操作與概念,希望讀者們若有使用的話一定要將此學習起來,這會幫助你更輕鬆地去組織與建構出有 performance、有可讀性的 Data Pipeline。

最後,PG 的介紹就先到這邊一個段落,明天會來介紹 - Controller Service。

Reference


上一篇
Day7 NiFi - Connection
下一篇
Day9 NiFi - Controller Service
系列文
Apache NiFi - 讓你輕鬆設計 Data Pipeline30

尚未有邦友留言

立即登入留言