iT邦幫忙

第 11 屆 iThome 鐵人賽

DAY 12
1
Software Development

從零開始土炮MQ系列 第 12

四、Router 路由器(3) - 通用 Router

  • 分享至 

  • xImage
  •  

迭代:通用型的 Router

由於前面的實作方式,都是建立在知道 QueueItem 本身的資料類型或結構。

那有沒有可能,回歸到 QueueItem 本身只單純的儲存資料,避開與任何資料類型的耦合?這樣是否為對存取資料的 consumber 造成困擾?

我們轉個想法,理論上,consumer 會知道 prodcutor 所造成的內容格式,所以在 queue 原封不動的儲存資料內容,唯一需要知道的只有 Lable 就可以了。

而 consumber 只要依據 Label,就可以取得他己知格式內容的資料本身,讓 consumber 自己去處理資料,這樣一來,程式就會與資料格式解耦,也可以避免 Boxing / Unboxing 的問題。

所以我們將 Payload 的格式,變更為 byte[] 的資料。

//	Payload 的資料資料變動
public class QueueItem
{
    public string label {get;set;}
    public byte[] Payload {get;set;}
}

//	Router 的結構變更,移
public class QueueRotuer
{
    public AddQeeue{string lable, ConcurrentQueue<byte[]> queue){...};
    public Enqueue(QueueItem item){...}
}

由於邏輯概念有較大幅度的改變,此次迭代的異動範圍有...

  • 移除 QueueItem 的泛型,並將 Payload 變更為 byte[] 格式。
  • QueueRouter.Enqueue 的傳入參數,由 QueueItem<T>QueueItem
  • 測試案例對應的大幅修改。
//	測試案例
[TestClass]
public class QueueRouterTest
{
    [TestMethod]
    public void QueueRouterQueueManagerTestSingleQueue可輸入的值放入佇列()
    {
        // arrange
        var router = new QueueRouter();
        var intQueue = new ConcurrentQueue<byte[]>();

        var label = typeof(int).ToString();
        router.AddQueue(label, intQueue);

        var intQueueItems = ConvertQueueItem(label, new[] {23, 16});

        // act
        Enqueue2Router(router, intQueueItems);

        // assert
        var expected = intQueueItems.Select(x => x.Payload);
        var actual = intQueue.ToList();

        expected.ToExpectedObject().ShouldMatch(actual.ToArray());
    }

    [TestMethod]
    public void QueueRouter_輸入不同的值_放入對應佇列()
    {
        // arrange
        var router = new QueueRouter();

        var intQueue = new ConcurrentQueue<byte[]>();
        var strQueue = new ConcurrentQueue<byte[]>();

        var intLabel = typeof(int).ToString();
        router.AddQueue(intLabel, intQueue);

        var strLabel = typeof(string).ToString();
        router.AddQueue(strLabel, strQueue);

        var intQueueItems = ConvertQueueItem(intLabel, new[] {23, 16});
        var stringQueueItems = ConvertQueueItem(strLabel, new[] {"Skype", "Google"});

        // act
        Enqueue2Router(router, intQueueItems);
        Enqueue2Router(router, stringQueueItems);

        // assert
        var intExpected = intQueueItems.Select(x => x.Payload);
        var intActual = intQueue.ToList();

        var strExpected = stringQueueItems.ToList().Select(x => x.Payload);
        var strActual = strQueue.ToList();

        intExpected.ToExpectedObject().ShouldMatch(intActual.ToArray());
        strExpected.ToExpectedObject().ShouldMatch(strActual.ToArray());
    }

    private List<QueueItem<byte[]>> ConvertQueueItem(string label, int[] entities)
    {
        var list = new List<QueueItem<byte[]>>();

        foreach (var entity in entities)
        {
            var queueItem = new QueueItem<byte[]>
            {
                Label = label,
                Payload = BitConverter.GetBytes(entity)
            };
            list.Add(queueItem);
        }

        return list;
    }

    private List<QueueItem<byte[]>> ConvertQueueItem(string label, string[] entities)
    {
        var list = new List<QueueItem<byte[]>>();

        foreach (var entity in entities)
        {
            var queueItem = new QueueItem<byte[]>
            {
                Label = label,
                Payload = Encoding.Default.GetBytes(entity)
            };
            list.Add(queueItem);
        }

        return list;
    }

    private static void Enqueue2Router(QueueRouter queueRouter, IEnumerable<QueueItem<byte[]>> expected)
    {
        foreach (var i in expected)
        {
            queueRouter.Enqueue(i);
        }
    }
}

同時,Binding Queue 時,必須註冊 RouterKey,以便 Router 在分派輸入的物件。所有輸入物件均 inhert common packet。

  • 優點:一個 Router 通用所用類型。
  • 缺點:需要額外封裝資料物件。

迭代:自動建立 Queue

