iT邦幫忙

2021 iThome 鐵人賽

DAY 6
0
AI & Data

Apache NiFi - 讓你輕鬆設計 Data Pipeline系列 第 6

Day6 NiFi - Processors

前面我們已經介紹完 FlowFiles 了,接下來就是可以一步一步地去建置我們的 Data Pipeline,今天的主角是『Processors』。

Processors Category

在 Day2 的時候,就有提到 Processor 這個名詞,它同時也是撐起在 NiFi 的 Data Pipeline 的重要 componenet 之一。根據官方文件的左手邊可以看到『Processors』底下的種類非常非常多,如果這次的系列文30天都全部介紹的話,還不一定講得完。

所以這裡我先幫各位讀者們去整理一份歸類,先有一個大概的認識:

Processor Categories Intorduction Example
Data Ingest Processors 主要就是用來取得資料的 Processors,可能透過 files, ftp, db 等方式 GenerateFlowFiles, ListFiles, GetFile, GetKafka, GetMongo, GetTwitter...
Data Transform Processors 主要就是在做一些 transform, replact, update等操作的 Processors ReplaceText, ConvertRecord, UpdateRecord, ConvertJsonToSQL...
Data Egress & Sending Data Processors 主要 ouput 或 傳送資料到下一個 Destination 的 Processors PutSlack, PutEmail, PutFile, PutFTP, PutKafka..
Routing & Mediation Processors 以控制 FlowFiles 該往哪一個 connection 流向的 Processors RouteOnAttribute, RouteOnContent...
DB Access Processors 以 Access DB 操作的 Processors(主要偏向可SQL指令的DB) PutSQL, ExecuteSQL, ListDatabaseTables, QueryDatabaseTable...
Attribute Extraction Processors 以 FlowFiles 的 attributes 操作的 Processors EvaluateJsonPath, EvaluateXPath, ExtractText, UpdateAttribute, LogAttribute...
Execute Programming Processors 執行第三方程式(ex. python, go, etc.) 操作的 Processors ExecuteProcess, ExecuteStreamCommand...
Splitting & Aggregation Processors 切分資料或計算處理的 Processors SplitText, SplitJson, SplitRecord, MergeContent....
HTTP & UDP Processors 透過 HTTP 或 UDP 做資料操作的 Processors,通常用於 API 呼叫居多 GetHTTP, ListentHTTP, PostHTTP, PutHTTP, PutUDP...
AWS Processors AWS 相關操作的 Processors ListS3, FetchS3Object, PutS3Object, PutSNS, GetSNS, PutSQS, PutDynamoDB, PutLambda...
GCP Processors GCP 相關操作的 Processors ListGCSBucket, PutGCSObject, FetchGCSObject, DeleteGCSObject, ConsumeGCPPubSub, PublishGCPPubSub, PutBigQueryBatch, PutBigQueryStreaming...

我想各位讀者看到這些應該就已經頭昏眼花了,更何況在這次的系列文當中更不可能全部提到,所以我又再簡化了一些重點,讀者們就可以透過我的重點即可推敲出大多出 Processor 的用途了:

  1. GetXXX
    通常代表著『取得』資料轉成 FlowFiles的意思,ex. GetFile,從 Local file 取得資料; GetMongo 就是從 MongoDB 取得資料。

  2. PutXXX:
    通常代表著『寫入』、『傳送』的意思,ex. PusSlack 傳送資訊到 slack; PutSQS 將資料傳送到 AWS SQS; PutKafka 將資料傳送到 Kafka。

  3. FetchXXX:
    通常代表『取得資料內容』,這通常較常出現在 File 類型,ex. FetchFile, FetchS3Object, FetchGCSObject。你可能會問這和『GetXXX』有什麼差異呢? 以 GetFile 是用來取得 File 的 size, permission, last modified等資訊,而 FetchFile 而是真實將檔案內容以 一個row 的方式轉成一個 FlowFiles,但只有 File 有這樣的差別。

  4. ListXXX:
    通常會列出我們指定的 Folder 底下有哪些 Files,並將 Path 回傳出來, ex. ListFiles 回傳本機端的某一個 Folder 底下全部檔案; ListS3 則回傳某一個 AWS S3 Bucket 下的 folder下的所有檔案等。

  5. DeleteXXX:
    就是很簡單的刪除資料,ex. DeleteGCSObject 就是刪除 GCS 的檔案; DeleteSQS 就是刪除 SQS等。

  6. RouteOnXXX:
    用來決定 Flowfiles 留下的條件,ex. RouteOnAttributes 代表依據 Flowfiles 某一個 attribute 的 value 來決定接下來要走哪一個 Connection。

  7. QueryXXX:
    就是對 DB 做資料查詢, ex. QueryCassandra 對 Cassandra 做查詢; QueryDatabaseTable 對 DB 做查詢,但需要搭配 JDBC Driver,如果要對 MySQL 就要指定 MySQL Driver。

  8. ExecuteXXX:
    執行 DB Query 或執行 Programing, ex. ExecuteSQL 對 DB 執行 SQL 指令; ExecuteStreamCommand 可用來執行第三方的程式語言等

