iT邦幫忙

2019 iT 邦幫忙鐵人賽

DAY 28
0
Software Development

30 天介紹 Java 的 Thread系列 第 28

Day 28 介紹 Producer Consumer 模式

  • 分享至 

  • xImage
  •  

今天要介紹一種常見的模式 Producer Consumer 模式,它主要是將生產資料和消費資料二個部份分開來處理,在 Producer 和 Consumer 之間會放一個 Queue 主要的目的是當產生資料時會放入 Queue,然後消費資料時會再從 Queue 裡面把資料拿出來。這樣的好處是可以讓程式能夠解耦合,在生產資料時就只負責生產資料,例如可以將從檔案或是從資料庫讀取出來的資料放入到 Queue 裡面,然後再 Consumer 時就不會去管資料來源是檔案還是資料庫只要讀到 Queue 裡面的資料就進行處理像是寫到其它的目的檔案系統裡面。這樣在實作 Code 時也會比較簡單容易,分工上也會變得比較容易拆開。

以下是 Producer Consumer 模式的 Sample code,主要會開二個執行緒一個負責生產資料,另外一個負責消費資料,然後會在生產者和消費者這二個執行緒之間放一個 Channel 用來當作放資料的 Queue,決定是否要繼續生產資料或是要消費資料的邏輯。程式如下:

import java.util.LinkedList;
import java.util.Queue;

public class Channel {
  private final Queue<String> queue = new LinkedList<String>();

  public synchronized void put(String message) {
    while (queue.size() >= 10) {
        try {
            wait();
        } catch(Exception e) {
            throw new RuntimeException(e);
        }
    }

    queue.offer(message);
    notifyAll();
  }

  public synchronized String take() {
    while (queue.peek() == null) {
      try {
        wait();
      } catch(Exception e) {
        throw new RuntimeException(e);
      }
    }
    notifyAll();
    return queue.remove();
  }
}

Channel 的類別會宣告一個 queue 變數是用來存取資料使用的,put 方法是給 Producer 生產資料使用,當 queue 裡面放入超過 10 個元素時會使用 wait 方法等待執行,目的是避免 Queue 會有爆掉的問題,當資料放入 queue 之後就可以通知像是 Consumer 可以消費資料了。take 方法是給 Consumer 消費資料使用,會先看 queue 裡面是否有資料,如果沒有的話會先呼叫 wait 方法等待,等到有資料之後就會從 queue 裡面取出來。

Producer 執行緒程式如下:

public class ProducerThread implements Runnable {
  private Channel channel;
  private static int id = 0;

  public ProducerThread(Channel channel) {
    this.channel = channel;
  }

  @Override
  public void run() {
    try {
      while (true) {
        Thread.sleep(1000L);
        String message = "message-" + nextId();
        System.out.println("Producer:" + message);
        this.channel.put(message);
      }
    } catch(Exception e) {
      throw new RuntimeException(e);
    }
  }

  private static synchronized int nextId() {
    return id++;
  }
}

Producer 執行緒程式主要會產生 message 的訊息,然後放入到 Queue 裡面去。Consumer 執行緒程式如下:

public class ConsumerThread implements Runnable {
  private Channel channel;

  public ConsumerThread(Channel channel) {
    this.channel = channel;
  }

  @Override
  public void run() {
    try {
      while (true) {
        String message = this.channel.take();
        System.out.println("Consumer:" + message);
      }
    } catch(Exception e) {
      throw new RuntimeException(e);
    }
  }
}

Consumer 執行緒程式會將 Queue 裡面的 message 資料取出來之後,然後印到 Console 上,主程式如下:

public class Test {
  public static void main(String args[]) {
     Channel channel = new Channel();
     Thread producerThread1 = new Thread(new ProducerThread(channel));
     Thread producerThread2 = new Thread(new ProducerThread(channel));

     Thread consumerThread = new Thread(new ConsumerThread(channel));

     producerThread1.start();
     producerThread2.start();
     consumerThread.start();
  }
}

在主程式裡面會開二個 Producer 執行緒程式和一個 Consumer 執行緒程式,程式的執行結果如下:

Producer:message-0
Consumer:message-0
Producer:message-1
Consumer:message-1
Producer:message-2
Consumer:message-2
Producer:message-3
Consumer:message-3
Producer:message-4
Consumer:message-4
Producer:message-5

如果 Producer 生產資料的速度大於 Consumer 消費資料的速度時,就有可能將記憶體塞滿,造成 Out Of Memory 的問題。這也就是會在 Channel 類別的 put 方法,判斷當 queue 的 size 大於等於 10 就進行等待的原因。


上一篇
Day 27 介紹 Guarded Suspension 模式
下一篇
Day 29 介紹 Worker Thread 模式
系列文
30 天介紹 Java 的 Thread30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言