當 consumer 主動跟 dispatch 請求資料,Dispatch 依據 consumer 傳入的可接受的資料類型資訊,判斷 Queue 內第一筆的資料是否為回傳。若為可接受類型;直接回傳資料內容;反之,則回傳空值。
這種分派方式,多數情境是應用於 consumer 未與 Dispatch 保持長連線狀態。例如取完資料後,就離線處理。直到作業完成後,才再次跟 Dispatch 請求資料。
Dispatch 在分派資料時,必須依賴 consumer 給與的資訊,才能正確的分派資料。所以,將 IDispatch
進行修改。
在異質性的 Queue 中,採用 QueueItem
的格式儲存資料,所以採用 label
做為識別的資訊。
public interface IDispatch<T>
{
void Binding(ConcurrentQueue<T> queue);
void Register(IConsumer<T> consumer);
// 加入請求資料類型的標簽
T Push(string label);
}
因為 Queue 的資料格式定為 QueueItem<byte[]>
,實作如下。
public class HeterogeneityDispatch : IDispatch<QueueItem<byte[]>>
{
private ConcurrentQueue<QueueItem<byte[]>> _bindingQueue;
private List<IConsumer<QueueItem<byte[]>>> _consumers = new List<IConsumer<QueueItem<byte[]>>>();
public void Binding(ConcurrentQueue<QueueItem<byte[]>> queue)
{
_bindingQueue = queue;
}
public void Register(IConsumer<QueueItem<byte[]>> consumer)
{
consumer.Binding(this);
_consumers.Add(consumer);
}
QueueItem<byte[]> IDispatch<QueueItem<byte[]>>.Push(string label)
{
_bindingQueue.TryPeek(out var result);
if (result.Label != label)
return null;
_bindingQueue.TryDequeue(out result);
return result;
}
}
同時,我們將同質性實作的 Consumer
稍微調整。
public class Consumer<T> : IConsumer<T>
{
protected IDispatch<T> _dispatch;
public void Binding(IDispatch<T> dispatch)
{
_dispatch = dispatch;
}
public virtual void DoWork()
{
}
}
最後,針對兩種情境,進行簡易的單元測試
// 測試程式
[TestClass]
public class HeterogeneityDispatchTest
{
private class IntConsumer : Consumer<QueueItem<byte[]>>
{
public List<int> Queue { get; } = new List<int>();
public override void DoWork()
{
var entity = _dispatch.Push(typeof(int).ToString());
if (entity == null)
return;
var data = BitConverter.ToInt32(entity.Payload);
Queue.Add(data);
}
}
private class StringConsumer : Consumer<QueueItem<byte[]>>
{
public List<string> Queue { get; } = new List<string>();
public override void DoWork()
{
var entity = _dispatch.Push(typeof(string).ToString());
if (entity == null)
return;
var data = Encoding.ASCII.GetString(entity.Payload);
Queue.Add(data);
}
}
[TestMethod]
public void HeterogeneityDispatchTest_兩個consumer依順序取值()
{
var queue = GetQueue();
var dispatch = new HeterogeneityDispatch();
dispatch.Binding(queue);
var consumer1 = new IntConsumer();
dispatch.Register(consumer1);
var consumer2 = new StringConsumer();
dispatch.Register(consumer2);
consumer1.DoWork();
consumer2.DoWork();
consumer1.DoWork();
consumer2.DoWork();
(new[] { 24, 36 }).ToExpectedObject().ShouldEqual(consumer1.Queue.ToArray());
(new[] { "Flora MQ", "Message Queue" }).ToExpectedObject().ShouldEqual(consumer2.Queue.ToArray());
}
private static ConcurrentQueue<QueueItem<byte[]>> GetQueue()
{
var queue = new ConcurrentQueue<QueueItem<byte[]>>();
queue.Enqueue(new QueueItem<byte[]>()
{
Label = typeof(int).ToString(),
Payload = BitConverter.GetBytes(24)
});
queue.Enqueue(new QueueItem<byte[]>()
{
Label = typeof(string).ToString(),
Payload = Encoding.ASCII.GetBytes("Flora MQ")
});
queue.Enqueue(new QueueItem<byte[]>()
{
Label = typeof(int).ToString(),
Payload = BitConverter.GetBytes(36)
});
queue.Enqueue(new QueueItem<byte[]>()
{
Label = typeof(string).ToString(),
Payload = Encoding.ASCII.GetBytes("Message Queue")
});
return queue;
}
[TestMethod]
public void HeterogeneityDispatchTest_當只有一個consume取值時()
{
var queue = GetQueue();
var dispatch = new HeterogeneityDispatch();
dispatch.Binding(queue);
var consumer1 = new IntConsumer();
dispatch.Register(consumer1);
var consumer2 = new StringConsumer();
dispatch.Register(consumer2);
consumer1.DoWork();
consumer1.DoWork();
consumer1.DoWork();
consumer1.DoWork();
(new[] { 24 }).ToExpectedObject().ShouldEqual(consumer1.Queue.ToArray());
(new string[] { }).ToExpectedObject().ShouldEqual(consumer2.Queue.ToArray());
}
}