執行緒池是一種執行緒使用模式。它是一種預先創建並維護多個執行緒的技術,這些執行緒可以用來執行多個任務。當有新任務需要執行時,會從池中取出一個空閒的執行緒來執行任務,任務執行完畢後,該執行緒會被放回池中等待下一個任務。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
executor.execute(() -> {
System.out.println("執行任務: " + Thread.currentThread().getName());
});
}
executor.shutdown();
}
}
使用執行緒池主要有以下幾個原因:
執行緒池相較於傳統的執行緒使用方式有以下優勢:
// 使用執行緒池
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 100; i++) {
executor.execute(new Task());
}
// 不使用執行緒池
for (int i = 0; i < 100; i++) {
new Thread(new Task()).start();
}
資源管理:
提高可管理性:
提供更多功能:
ExecutorService executor = Executors.newFixedThreadPool(1);
Future<String> future = executor.submit(() -> {
Thread.sleep(1000);
return "任務完成";
});
System.out.println(future.get()); // 等待並獲取結果
通過使用執行緒池,我們可以更有效地管理並發任務,提高應用程式的性能和可靠性。
Java的Executor框架是一個強大的執行緒池實現,定義在java.util.concurrent包中,主要包含以下幾個核心介面和類。
Executor是最基本的執行器介面,只定義了一個方法:
public interface Executor {
void execute(Runnable command);
}
這個介面將任務的提交與執行解耦,使得我們可以輕鬆地改變任務執行的方式而不影響任務的提交。
使用示例:
Executor executor = Executors.newSingleThreadExecutor();
executor.execute(() -> System.out.println("Hello from Executor!"));
ExecutorService擴展了Executor介面,提供了更多的功能,包括管理執行器的生命週期和異步任務的處理。
主要方法包括:
submit()
:提交一個Callable或Runnable任務並返回FutureinvokeAll()
:執行給定的任務集合,返回一個Future列表invokeAny()
:執行給定的任務集合,返回其中一個成功完成的結果shutdown()
:關閉執行器,但允許已提交的任務繼續執行shutdownNow()
:立即關閉執行器,停止所有正在執行的任務使用示例:
ExecutorService service = Executors.newFixedThreadPool(2);
Future<String> future = service.submit(() -> "任務結果");
System.out.println(future.get());
service.shutdown();
Executors是一個工廠類,提供了創建各種預定義執行器的靜態方法。
主要方法包括:
newFixedThreadPool(int nThreads)
:創建固定大小的執行緒池newCachedThreadPool()
:創建一個可根據需要創建新執行緒的執行緒池newSingleThreadExecutor()
:創建只有一個執行緒的執行器newScheduledThreadPool(int corePoolSize)
:創建一個可以執行定時任務的執行緒池使用示例:
ExecutorService fixedPool = Executors.newFixedThreadPool(5);
ExecutorService cachedPool = Executors.newCachedThreadPool();
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(2);
// 使用固定大小的執行緒池
for (int i = 0; i < 10; i++) {
final int taskId = i;
fixedPool.execute(() -> System.out.println("Task " + taskId + " executed by " + Thread.currentThread().getName()));
}
// 使用可調度的執行緒池
scheduledPool.scheduleAtFixedRate(() -> System.out.println("Scheduled task"), 0, 1, TimeUnit.SECONDS);
// 記得在使用完畢後關閉執行器
fixedPool.shutdown();
cachedPool.shutdown();
singleThreadExecutor.shutdown();
scheduledPool.shutdown();
Java的Executor框架提供幾種預定義的執行器服務,每種都有其特定的用途和特性。
FixedThreadPool是一個固定大小的執行緒池,特點是:
使用示例:
ExecutorService fixedPool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
final int taskId = i;
fixedPool.execute(() -> {
System.out.println("Task " + taskId + " executed by " + Thread.currentThread().getName());
});
}
fixedPool.shutdown();
適用場景:
CachedThreadPool是一個可根據需要創建新執行緒的執行緒池。它的特點是:
使用示例:
ExecutorService cachedPool = Executors.newCachedThreadPool();
for (int i = 0; i < 100; i++) {
final int taskId = i;
cachedPool.execute(() -> {
System.out.println("Task " + taskId + " executed by " + Thread.currentThread().getName());
});
}
cachedPool.shutdown();
適用場景:
SingleThreadExecutor是只有一個執行緒的執行器,特點是:
使用示例:
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
final int taskId = i;
singleThreadExecutor.execute(() -> {
System.out.println("Task " + taskId + " executed by " + Thread.currentThread().getName());
});
}
singleThreadExecutor.shutdown();
適用場景:
ScheduledThreadPoolExecutor是一個可以執行定時任務的執行緒池,特點是:
使用示例:
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(2);
// 延遲3秒後執行
scheduledPool.schedule(() -> System.out.println("Delayed task"), 3, TimeUnit.SECONDS);
// 延遲1秒後開始執行,每2秒執行一次
scheduledPool.scheduleAtFixedRate(() -> System.out.println("Periodic task"), 1, 2, TimeUnit.SECONDS);
// 讓程式運行一段時間
Thread.sleep(10000);
scheduledPool.shutdown();
適用場景:
在選擇使用哪種執行器服務時,需要根據具體的應用場景和需求來決定。例如,如果需要限制並發執行緒數量,可以使用FixedThreadPool;如果需要執行大量短期異步任務,可以使用CachedThreadPool;如果需要保證任務順序執行,可以使用SingleThreadExecutor;如果需要執行定時任務,可以使用ScheduledThreadPoolExecutor。
在下一章節中,我們將探討如何向這些執行器提交任務,以及如何處理任務的執行結果。
在使用執行器服務時,我們需要知道如何正確地提交任務並處理執行結果。本章節將介紹幾種常用的任務提交方法。
execute()方法是最基本的任務提交方法,它來自Executor介面。這個方法接受一個Runnable對象,並且沒有返回值。
使用示例:
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute(() -> {
System.out.println("Task executed by " + Thread.currentThread().getName());
});
executor.shutdown();
特點:
submit()方法是ExecutorService介面提供的方法,它可以提交Runnable或Callable任務,並返回一個Future對象。
使用示例:
ExecutorService executor = Executors.newFixedThreadPool(2);
// 提交Runnable任務
Future<?> future1 = executor.submit(() -> {
System.out.println("Runnable task executed");
});
// 提交Callable任務
Future<String> future2 = executor.submit(() -> {
return "Callable task result";
});
System.out.println(future2.get()); // 輸出: Callable task result
executor.shutdown();
特點:
這兩個方法允許同時提交多個任務。
invokeAll()執行所有任務,並返回所有任務的Future列表:
ExecutorService executor = Executors.newFixedThreadPool(2);
List<Callable<String>> tasks = Arrays.asList(
() -> "Task 1 result",
() -> "Task 2 result",
() -> "Task 3 result"
);
List<Future<String>> futures = executor.invokeAll(tasks);
for (Future<String> future : futures) {
System.out.println(future.get());
}
executor.shutdown();
invokeAny()執行所有任務,但只返回其中一個成功完成的任務的結果:
ExecutorService executor = Executors.newFixedThreadPool(2);
List<Callable<String>> tasks = Arrays.asList(
() -> {
Thread.sleep(2000);
return "Task 1 result";
},
() -> "Task 2 result",
() -> {
Thread.sleep(1000);
return "Task 3 result";
}
);
String result = executor.invokeAny(tasks);
System.out.println(result); // 可能輸出: Task 2 result
executor.shutdown();
特點:
當使用submit()或invokeAll()方法時,我們得到的是Future對象,通過以下方式處理結果和異常:
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(() -> {
if (Math.random() < 0.5) {
throw new Exception("Task failed");
}
return "Task succeeded";
});
try {
String result = future.get();
System.out.println(result);
} catch (InterruptedException e) {
System.out.println("Task was interrupted");
} catch (ExecutionException e) {
System.out.println("Task threw an exception: " + e.getCause().getMessage());
}
executor.shutdown();
注意事項:
通過正確地提交任務和處理結果,我們可以充分利用執行器服務的功能,實現高效的並發程式設計。在下一章節中,我們將探討如何管理執行器的生命週期。
正確管理執行器的生命週期對於應用程式的穩定性和資源管理至關重要。
執行器在創建時就已經啟動,無需額外的啟動步驟。例如:
ExecutorService executor = Executors.newFixedThreadPool(5);
// 執行器已經準備好接受任務
ExecutorService提供兩種關閉執行器的方法:
ExecutorService executor = Executors.newFixedThreadPool(5);
executor.submit(() -> {
// 執行一些任務
});
executor.shutdown();
ExecutorService executor = Executors.newFixedThreadPool(5);
executor.submit(() -> {
// 執行一些任務
});
List<Runnable> notExecutedTasks = executor.shutdownNow();
在關閉執行器後,我們可能需要等待所有任務完成。
ExecutorService提供了幾種方法來實現這一點:
ExecutorService executor = Executors.newFixedThreadPool(5);
executor.submit(() -> {
// 執行一些長時間運行的任務
});
executor.shutdown();
try {
if (executor.awaitTermination(60, TimeUnit.SECONDS)) {
System.out.println("執行器已終止");
} else {
System.out.println("執行器在超時前未終止");
}
} catch (InterruptedException e) {
System.out.println("等待被中斷");
}
ExecutorService executor = Executors.newFixedThreadPool(5);
executor.shutdown();
while (!executor.isTerminated()) {
// 等待所有任務完成
}
System.out.println("所有任務已完成");
void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // 禁止新任務提交
try {
// 等待現有任務完成
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // 取消當前執行的任務
// 等待任務響應中斷
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("執行器未能終止");
}
} catch (InterruptedException ie) {
// (重新)取消如果當前執行緒也被中斷
pool.shutdownNow();
// 保留中斷狀態
Thread.currentThread().interrupt();
}
}
在某些情況下,我們可能需要處理那些在執行器關閉時尚未完成的任務:
ExecutorService executor = Executors.newFixedThreadPool(5);
// 提交一些任務...
List<Runnable> notExecutedTasks = executor.shutdownNow();
for (Runnable task : notExecutedTasks) {
// 處理未執行的任務,例如重新安排或記錄
System.out.println("未執行的任務: " + task);
}
正確管理執行器的生命週期可以確保:
雖然Java提供了幾種預定義的執行器服務,但在某些情況下,我們可能需要更細緻的控制來滿足特定的應用需求,我們可以使用ThreadPoolExecutor類來自定義執行緒池。
ThreadPoolExecutor是Java執行緒池的核心實現類,允許我們根據具體需求調整執行緒池的行為。
基本構造函數:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize:核心執行緒數
maximumPoolSize:最大執行緒數
keepAliveTime:執行緒空閒時間
unit:keepAliveTime參數的時間單位
workQueue:工作隊列
threadFactory:執行緒工廠
handler:拒絕策略
以下是一個自定義執行緒池的示例:
import java.util.concurrent.*;
public class CustomThreadPoolExample {
public static void main(String[] args) {
int corePoolSize = 2;
int maximumPoolSize = 4;
long keepAliveTime = 10;
TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
ThreadFactory threadFactory = new ThreadFactory() {
private int count = 1;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "Custom-Thread-" + count++);
}
};
RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
// 提交任務
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId + " is running on " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
// 關閉執行器
executor.shutdown();
}
}
ThreadPoolExecutor提供了許多方法來動態調整和監控執行緒池:
// 動態調整核心執行緒數
executor.setCorePoolSize(newSize);
// 動態調整最大執行緒數
executor.setMaximumPoolSize(newSize);
// 獲取當前執行緒數
int threadCount = executor.getPoolSize();
// 獲取活躍執行緒數
int activeCount = executor.getActiveCount();
// 獲取已完成任務數
long completedTaskCount = executor.getCompletedTaskCount();
通過自定義執行緒池,我們可以更精確地控制執行緒的創建、銷毀以及任務的處理方式,從而優化應用程式的性能和資源使用。在實際應用中,應根據具體的負載特性和系統資源來調整這些參數,以達到最佳效果。
Fork/Join框架是Java 7引入的一個用於並行執行任務的框架,是ExecutorService接口的一個實現。
這個框架被設計用來有效地使用多處理器系統,特別適合於使用分治法來解決問題。
分治法是一種解決複雜問題的方法,它的基本思想是:
ForkJoinPool是Fork/Join框架的核心類,它實現了工作竊取算法(work-stealing algorithm)。
基本用法:
ForkJoinPool pool = new ForkJoinPool();
Result result = pool.invoke(new MyTask());
或者使用公共的ForkJoinPool:
Result result = ForkJoinPool.commonPool().invoke(new MyTask());
Fork/Join框架提供了兩個主要的任務類:
這兩個類都繼承自ForkJoinTask。
使用RecursiveTask的例子 - 計算斐波那契數列:
import java.util.concurrent.RecursiveTask;
public class FibonacciTask extends RecursiveTask<Integer> {
final int n;
FibonacciTask(int n) {
this.n = n;
}
@Override
protected Integer compute() {
if (n <= 1)
return n;
FibonacciTask f1 = new FibonacciTask(n - 1);
f1.fork();
FibonacciTask f2 = new FibonacciTask(n - 2);
return f2.compute() + f1.join();
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
System.out.println(pool.invoke(new FibonacciTask(10)));
}
}
使用並行流的例子:
import java.util.Arrays;
public class ParallelStreamExample {
public static void main(String[] args) {
long[] numbers = new long[1000000];
Arrays.fill(numbers, 1);
long sum = Arrays.stream(numbers).parallel().sum();
System.out.println("Sum: " + sum);
}
}
通過使用Fork/Join框架,我們可以更容易地實現複雜的並行計算,特別是對於那些可以被分解成小任務的大規模問題。
任務特性:
系統資源:
int processors = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(processors);
ExecutorService executor = Executors.newFixedThreadPool(4);
try {
// 執行任務
} finally {
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
}
executor.submit(() -> {
try {
// 任務邏輯
} catch (Exception e) {
// 處理異常
}
});
int corePoolSize = 2;
int maxPoolSize = 4;
long keepAliveTime = 60L;
TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100);
RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue, handler);
while (!Thread.currentThread().isInterrupted()) {
// 任務邏輯
}
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
System.out.println("Active threads: " + executor.getActiveCount());
System.out.println("Completed tasks: " + executor.getCompletedTaskCount());
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
遵循這些最佳實踐並注意潛在的問題,可以幫助我們更有效地使用Java執行緒池和執行器框架,提高應用程序的性能和可靠性。在實際應用中,應該根據具體情況進行調整和優化。
本篇文章同步刊載: JYI.TW
筆者個人的網站: JUNYI