在第六天的分享中介紹了 Elixir Programming Language 有 Concurrent 的特性,可以在同一個運轉的 node 中開啟不同的執行緒(Process),彼此的資料並不互通,但可以傳訊息溝通。工程師們便利用這樣子的特性,來實作 Data Pipeline,最簡單的 Data Pipeline 可以使用 Gen Stage來實作。
在 Gen Stage 中,你可以設定 producer(汲取資料的地方), producer and consumer (通常用於資料處理)以及 Consumer (資料最終要送到哪裡)。
假設我們有一個簡單的 Pipeline:
[A] -> [B] -> [C]
Producer A
defmodule A do
use GenStage
def start_link(number) do
GenStage.start_link(A, number)
end
def init(counter) do
{:producer, counter}
end
def handle_demand(demand, counter) when demand > 0 do
# If the counter is 3 and we ask for 2 items, we will
# emit the items 3 and 4, and set the state to 5.
events = Enum.to_list(counter..counter+demand-1)
{:noreply, events, counter + demand}
end
end
而 B 的部分,針對從Producer 拿到的資料進行處理,這邊可以看到他在拿到資料資後會對於程式碼中給予的 multiplier 與傳進來的數值進行相乘:
defmodule B do
use GenStage
def start_link(multiplier) do
GenStage.start_link(B, multiplier)
end
def init(multiplier) do
{:producer_consumer, multiplier}
end
def handle_events(events, _from, multiplier) do
events = Enum.map(events, & &1 * multiplier)
{:noreply, events, multiplier}
end
end
C的部分,則是在收到B的資訊之後,將它印在console 上面:
defmodule C do
use GenStage
def start_link(_opts) do
GenStage.start_link(C, :ok)
end
def init(:ok) do
{:consumer, :the_state_does_not_matter}
end
def handle_events(events, _from, state) do
# Wait for a second.
Process.sleep(1000)
# Inspect the events.
IO.inspect(events)
# We are a consumer, so we would never emit items.
{:noreply, [], state}
end
end
設定完成之後,在 console 上面打入:
{:ok, a} = A.start_link(0) # starting from zero
{:ok, b} = B.start_link(2) # multiply by 2
{:ok, c} = C.start_link([]) # state does not matter
GenStage.sync_subscribe(c, to: b)
GenStage.sync_subscribe(b, to: a)
這樣pipeline 的部分就設定完成了!
雖然使用簡單的 Gen Stage 就可以完成 data pipeline,但如果今天伺服器突然有問題 crash 了,A說他已經收到並且 pass 給 B 了,但是 B 說他並沒有收到,那這一筆資料就永遠流失了;又或者今天伺服器上了新的版本,但 Gen Stage 不知道要怎麼 gracefully shutdown,到底是要先停止 A,B還是C,才能讓資料不會流失呢?
一群 Elixir 的工程師為了解決這樣子的問題,所以運用了 Gen Stage的技術,製作出了 Broadway。