還有一種情況,是 consumer 會長時間與 Dispatch 保持連線,當 Queue 存在資料時,Dispatch 判斷資料的類型,主動通知相關的 consumer 。若假沒有對應的 consumer 可以處理資料,就進行額外的處理。
額外的處理方式很多種,完全取決於需求的方式。例如...
在分派實作的部份,採用直接將無用的資料丟棄的作法。並使用delegate
的方式,讓 Dispatch 可以主動通知 consumer 進行處理。當然,也可以使用 Action<T>
或 Event
進行實作,如果有更好的作法,也歡迎告知。
public class Consumer<T> : IConsumer<T>
{
// 略過原本程式碼
public virtual string Key { get; }
// 增加通知用的 method
public virtual void Receive(T data)
{
throw new NotImplementedException();
}
}
/// <summary>
/// 異質性
/// </summary>
/// <seealso cref="EMQ.IDispatch{EMQ.QueueItem{System.Byte[]}}" />
public class HeterogeneityDispatch : IDispatch<QueueItem<byte[]>>
{
private ConcurrentQueue<QueueItem<byte[]>> _bindingQueue;
private readonly Dictionary<string, IConsumer<QueueItem<byte[]>>> _consumers = new Dictionary<string, IConsumer<QueueItem<byte[]>>>();
public void Binding(ConcurrentQueue<QueueItem<byte[]>> queue)
{
_bindingQueue = queue;
}
public void Register(IConsumer<QueueItem<byte[]>> consumer)
{
consumer.Binding(this);
_consumers[consumer.Key] = consumer;
}
QueueItem<byte[]> IDispatch<QueueItem<byte[]>>.Push(string label)
{
_bindingQueue.TryPeek(out var result);
if (result.Label != label)
return null;
_bindingQueue.TryDequeue(out result);
return result;
}
public void Notice()
{
while (true)
{
if (_bindingQueue.IsEmpty)
{
SpinWait.SpinUntil(() => !_bindingQueue.IsEmpty, 10000);
continue;
}
_bindingQueue.TryDequeue(out var result);
if (!_consumers.Keys.Contains(result.Label))
{
continue;
}
_consumers[result.Label].Receive(result);
}
}
}
// 測試程式
[TestClass]
public class HeterogeneityDispatchTest
{
private class IntConsumer : Consumer<QueueItem<byte[]>>
{
public override string Key { get; } = typeof(int).ToString();
public List<int> Queue { get; } = new List<int>();
public override void DoWork()
{
var entity = _dispatch.Push(typeof(int).ToString());
if (entity == null)
return;
var data = BitConverter.ToInt32(entity.Payload);
Queue.Add(data);
}
public override void Receive(QueueItem<byte[]> entity)
{
var data = BitConverter.ToInt32(entity.Payload);
Queue.Add(data);
}
}
private class StringConsumer : Consumer<QueueItem<byte[]>>
{
public override string Key { get; } = typeof(string).ToString();
public List<string> Queue { get; } = new List<string>();
public override void DoWork()
{
var entity = _dispatch.Push(typeof(string).ToString());
if (entity == null)
return;
var data = Encoding.ASCII.GetString(entity.Payload);
Queue.Add(data);
}
public override void Receive(QueueItem<byte[]> entity)
{
var data = Encoding.ASCII.GetString(entity.Payload);
Queue.Add(data);
}
}
private static ConcurrentQueue<QueueItem<byte[]>> GetQueue()
{
var queue = new ConcurrentQueue<QueueItem<byte[]>>();
queue.Enqueue(new QueueItem<byte[]>()
{
Label = typeof(int).ToString(),
Payload = BitConverter.GetBytes(24)
});
queue.Enqueue(new QueueItem<byte[]>()
{
Label = typeof(string).ToString(),
Payload = Encoding.ASCII.GetBytes("Flora MQ")
});
queue.Enqueue(new QueueItem<byte[]>()
{
Label = typeof(int).ToString(),
Payload = BitConverter.GetBytes(36)
});
queue.Enqueue(new QueueItem<byte[]>()
{
Label = typeof(string).ToString(),
Payload = Encoding.ASCII.GetBytes("Message Queue")
});
return queue;
}
[TestMethod]
public void HeterogeneityDispatchTest_有2個consumer_主動通知()
{
var queue = GetQueue();
var dispatch = new HeterogeneityDispatch();
dispatch.Binding(queue);
var consumer1 = new IntConsumer();
dispatch.Register(consumer1);
var consumer2 = new StringConsumer();
dispatch.Register(consumer2);
Task.Factory.StartNew(() => dispatch.Notice());
Thread.Sleep(200);
(new[] { 24, 36 }).ToExpectedObject().ShouldEqual(consumer1.Queue.ToArray());
(new[] { "Flora MQ", "Message Queue" }).ToExpectedObject().ShouldEqual(consumer2.Queue.ToArray());
}
}
在實作中,有許多地方是未處理或實作的。例如:
register
一組 consumer。Async
的機制。這些部份,都是後面可以持續改進的部份。