iT邦幫忙

2024 iThome 鐵人賽

DAY 21
0
自我挑戰組

我的Java自學之路:一個轉職者的30篇技術統整系列 第 21

多執行緒與並行程式設計:Executor框架的使用及運作原理

  • 分享至 

  • xImage
  •  

1. 引言

Executor 框架來簡化並優化多執行緒程式的開發,相較於傳統的多執行緒程式設計方法,Executor 框架提供更高層次的抽象,使開發者能夠更專注於業務邏輯,而不是繁瑣的執行緒管理細節。

2. Executor框架概述

Executor 框架是 Java 5 引入的一個用於管理非同步任務執行的高層次 API,提供一種將「任務提交」與「任務執行」分離的機制,簡化多執行緒程式的開發。
以下是 Executor 框架的核心組件:

  1. Executor 介面:
    這是框架的基礎,定義執行已提交的 Runnable 任務的方法。它的設計理念是將任務的提交與執行解耦,使得任務的執行方式可以靈活變化。

  2. ExecutorService 介面:
    這是 Executor 介面的擴展,提供管理執行緒生命週期的方法,包括任務的提交、執行狀態的追蹤,以及服務的關閉等功能。它還引入 Callable 介面,允許任務返回結果。

  3. Executors 工廠類別:
    這是一個工具類別,提供許多靜態方法來創建不同類型的 ExecutorService 實例。例如,可以創建固定大小的執行緒池、可快取的執行緒池、單一執行緒的執行器等。

Executor 框架的設計遵循「組合優於繼承」的原則,通過組合不同的介面和類別,提供豐富的功能和靈活的使用方式。這種設計使得開發者可以根據應用程式的需求,選擇最合適的執行器類型,並且能夠輕鬆地在不同類型的執行器之間切換,而無需大幅修改程式碼。

3. Executor框架的核心元件

Executor 框架的核心元件包括以下幾個關鍵類別:

  1. ThreadPoolExecutor:
    這是 Executor 框架最核心的類別,實現 ExecutorService 介面,維護一個執行緒池和一個任務佇列,能夠有效地管理和重用執行緒。ThreadPoolExecutor 提供多個建構子,允許開發者精確控制執行緒池的行為,如核心執行緒數、最大執行緒數、執行緒存活時間等。

  2. ScheduledThreadPoolExecutor:
    這是 ThreadPoolExecutor 的一個特殊化版本,除具備一般執行緒池的功能外,還支援定時或週期性執行任務。它實現 ScheduledExecutorService 介面,可以安排任務在未來的某個時間點執行,或者按照固定的時間間隔重複執行。

  3. FutureTask:
    FutureTask 類別實現 RunnableFuture 介面,它既可以被當作 Runnable 使用,又可以追蹤任務的執行狀態和結果。當提交一個 Callable 任務到執行器時,它會被包裝成一個 FutureTask 對象。通過 FutureTask,我們可以取消任務、查詢任務是否完成,以及獲取任務的執行結果。

這些核心元件共同構成 Executor 框架的基礎,為開發者提供強大而靈活的多執行緒程式設計工具。ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 負責管理執行緒和調度任務,而 FutureTask 則提供任務執行狀態和結果的管理機制。

透過這些元件,開發者可以輕鬆實現複雜的並行處理邏輯,如批次處理、非同步計算、定時任務等,同時有效控制系統資源的使用,避免因創建過多執行緒而導致的效能問題。

4. Executor框架的使用方法