你會發現,其實在 NiFi 的 Processor 命名其實還蠻單純的,通常就是一個『動作』搭配一個『目的』,所以只要掌握這些原則,其實大多數的 Processor 都能大概知道他的用途,剩下例外的就在查 Document 就可以了。

那接下來就來了解一下如何操作 Processor 吧!

How to use Processors?

如何加入 Processors


從上面的 gif 可以看到我們是如何將 Processor 拉到主畫面的操作,這裡我也用截圖的方式來呈現一次

  1. 在畫面最左方可以看到 『Processor』的圖示,拖拉到主畫面

  2. 接著就會呈現所有 Processor 的視窗,根據你的選擇點選你要的 Processor,且點下 Add 就會有你選擇的 Processor 了

透過以上簡單兩的步驟,就可加入我們要的 Processor,但其實 Processor 還是有很多細節,我們繼續看下去

Processors Config 細節

我們可以在 Processor 點擊兩下就可以看到更細節的設定,就會呈現如下畫面:

  1. SETTINGS

    這個主要會包含幾個部分,Processor 命名、Processor Type等,其中最重要的是 Automatically Terminate Relationships,這部分就會得知出該 Processor 可與下游 Processor 建立哪些 Connection,但並不是所有 Connection 都一定要建立,你可以在這邊的設定勾選起來,就代表 terminate,後續 NiFi 也就不會要求你一定要建立這個 Connection 了

  2. SCHEDULE

    這裡就是要設定 Schedule 的排程,在 NiFi 有分成兩種方式:

  • Timer Driven
    搭配 Run Schedule,直接指定單位,ex. 1 hour, 10 minute 就分別代表每一小時、每10分鐘 triggerㄡ
  • Crontab Driven
    搭配 Run Schedule,像 linux 一樣的去設定排成時間,ex. 0 0 13 * * ? 代表安排在每天下午1:00運行。

除了以上設定之外,還有以下的設定可以注意:

  • Concurrent Tasks
    可以控制 Processor 將使用的 Concurrent 數量。簡單來說,用來決定 Processor 可同時處理多少個 FlowFiles,增加該設定能使 Processor 同時 處理更多數據。
  1. PROPERTIES

    接下來這個部分應該是 Processor 最重要且最複雜的地方,也就是 Processor 的設定,每一個不同 Type 的 Processor 會有不同的 properties 要去做設定,其中黑色粗體的就是必要設定的 key,有些 Processor 的 properties key 很多很複雜,所以通常就要搭配著 Document 來去做確認跟設定對的 value。未來我這邊套用場境時會挑選簡單的 Processor 來讓大家熟悉,隨著操作熟悉之後,其實在設定上也會變得平易近人。

  2. COMMENT

    這部分就是來針對該 Processor 做一些簡單的說明和註解,幫助你的團隊成員或是後續你來檢視的時候,就能知道當初為什麼要採用這個 Processor 的原因和目的。

小總結

我想經過今天的簡介與簡單的操作,想必讀者們對於如何使用 Processor 有基本的認識了吧!還沒很熟悉的人沒關係,不要急慢慢來。我相信你一定是需要套用真實的場景和案例,你就會比較知道如何上手了,再讓我們等一下,待我在花個幾天把全部 Componenet 操作個別都介紹完之後,我們就可以全部串在一起做使用了,到時候相信你一定可以使用得很得心應手!

Reference


上一篇
Day5 NiFi - FlowFiles
下一篇
Day7 NiFi - Connection
系列文
Apache NiFi - 讓你輕鬆設計 Data Pipeline30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言