今天介紹 Worker Thread 模式,它是用來處理客戶請求工作的執行緒模式。例如有多個客戶每一個客戶都有一個案子要交給工人來做,這個工人的數量有可能會少於客戶的數量,因此同一個工人有可能會去做不同客戶的事情。此模式主要的目的就是要將客戶要做的事情和要做的工人拆開成不同的執行緒執行。這樣的好處是如果發現客戶的數量變多時就可以增加工人的數量來處理或者說客戶的數量變少時就可以減少工人的數量,使得在使用系統資源時具有可伸縮性,可以按照情況來增減資源。
以下是使用 Worker Thread 模式的 Sample Code 如下:
import java.util.LinkedList;
import java.util.Queue;
public class Channel {
private int workerNum;
private final Queue<Request> queue = new LinkedList<Request>();
public Channel(int workerNum) {
this.workerNum = workerNum;
}
public void startWorkerThread() {
for (int i = 0 ; i < workerNum ; i++) {
Thread thread = new Thread(new WorkerThread(this));
thread.start();
}
}
public synchronized void put(Request request) {
while (queue.size() >= 10) {
try {
wait();
} catch(Exception e) {
throw new RuntimeException(e);
}
}
queue.offer(request);
notifyAll();
}
public synchronized Request take() {
while (queue.peek() == null) {
try {
wait();
} catch(Exception e) {
throw new RuntimeException(e);
}
}
notifyAll();
return queue.remove();
}
}
在這裡 Channel 類別是 Worker Thread 程式的主要核心的地方,它主要有 startWorkerThread 方法用啟動工人(Worker) 的執行緒,要有幾個工人執行緒的數量由主程式來決定。put 方法主要給客戶的執行緒使用的,如果發現接到超過或等於 10 個案子就需要暫停接案子的動作,不然太多案子可能會做不完造成爆炸。另外一個 take 方法就是會給工人執行緒使用,以下是工人執行緒的 Sample Code:
public class WorkerThread implements Runnable {
private Channel channel;
public WorkerThread(Channel channel) {
this.channel = channel;
}
@Override
public void run() {
while(true) {
Request request = this.channel.take();
request.execute();
}
}
}
WorkerThread 主要是工人會把工作拿回來,然後呼叫 execute 的方法來執行。執行的邏輯會放在 Request 物件的 execute 方法裡,以下是 Request 的程式:
public class Request {
private String message;
public Request(String message) {
this.message = message;
}
public void execute() {
String currentThreadName = Thread.currentThread().getName();
System.out.println("message:" + this.message + " Worker Thread is:" + currentThreadName);
}
}
Request 主要就是會在建構子裡面接收客戶要執行的情求,然後在 execute 方法裡面處理要如何工作,在這裡就是直接把執行緒名稱和 message 訊息印出來,以下是 ClientThread 的執行緒程式:
public class ClientThread implements Runnable {
private String clientName;
private Channel channel;
public ClientThread(String clientName, Channel channel) {
this.clientName = clientName;
this.channel = channel;
}
@Override
public void run() {
while(true) {
try {
Thread.sleep(1000L);
} catch(Exception e) {
throw new RuntimeException(e);
}
Request request = new Request(clientName);
this.channel.put(request);
}
}
}
ClientThread 用來傳遞執行訊息的內容,然後放入到 channel 的 queue 裡面,等待工人拿出來處理,以下是主程式的內容:
public class Test {
public static void main(String args[]) {
Channel channel = new Channel(2);
channel.startWorkerThread();
Thread clientThread1 = new Thread(new ClientThread("client1", channel));
Thread clientThread2 = new Thread(new ClientThread("client2", channel));
clientThread1.start();
clientThread2.start();
}
}
在主程式裡面會在 Channel 的建構子裡面給 2,主要是會開 2 個 WorkerThread 執行緒,呼叫 startWorkerThread 才會真正的啟動執行緒。之後就會啟動 2 個 ClientThread 客戶執行緒來執行給工作的請求,以下是執行的結果:
message:client2 Worker Thread is:Thread-0
message:client1 Worker Thread is:Thread-1
message:client2 Worker Thread is:Thread-1
message:client1 Worker Thread is:Thread-1
message:client2 Worker Thread is:Thread-0
message:client1 Worker Thread is:Thread-0
message:client2 Worker Thread is:Thread-1
message:client2 Worker Thread is:Thread-0
message:client1 Worker Thread is:Thread-1
從執行結果可以看出 client1 的工作有可能由 Thread-0 或是 Thread-1 來做,client2 的工作也有可能由 Thread-0 或是 Thread-1 來做,所以客戶給的工作是隨機的分配工人來執行。