iT邦幫忙

第 12 屆 iThome 鐵人賽

DAY 11
0
自我挑戰組

半路出家工程師在香港系列 第 11

Day 11 - 資料的舞台 Elixir Broadway (下)

  • 分享至 

  • xImage
  •  

上一篇:Day 10 - 資料的舞太 Elixir Broadway (上)說到工程師們運用了 Gen Stage 的技術,做出了 Broadway ,但他跟 Gen Stage 到底有什麼不同呢?

最主要的分別:

  • Broadway 幫你處理了 Graceful Shutdown,在伺服器收到關機訊號時,要怎麼依序關閉 Producer, Producer and Consumer, Consumer 已經定義在Broadway裡面
  • Broadway 可以讓你自己定義你要開幾個 Producer,開幾個 Processor (Producer and Consumer),以及加入了Batcher(讓你批次處理資料)
  • 加入了 Rate-Limit,讓你的 Pipeline 不會被資料淹沒
  • 運用 telemetry 讓你可以觀察每個 Process 的狀態,更好做優化。

設定 Broadway

那設定上會不會很困難呢?其實非常的簡單,你只要寫好一個設定的 function,然後指定好誰當 Producer,誰當Consumer ,Pipeline 就可以啟動了!

先讓我們寫好設定檔案:

defmodule BroadwayDemo do
  use Broadway

  def start_link(_opts) do
    Broadway.start_link(BroadwayDemo,
      name: BroadwayDemo,
      producer: [
        module: {Counter, []},
        concurrency: 1
      ],
      processors: [
        default: [concurrency: 2]
      ]
    )
  end

  ...callbacks...
end

上面的 producer的部分就是產出資料的地方,指定好是哪一個 Module 負責,並且寫上要開幾個 process(上面
concurrency:1 表示開一個 process),這樣就設定好Producer了;另外Processors的部分可以視為Consumer,這邊可以看到他指定要啟動兩個 process。

如果你想要讓資料可以批次處理,那在設定檔上面再加上 batcher的部分:

 defmodule BroadwayDemo do
  use Broadway

  def start_link(_opts) do
    Broadway.start_link(BroadwayDemo,
      name: BroadwayDemo,
      producer: [
        module: {Counter, []},
        concurrency: 1
      ],
      processors: [
        default: [concurrency: 2]
      ],
       batchers: [
        sqs: [concurrency: 2, batch_size: 10],
        s3: [concurrency: 1, batch_size: 10]
      ]
    )
  end

  ...callbacks...
end

在 batcher 上面我們可以看到我們有兩個 batcher,一個是sqs,另一個是s3,而每個批次以十個為一單位。

設定 callback

在設定寫好之後,就要來看資料進來之後該怎麼做啦!如果你只有processor的話,只需要在 module 裡面加入
handle_message/3,這樣一來 processor 就知道該怎麼處理進來的資料,例如:

def handle_message(_, %Message{data: data} = message, _) do
    IO.inspect(message)
    message
end

如果有 batcher的話,就需要加入 handle_batch/4,並且在 handle_message裡面跟elixir講說哪一筆資料要去哪一個 batcher:

  def handle_message(_, %Message{data: data} = message, _) when is_odd(data) do
    message
    |> Message.update_data(&process_data/1) #在 handle_message中也可以在對資料進行處理
    |> Message.put_batcher(:sqs)
  end

  def handle_message(_, %Message{data: data} = message, _) when is_even(data) do
    message
    |> Message.update_data(&process_data/1)
    |> Message.put_batcher(:s3)
  end

  defp process_data(data) do
    # 處理資料!
  end

  @impl true
  def handle_batch(:sqs, messages, _batch_info, _context) do
    # 將資料送到 SQS,進行後續的處理
  end

  def handle_batch(:s3, messages, _batch_info, _context) do
    # 將資料存入S3
  end

實際應用

在我的公司,我做的專案裡面,我跟我同事運用了 Broadway,每五分鐘引入每個加密幣的資訊,存入S3,將最新的一筆資訊加入各個不同時間區間的價格走勢圖;也在這段時間內重新計算各個加密幣的排名(根據市值)

另外為什麼這個套件要叫做 Broadway呢?因為這個套件裡面有很多 Producer (監製),很多 stage (舞台)(原本是開多少個 process的key,後來避免混淆改成 concurrency,個人覺得比較可惜XD),所以工程師把它命名成 Broadway XD


上一篇
Day 10 - 資料的舞台 Elixir Broadway (上)
下一篇
Day 12 - 想先前進,先看看自己目前的情況
系列文
半路出家工程師在香港30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言