iT邦幫忙

0

設計Google Cloud的Data pipeline

  • 分享至 

  • xImage
  •  

這一篇我們會從設計模式的角度來看Data pipeline,外加一些其他不一樣的可供選擇的設計模式。同時我們也會介紹GCP有關於data pipeline服務,像是Dataflow, DataProc, Pub/Sub, Composer如何運用在Data pipeline。最後我們會稍微介紹如何將你地端機房的Hadoop cluster轉移到GCP上。

Data Pipeline 概觀
這是一個抽象的概念,是資料處理的連續過程從一個資料加工階段跳到下一個階段。Data pipeline 是基於directed acyclic graphs(DAGs-有向無環圖)被建模出來的。

一個graphs是指一群nodes它們之間的邊緣之間連結起來。directed graphs 是指在兩邊的邊緣之間有flow在流動,從一邊流動到另一邊。下圖展示三個node它們的邊緣之間有資料的流動。


圖一
但有時候graphs向上圖一樣一個點跳到下一個點(如下圖),有時也會跳回(loop)前一個點甚至loop回到自己身上。這種loop回到前一個邊緣(edge)就是我們所知道cyclic graphs(有循環的),而loop 就是cyclic.


圖二
然而在cyclic(也就是loop)在data pipeline是不被允許的。基於這個data pipeline的graphic就是有向無環圖。

上面的學術說法如果看不懂,我們可以就字面上來解釋 "有向無環圖"

有向:資料會順著同一個方向一直往後進行

無環:因為資料處理是一直往後的,所以它部會有循環

所以圖一才會是我們做data pipeline的標準做法

Data Pipeline各階段
在data pipeline DAG的pipeline 中的node呈現的是每個資料處理階段,哪edge就是呈現的就是node之間的資料流動。以下是在data pipeline 中四種型態的資料處理階段

  • Ingestion
  • Transformation
  • storage
  • Analysis

在整個data pipeline中每一個資料處理階段有時可能需要多個node才能處理完這個階段的工作。例如如果我們有五個data source 需要將資料送到data warehouse,這時我們在data ingestion就會有5個node。也不是每種data pipeline都需要完成上面四種階段的工作。例如我們有一個pipeline要ingest audit log資料,執行transformation然後將它以檔案型式放到Cloud storage但沒有要分析它。這些log資料可能永遠不會被使用(因為沒有稽核需求),但它會因為法規需求而要儲存它。資料放進clous storage之後就不會再次的執行其他的transformationc或其他形式的處理。不過還是會有一些極端的案例,有些的資料處理階段可能會是多於上所講的四個階段。

Ingestion
將資料倒進GCP中我們有streaming and batch mode兩種(如下圖範例)。

使用batch mode的方式我們通常是將一個或數個以上的檔案先copy到 Cloud storage中。通常有幾種方式將資料copye過去,你可以使用clous stroage專屬的command “gsutil” 或Transfer service 跟Transfer Appliance.

Streaming batch是只持續收到不斷新增的資料,通常會是single record或是 small batch record. Cloud Pub/Sub的Topic適合這樣的作業形式。

Transformation
這是是指將來源端的資料結構轉換成適合在data pipeline 過程中適合儲存與分析這一階段的資料結構。以下是一些transformation的類型(如下圖範例):

  • Converting data type, 例如將資料從文字現呈現轉換成日期格式來呈現。
  • 填補原始資料中缺少的資料值,例如用預設值或推估值。
  • Aggregating data, 例如將一台Server每五分鐘的Memory的使用率做平均計算。
  • Filtering record, 例如不符合業務邏輯規則,像是我們的交易資料出現了未來日期的record.
  • 資料擴增效應,我們join兩個不同的資料來源產生之後產生的效果。像是我們將員工資料的table跟銷售資料的table join再一起之後就知道誰的業績最好。
  • 去除在資料集中我們不需要的column or attributes
  • 加入(column or attributes)我們從輸入資料中衍生出來的data,例如我們要將公司某項商品的庫存每三個月做一次平均計算,並寫入成另一筆新的record。

    GCP的Cloud Dataflow and Cloud Dataproc 經常使用在資料處理的transformation 階段(support batch and stream mode)。Cloud Dataprep 是使用在interactive view與資料分析的準備階段。

