上一篇:Day 10 - 資料的舞太 Elixir Broadway (上)說到工程師們運用了 Gen Stage 的技術,做出了 Broadway ,但他跟 Gen Stage 到底有什麼不同呢?
最主要的分別:
那設定上會不會很困難呢?其實非常的簡單,你只要寫好一個設定的 function,然後指定好誰當 Producer,誰當Consumer ,Pipeline 就可以啟動了!
先讓我們寫好設定檔案:
defmodule BroadwayDemo do
use Broadway
def start_link(_opts) do
Broadway.start_link(BroadwayDemo,
name: BroadwayDemo,
producer: [
module: {Counter, []},
concurrency: 1
],
processors: [
default: [concurrency: 2]
]
)
end
...callbacks...
end
上面的 producer的部分就是產出資料的地方,指定好是哪一個 Module 負責,並且寫上要開幾個 process(上面
concurrency:1 表示開一個 process),這樣就設定好Producer了;另外Processors的部分可以視為Consumer,這邊可以看到他指定要啟動兩個 process。
如果你想要讓資料可以批次處理,那在設定檔上面再加上 batcher的部分:
defmodule BroadwayDemo do
use Broadway
def start_link(_opts) do
Broadway.start_link(BroadwayDemo,
name: BroadwayDemo,
producer: [
module: {Counter, []},
concurrency: 1
],
processors: [
default: [concurrency: 2]
],
batchers: [
sqs: [concurrency: 2, batch_size: 10],
s3: [concurrency: 1, batch_size: 10]
]
)
end
...callbacks...
end
在 batcher 上面我們可以看到我們有兩個 batcher,一個是sqs,另一個是s3,而每個批次以十個為一單位。
在設定寫好之後,就要來看資料進來之後該怎麼做啦!如果你只有processor的話,只需要在 module 裡面加入
handle_message/3,這樣一來 processor 就知道該怎麼處理進來的資料,例如:
def handle_message(_, %Message{data: data} = message, _) do
IO.inspect(message)
message
end
如果有 batcher的話,就需要加入 handle_batch/4,並且在 handle_message裡面跟elixir講說哪一筆資料要去哪一個 batcher:
def handle_message(_, %Message{data: data} = message, _) when is_odd(data) do
message
|> Message.update_data(&process_data/1) #在 handle_message中也可以在對資料進行處理
|> Message.put_batcher(:sqs)
end
def handle_message(_, %Message{data: data} = message, _) when is_even(data) do
message
|> Message.update_data(&process_data/1)
|> Message.put_batcher(:s3)
end
defp process_data(data) do
# 處理資料!
end
@impl true
def handle_batch(:sqs, messages, _batch_info, _context) do
# 將資料送到 SQS,進行後續的處理
end
def handle_batch(:s3, messages, _batch_info, _context) do
# 將資料存入S3
end
在我的公司,我做的專案裡面,我跟我同事運用了 Broadway,每五分鐘引入每個加密幣的資訊,存入S3,將最新的一筆資訊加入各個不同時間區間的價格走勢圖;也在這段時間內重新計算各個加密幣的排名(根據市值)
另外為什麼這個套件要叫做 Broadway呢?因為這個套件裡面有很多 Producer (監製),很多 stage (舞台)(原本是開多少個 process的key,後來避免混淆改成 concurrency,個人覺得比較可惜XD),所以工程師把它命名成 Broadway XD