iT邦幫忙

2021 iThome 鐵人賽

DAY 16
0
AI & Data

Apache NiFi - 讓你輕鬆設計 Data Pipeline系列 第 16

Day16 NiFi - 與 MongoDB 對接設定

今天要介紹如何用 NiFi 來對 MongoDB 的資料做操作。MongoDB 是我們最熟悉的 Document DB 的類型,他支援的 JSON, XML 等格式,他其實對於一些文檔的資料儲存有一定的好處,也是現今最常用的資料庫之一,所以我認為介紹 MongoDB 如何整合於 NiFi 也是有一定幫助的。那我們就趕快開始吧

Build MongoDB Container

一樣我們要先建立好 Mongodb 的 Container,這樣才能做範例。這裡我也先給各位一個 docker-compose.yaml 的範例:

version: '3'

services:
    nifi:
        image: nifi-sample
        container_name: nifi-service
        restart: always
        ports:
            - 8443:8443/tcp
            - 8080:8080/tcp
        env_file: .env
        environment:
            SINGLE_USER_CREDENTIALS_USERNAME: ${NIFI_USERNAME}
            SINGLE_USER_CREDENTIALS_PASSWORD: ${NIFI_PASSWORD}
            AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
            AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
            AWS_REGION: ${AWS_REGION}
        networks:
            - nifi-network

    nifi-registry:
        image: apache/nifi-registry:1.14.0
        container_name: nifi-registry-service
        restart: always
        ports:
            - 18080:18080/tcp
        networks:
            - nifi-network
    
    mysql:
        image: mongo:4.2-rc-bionic
        container_name: nifi-mongo
        restart: always
        ports:
            - 27017:27017/tcp
        env_file: .env
        environment:
            MONGO_INITDB_ROOT_USERNAME: ${DB_ADMIN_USERNAME}
            MONGO_INITDB_ROOT_PASSWORD: ${DB_ADMIN_PASSWORD}
            MONGO_INITDB_DATABASE: ${DB_TABLE_NAME}
        networks:
            - nifi-network
networks:
    nifi-network:

一樣執行 docker-compose up -d 即可建立完成。

How to use?

相較於前一篇的 RDB 介紹,NiFi 本身就有提供原生的 Processor 可以讓我們對 MongoDB 來做操作,所以整體來說會更加簡單,目前 NiFi 有的 MongoDB Processor 包含如下:

  • GetMongo
  • GetMongoRecord
  • PutMongo
  • PutMongoRecord
  • RunMongoAggregation
  • DeleteMongo

其中 GetMongoRecordPutMongoRecord 這兩個 Processor 主要是要讓寫入或讀取的資料不是以 JSON format 呈現,而是可以改成 CSV 等其他格式。通常除非有特別需求才會改用這兩種 Processor 來對 MongoDB 做讀寫,不然通常還是以 GetMongoPutMongo 來做處理。

GetMongo

這個很單純地就是從 MongoDB 讀取資料的 Processor,我們可以詳細看一下其中幾個重要的設定:

  • Mongo URI:
    這邊就是輸入 Mongo 原生的 URI,官方有提供相關的 URI format,可點該 link,這邊的範例是 mongodb://[username]:[password]@172.17.0.1:27017
  • Mongo Database Name:
    一樣輸入 Database 名稱。
  • Mongo Collection Name:
    一樣輸入 Collection 名稱。

如此一來,當開始執行時,他就會從 Collection 開始逐一把 Record 讀出來成 FlowFiles。

此外,我們可以看到 Properties 中第一個是 Client Service,這邊可以指定 MongoDB 的 Controller Service (MongoDBControllerService),所以我們也可以事先建立好,參考如下圖:


就可以在 MongoDBControllerService 設定 MongoURI,如此一來就可以在所有 MongoDB 相關 Processors 中的 Client Service 指定這個 Controller Service,就可以建立連線了。

PutMongo

該 Processor 就是寫入或更新資料到 MongoDB,所以前提是 FlowFiles 當中必須要有 Content 內容,這樣 NiFi 才會把當中的資料轉成 JSON 且寫入進入,一樣我們來看一下內部的設定:

前面有提到的 Client Service, Mongo Database NameMongo Collection Name 這邊就不再提一次了,我們來看一下其他的設定。

  • Mode:
    目前支援 insertupdate 兩種,可依據你的情境來做選定。
  • Upsert:
    簡單來說是否開啟這功能,若開啟他會自動偵測如果資料不存在就寫入,有存在就更新。
  • Update Query Key:
    用來根據某一個 key 來做更新。
  • Update Query:
    針對全部的 Record 來做更新,沒有依據某一個 Key 的條件。

所以透過 PutMongo,我們就可以很輕鬆地對 MongoDB 的資料做新增與寫入的動作了。

DeleteMongo

該 Processor 就是顧名思義去刪除 MongoDB 的資料,所以理所當然他的 Processor 設定也是一樣的,如下:

其中可以看到 Delete Mode 可以讓我們去決定一次刪除一筆還是多筆,所以可依據我們的場景來做選定。

RunMongoAggregation

該 Processor 主要是用來執行 MongoDB 的 aggregation command,可以參考官方 link,就可以大概知道 aggregation 是如何實作的。

一樣我們來先看一下該 Processor 的設定:

一些基本的設定都跟前面一樣,其中要注意的是 Query 這個參數。
舉例來說,我們原生 MongoDB 會透過像是以下指令來做 aggregation:

db.orders.count( { ord_dt: { $gt: new Date('01/01/2012') } } )

對應到 RunMongoAggregation 則在 Query 這個參數就是填入

{ "ord_dt" : { "$gt": new Date('01/01/2012') } }

就是去除掉 function,將 key 補上雙引號即可,就能做到同樣的效果了。

小總結

今天對於 MongoDB 那邊的操作就大概介紹到這邊,原則上只要把 Processor 設定好,接著在 Start 你的 datapipeline,就會看到對應的 FlowFiles 的狀況,所以我仍先以 Processor 本身需要注意的設定為主,到了系列文的最後面就會有一個情境來讓大家更熟悉整體的操作流程。接下來,明天就會開始介紹到 AWS 相關服務的整合,這部分我覺得是目前實務案例中最重要的,大多數企業都會將服務建置上雲端上,所以我盡可能地來把這塊介紹完整,包含了 AWS, GCP,也再麻煩讀者們好好地追蹤!

Reference


上一篇
Day15 NiFi - 與 RDB 對接設定
下一篇
Day17 NiFi - 與 AWS S3 & AWS lambda 對接設定
系列文
Apache NiFi - 讓你輕鬆設計 Data Pipeline30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言