今天要介紹如何用 NiFi 來對 MongoDB 的資料做操作。MongoDB 是我們最熟悉的 Document DB 的類型,他支援的 JSON, XML 等格式,他其實對於一些文檔的資料儲存有一定的好處,也是現今最常用的資料庫之一,所以我認為介紹 MongoDB 如何整合於 NiFi 也是有一定幫助的。那我們就趕快開始吧
一樣我們要先建立好 Mongodb 的 Container,這樣才能做範例。這裡我也先給各位一個 docker-compose.yaml
的範例:
version: '3'
services:
nifi:
image: nifi-sample
container_name: nifi-service
restart: always
ports:
- 8443:8443/tcp
- 8080:8080/tcp
env_file: .env
environment:
SINGLE_USER_CREDENTIALS_USERNAME: ${NIFI_USERNAME}
SINGLE_USER_CREDENTIALS_PASSWORD: ${NIFI_PASSWORD}
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
AWS_REGION: ${AWS_REGION}
networks:
- nifi-network
nifi-registry:
image: apache/nifi-registry:1.14.0
container_name: nifi-registry-service
restart: always
ports:
- 18080:18080/tcp
networks:
- nifi-network
mysql:
image: mongo:4.2-rc-bionic
container_name: nifi-mongo
restart: always
ports:
- 27017:27017/tcp
env_file: .env
environment:
MONGO_INITDB_ROOT_USERNAME: ${DB_ADMIN_USERNAME}
MONGO_INITDB_ROOT_PASSWORD: ${DB_ADMIN_PASSWORD}
MONGO_INITDB_DATABASE: ${DB_TABLE_NAME}
networks:
- nifi-network
networks:
nifi-network:
一樣執行 docker-compose up -d
即可建立完成。
相較於前一篇的 RDB 介紹,NiFi 本身就有提供原生的 Processor 可以讓我們對 MongoDB 來做操作,所以整體來說會更加簡單,目前 NiFi 有的 MongoDB Processor 包含如下:
其中 GetMongoRecord
和 PutMongoRecord
這兩個 Processor 主要是要讓寫入或讀取的資料不是以 JSON format 呈現,而是可以改成 CSV 等其他格式。通常除非有特別需求才會改用這兩種 Processor 來對 MongoDB 做讀寫,不然通常還是以 GetMongo
和 PutMongo
來做處理。
這個很單純地就是從 MongoDB 讀取資料的 Processor,我們可以詳細看一下其中幾個重要的設定:
mongodb://[username]:[password]@172.17.0.1:27017
如此一來,當開始執行時,他就會從 Collection 開始逐一把 Record 讀出來成 FlowFiles。
此外,我們可以看到 Properties 中第一個是 Client Service
,這邊可以指定 MongoDB 的 Controller Service (MongoDBControllerService
),所以我們也可以事先建立好,參考如下圖:
就可以在 MongoDBControllerService
設定 MongoURI,如此一來就可以在所有 MongoDB 相關 Processors 中的 Client Service
指定這個 Controller Service,就可以建立連線了。
該 Processor 就是寫入或更新資料到 MongoDB,所以前提是 FlowFiles 當中必須要有 Content 內容,這樣 NiFi 才會把當中的資料轉成 JSON 且寫入進入,一樣我們來看一下內部的設定:
前面有提到的 Client Service
, Mongo Database Name
和 Mongo Collection Name
這邊就不再提一次了,我們來看一下其他的設定。
insert
和 update
兩種,可依據你的情境來做選定。所以透過 PutMongo
,我們就可以很輕鬆地對 MongoDB 的資料做新增與寫入的動作了。
該 Processor 就是顧名思義去刪除 MongoDB 的資料,所以理所當然他的 Processor 設定也是一樣的,如下:
其中可以看到 Delete Mode
可以讓我們去決定一次刪除一筆還是多筆,所以可依據我們的場景來做選定。
該 Processor 主要是用來執行 MongoDB 的 aggregation command,可以參考官方 link,就可以大概知道 aggregation 是如何實作的。
一樣我們來先看一下該 Processor 的設定:
一些基本的設定都跟前面一樣,其中要注意的是 Query
這個參數。
舉例來說,我們原生 MongoDB 會透過像是以下指令來做 aggregation:
db.orders.count( { ord_dt: { $gt: new Date('01/01/2012') } } )
對應到 RunMongoAggregation
則在 Query 這個參數就是填入
{ "ord_dt" : { "$gt": new Date('01/01/2012') } }
就是去除掉 function,將 key 補上雙引號即可,就能做到同樣的效果了。
今天對於 MongoDB 那邊的操作就大概介紹到這邊,原則上只要把 Processor 設定好,接著在 Start 你的 datapipeline,就會看到對應的 FlowFiles 的狀況,所以我仍先以 Processor 本身需要注意的設定為主,到了系列文的最後面就會有一個情境來讓大家更熟悉整體的操作流程。接下來,明天就會開始介紹到 AWS 相關服務的整合,這部分我覺得是目前實務案例中最重要的,大多數企業都會將服務建置上雲端上,所以我盡可能地來把這塊介紹完整,包含了 AWS, GCP,也再麻煩讀者們好好地追蹤!