iT邦幫忙

第 11 屆 iThome 鐵人賽

DAY 17
1
Software Development

從零開始土炮MQ系列 第 17

五、分配機制(5) - 異質性分派 之主動分派

  • 分享至 

  • xImage
  •  

主動式分派

還有一種情況,是 consumer 會長時間與 Dispatch 保持連線,當 Queue 存在資料時,Dispatch 判斷資料的類型,主動通知相關的 consumer 。若假沒有對應的 consumer 可以處理資料,就進行額外的處理。

額外的處理方式很多種,完全取決於需求的方式。例如...

  • 直接將無用的資料丟棄。放棄資料,確保 Queue 的通暢。
  • 保持持續等待,直到對應的 consumer 連入。保留完整的資料,但可能導致 Queue 內的資料爆量。

在分派實作的部份,採用直接將無用的資料丟棄的作法。並使用delegate的方式,讓 Dispatch 可以主動通知 consumer 進行處理。當然,也可以使用 Action<T>Event 進行實作,如果有更好的作法,也歡迎告知。

https://ithelp.ithome.com.tw/upload/images/20191005/20107551jkFG3paT9s.png

   public class Consumer<T> : IConsumer<T>
    {
        // 略過原本程式碼

        public virtual string Key { get; }

		//	增加通知用的 method
        public virtual void Receive(T data)
        {
            throw new NotImplementedException();
        }
    }
    
    /// <summary>
    /// 異質性
    /// </summary>
    /// <seealso cref="EMQ.IDispatch{EMQ.QueueItem{System.Byte[]}}" />
    public class HeterogeneityDispatch : IDispatch<QueueItem<byte[]>>
    {
        private ConcurrentQueue<QueueItem<byte[]>> _bindingQueue;
        private readonly Dictionary<string, IConsumer<QueueItem<byte[]>>> _consumers = new Dictionary<string, IConsumer<QueueItem<byte[]>>>();

        public void Binding(ConcurrentQueue<QueueItem<byte[]>> queue)
        {
            _bindingQueue = queue;
        }

        public void Register(IConsumer<QueueItem<byte[]>> consumer)
        {
            consumer.Binding(this);

            _consumers[consumer.Key] = consumer;
        }

        QueueItem<byte[]> IDispatch<QueueItem<byte[]>>.Push(string label)
        {
            _bindingQueue.TryPeek(out var result);

            if (result.Label != label)
                return null;

            _bindingQueue.TryDequeue(out result);

            return result;
        }

        public void Notice()
        {
            while (true)
            {
                if (_bindingQueue.IsEmpty)
                {
                    SpinWait.SpinUntil(() => !_bindingQueue.IsEmpty, 10000);
                    continue;
                }

                _bindingQueue.TryDequeue(out var result);

                if (!_consumers.Keys.Contains(result.Label))
                {
                    continue;
                }

                _consumers[result.Label].Receive(result);
            }
        }
    }


//	測試程式
[TestClass]
public class HeterogeneityDispatchTest
{
    private class IntConsumer : Consumer<QueueItem<byte[]>>
    {
        public override string Key { get; } = typeof(int).ToString();

        public List<int> Queue { get; } = new List<int>();

        public override void DoWork()
        {
            var entity = _dispatch.Push(typeof(int).ToString());

            if (entity == null)
                return;

            var data = BitConverter.ToInt32(entity.Payload);
            Queue.Add(data);
        }

        public override void Receive(QueueItem<byte[]> entity)
        {
            var data = BitConverter.ToInt32(entity.Payload);
            Queue.Add(data);
        }
    }

    private class StringConsumer : Consumer<QueueItem<byte[]>>
    {
        public override string Key { get; } = typeof(string).ToString();
        public List<string> Queue { get; } = new List<string>();

        public override void DoWork()
        {
            var entity = _dispatch.Push(typeof(string).ToString());

            if (entity == null)
                return;

            var data = Encoding.ASCII.GetString(entity.Payload);
            Queue.Add(data);
        }

        public override void Receive(QueueItem<byte[]> entity)
        {
            var data = Encoding.ASCII.GetString(entity.Payload);
            Queue.Add(data);
        }
    }

    private static ConcurrentQueue<QueueItem<byte[]>> GetQueue()
    {
        var queue = new ConcurrentQueue<QueueItem<byte[]>>();
        queue.Enqueue(new QueueItem<byte[]>()
        {
            Label = typeof(int).ToString(),
            Payload = BitConverter.GetBytes(24)
        });

        queue.Enqueue(new QueueItem<byte[]>()
        {
            Label = typeof(string).ToString(),
            Payload = Encoding.ASCII.GetBytes("Flora MQ")
        });
        queue.Enqueue(new QueueItem<byte[]>()
        {
            Label = typeof(int).ToString(),
            Payload = BitConverter.GetBytes(36)
        });
        queue.Enqueue(new QueueItem<byte[]>()
        {
            Label = typeof(string).ToString(),
            Payload = Encoding.ASCII.GetBytes("Message Queue")
        });
        return queue;
    }

    [TestMethod]
    public void HeterogeneityDispatchTest_有2個consumer_主動通知()
    {
        var queue = GetQueue();

        var dispatch = new HeterogeneityDispatch();
        dispatch.Binding(queue);

        var consumer1 = new IntConsumer();
        dispatch.Register(consumer1);
        var consumer2 = new StringConsumer();
        dispatch.Register(consumer2);

        Task.Factory.StartNew(() => dispatch.Notice());

        Thread.Sleep(200);

        (new[] { 24, 36 }).ToExpectedObject().ShouldEqual(consumer1.Queue.ToArray());

        (new[] { "Flora MQ", "Message Queue" }).ToExpectedObject().ShouldEqual(consumer2.Queue.ToArray());
    }
}

在實作中,有許多地方是未處理或實作的。例如:

  • 每種類型的 consumer 只能 register 一組 consumer。
  • 在通知 consumer 處理資料,可以使用 Async 的機制。

這些部份,都是後面可以持續改進的部份。


上一篇
五、分配機制(4) - 異質性分派 之被動分派
下一篇
五、分配機制(6) - 優先性分派
系列文
從零開始土炮MQ30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言