Executor 框架的使用相對簡單,但要充分發揮其威力,需要解一些關鍵概念和方法。以下是 Executor 框架的基本使用步驟:

  1. 建立執行器:
    使用 Executors 工廠類別或直接實例化 ThreadPoolExecutor 來創建執行器。

    // 使用 Executors 工廠類別創建固定大小的執行緒池
    ExecutorService executor = Executors.newFixedThreadPool(5);
    
    // 或者直接實例化 ThreadPoolExecutor
    ExecutorService customExecutor = new ThreadPoolExecutor(
        3, 5, 60L, TimeUnit.SECONDS,
        new LinkedBlockingQueue<Runnable>()
    );
    
  2. 提交任務:
    可以提交 Runnable 或 Callable 任務到執行器。

    // 提交 Runnable 任務
    executor.execute(() -> {
        System.out.println("執行 Runnable 任務");
    });
    
    // 提交 Callable 任務
    Future<String> future = executor.submit(() -> {
        return "執行 Callable 任務的結果";
    });
    
  3. 管理任務執行:
    使用 Future 介面來管理任務的執行狀態和結果。

    try {
        String result = future.get(); // 等待並獲取任務結果
        System.out.println(result);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    
  4. 關閉執行器:
    當不再需要執行器時,應該正確地關閉它以釋放資源。

    executor.shutdown(); // 平滑關閉,等待所有任務完成
    // 或者
    executor.shutdownNow(); // 立即關閉,嘗試中斷正在執行的任務
    

使用 Executor 框架時,需要注意以下幾點:

  • 選擇合適的執行器類型:根據任務的特性和系統的需求,選擇適當的執行器類型。例如,對於 CPU 密集型任務,可以使用固定大小的執行緒池;對於 I/O 密集型任務,可以使用可快取的執行緒池。

  • 正確處理例外狀況:在任務執行過程中可能發生的例外狀況應該被適當地捕獲和處理,以避免影響整個執行器的運作。

  • 合理設置執行緒池參數:根據系統的資源情況和預期的負載,合理設置執行緒池的大小和其他參數,以達到最佳的效能。

通過這些基本的使用方法,開發者可以輕鬆地將 Executor 框架整合到自己的應用程式中,實現高效的並行處理。

5. Executor框架的運作原理

Executor 框架的核心是 ThreadPoolExecutor 類別,它的運作原理涉及執行緒池的管理、任務佇列的處理以及拒絕策略的應用。讓我們深入解這些機制:

  1. 執行緒池的管理機制:
    ThreadPoolExecutor 維護一個核心執行緒池和一個最大執行緒池。當新任務提交時,如果運行的執行緒數小於核心執行緒數,則創建新執行緒;如果大於核心執行緒數但小於最大執行緒數,則將任務加入佇列;如果佇列已滿,則創建新執行緒直到達到最大執行緒數。

    ThreadPoolExecutor executor = new ThreadPoolExecutor(
        2,  // 核心執行緒數
        4,  // 最大執行緒數
        60L, TimeUnit.SECONDS,  // 執行緒空閒時間
        new LinkedBlockingQueue<Runnable>(10)  // 任務佇列
    );
    
  2. 任務佇列的處理:
    當核心執行緒都在忙碌時,新提交的任務會被放入任務佇列。ThreadPoolExecutor 支援多種佇列類型,如 ArrayBlockingQueue、LinkedBlockingQueue 和 SynchronousQueue 等。佇列的選擇會影響執行器的行為和效能。

  3. 拒絕策略:
    當執行器已達到最大執行緒數且任務佇列已滿時,新提交的任務將被拒絕。ThreadPoolExecutor 提供四種預設的拒絕策略:

    • AbortPolicy:拋出 RejectedExecutionException 異常(預設策略)
    • CallerRunsPolicy:在呼叫者的執行緒中執行任務
    • DiscardPolicy:直接丟棄任務
    • DiscardOldestPolicy:丟棄佇列中最老的任務,然後重試執行當前任務
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    
  4. 執行緒的生命週期:
    執行緒池中的執行緒遵循以下生命週期:

    • 當核心執行緒數小於設定值時,新建執行緒
    • 執行任務
    • 任務執行完成後,檢查是否有新任務,如果有則繼續執行
    • 如果沒有新任務,等待設定的空閒時間
    • 如果在空閒時間內沒有新任務且當前執行緒數超過核心執行緒數,則終止執行緒
  5. 任務的執行過程:
    當任務被提交到執行器時,它會經歷以下步驟:

    1. 檢查核心執行緒池是否已滿,如果未滿則創建新執行緒執行任務
    2. 如果核心執行緒池已滿,則將任務加入佇列
    3. 如果佇列已滿,則嘗試創建新執行緒(不超過最大執行緒數)
    4. 如果無法創建新執行緒,則應用拒絕策略

6. Executor框架的進階特性

Executor 框架除基本的任務執行功能外,還提供一些進階特性,使其能夠應對更複雜的並行處理場景。以下是一些重要的進階特性:

  1. 可調整的執行緒池:
    ThreadPoolExecutor 提供動態調整執行緒池大小的方法。這允許在運行時根據系統負載調整執行緒池的參數。

    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
    executor.setCorePoolSize(10);
    executor.setMaximumPoolSize(20);
    
  2. 定時任務執行:
    ScheduledThreadPoolExecutor 支援定時執行和週期性執行任務。這對於實現定時任務、週期性維護等場景非常有用。

    ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    scheduler.schedule(() -> System.out.println("延遲執行"), 5, TimeUnit.SECONDS);
    scheduler.scheduleAtFixedRate(() -> System.out.println("固定頻率執行"), 0, 1, TimeUnit.MINUTES);
    
  3. 任務的取消與中斷:
    Executor 框架提供取消和中斷正在執行任務的機制。這對於處理長時間運行的任務或需要提前終止的任務很有幫助。

    Future<?> future = executor.submit(() -> {
        while (!Thread.currentThread().isInterrupted()) {
            // 執行任務邏輯
        }
    });
    future.cancel(true); // 嘗試取消任務
    
  4. 執行緒工廠:
    ThreadPoolExecutor 允許自定義 ThreadFactory,這使得我們可以控制如何創建新的執行緒,例如設置執行緒名稱、優先級等。

    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
        .setNameFormat("worker-thread-%d")
        .build();
    ExecutorService executor = new ThreadPoolExecutor(5, 10, 0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>(), namedThreadFactory);
    
  5. 執行緒池監控:
    ThreadPoolExecutor 提供一些方法來監控執行緒池的狀態,如當前執行緒數、已完成任務數等。這對於系統監控和效能調優非常有用。

    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
    System.out.println("活躍執行緒數:" + executor.getActiveCount());
    System.out.println("已完成任務數:" + executor.getCompletedTaskCount());
    
  6. 自定義執行緒池:
    通過繼承 ThreadPoolExecutor 類別,我們可以創建自定義的執行緒池,實現特定的行為,如任務優先級排序、執行前後的鉤子方法等。

7. Executor框架的實踐

在使用 Executor 框架時,遵循一些最佳實踐可以幫助我們更有效地利用這個強大的工具,同時避免一些常見的陷阱。以下是一些重要的實踐建議:

  1. 選擇適當的執行器類型:
    根據任務的特性和系統的需求,選擇合適的執行器類型。例如:

    • 對於數量固定的短期任務,使用 FixedThreadPool
    • 對於需要長期運行的任務,使用 CachedThreadPool
    • 對於需要定時或週期執行的任務,使用 ScheduledThreadPool
    • 對於單線程順序執行的任務,使用 SingleThreadExecutor
  2. 正確設定執行緒池大小:
    執行緒池大小的設定對系統效能有重大影響。一般而言:

    • 對於 CPU 密集型任務,執行緒池大小通常設為 CPU 核心數 + 1
    • 對於 I/O 密集型任務,執行緒池大小可以設得更大,如 CPU 核心數 * 2
  3. 合理使用任務佇列:
    選擇合適的任務佇列類型和大小,避免佇列過大導致記憶體問題,或過小導致頻繁拒絕任務。

  4. 處理未捕獲的例外狀況:
    使用 UncaughtExceptionHandler 來處理執行緒中未捕獲的例外狀況,避免任務靜默失敗。

    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
    executor.setThreadFactory(new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setUncaughtExceptionHandler((thread, ex) -> {
                System.err.println("未捕獲的例外狀況:" + ex.getMessage());
            });
            return t;
        }
    });
    
  5. 正確關閉執行器:
    在應用程式結束時,確保正確關閉執行器,釋放資源。

    executor.shutdown();
    try {
        if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
            executor.shutdownNow();
        }
    } catch (InterruptedException ie) {
        executor.shutdownNow();
        Thread.currentThread().interrupt();
    }
    
  6. 避免執行緒洩漏:
    確保所有提交的任務都能正常結束,避免因為任務無法結束導致執行緒池無法關閉。

  7. 監控和調整:
    定期監控執行器的效能指標,如活躍執行緒數、已完成任務數等,並根據實際情況進行調整。

  8. 使用 Future 管理任務:
    利用 Future 介面來管理非同步任務的執行狀態和結果,有助於實現更複雜的任務控制邏輯。

  9. 考慮使用 ForkJoinPool:
    對於可以分解為小任務的大型並行任務,考慮使用 ForkJoinPool 來實現更高效的負載平衡。

  10. 適當使用 ThreadLocal:
    在需要執行緒安全的情況下,合理使用 ThreadLocal 可以避免不必要的同步開銷。

本篇文章同步刊載: JYI.TW
筆者個人的網站: JUNYI


上一篇
Java虛擬機器:JVM位元組碼與即時編譯
下一篇
多執行緒與並行程式設計:Executor框架中的執行緒池管理及最佳化
系列文
我的Java自學之路:一個轉職者的30篇技術統整30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言