Storage
資料從Ingestion與transformation大都會經過儲存階段,有關詳細描述可參考建置與管理Google Cloud的儲存服務。

Analysis
這一階段有好幾種方式來實現,從使用簡單的SQL語法把report產生出來到使用Machine learning model training跟Data science analysis。

以Bigquery舉例,它一開是用SQL語法來分析資料。後來加上了一樣使用SQL語法來建立Machine Learning model的功能。

Data Studio是GCP的互動式reporting tool,功能是building reports與exploring data(一種有結構的,像是dimensional model). Cloud datalab,一種基於open source(Jupyter Notebook)的互動式的workbook。它是被使用在data exploration, Machine learning, data science, 與Virtualization.

大規模的Machine learning當要去access 支援Hbase interface的bigtable時可以使用有支援Spark machine learning libraries 的Dataproc服務。

以上介紹的在data pipeline 四種基本階段的pattern 可以發展出根據不同的特徵而有不同種類的data pipeline。

Data Pipeline的種類
data pipeline的結構跟功能會因為根據你的使用場景而有不同的的型態。但一般來說會有以下以三種基本型態

  • Data warehouse pipeline
  • Stream processing pipeline
  • Machine learning pipeline

Data warehouse pipeline
將多個資料來源集中到一處,而這種資料的結構通常都是Dimensioanl data model。這一類的Dimensional data的資料都是去正規化的,意思是他們跟RDBMS不一樣是不會遵守正規化的規則。而這種非正規化是需要你將資料從RDBMS轉過來時要刻意去做的,因為Data warehousing 是用來進行有效率的分析,如果沒有這麼做你的資源就會浪費在複雜的join語法與大量的DIsk I/O 操作上。你越把資料全部都集中在一個table上你所需要的join就越少。

從RDBMS系統中收集與重新結構資料通常需要多個步驟來完成。以下為一些data warehouse data pileline會用到common patterns :

  • Extraction, Transformation, Load(ETL)
  • Extraction, Load, Transformation(ELT)
  • Extraction and Load
  • Change data capture

以上這些通常都是batch processing pipeline, 但它們有一些streaming pipeline的特徵特別是Change data capture.

Extraction, Transformation and Load
這個流程是先將資料從一個或多個data source將資料提取出來。當有多個data source同時被提取時這一個流程中的所有動作都需要被協調。這是因為通常這類的資料提取動作都是time based的,所以重要的在於將資料提取出來時是要相同時間區間。例如,提取的流程可能是一個鐘頭跑一次而在下一個鐘頭提取資料前我們可能要將資料做完insert or modify的作業才能順利的進行下一次(下一個小時)的作業。

我們來看一種"貨品庫存"類型的Data warehousing的例子。某集團底下的所有子公司的DB都有產品(Product DB)與庫存數量(Inventory DB)的資料,而這個Data warehousing會提取所有子公司的這一類資料。產品會被定義成SKU(stock keeping unit)在Product DB。 Product DB會維護產品有關的所有詳細資料,向是產品描述,供應商,每個單位價格等。這個data warehouse會從這兩個DB提取資料 :

Product DB — 產品描述資訊

Inventory DB — Inventory資訊

假設有新的產品被加入了Product DB並有庫存資料在Inventory DB時, data warehouse就需要從這兩個DB 來update這一項資訊; 不過有可能會也有可能在取Inventory DB時會應對不到Product DB的描述性資料。

在ETL的pipeline中,資料從提取到落地到另一個DB中會經過transformation. 在過去 data warehousing的管理者會使用客製化的scripts或特定的工具來做這一類transformation的作業。不過這一類的作業都需要transformation code已經存在於scripts裡或當需要進行資料分析時,管理者需要有一些開發經驗才有辦法進行此類的工作。特定的工具可能還需要管理者去學習如何使用才能將SQL語法套進這個工具。

