今天要介紹一種常見的模式 Producer Consumer 模式,它主要是將生產資料和消費資料二個部份分開來處理,在 Producer 和 Consumer 之間會放一個 Queue 主要的目的是當產生資料時會放入 Queue,然後消費資料時會再從 Queue 裡面把資料拿出來。這樣的好處是可以讓程式能夠解耦合,在生產資料時就只負責生產資料,例如可以將從檔案或是從資料庫讀取出來的資料放入到 Queue 裡面,然後再 Consumer 時就不會去管資料來源是檔案還是資料庫只要讀到 Queue 裡面的資料就進行處理像是寫到其它的目的檔案系統裡面。這樣在實作 Code 時也會比較簡單容易,分工上也會變得比較容易拆開。
以下是 Producer Consumer 模式的 Sample code,主要會開二個執行緒一個負責生產資料,另外一個負責消費資料,然後會在生產者和消費者這二個執行緒之間放一個 Channel 用來當作放資料的 Queue,決定是否要繼續生產資料或是要消費資料的邏輯。程式如下:
import java.util.LinkedList;
import java.util.Queue;
public class Channel {
private final Queue<String> queue = new LinkedList<String>();
public synchronized void put(String message) {
while (queue.size() >= 10) {
try {
wait();
} catch(Exception e) {
throw new RuntimeException(e);
}
}
queue.offer(message);
notifyAll();
}
public synchronized String take() {
while (queue.peek() == null) {
try {
wait();
} catch(Exception e) {
throw new RuntimeException(e);
}
}
notifyAll();
return queue.remove();
}
}
Channel 的類別會宣告一個 queue 變數是用來存取資料使用的,put 方法是給 Producer 生產資料使用,當 queue 裡面放入超過 10 個元素時會使用 wait 方法等待執行,目的是避免 Queue 會有爆掉的問題,當資料放入 queue 之後就可以通知像是 Consumer 可以消費資料了。take 方法是給 Consumer 消費資料使用,會先看 queue 裡面是否有資料,如果沒有的話會先呼叫 wait 方法等待,等到有資料之後就會從 queue 裡面取出來。
Producer 執行緒程式如下:
public class ProducerThread implements Runnable {
private Channel channel;
private static int id = 0;
public ProducerThread(Channel channel) {
this.channel = channel;
}
@Override
public void run() {
try {
while (true) {
Thread.sleep(1000L);
String message = "message-" + nextId();
System.out.println("Producer:" + message);
this.channel.put(message);
}
} catch(Exception e) {
throw new RuntimeException(e);
}
}
private static synchronized int nextId() {
return id++;
}
}
Producer 執行緒程式主要會產生 message 的訊息,然後放入到 Queue 裡面去。Consumer 執行緒程式如下:
public class ConsumerThread implements Runnable {
private Channel channel;
public ConsumerThread(Channel channel) {
this.channel = channel;
}
@Override
public void run() {
try {
while (true) {
String message = this.channel.take();
System.out.println("Consumer:" + message);
}
} catch(Exception e) {
throw new RuntimeException(e);
}
}
}
Consumer 執行緒程式會將 Queue 裡面的 message 資料取出來之後,然後印到 Console 上,主程式如下:
public class Test {
public static void main(String args[]) {
Channel channel = new Channel();
Thread producerThread1 = new Thread(new ProducerThread(channel));
Thread producerThread2 = new Thread(new ProducerThread(channel));
Thread consumerThread = new Thread(new ConsumerThread(channel));
producerThread1.start();
producerThread2.start();
consumerThread.start();
}
}
在主程式裡面會開二個 Producer 執行緒程式和一個 Consumer 執行緒程式,程式的執行結果如下:
Producer:message-0
Consumer:message-0
Producer:message-1
Consumer:message-1
Producer:message-2
Consumer:message-2
Producer:message-3
Consumer:message-3
Producer:message-4
Consumer:message-4
Producer:message-5
如果 Producer 生產資料的速度大於 Consumer 消費資料的速度時,就有可能將記憶體塞滿,造成 Out Of Memory 的問題。這也就是會在 Channel 類別的 put 方法,判斷當 queue 的 size 大於等於 10 就進行等待的原因。