iT邦幫忙

第 11 屆 iThome 鐵人賽

DAY 14
1
Software Development

從零開始土炮MQ系列 第 14

五、分配機制(2) - 同質性分派

  • 分享至 

  • xImage
  •  

5.2 同質性分派

情境一:當 Queue 中存在多筆相同類型的資料,禁止 Consumer 直接取得資料,中間需經由 Dispatch 去分配資料。

這個是與生產者消費者模式的應用情境相似,只是 consumer 無法直接跟 Queue 取得資源,必須依賴 Dispatch 的分派。也因為如此,主動權由 Consumer 移致 Dispatch。

當有多個 conumer 同時跟 queue 請求資料時,Dispatch 會依據 consumer 請求的順序,依先進先出的原則,逐一將資料回傳給 consumer。

https://ithelp.ithome.com.tw/upload/images/20190930/20107551riybnaOsyA.png
將前面提到的 IDispatch<T>IConsumer<T> 進行迭代。

public interface IDispatch<T>
{
    void Binding(ConcurrentQueue<T> queue);

    void Register(IConsumer<T> consumer);

    T Push();
}

public interface IConsumer<T>
{
    void Binding(IDispatch<T> dispatch);
}

接下來,進行實作的程式。

同時,因為實作內容的 int 同質性的處理,Consumber 的部份,就偷懶直接指定為 int 的類別。

public class HomogeneityDispatch<T> : IDispatch<T>
{
    private ConcurrentQueue<T> _bindingQueue;
    private List<IConsumer<T>> _consumers;

    public void Binding(ConcurrentQueue<T> queue)
    {
        _bindingQueue = queue;
    }

    public void Register(IConsumer<T> consumer)
    {
        consumer.Binding(this);
        _consumers.Add(consumer);
    }

    public T Push()
    {
        if (_bindingQueue.IsEmpty)
            return default(T);

        _bindingQueue.TryDequeue(out T result);

        return result;
    }
}

public class Consumer : IConsumer<int>
{
    protected IDispatch<int> _dispatch;

    public void Binding(IDispatch<int> dispatch)
    {
        _dispatch = dispatch;
    }

    public virtual void DoWork()
    {
    }
}

接著,簡單測試兩種情境。

//	測試案例
private class ConsumerTest : Consumer
{
    public List<int> Queue { get; } = new List<int>();

    public override void DoWork()
    {
        Queue.Add(_dispatch.Push());
    }
}

[TestMethod]
public void ConsumerTest_單consumer取值_資料順序相同()
{
    var expect = new int[] { 24, 12, 16, 22 };
    ConcurrentQueue<int> queue = new ConcurrentQueue<int>(expect);

    var dispatch = new HomogeneityDispatch<int>();
    dispatch.Binding(queue);

    var consumer1 = new ConsumerTest();
    dispatch.Register(consumer1);

    consumer1.DoWork();
    consumer1.DoWork();
    consumer1.DoWork();
    consumer1.DoWork();

    var actual = consumer1.Queue.ToArray();

    expect.ToExpectedObject().ShouldEqual(actual);
}

[TestMethod]
public void ConsumerTest_當2個consumer輸流取值時_取回資料與順序相符()
{
    ConcurrentQueue<int> queue = new ConcurrentQueue<int>(new[] { 1, 2, 3, 4 });

    var dispatch = new HomogeneityDispatch<int>();
    dispatch.Binding(queue);

    var consumer1 = new ConsumerTest();
    var consumer2 = new ConsumerTest();

    dispatch.Register(consumer1);
    dispatch.Register(consumer2);

    consumer1.DoWork();
    consumer2.DoWork();
    consumer1.DoWork();

    var actual = consumer1.Queue.ToArray();

    (new[] { 1, 3 }).ToExpectedObject().ShouldEqual(actual);

    actual = consumer2.Queue.ToArray();
    (new[] { 2 }).ToExpectedObject().ShouldEqual(actual);
}

上一篇
五、分配機制(1)
下一篇
五、分配機制(3) - 異質性分派
系列文
從零開始土炮MQ30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言