今天要來介紹如何透過 NiFi 來與 GCP BigQuery 來做資料整合與操作。
在一開始先簡單來為各位介紹,何謂 GCP BigQuery? BigQuery 是 GCP 中一個可以提供大量資料儲存與查詢分析的服務,我們可以透過 SQL 的方式來做一些資料的操作(ex. JOIN, GROUPBY 等)。
簡單來說可以想像成他是個 DataWarehouse,在 BigQuery 中他會以 Column-Based 的方式來做資料的儲存,再加上其對於資料老屈與分析時的效能能夠提供到一個即時的程度,官方稱 Near Realtime。
所以在應用上,我們可想像成無論是 Batch 或 Streaming 的資料,都可以進一步地有效寫入,好讓後續的使用者可以在該服務上面做立即性地查詢。因此在 NiFi 這邊就有提供了兩個相關的 Processors,分別是 PutBigQueryBatch
和 PutBigQueryStreaming
。
再來,我們從架構上來看一下 BigQuery 的資料階層:
每一個 Project 底下會有多個 Datasets,而每一個 Datasets 則會有多個 Tables,我們就可以將這些 Tables 做整合應用與分析。
這邊你可能會思考,為什麼在 NiFi 中 BigQuery 的 Processors 只支援寫入而沒有讀取呢?其實一樣有支援讀取的 Processor,他的設定就跟我們之前在 AWS 撈取 Athena 和 Redshift 的操作一模一樣,就是下載 BigQuery 的 JDBC Driver,然後指定好 Controller Service,就可以透過 SQL 相關的 Processor 做資料的讀取與操作了,所以整體的設定上都是一樣的概念。
如同前面提到的,這邊 NiFi 本身提供了PutBigQueryBatch
和 PutBigQueryStreaming
這兩個 Processor,從命名我們可以得知為一個差別在於 Batch 或 Streaming 的寫入,所以在一些設定上也會有一些差異。
該 Process 就是以 Batch 的方式來寫入 FlowFiles 到 GCP BigQuery,相關設定如下:
其他比較重要的設定是 CSV 相關的參數,如果要使用這個 Processor 的時候,必須確保好你的 FlowFiles 的 Content 為 CSV 格式,可以透過 CSVWriter 來做轉換,所以原則上你在執行該 Processor 之前的 Content 可能會長得以下這樣:
PassengerId,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
892,3,"Kelly, Mr. James",male,34.5,0,0,330911,7.8292,,Q
893,3,"Wilkes, Mrs. James (Ellen Needs)",female,47,1,0,363272,7,,S
894,2,"Myles, Mr. Thomas Francis",male,62,0,0,240276,9.6875,,Q
895,3,"Wirz, Mr. Albert",male,27,0,0,315154,8.6625,,S
896,3,"Hirvonen, Mrs. Alexander (Helga E Lindqvist)",female,22,1,1,3101298,12.2875,,S
897,3,"Svensson, Mr. Johan Cervin",male,14,0,0,7538,9.225,,S
898,3,"Connolly, Miss. Kate",female,30,0,0,330972,7.6292,,Q
899,2,"Caldwell, Mr. Albert Francis",male,26,1,1,248738,29,,S
900,3,"Abrahim, Mrs. Joseph (Sophie Halaut Easu)",female,18,0,0,2657,7.2292,,C
901,3,"Davies, Mr. John Samuel",male,21,2,0,A/4 48871,24.15,,S
902,3,"Ilieff, Mr. Ylio",male,,0,0,349220,7.8958,,S
903,1,"Jones, Mr. Charles Cresson",male,46,0,0,694,26,,S
然而你每一次要 batch 的 Size 取決於你 FlowFiles 的 Content 有多少 record,所以你會發現在 Content 必須是 CSV 的格式下,PutBigQueryBatch
就會有許多關於 CSV 設定的參數要去注意。
這個 Processor 就是針對 Streaming 來做使用的,所以通常用於一個 FlowFiles 只有一筆 Record Content,然而當中的設定較少,但大多數都與 PutBigQueryBatch
大同小異,一樣列在下面給各位做參考:
PutBigQueryBatch
則來得更有彈性。介紹完了 GCP BigQuery 如何設定之後,不難發現他的設定與參數原理也十分單純,只要掌握好設定原則之後,接著與上下游 Processor 做整合之後,你也可以將你要的資料讀取或寫入到 GCP BigQuery。
明天是最後一天介紹 GCP,我會帶大家來理解 GCP PubSub 是如何在 NiFi 做設定與整合,如此一來你也可以透過 GCP 的 Message Queue 系統來與 NiFi 做整合,來形成一個更完整的 Data Pipeline 系統。