在GCP服務中transformation這個功能我們可以使用Cloud dataproc或Cloud dataflow. 使用Dataproc時我們寫的code可以是Spark or Hadoop support的語言。Spark使用的是 In-memory distributed data model來運作與分析資料。Spark program可以用Java, Scala, Python, R 跟SQL來完成。當使用Cloud dataproc時資料的transformation是基於Hadoop map reduce model或Spark 的distributed tabular data structure. 此外當你使用JAVA時,Hadoop提供了 一種稱為 pig的高階語言來運作資料。pig program compile到map reduce program中。

而在使用Cloud dataflow時,transformation 使用的是一種Apache Beam model的方式。它提供unified batch 跟streaming process model. Apache Beam被建模成pipeline 與有著明確support哪種pipeline構造,包含以下三個:

Pipelines : 執行資料操作的end-to-end 資料處理任務的封裝(encapsulation)。

Pcollection: A distributed dataset

PTransformation: 對資料的操作,像是grouping by , flattening, 跟data的partition.

Apache Beam program是用JAVA跟Python寫出來的。

當要寫資料到DB時,Cloud dataflow使用connector 連結到以下GCP的DB服務,像是Bigtable, Spanner, BigQuery.

Cloud dataproc中的ETL程序適合用來將你自己own的Hadoop or Spark轉移Cloud datarpoc中。而Cloud dataflow則適合用來開發新的ETL流程。因為它是serverless所以不需要管理這個服務的底層系統,它的processing model也是基於data pipelines的模式產生的。Cloud dataproc的 Hadoop與Spark平台是設計用來執行big data的分析流程,所以transformation可以在這個服務中執行。但是Cloud dataflow model則是基於Data pipelines產生的。

Extraction, Load and Transformation
這個過程跟ETL不太一樣的是,它是先進行資料擷取然後落地到目標地然後才進行Transformation. 這個方式有一些優於ETL的效益。

因為原始資料可以在沒有進行transformation保留一份。這個樣子可以讓data warehouse的管理者根據原始資料使用SQL語法進行基本的data quality check與收集特徵統計,像是有缺少幾筆的資料。

第二個好處是管理者可以直接用SQL語法進行data transformation. 這特別對SQL專家有幫助,因為不用像在ETL需要在transformation過程中需要有一點coding的經驗要求,只要會SQL語法就可以了。

Extraction and Load
這就是很簡單的搬移資料。而不做任何資料的任何變動,原來資料長甚麼樣就是怎麼樣。

Change data capture
這是指在source data有任何的資料異動,目的端馬上記錄異動了甚麼。這個有助於你想要知道在某一段時間內所有資料的異動,而不是跟上面三種一樣,只是去擷取當下DB資料最新的狀態。例如我們想知道某一樣商品過去三個月來每天的庫存數字,哪麼我們就需要知道在庫存總和這一欄每天數字的變化。

Data warehouse data pipeline通常都是以batch mode 方式在一定的週期性時間觸發運作。但如果資料是需要持續性被處理,我們就需要使用stream processing pipeline.

Stream processing pipeline
這是指不會中斷,持續性處理資料的模式。這種持續性的資料來源有很多種,例如

  • IoT device,像是天氣的各項資料都是持續不斷傳送的
  • 醫院的病人心跳資料
  • 或是系統的Application 每15秒傳送效能資料來偵測系統的異常

我們以上面三個例子來看,如果以天氣資料來看也許需要用到即時的資料處裡與分析,因為天氣的變化需要長時間的資料收集。相反的病人的心跳就可能需要被及時的處理與分析,否則這個病人就可能陷入危急的狀況。Application 效能也是類似狀況,如果沒有及時的處理與分析可能系統就會有問題。從上述的例子來看絕大多數的stream data都是需要及時的處理與分析。

這種資料分析模式跟batch processing分析是不一樣的,從aggregate data, 尋找資料的異常模式,跟使用 charts/graphs來做視覺化資料。不同的點在於你group data的方式。在batch mode你是一次取得你所需要分析的全部資料。但這方法無法用在streaming data.

因為streaming data是持續性的,所以你只能挑選一段運作時間的data subsets。例如,你需要持續計算從IoT 感測器來每小時平均溫度,所以你選取的時間可能就是最新一小時傳送過來的資料。你也需會根據每小時平均溫度再次用四小時來平均上午或下午的平均溫度狀況。當然想要在某個地區計算每五分鐘的平均溫度當然也可以。

