iT邦幫忙

第 11 屆 iThome 鐵人賽

DAY 16
1
Software Development

從零開始土炮MQ系列 第 16

五、分配機制(4) - 異質性分派 之被動分派

  • 分享至 

  • xImage
  •  

被動式分派

consumer 主動跟 dispatch 請求資料,Dispatch 依據 consumer 傳入的可接受的資料類型資訊,判斷 Queue 內第一筆的資料是否為回傳。若為可接受類型;直接回傳資料內容;反之,則回傳空值。

https://ithelp.ithome.com.tw/upload/images/20191002/20107551l4hHMU9iax.png

這種分派方式,多數情境是應用於 consumer 未與 Dispatch 保持長連線狀態。例如取完資料後,就離線處理。直到作業完成後,才再次跟 Dispatch 請求資料。

Dispatch 在分派資料時,必須依賴 consumer 給與的資訊,才能正確的分派資料。所以,將 IDispatch 進行修改。

在異質性的 Queue 中,採用 QueueItem 的格式儲存資料,所以採用 label做為識別的資訊。

public interface IDispatch<T>
{
    void Binding(ConcurrentQueue<T> queue);

    void Register(IConsumer<T> consumer);

	// 加入請求資料類型的標簽
    T Push(string label);
}

因為 Queue 的資料格式定為 QueueItem<byte[]> ,實作如下。

 public class HeterogeneityDispatch : IDispatch<QueueItem<byte[]>>
 {
     private ConcurrentQueue<QueueItem<byte[]>> _bindingQueue;
     private List<IConsumer<QueueItem<byte[]>>> _consumers = new List<IConsumer<QueueItem<byte[]>>>();

     public void Binding(ConcurrentQueue<QueueItem<byte[]>> queue)
     {
         _bindingQueue = queue;
     }

     public void Register(IConsumer<QueueItem<byte[]>> consumer)
     {
         consumer.Binding(this);

         _consumers.Add(consumer);
     }

     QueueItem<byte[]> IDispatch<QueueItem<byte[]>>.Push(string label)
     {
         _bindingQueue.TryPeek(out var result);

         if (result.Label != label)
             return null;

         _bindingQueue.TryDequeue(out result);

         return result;
     }
 }

同時,我們將同質性實作的 Consumer 稍微調整。

public class Consumer<T> : IConsumer<T>
{
    protected IDispatch<T> _dispatch;
    public void Binding(IDispatch<T> dispatch)
    {
        _dispatch = dispatch;
    }
    public virtual void DoWork()
    {
    }
}

最後,針對兩種情境,進行簡易的單元測試

//	測試程式
[TestClass]
public class HeterogeneityDispatchTest
{
    private class IntConsumer : Consumer<QueueItem<byte[]>>
    {
        public List<int> Queue { get; } = new List<int>();

        public override void DoWork()
        {
            var entity = _dispatch.Push(typeof(int).ToString());

            if (entity == null)
                return;

            var data = BitConverter.ToInt32(entity.Payload);
            Queue.Add(data);
        }
    }

    private class StringConsumer : Consumer<QueueItem<byte[]>>
    {
        public List<string> Queue { get; } = new List<string>();

        public override void DoWork()
        {
            var entity = _dispatch.Push(typeof(string).ToString());

            if (entity == null)
                return;

            var data = Encoding.ASCII.GetString(entity.Payload);
            Queue.Add(data);
        }
    }

    [TestMethod]
    public void HeterogeneityDispatchTest_兩個consumer依順序取值()
    {
        var queue = GetQueue();

        var dispatch = new HeterogeneityDispatch();
        dispatch.Binding(queue);

        var consumer1 = new IntConsumer();
        dispatch.Register(consumer1);
        var consumer2 = new StringConsumer();
        dispatch.Register(consumer2);

        consumer1.DoWork();
        consumer2.DoWork();
        consumer1.DoWork();
        consumer2.DoWork();

        (new[] { 24, 36 }).ToExpectedObject().ShouldEqual(consumer1.Queue.ToArray());

        (new[] { "Flora MQ", "Message Queue" }).ToExpectedObject().ShouldEqual(consumer2.Queue.ToArray());
    }

    private static ConcurrentQueue<QueueItem<byte[]>> GetQueue()
    {
        var queue = new ConcurrentQueue<QueueItem<byte[]>>();
        queue.Enqueue(new QueueItem<byte[]>()
        {
            Label = typeof(int).ToString(),
            Payload = BitConverter.GetBytes(24)
        });

        queue.Enqueue(new QueueItem<byte[]>()
        {
            Label = typeof(string).ToString(),
            Payload = Encoding.ASCII.GetBytes("Flora MQ")
        });
        queue.Enqueue(new QueueItem<byte[]>()
        {
            Label = typeof(int).ToString(),
            Payload = BitConverter.GetBytes(36)
        });
        queue.Enqueue(new QueueItem<byte[]>()
        {
            Label = typeof(string).ToString(),
            Payload = Encoding.ASCII.GetBytes("Message Queue")
        });
        return queue;
    }

    [TestMethod]
    public void HeterogeneityDispatchTest_當只有一個consume取值時()
    {
        var queue = GetQueue();

        var dispatch = new HeterogeneityDispatch();
        dispatch.Binding(queue);

        var consumer1 = new IntConsumer();
        dispatch.Register(consumer1);
        var consumer2 = new StringConsumer();
        dispatch.Register(consumer2);

        consumer1.DoWork();
        consumer1.DoWork();
        consumer1.DoWork();
        consumer1.DoWork();

        (new[] { 24 }).ToExpectedObject().ShouldEqual(consumer1.Queue.ToArray());

        (new string[] { }).ToExpectedObject().ShouldEqual(consumer2.Queue.ToArray());
    }
}

上一篇
五、分配機制(3) - 異質性分派
下一篇
五、分配機制(5) - 異質性分派 之主動分派
系列文
從零開始土炮MQ30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言