Executor 框架來簡化並優化多執行緒程式的開發,相較於傳統的多執行緒程式設計方法,Executor 框架提供更高層次的抽象,使開發者能夠更專注於業務邏輯,而不是繁瑣的執行緒管理細節。
Executor 框架是 Java 5 引入的一個用於管理非同步任務執行的高層次 API,提供一種將「任務提交」與「任務執行」分離的機制,簡化多執行緒程式的開發。
以下是 Executor 框架的核心組件:
Executor 介面:
這是框架的基礎,定義執行已提交的 Runnable 任務的方法。它的設計理念是將任務的提交與執行解耦,使得任務的執行方式可以靈活變化。
ExecutorService 介面:
這是 Executor 介面的擴展,提供管理執行緒生命週期的方法,包括任務的提交、執行狀態的追蹤,以及服務的關閉等功能。它還引入 Callable 介面,允許任務返回結果。
Executors 工廠類別:
這是一個工具類別,提供許多靜態方法來創建不同類型的 ExecutorService 實例。例如,可以創建固定大小的執行緒池、可快取的執行緒池、單一執行緒的執行器等。
Executor 框架的設計遵循「組合優於繼承」的原則,通過組合不同的介面和類別,提供豐富的功能和靈活的使用方式。這種設計使得開發者可以根據應用程式的需求,選擇最合適的執行器類型,並且能夠輕鬆地在不同類型的執行器之間切換,而無需大幅修改程式碼。
Executor 框架的核心元件包括以下幾個關鍵類別:
ThreadPoolExecutor:
這是 Executor 框架最核心的類別,實現 ExecutorService 介面,維護一個執行緒池和一個任務佇列,能夠有效地管理和重用執行緒。ThreadPoolExecutor 提供多個建構子,允許開發者精確控制執行緒池的行為,如核心執行緒數、最大執行緒數、執行緒存活時間等。
ScheduledThreadPoolExecutor:
這是 ThreadPoolExecutor 的一個特殊化版本,除具備一般執行緒池的功能外,還支援定時或週期性執行任務。它實現 ScheduledExecutorService 介面,可以安排任務在未來的某個時間點執行,或者按照固定的時間間隔重複執行。
FutureTask:
FutureTask 類別實現 RunnableFuture 介面,它既可以被當作 Runnable 使用,又可以追蹤任務的執行狀態和結果。當提交一個 Callable 任務到執行器時,它會被包裝成一個 FutureTask 對象。通過 FutureTask,我們可以取消任務、查詢任務是否完成,以及獲取任務的執行結果。
這些核心元件共同構成 Executor 框架的基礎,為開發者提供強大而靈活的多執行緒程式設計工具。ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 負責管理執行緒和調度任務,而 FutureTask 則提供任務執行狀態和結果的管理機制。
透過這些元件,開發者可以輕鬆實現複雜的並行處理邏輯,如批次處理、非同步計算、定時任務等,同時有效控制系統資源的使用,避免因創建過多執行緒而導致的效能問題。
Executor 框架的使用相對簡單,但要充分發揮其威力,需要解一些關鍵概念和方法。以下是 Executor 框架的基本使用步驟:
建立執行器:
使用 Executors 工廠類別或直接實例化 ThreadPoolExecutor 來創建執行器。
// 使用 Executors 工廠類別創建固定大小的執行緒池
ExecutorService executor = Executors.newFixedThreadPool(5);
// 或者直接實例化 ThreadPoolExecutor
ExecutorService customExecutor = new ThreadPoolExecutor(
3, 5, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>()
);
提交任務:
可以提交 Runnable 或 Callable 任務到執行器。
// 提交 Runnable 任務
executor.execute(() -> {
System.out.println("執行 Runnable 任務");
});
// 提交 Callable 任務
Future<String> future = executor.submit(() -> {
return "執行 Callable 任務的結果";
});
管理任務執行:
使用 Future 介面來管理任務的執行狀態和結果。
try {
String result = future.get(); // 等待並獲取任務結果
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
關閉執行器:
當不再需要執行器時,應該正確地關閉它以釋放資源。
executor.shutdown(); // 平滑關閉,等待所有任務完成
// 或者
executor.shutdownNow(); // 立即關閉,嘗試中斷正在執行的任務
使用 Executor 框架時,需要注意以下幾點:
選擇合適的執行器類型:根據任務的特性和系統的需求,選擇適當的執行器類型。例如,對於 CPU 密集型任務,可以使用固定大小的執行緒池;對於 I/O 密集型任務,可以使用可快取的執行緒池。
正確處理例外狀況:在任務執行過程中可能發生的例外狀況應該被適當地捕獲和處理,以避免影響整個執行器的運作。
合理設置執行緒池參數:根據系統的資源情況和預期的負載,合理設置執行緒池的大小和其他參數,以達到最佳的效能。
通過這些基本的使用方法,開發者可以輕鬆地將 Executor 框架整合到自己的應用程式中,實現高效的並行處理。
Executor 框架的核心是 ThreadPoolExecutor 類別,它的運作原理涉及執行緒池的管理、任務佇列的處理以及拒絕策略的應用。讓我們深入解這些機制:
執行緒池的管理機制:
ThreadPoolExecutor 維護一個核心執行緒池和一個最大執行緒池。當新任務提交時,如果運行的執行緒數小於核心執行緒數,則創建新執行緒;如果大於核心執行緒數但小於最大執行緒數,則將任務加入佇列;如果佇列已滿,則創建新執行緒直到達到最大執行緒數。
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 核心執行緒數
4, // 最大執行緒數
60L, TimeUnit.SECONDS, // 執行緒空閒時間
new LinkedBlockingQueue<Runnable>(10) // 任務佇列
);
任務佇列的處理:
當核心執行緒都在忙碌時,新提交的任務會被放入任務佇列。ThreadPoolExecutor 支援多種佇列類型,如 ArrayBlockingQueue、LinkedBlockingQueue 和 SynchronousQueue 等。佇列的選擇會影響執行器的行為和效能。
拒絕策略:
當執行器已達到最大執行緒數且任務佇列已滿時,新提交的任務將被拒絕。ThreadPoolExecutor 提供四種預設的拒絕策略:
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
執行緒的生命週期:
執行緒池中的執行緒遵循以下生命週期:
任務的執行過程:
當任務被提交到執行器時,它會經歷以下步驟:
Executor 框架除基本的任務執行功能外,還提供一些進階特性,使其能夠應對更複雜的並行處理場景。以下是一些重要的進階特性:
可調整的執行緒池:
ThreadPoolExecutor 提供動態調整執行緒池大小的方法。這允許在運行時根據系統負載調整執行緒池的參數。
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
executor.setCorePoolSize(10);
executor.setMaximumPoolSize(20);
定時任務執行:
ScheduledThreadPoolExecutor 支援定時執行和週期性執行任務。這對於實現定時任務、週期性維護等場景非常有用。
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.schedule(() -> System.out.println("延遲執行"), 5, TimeUnit.SECONDS);
scheduler.scheduleAtFixedRate(() -> System.out.println("固定頻率執行"), 0, 1, TimeUnit.MINUTES);
任務的取消與中斷:
Executor 框架提供取消和中斷正在執行任務的機制。這對於處理長時間運行的任務或需要提前終止的任務很有幫助。
Future<?> future = executor.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
// 執行任務邏輯
}
});
future.cancel(true); // 嘗試取消任務
執行緒工廠:
ThreadPoolExecutor 允許自定義 ThreadFactory,這使得我們可以控制如何創建新的執行緒,例如設置執行緒名稱、優先級等。
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("worker-thread-%d")
.build();
ExecutorService executor = new ThreadPoolExecutor(5, 10, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), namedThreadFactory);
執行緒池監控:
ThreadPoolExecutor 提供一些方法來監控執行緒池的狀態,如當前執行緒數、已完成任務數等。這對於系統監控和效能調優非常有用。
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
System.out.println("活躍執行緒數:" + executor.getActiveCount());
System.out.println("已完成任務數:" + executor.getCompletedTaskCount());
自定義執行緒池:
通過繼承 ThreadPoolExecutor 類別,我們可以創建自定義的執行緒池,實現特定的行為,如任務優先級排序、執行前後的鉤子方法等。
在使用 Executor 框架時,遵循一些最佳實踐可以幫助我們更有效地利用這個強大的工具,同時避免一些常見的陷阱。以下是一些重要的實踐建議:
選擇適當的執行器類型:
根據任務的特性和系統的需求,選擇合適的執行器類型。例如:
正確設定執行緒池大小:
執行緒池大小的設定對系統效能有重大影響。一般而言:
合理使用任務佇列:
選擇合適的任務佇列類型和大小,避免佇列過大導致記憶體問題,或過小導致頻繁拒絕任務。
處理未捕獲的例外狀況:
使用 UncaughtExceptionHandler 來處理執行緒中未捕獲的例外狀況,避免任務靜默失敗。
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
executor.setThreadFactory(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setUncaughtExceptionHandler((thread, ex) -> {
System.err.println("未捕獲的例外狀況:" + ex.getMessage());
});
return t;
}
});
正確關閉執行器:
在應用程式結束時,確保正確關閉執行器,釋放資源。
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException ie) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
避免執行緒洩漏:
確保所有提交的任務都能正常結束,避免因為任務無法結束導致執行緒池無法關閉。
監控和調整:
定期監控執行器的效能指標,如活躍執行緒數、已完成任務數等,並根據實際情況進行調整。
使用 Future 管理任務:
利用 Future 介面來管理非同步任務的執行狀態和結果,有助於實現更複雜的任務控制邏輯。
考慮使用 ForkJoinPool:
對於可以分解為小任務的大型並行任務,考慮使用 ForkJoinPool 來實現更高效的負載平衡。
適當使用 ThreadLocal:
在需要執行緒安全的情況下,合理使用 ThreadLocal 可以避免不必要的同步開銷。
本篇文章同步刊載: JYI.TW
筆者個人的網站: JUNYI