當我們要建立streaming data pipeline時,我們有以下因素需要考量:

  • Event time and processing time
  • Sliding and tumbling Windows
  • Late-arriving data and watermarks
  • Missing data
    以上這些因素你在建立streaming data pipeline一定要考慮到的。

Event time and processing time
資料是依照資料的時間連續性依序被儲存的。假如有一組資料,其中資料 A到達的時間在資料B之前,哪麼依照時間邏輯這筆資料A的發生時間一定是在資料B之前。前面的描述隱含著一個重要但微妙的問題,哪就是你實際上是在stream processing處理中的兩個時間點的資料:

  • Event time,這是指某件是發生時所產生的資料
  • Processing time,這是指當資料被擷取進到我們目的端的endpoint的時間。Processing time可以被定為在整個data pipeline的每個階段的endpoint的到達時間,像是開始執行data transformation的時間。

當使用streaming data時,很重要的是一致地使用這些時間之一(Event time or processing time)來對stream 進行排序。以上面的例子(資料A與B)來說,data的Event time就是用來處理資料時間序列的方式。

假如資料的Event time時間序列跟到達endpoint的時間順序是一樣的話,哪排序自然就會一致。但大部分都會因為來源端或網路的關係Event time跟Processing time會經常不是同一個排序狀況。

Sliding and tumbling Windows
Windows指在stream的一組連續的數據點。Windows有固定的寬度與一種資料往前走的方式。資料往前走的資料組數小於Windows寬度稱為Sliding Window;如果資料組數大於Windows寬度就叫做tumbling windows。

上圖我們看到有9個data point(7,9,10,12,8,9,13,8,4),每三個data point就是我們規定的Windows寬度,如 7,8,9。如果windows在這個stream移動到下一段哪就是會 9,10,12。再下一段就是10,12,8,每次都移動一個data point。這就是Sliding Window在stream前進的方式。Tumbling windows就每次前進的方式就是用Windows寬度來移動。

Sliding windows的使用時機是用在你要資料是怎樣被aggregate — 每次都隨間時間的推移對最後三個value做平均。Tumbling windows是用在你要在固定的時間間隔做平均。每次都是對三個數據做平均,而且每個data point都不會有被重複計算(上一個或下一個平均計算)。

Late-arriving data and watermarks
在處理streaming data,尤其是時間序列資料時,我們必須決定等待data到達的時間。 如果我們希望data大約每分鐘到達一次,我們可能願意為遲到的data等待三到四分鐘。 例如,如果streaming data來自醫療IoT device,並且我們希望盡可能記錄所有data point,那麼我們可能願意等待比使用data更新stream chart更長的時間。 在這種情況下,我們可能決定在兩分鐘後顯示最後三個data point的移動平均值,而不是等待更長時間。

當我們等待遲到的data時,我們將不得不在執行stream processing operation之前維護一個緩衝區來暫存data。 考量一個use case,您將一個stream作為輸入並輸出最後三分鐘的平均值stream。 如果我們收到了一分鐘前和三分鐘前但不是兩分鐘前的data,則必須將已到達的兩個data point保留在緩衝區中,直到我們收到兩分鐘前的data point,或者直到我們等待同樣長的時間 盡後將兩分鐘的data point視為lost掉的data。

在處理stream時,我們需要能夠假設在某個特定時間之前生成的data不會到達。 例如,我們可以決定任何遲到 10 分鐘的data都不會被ingest到stream中。 為了幫助stream processing applications,我們可以使用wantermarks的概念,它基本上是一個timestamp,表示不會在stream中出現早於該timestamp的data。

就目前所知,我們可以將stream視為有限且完整的data windows,就像我們以batch mode處理的dataset一樣。 在那種情況下,windows只是small batch的data,但實際情況更為複雜。 watermarks表示data延遲的邊界。 如果data point到達太晚以至於其event time發生在watermarks的timestamp之前,則stream將忽略它。 但這並不意味著我們應該完全忽略它。 系統狀態的高精確度會反映到將要包括遲到的data。

我們可以通過修改Ingest、transformation和存儲data的方式來適應遲到的data並提高精確性。


圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言