在過往的介紹篇幅當中,我們幾乎都是透過 NiFi 原生的 Processor 來做處理,甚至整合一些外部的 JDBC,但除了這些方法之外,我們也可以透過我們熟悉的程式語言來做輔助,像是 python, java, golang 等來幫我們針對 FlowFiles 來做處理,今天會介紹一下如何在 NiFi 撰寫這些程式語言的設計。
NiFi 要如何來撰寫這些程式的 script 呢?主要有兩個 Processor 可以做到,分別是 ExecuteScript
和 ExecuteStreamCommand
。
這個 Processor 通常會有內建好以支援的程式,如下圖:
我們可以在 Script Engine 看到有像是 Groovy, lua, python 等,像我自己是只用過 python 的 Script Engine 來做處理,所以我這邊稍微提一下注意的地方。
當你如果選擇 python 的話,要注意的是這邊的程式撰寫並不是純 python,而是 jython。因為在一開始有介紹到 NiFi 是一個由 java 做為底層的開發語言,所以在這中間的轉換如果需要寫 code 來做處理的話,以 python 為例則需要以 jython 來做為撰寫。
那 jython 其實不難,詳情可以參考該連結,大致上的結構與邏輯與 python 差不多,但由於 jython 是由 java 語言所撰寫的 python 直譯器,所以會需要事先 import java 可能會用到的 lib,才能在 NiFi 順利地執行下去。
然而指定完 Script Engine 之後,倘若你有事先寫好的 script file,那就在 Script File 去指定 Script 的路徑; 如果沒有則在 Script Body 直接輸入要執行的 code 內容,如下圖:
這邊貼一下網路的 jython 的 example code 上來:
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import OutputStreamCallback
# Define a subclass of OutputStreamCallback for use in session.write()
class PyOutputStreamCallback(OutputStreamCallback):
def __init__(self):
pass
def process(self, outputStream):
outputStream.write(bytearray('Hello World!'.encode('utf-8')))
# end class
flowFile = session.get()
if(flowFile != None):
flowFile = session.write(flowFile, PyOutputStreamCallback())
session.transfer(flowFile, REL_SUCCESS)
簡單來說,就是將流進來該 Processor 的 FlowFiles 之 Content,多了 Hello World!
的內容,接著來流出去給下游 Processor。
所以我們可以發現在 import 的那一段會需要用到 java 底層的 lib 來作為輔助,然而取得 FlowFiles 的方式則需要透過 session.get()
來取得,最後再 transfer 到下游。
這時候你可能會覺得,但我就是想要用原生的 python 來撰寫跟輔助,請問 NiFi 可以怎麼用呢?這時候就可以透過 ExecuteStreamCommand 這個 Processor 來做使用。
首先,我們先來看一下一個大致上的設定:
這邊我們就可以事先寫好真的原生的 python script,這部分就不需要寫用 jython 來做處理,只要你在 Command Path 指定好 python 的執行路徑,你就可以執行 python script。
其中比較特別的是 Output Destination Attribute,假設你在你的 python script 是以下的內容:
import os
import argparse
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--arg1', default=5, type=int)
parser.add_argument('--arg2', default=1, type=int)
args = parser.parse_args()
count = args.arg1 + args.arg2
print(count) ### it will be output as attribute
if __name__ == '__main__':
main()
從這段 python code,我們可以看到就是一個很簡單的傳入兩個 args,接著 print 出加總的結果。而假如我們在 Output Destination Attribute 設定為 output_res
,則該 Processor 就會把 print 的結果設定為 output_res
這個 key 的 value,也就是說經過該 Processor 的所有 FlowFiles 都會多帶有一個名為 output_res
的 attribute,其 value 就是在 script print
的資訊內容。
所以這個 Processor 的用法要留意一下,通常我的經驗是用在我可能有一些複雜的邏輯是需要透過 python 來做處理,且處理後的結果只有一個,所以我在整個 script 內只會有一個地方有 print 的 code,我再把那個 print 的 value 導向到我設定的 attribute name,如此一來我下游的 Processor 就可以採用計算處理完的 value 去做接下來的 task。
這兩個 Processor 是目前在 NiFi 中可以透過第三方的程式語言來做輔助的 Processor,我們會依照使用場境來選擇適當的處理方式,例如如果你是需要用到 FlowFiles 本身的 Attributes 或 Content 來做應用的話,通常會比較建議採用 ExecuteScript
的 Processor,因為就可以透過 session.get()
來取得。
但如果只是很單純額外增加 attribute 或是更新的話,其實透過 ExecuteStreamCommand
來處理就好了,因為我們可以對 attribute 來做操作。
以上是我對於這兩個 Processor 的操作經驗,或許有更好的用法也說不定,只是真的會用到的機會較少,但這邊我也一樣分享給大家這樣的做法,倘若讀者們有新的發現,也歡迎來一起討論交流喔。