最近我開發了 MessageWorkerPool
專案。其主要概念是提供一個平台框架,使使用者能夠快速且輕鬆地在 Worker
內實作邏輯。該設計高度靈活,允許基於我創建的 Worker 通訊協議,以多種程式語言實作 Worker
。目前,我已提供使用 C#、Rust 和 Python 編寫的 Worker 範例。
這個函式庫在多進程環境中處理任務表現優異。此外,它還支援優雅關閉 (graceful shutdown),確保在隨時 consumer worker 能順利終止處理程序。
當你需要強大的隔離性,以防止某個任務影響其他任務時,應該選擇 ProcessPool,特別是針對關鍵操作或容易崩潰的任務。雖然 ThreadPool 較為輕量(因為執行緒共用記憶體並且具有較低的上下文切換開銷),但 ProcessPool 能夠提供更靈活的解決方案,允許使用不同的程式語言來實作 Worker。
要安裝 MessageWorkerPool
套件,請使用以下 NuGet 指令:
PM > Install-Package MessageWorkerPool
若要手動安裝此函式庫,可克隆儲存庫並建置專案:
git clone https://github.com/isdaniel/MessageWorkerPool.git
cd MessageWorkerPool
dotnet build
這是部署 RabbitMQ 和相關服務的快速開始指南,使用提供的 docker-compose.yml 檔案和 .env 中的環境變數。
docker-compose --env-file .\env\.env up --build -d
以下是創建並配置與 RabbitMQ 互動的 workerpool 的範例程式碼。以下是其功能的解析:workerpool 將根據您的 RabbitMqSetting 設定從 RabbitMQ 伺服器獲取訊息,並通過 Process.StandardInput 將訊息傳遞給用戶創建的真實 worker node
public class Program
{
public static async Task Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureLogging(logging =>
{
logging.ClearProviders();
logging.AddConsole(options => {
options.FormatterName = ConsoleFormatterNames.Simple;
});
logging.Services.Configure<SimpleConsoleFormatterOptions>(options => {
options.IncludeScopes = true;
options.TimestampFormat = " yyyy-MM-dd HH:mm:ss ";
});
}).AddRabbitMqWorkerPool(new RabbitMqSetting
{
UserName = Environment.GetEnvironmentVariable("USERNAME") ?? "guest",
Password = Environment.GetEnvironmentVariable("PASSWORD") ?? "guest",
HostName = Environment.GetEnvironmentVariable("RABBITMQ_HOSTNAME"),
Port = ushort.TryParse(Environment.GetEnvironmentVariable("RABBITMQ_PORT"), out ushort p) ? p : (ushort) 5672,
PrefetchTaskCount = 3
}, new WorkerPoolSetting() { WorkerUnitCount = 9, CommandLine = "dotnet", Arguments = @"./ProcessBin/WorkerProcessSample.dll", QueueName = Environment.GetEnvironmentVariable("QUEUENAME"), }
);
}
worker node 與任務進程之間的協議使用 MessagePack 二進制格式來進行更快且更小的資料傳輸,標準輸入將發送信號來控制 worker process。
一開始 workerPool 將通過標準輸入傳遞 NamedPipe 名稱,因此 worker node 需要接收該名稱並建立 worker process 和 workerPool 之間的 NamedPipe。
目前,workerPool將通過標準輸入向 worker process 發送操作信號或指令。
__quit__
): 代表 workerPool 發送關閉或關機信號給 worker node,worker process 應盡快執行優雅關機。msgpack 協議支持的資料類型如下類別與 byte[] 格式。
對應的 byte[] 資料是:
[132,161,48,179,78,101,119,32,79,117,116,80,117,116,32,77,101,115,115,97,103,101,33,161,49,204,200,161,50,129,164,116,101,115,116,167,116,101,115,116,118,97,108,161,51,169,116,101,115,116,81,117,101,117,101]
要將提供的偽 JSON 結構表示為 MsgPack
格式(byte[]),我們可以分解過程如下:
Edit
{
"0": "New OutPut Message!",
"1": 200,
"2": {
"test": "testval"
},
"3": "testQueue"
}
更多資訊,您可以使用 msgpack-converter 來解碼和編碼。
/// <summary>
/// 封裝來自 MQ 服務的訊息
/// </summary>
[MessagePackObject]
public class MessageOutputTask
{
/// <summary>
/// 來自進程的輸出訊息
/// </summary>
[Key("0")]
public string Message { get; set; }
[Key("1")]
public MessageStatus Status { get; set; }
/// <summary>
/// 我們希望儲存的回應資訊以便繼續執行訊息。
/// </summary>
[Key("2")]
[MessagePackFormatter(typeof(PrimitiveObjectResolver))]
public IDictionary<string, object> Headers { get; set; }
/// <summary>
/// 預設使用 BasicProperties.Reply To 隊列名稱,任務處理器可以覆寫回應隊列名稱。
/// </summary>
/// <value>預設使用 BasicProperties.Reply</value>
[Key("3")]
public string ReplyQueueName { get; set; }
}
我將在此介紹 MessageStatus 的含義。
我們可以通過不同的程式語言來編寫自己的 worker node (我已經在此 github 提供了 Python, .NET, rust example code)。
類似於操作系統中的進程,發生上下文切換(中斷等)。
客戶端可以通過 Header 發送一個 TimeoutMilliseconds
值:在取消之前等待的時間(毫秒)。如果任務執行超過該值,worker process 可以使用該值來設置中斷,例如 CancellationToken。
例如,MessageOutputTask
的 JSON 可以如下所示,status=201
代表此訊息將重新入隊以便下次處理,並且訊息將攜帶 Headers
資訊再次重新入隊。
{
"Message": "This is Mock Json Data",
"Status": 201,
"Headers": {
"CreateTimestamp": "2025-01-01T14:35:00Z",
"PreviousProcessingTimestamp": "2025-01-01T14:40:00Z",
"Source": "OrderProcessingService",
"PreviousExecutedRows": "123",
"RequeueTimes": "3"
}
}
此專案還包括 integration、unit test 和 github action pipeline。雖然 API 文件(專案仍在 beta 階段),但我計劃在未來逐步添加。如果您對此專案有任何想法或建議,請隨時創建問題或發送 PR。