iT邦幫忙

第 12 屆 iThome 鐵人賽

DAY 10
0
自我挑戰組

半路出家工程師在香港系列 第 10

Day 10 - 資料的舞台 Elixir Broadway (上)

  • 分享至 

  • xImage
  •  

第六天的分享中介紹了 Elixir Programming Language 有 Concurrent 的特性,可以在同一個運轉的 node 中開啟不同的執行緒(Process),彼此的資料並不互通,但可以傳訊息溝通。工程師們便利用這樣子的特性,來實作 Data Pipeline,最簡單的 Data Pipeline 可以使用 Gen Stage來實作。

Gen Stage

在 Gen Stage 中,你可以設定 producer(汲取資料的地方), producer and consumer (通常用於資料處理)以及 Consumer (資料最終要送到哪裡)。

假設我們有一個簡單的 Pipeline:

[A] -> [B] -> [C]

Producer A

defmodule A do
  use GenStage

  def start_link(number) do
    GenStage.start_link(A, number)
  end

  def init(counter) do
    {:producer, counter}
  end

  def handle_demand(demand, counter) when demand > 0 do
    # If the counter is 3 and we ask for 2 items, we will
    # emit the items 3 and 4, and set the state to 5.
    events = Enum.to_list(counter..counter+demand-1)
    {:noreply, events, counter + demand}
  end
end

而 B 的部分,針對從Producer 拿到的資料進行處理,這邊可以看到他在拿到資料資後會對於程式碼中給予的 multiplier 與傳進來的數值進行相乘:

defmodule B do
  use GenStage

  def start_link(multiplier) do
    GenStage.start_link(B, multiplier)
  end

  def init(multiplier) do
    {:producer_consumer, multiplier}
  end

  def handle_events(events, _from, multiplier) do
    events = Enum.map(events, & &1 * multiplier)
    {:noreply, events, multiplier}
  end
end

C的部分,則是在收到B的資訊之後,將它印在console 上面:

defmodule C do
  use GenStage

  def start_link(_opts) do
    GenStage.start_link(C, :ok)
  end

  def init(:ok) do
    {:consumer, :the_state_does_not_matter}
  end

  def handle_events(events, _from, state) do
    # Wait for a second.
    Process.sleep(1000)

    # Inspect the events.
    IO.inspect(events)

    # We are a consumer, so we would never emit items.
    {:noreply, [], state}
  end
end

設定完成之後,在 console 上面打入:

{:ok, a} = A.start_link(0)  # starting from zero
{:ok, b} = B.start_link(2)  # multiply by 2
{:ok, c} = C.start_link([]) # state does not matter

GenStage.sync_subscribe(c, to: b)
GenStage.sync_subscribe(b, to: a)

這樣pipeline 的部分就設定完成了!

後續問題

雖然使用簡單的 Gen Stage 就可以完成 data pipeline,但如果今天伺服器突然有問題 crash 了,A說他已經收到並且 pass 給 B 了,但是 B 說他並沒有收到,那這一筆資料就永遠流失了;又或者今天伺服器上了新的版本,但 Gen Stage 不知道要怎麼 gracefully shutdown,到底是要先停止 A,B還是C,才能讓資料不會流失呢?

一群 Elixir 的工程師為了解決這樣子的問題,所以運用了 Gen Stage的技術,製作出了 Broadway。

參考資料

Gen Stage Documentation


上一篇
Day 09 - 拿番茄做時鐘
下一篇
Day 11 - 資料的舞台 Elixir Broadway (下)
系列文
半路出家工程師在香港30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言