到目前為止,我們的使用方式,還是要先建立好 Queue 後,再將 Router 與 Queue 進行 Binding。
我真的很懶,懶到不想要自行建立 queue,或是,被要求不準自己建立 queue,而是必須使用程式自行產生 queue。這時,新的需求出現了~~

需求: 系統要能依據輸入資料類型,自行建立對應的 queue。

從上面的邏輯看起來,好像 AddQueue 的參數都是相同的。那這樣,我們是否能將其整合在一起,改成註冊 qeueu 時,Router 就會自動產生相關的 Queue。

//	概念
public class QueueRouter
{
    public void Registry(string label){...}    
    public void Enqueue(QueueItem item){...}
}

此次迭代的異動範圍有...

  • 移除 AddQueue 的方法,變更為 Registory
  • 因為 QueueRouter 本身不支援取得 Queue 的功能,為了測試,額外增加繼承 QueueRouter 的測試用類別。
  • 針對測試案例對應的修改。
public class QueueRouter
{
    protected Dictionary<string, ConcurrentQueue<byte[]>> _dctQueues
        = new Dictionary<string, ConcurrentQueue<byte[]>>();

    public void Registry(string label)
    {
        if (_dctQueues.ContainsKey(label))
            throw new Exception("己存在相同 label 的 queue");

        _dctQueues[label] = new ConcurrentQueue<byte[]>();
    }

    public void Enqueue(QueueItem<byte[]> item)
    {
        if (!_dctQueues.ContainsKey(item.Label))
            return;

        _dctQueues[item.Label].Enqueue(item.Payload);
    }
}
//	為了測試用,繼承 QueueRouter    
public class QueueRouterInherit : QueueRouter
{
    public ConcurrentQueue<byte[]> GetQueue(string label)
    {
        return _dctQueues.ContainsKey(label) ? _dctQueues[label] : null;
    }
}

[TestClass]
public class QueueRouterTest
{
    [TestMethod]
    public void QueueRouterQueueManagerTestSingleQueue可輸入的值放入佇列()
    {
        // arrange
        var router = new QueueRouterInherit();

        var label = typeof(int).ToString();
        router.Registry(label);
        var intQueue = router.GetQueue(label);

        var intQueueItems = ConvertQueueItem(label, new[] {23, 16});

        // act
        Enqueue2Router(router, intQueueItems);

        // assert
        var expected = intQueueItems.Select(x => x.Payload);
        var actual = intQueue.ToList();

        expected.ToExpectedObject().ShouldMatch(actual.ToArray());
    }

    [TestMethod]
    public void QueueRouter_輸入不同的值_放入對應佇列()
    {
        // arrange
        var router = new QueueRouterInherit();

        var intLabel = typeof(int).ToString();
        router.Registry(intLabel);
        var intQueue = router.GetQueue(intLabel);

        var strLabel = typeof(string).ToString();
        router.Registry(strLabel);
        var strQueue = router.GetQueue(strLabel);

        var intQueueItems = ConvertQueueItem(intLabel, new[] {23, 16});
        var stringQueueItems = ConvertQueueItem(strLabel, new[] {"Skype", "Google"});

        // act
        Enqueue2Router(router, intQueueItems);
        Enqueue2Router(router, stringQueueItems);

        // assert
        var intExpected = intQueueItems.Select(x => x.Payload);
        var intActual = intQueue.ToList();

        var strExpected = stringQueueItems.ToList().Select(x => x.Payload);
        var strActual = strQueue.ToList();

        intExpected.ToExpectedObject().ShouldMatch(intActual.ToArray());
        strExpected.ToExpectedObject().ShouldMatch(strActual.ToArray());
    }

    private List<QueueItem<byte[]>> ConvertQueueItem(string label, int[] entities)
    {
        var list = new List<QueueItem<byte[]>>();

        foreach (var entity in entities)
        {
            var queueItem = new QueueItem<byte[]>
            {
                Label = label,
                Payload = BitConverter.GetBytes(entity)
            };
            list.Add(queueItem);
        }

        return list;
    }

    private List<QueueItem<byte[]>> ConvertQueueItem(string label, string[] entities)
    {
        var list = new List<QueueItem<byte[]>>();

        foreach (var entity in entities)
        {
            var queueItem = new QueueItem<byte[]>
            {
                Label = label,
                Payload = Encoding.Default.GetBytes(entity)
            };
            list.Add(queueItem);
        }

        return list;
    }

    private static void Enqueue2Router(QueueRouter queueRouter, IEnumerable<QueueItem<byte[]>> expected)
    {
        foreach (var i in expected)
        {
            queueRouter.Enqueue(i);
        }
    }
}


上一篇
四、Router 路由器(2) - Router key
下一篇
五、分配機制(1)
系列文
從零開始土炮MQ30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言