iT邦幫忙

2021 iThome 鐵人賽

DAY 25
0
AI & Data

Apache NiFi - 讓你輕鬆設計 Data Pipeline系列 第 25

Day25 NiFi - 第三方程式執行

在過往的介紹篇幅當中,我們幾乎都是透過 NiFi 原生的 Processor 來做處理,甚至整合一些外部的 JDBC,但除了這些方法之外,我們也可以透過我們熟悉的程式語言來做輔助,像是 python, java, golang 等來幫我們針對 FlowFiles 來做處理,今天會介紹一下如何在 NiFi 撰寫這些程式語言的設計。

How to integrate program?

NiFi 要如何來撰寫這些程式的 script 呢?主要有兩個 Processor 可以做到,分別是 ExecuteScriptExecuteStreamCommand

ExecuteScript

這個 Processor 通常會有內建好以支援的程式,如下圖:

我們可以在 Script Engine 看到有像是 Groovy, lua, python 等,像我自己是只用過 python 的 Script Engine 來做處理,所以我這邊稍微提一下注意的地方。

當你如果選擇 python 的話,要注意的是這邊的程式撰寫並不是純 python,而是 jython。因為在一開始有介紹到 NiFi 是一個由 java 做為底層的開發語言,所以在這中間的轉換如果需要寫 code 來做處理的話,以 python 為例則需要以 jython 來做為撰寫。

那 jython 其實不難,詳情可以參考該連結,大致上的結構與邏輯與 python 差不多,但由於 jython 是由 java 語言所撰寫的 python 直譯器,所以會需要事先 import java 可能會用到的 lib,才能在 NiFi 順利地執行下去。

然而指定完 Script Engine 之後,倘若你有事先寫好的 script file,那就在 Script File 去指定 Script 的路徑; 如果沒有則在 Script Body 直接輸入要執行的 code 內容,如下圖:

這邊貼一下網路的 jython 的 example code 上來:

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import OutputStreamCallback
 
# Define a subclass of OutputStreamCallback for use in session.write()
class PyOutputStreamCallback(OutputStreamCallback):
  def __init__(self):
        pass
  def process(self, outputStream):
    outputStream.write(bytearray('Hello World!'.encode('utf-8')))
# end class
flowFile = session.get()
if(flowFile != None):
    flowFile = session.write(flowFile, PyOutputStreamCallback())
session.transfer(flowFile, REL_SUCCESS)

簡單來說,就是將流進來該 Processor 的 FlowFiles 之 Content,多了 Hello World! 的內容,接著來流出去給下游 Processor。

所以我們可以發現在 import 的那一段會需要用到 java 底層的 lib 來作為輔助,然而取得 FlowFiles 的方式則需要透過 session.get() 來取得,最後再 transfer 到下游。

ExecuteStreamCommand

這時候你可能會覺得,但我就是想要用原生的 python 來撰寫跟輔助,請問 NiFi 可以怎麼用呢?這時候就可以透過 ExecuteStreamCommand 這個 Processor 來做使用。

首先,我們先來看一下一個大致上的設定:

  • Command Path: 要執行的程式路徑,ex. /usr/lib/python3
  • Command Arguments: 要帶入到 script 的參數
  • Argument Delimiter:參數的分隔符號,上圖採用空白鍵
  • Output Destination Attribute:傳出來的結果要存到 FlowFiles 的哪一個 key

這邊我們就可以事先寫好真的原生的 python script,這部分就不需要寫用 jython 來做處理,只要你在 Command Path 指定好 python 的執行路徑,你就可以執行 python script。

其中比較特別的是 Output Destination Attribute,假設你在你的 python script 是以下的內容:

import os
import argparse

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--arg1', default=5, type=int)
    parser.add_argument('--arg2', default=1, type=int)
    args = parser.parse_args()
    
    count = args.arg1 + args.arg2
    
    print(count) ### it will be output as attribute
    
if __name__ == '__main__':
    main()

從這段 python code,我們可以看到就是一個很簡單的傳入兩個 args,接著 print 出加總的結果。而假如我們在 Output Destination Attribute 設定為 output_res,則該 Processor 就會把 print 的結果設定為 output_res 這個 key 的 value,也就是說經過該 Processor 的所有 FlowFiles 都會多帶有一個名為 output_res 的 attribute,其 value 就是在 script print 的資訊內容。

所以這個 Processor 的用法要留意一下,通常我的經驗是用在我可能有一些複雜的邏輯是需要透過 python 來做處理,且處理後的結果只有一個,所以我在整個 script 內只會有一個地方有 print 的 code,我再把那個 print 的 value 導向到我設定的 attribute name,如此一來我下游的 Processor 就可以採用計算處理完的 value 去做接下來的 task。

小總結

這兩個 Processor 是目前在 NiFi 中可以透過第三方的程式語言來做輔助的 Processor,我們會依照使用場境來選擇適當的處理方式,例如如果你是需要用到 FlowFiles 本身的 Attributes 或 Content 來做應用的話,通常會比較建議採用 ExecuteScript 的 Processor,因為就可以透過 session.get() 來取得。

但如果只是很單純額外增加 attribute 或是更新的話,其實透過 ExecuteStreamCommand 來處理就好了,因為我們可以對 attribute 來做操作。

以上是我對於這兩個 Processor 的操作經驗,或許有更好的用法也說不定,只是真的會用到的機會較少,但這邊我也一樣分享給大家這樣的做法,倘若讀者們有新的發現,也歡迎來一起討論交流喔。

Reference


上一篇
Day24 NiFi 延伸應用 - Slack & Email
下一篇
Day26 NiFi 場景應用範例 (一)
系列文
Apache NiFi - 讓你輕鬆設計 Data Pipeline30

尚未有邦友留言

立即登入留言