iT邦幫忙

2024 iThome 鐵人賽

DAY 11
0
自我挑戰組

我的Java自學之路:一個轉職者的30篇技術統整系列 第 11

Java進階:並行程式設計基礎

  • 分享至 

  • xImage
  •  

1. 引言

隨著多核處理器的普及和大數據處理需求的增加,有效利用系統資源以提高程式效能變得越來越重要,Java提供了豐富的並行程式設計工具和 API,使開發者能夠輕鬆地實現高效能的並行應用程式。

2. 並行程式設計的基本概念

在深入探討Java並行程式設計之前,我們需要先了解一些基本概念。

2.1 並行vs並發

雖然「並行」(Parallelism)和「並發」(Concurrency)這兩個詞經常被混用,但實際上有著不同的含義:

  • 並行: 指同時執行多個任務,通常在多核處理器上實現。
  • 並發: 指在同一時間段內處理多個任務,但這些任務可能是交替執行的。

2.2 執行緒(Thread)和處理程序(Process)

  • 處理程序: 是一個獨立的執行環境,擁有自己的記憶體空間。
  • 執行緒: 是處理程序內的執行單元,共享同一處理程序的記憶體空間。

2.3 同步vs非同步

  • 同步(Synchronous): 操作按順序執行,一個操作完成後才能開始下一個。
  • 非同步(Asynchronous): 操作可以重疊執行,不需要等待前一個操作完成。

2.4 競爭條件(Race Condition)

當多個執行緒同時訪問共享資源,並且至少有一個執行緒試圖修改該資源時,可能會發生競爭條件。這可能導致不可預測的結果。

2.5 死鎖(Deadlock)

當兩個或多個執行緒互相等待對方釋放資源時,就會發生死鎖。這會導致程式無法繼續執行。

2.6 執行緒安全(Thread Safety)

一個類或方法如果在多執行緒環境下能夠正確工作,我們就說是執行緒安全的。實現執行緒安全通常需要使用同步機制。

2.7 可見性(Visibility)

在多執行緒環境中,一個執行緒對共享變數的修改可能對其他執行緒不可見。
Java提供了volatile關鍵字和同步機制來解決這個問題。

2.8 原子性(Atomicity)

一個操作是原子的,意味著是不可分割的。
在多執行緒環境中,非原子操作可能導致不一致的結果。

2.9 可擴展性(Scalability)

指系統處理增加的工作負載的能力。
良好的並行設計應該能夠隨著處理器核心數的增加而提高性能。

2.10 Amdahl定律

Amdahl定律描述程式的整體速度提升受限於其串行部分的比例。

Amdahl定律可以用以下公式表示:

速度提升 = 1 / (串行部分 + (並行部分 / 處理器數量))

串行部分:程式中無法並行化的部分所佔的比例
並行部分:程式中可以並行化的部分所佔的比例
處理器數量:用於並行計算的處理器數量

3. Java中的執行緒(Thread)和Runnable介面

在Java中,執行緒是並行程式設計的基本單位。
Java提供兩種主要的方式來創建和使用執行緒:通過繼承Thread類和實現Runnable介面。

3.1 使用Thread類

Java的Thread類是執行緒的核心類,要創建一個新的執行緒,我們可以繼承Thread類並重寫其run()方法。

public class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("執行緒正在運行: " + Thread.currentThread().getName());
    }

    public static void main(String[] args) {
        MyThread thread = new MyThread();
        thread.start();
    }
}

注意,我們調用的是start()方法而不是run()方法。start()方法會創建一個新的執行緒並調用run()方法。

3.2 實現Runnable介面

另一種創建執行緒的方法是實現Runnable介面,由於 Java 不支援多重繼承,但允許實現多個介面,使用 Runnable 介面可以讓您的類別同時實現其他介面,增加了設計的靈活性。
將執行緒的任務(what to do)與執行緒的控制(how to run)分離,符合單一職責原則,並且多個執行緒可以共享同一個 Runnable 物件,這在某些情況下可以提高效率。

public class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println("Runnable正在運行: " + Thread.currentThread().getName());
    }

    public static void main(String[] args) {
        Thread thread = new Thread(new MyRunnable());
        thread.start();
    }
}

3.3 使用Lambda表達式

從Java 8開始,我們可以使用Lambda表達式來更簡潔地創建Runnable對象:

public class LambdaThread {
    public static void main(String[] args) {
        Thread thread = new Thread(() -> {
            System.out.println("Lambda執行緒正在運行: " + Thread.currentThread().getName());
        });
        thread.start();
    }
}

3.4 執行緒的生命週期

Java執行緒有以下幾種狀態:

  1. NEW: 執行緒被創建,但還沒有啟動。
  2. RUNNABLE: 執行緒正在運行或準備運行。
  3. BLOCKED: 執行緒被阻塞,等待監視器鎖。
  4. WAITING: 執行緒無限期等待另一個執行緒執行特定操作。
  5. TIMED_WAITING: 執行緒等待另一個執行緒執行操作,最多等待指定的時間。
  6. TERMINATED: 執行緒已經結束執行。

我們可以使用Thread.getState()方法來獲取執行緒的當前狀態。

3.5 執行緒的基本操作

Java提供了一些方法來控制執行緒的行為:

  • start(): 啟動執行緒。
  • join(): 等待執行緒結束。
  • sleep(long millis): 使執行緒休眠指定的毫秒數。
  • interrupt(): 中斷執行緒。
  • isAlive(): 檢查執行緒是否仍在運行。
public class ThreadOperations {
    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                System.out.println("執行緒被中斷");
            }
        });

        thread.start();
        System.out.println("執行緒已啟動");

        thread.join(1000);
        System.out.println("等待1秒後");

        if (thread.isAlive()) {
            System.out.println("執行緒仍在運行");
            thread.interrupt();
        }
    }
}

3.6 執行緒優先級

Java允許我們設置執行緒的優先級,範圍從1(最低)到10(最高)。
但是,執行緒調度器的實際行為取決於底層操作系統,因此不應過度依賴優先級。

Thread thread = new Thread(() -> {
    // 執行緒程式碼
});
thread.setPriority(Thread.MAX_PRIORITY); // 設置最高優先級
thread.start();

3.7 守護執行緒

守護執行緒是在背景運行的低優先級執行緒。
當所有非守護執行緒結束時,JVM會自動終止守護執行緒。

Thread daemonThread = new Thread(() -> {
    while (true) {
        System.out.println("守護執行緒正在運行");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            break;
        }
    }
});
daemonThread.setDaemon(true);
daemonThread.start();

4. 同步化機制和鎖(Lock)

在多執行緒環境中,當多個執行緒同時訪問共享資源時,可能會導致數據不一致或其他並發問題。

4.1 synchronized關鍵字

synchronized是Java中最基本的同步化機制,可以用於方法或程式碼塊。

4.1.1 同步方法

public class Counter {
    private int count = 0;

    public synchronized void increment() {
        count++;
    }

    public synchronized int getCount() {
        return count;
    }
}

4.1.2 同步程式碼塊

public class Counter {
    private int count = 0;
    private final Object lock = new Object();

    public void increment() {
        synchronized(lock) {
            count++;
        }
    }

    public int getCount() {
        synchronized(lock) {
            return count;
        }
    }
}

4.2 volatile關鍵字

volatile關鍵字用於確保變數的可見性,但不保證原子性。

public class SharedFlag {
    private volatile boolean flag = false;

    public void setFlag(boolean value) {
        flag = value;
    }

    public boolean isFlag() {
        return flag;
    }
}

4.3 java.util.concurrent.locks包

Java 5引入了java.util.concurrent.locks包,提供了更靈活的鎖機制。

4.3.1 ReentrantLock

ReentrantLock提供了與synchronized相似的功能,但具有更多的靈活性。

import java.util.concurrent.locks.ReentrantLock;

public class Counter {
    private int count = 0;
    private final ReentrantLock lock = new ReentrantLock();

    public void increment() {
        lock.lock();
        try {
            count++;
        } finally {
            lock.unlock();
        }
    }

    public int getCount() {
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }
}

4.3.2 ReadWriteLock

ReadWriteLock允許多個讀操作同時進行,但寫操作需要獨佔訪問。

import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class Cache {
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    private final Map<String, String> cache = new HashMap<>();

    public String get(String key) {
        rwLock.readLock().lock();
        try {
            return cache.get(key);
        } finally {
            rwLock.readLock().unlock();
        }
    }

    public void put(String key, String value) {
        rwLock.writeLock().lock();
        try {
            cache.put(key, value);
        } finally {
            rwLock.writeLock().unlock();
        }
    }
}

4.4 Condition

Condition提供了比Object的wait()和notify()更強大的等待/通知機制。

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class BoundedBuffer<T> {
    private final T[] items;
    private int putIndex, takeIndex, count;

    private final ReentrantLock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    public BoundedBuffer(int capacity) {
        items = (T[]) new Object[capacity];
    }

    public void put(T item) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                notFull.await();
            items[putIndex] = item;
            if (++putIndex == items.length) putIndex = 0;
            ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public T take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)
                notEmpty.await();
            T item = items[takeIndex];
            if (++takeIndex == items.length) takeIndex = 0;
            --count;
            notFull.signal();
            return item;
        } finally {
            lock.unlock();
        }
    }
}

4.5 原子變數

java.util.concurrent.atomic包提供一系列原子變數類,可以在不使用同步的情況下實現執行緒安全。

import java.util.concurrent.atomic.AtomicInteger;

public class AtomicCounter {
    private AtomicInteger count = new AtomicInteger(0);

    public void increment() {
        count.incrementAndGet();
    }

    public int getCount() {
        return count.get();
    }
}

4.6 同步化的最佳實踐

  1. 盡量減少同步範圍,只在必要的程式碼塊上使用同步。
  2. 避免在同步塊中進行耗時操作,如I/O操作。
  3. 優先使用java.util.concurrent包中的並發工具,而不是低級的synchronized和volatile。
  4. 使用不可變對象可以避免許多並發問題。
  5. 注意避免死鎖,例如始終以固定的順序獲取多個鎖。
  6. 考慮使用執行緒本地變數(ThreadLocal)來避免共享狀態。

5. 執行緒池(Thread Pool)和Executor框架

在並行程式設計中,創建和管理大量執行緒可能會導致效能問題和資源浪費。
執行緒池和Executor框架提供了一種更有效的方式來管理和重用執行緒,從而提高應用程式的效能和可擴展性。

5.1 執行緒池的概念

執行緒池是一種執行緒管理模式,預先創建一定數量的執行緒,並將這些執行緒保存在池中。
當有任務需要執行時,從池中取出一個執行緒來執行任務,任務執行完畢後,執行緒被返回到池中等待下一個任務。

執行緒池的主要優點包括:

  1. 減少執行緒創建和銷毀的開銷
  2. 控制並發執行緒的數量
  3. 提高系統的響應速度
  4. 方便進行執行緒管理

5.2 Executor框架

Java的java.util.concurrent包提供了Executor框架,用於異步執行任務的框架。

Executor框架的核心接口包括:

  1. Executor: 最基本的接口,定義了execute方法用於提交任務。
  2. ExecutorService: 擴展了Executor接口,提供了管理執行緒生命週期的方法。
  3. ScheduledExecutorService: 擴展了ExecutorService,增加了定時執行任務的功能。

5.3 常用的ExecutorService實現

5.3.1 固定大小的執行緒池

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

ExecutorService executor = Executors.newFixedThreadPool(5);

5.3.2 單執行緒的執行器

ExecutorService executor = Executors.newSingleThreadExecutor();

5.3.3 可快取的執行緒池

ExecutorService executor = Executors.newCachedThreadPool();

5.3.4 定時任務執行器

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);

5.4 使用ExecutorService

5.4.1 提交任務

ExecutorService executor = Executors.newFixedThreadPool(5);

// 提交Runnable任務
executor.execute(() -> System.out.println("執行Runnable任務"));

// 提交Callable任務
Future<String> future = executor.submit(() -> "執行Callable任務");

// 獲取Callable任務的結果
try {
    String result = future.get();
    System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

5.4.2 關閉ExecutorService

executor.shutdown();
// 等待所有任務完成
executor.awaitTermination(60, TimeUnit.SECONDS);

5.5 使用ScheduledExecutorService

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

// 延遲5秒後執行
scheduler.schedule(() -> System.out.println("延遲任務"), 5, TimeUnit.SECONDS);

// 每3秒執行一次
scheduler.scheduleAtFixedRate(() -> System.out.println("固定頻率任務"), 0, 3, TimeUnit.SECONDS);

// 每次執行完畢後等待2秒再執行
scheduler.scheduleWithFixedDelay(() -> System.out.println("固定延遲任務"), 0, 2, TimeUnit.SECONDS);

5.6 自定義ThreadPoolExecutor

對於更複雜的場景,我們可以自定義ThreadPoolExecutor:

import java.util.concurrent.*;

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5, // 核心執行緒數
    10, // 最大執行緒數
    60L, // 空閒執行緒存活時間
    TimeUnit.SECONDS, // 時間單位
    new ArrayBlockingQueue<>(100), // 任務隊列
    new ThreadPoolExecutor.CallerRunsPolicy() // 拒絕策略
);

5.7 使用CompletableFuture

Java 8引入的CompletableFuture提供異步寫程式碼能力:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
    .thenApply(s -> s + " World")
    .thenApply(String::toUpperCase);

future.thenAccept(System.out::println);

5.8 實踐建議

  1. 選擇適當的執行緒池大小,考慮CPU核心數和任務類型(I/O密集型或CPU密集型)。
  2. 使用合適的任務隊列大小,避免OOM錯誤。
  3. 正確處理異常,避免任務靜默失敗。
  4. 適時關閉執行器,避免資源洩漏。
  5. 考慮使用自定義的執行緒工廠來命名執行緒,便於調試。
  6. 對於長時間運行的應用,監控執行緒池的狀態。

6. 並行集合(Concurrent Collections)

在多執行緒環境中,普通的Java集合類可能會出現執行緒安全問題。
為了解決這個問題,Java提供專門用於並行環境的集合類,這些類位於java.util.concurrent包中。

6.1 ConcurrentHashMap

ConcurrentHashMap是HashMap的執行緒安全版本。

import java.util.concurrent.ConcurrentHashMap;

ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put("key", 1);
map.putIfAbsent("key", 2); // 如果key不存在才插入
map.computeIfAbsent("newKey", k -> k.length()); // 如果key不存在,計算並插入新值

ConcurrentHashMap的主要特點:

  • 支援高並發讀取和更新
  • 不允許null鍵或值
  • 迭代器是弱一致性的,不會拋出ConcurrentModificationException

6.2 CopyOnWriteArrayList

CopyOnWriteArrayList是ArrayList的執行緒安全變體,適用於讀多寫少的場景。

import java.util.concurrent.CopyOnWriteArrayList;

CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("item1");
list.addIfAbsent("item2");

CopyOnWriteArrayList的特點:

  • 讀操作不需要加鎖,性能很高
  • 寫操作會複製整個底層數組,適合讀多寫少的場景
  • 迭代器支援遍歷,但不支援修改操作

6.3 ConcurrentLinkedQueue

ConcurrentLinkedQueue是基於鏈接節點的無界執行緒安全隊列。

import java.util.concurrent.ConcurrentLinkedQueue;

ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
queue.offer("item1");
String item = queue.poll();

ConcurrentLinkedQueue的特點:

  • 適用於高並發環境
  • 非阻塞算法,offer和poll操作都是O(1)複雜度

6.4 BlockingQueue接口

BlockingQueue接口定義了阻塞隊列的行為,Java提供了多種實現:

6.4.1 ArrayBlockingQueue

固定大小的阻塞隊列,基於數組實現。

import java.util.concurrent.ArrayBlockingQueue;

ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
queue.put("item"); // 如果隊列滿了,會阻塞
String item = queue.take(); // 如果隊列空了,會阻塞

6.4.2 LinkedBlockingQueue

基於鏈表的可選有界阻塞隊列。

import java.util.concurrent.LinkedBlockingQueue;

LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(100); // 有界
LinkedBlockingQueue<String> unboundedQueue = new LinkedBlockingQueue<>(); // 無界

6.4.3 PriorityBlockingQueue

帶優先級的無界阻塞隊列。

import java.util.concurrent.PriorityBlockingQueue;

PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
queue.put(3);
queue.put(1);
queue.put(2);
System.out.println(queue.take()); // 輸出1

6.5 ConcurrentSkipListMap和ConcurrentSkipListSet

這兩個類分別是TreeMap和TreeSet的並發版本,基於跳表數據結構實現。

import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;

ConcurrentSkipListMap<String, Integer> map = new ConcurrentSkipListMap<>();
map.put("key", 1);

ConcurrentSkipListSet<Integer> set = new ConcurrentSkipListSet<>();
set.add(1);

這些類的特點:

  • 保持元素的排序
  • 支援高效的並發訪問
  • 適用於需要排序的並發場景

6.6 使用並行集合的實踐

  1. 選擇合適的集合類: 根據你的使用場景(讀多寫少、需要排序等)選擇合適的並行集合。
  2. 避免過度同步: 並行集合已經是執行緒安全的,不需要額外的同步。
  3. 注意迭代器的一致性: 並行集合的迭代器通常是弱一致性的,要注意可能看不到最新的修改。
  4. 合理設置初始容量: 對於有界集合,合理設置初始容量可以提高性能。
  5. 使用批量操作: 許多並行集合提供了批量添加或刪除操作,這些操作通常更高效。
  6. 注意內存使用: 某些並行集合(如CopyOnWriteArrayList)在修改時可能會消耗大量內存。

7. Fork/Join框架

Fork/Join框架是Java 7引入的一個用於並行執行任務的強大工具,特別適合用於"分而治之"的問題,即可以將大任務分解為多個小任務並行執行,然後將結果合併的問題。

7.1 Fork/Join框架的基本概念

Fork/Join框架的核心思想是:

  • Fork: 將大任務分割成更小的子任務
  • Join: 等待所有子任務完成,並合併其結果

使用了工作竊取(work-stealing)算法,空閒的執行緒可以竊取其他執行緒隊列中的任務來執行,從而提高效率。

7.2 RecursiveTask和RecursiveAction

Fork/Join框架主要通過兩個抽象類來實現:

  • RecursiveTask: 用於有返回值的任務
  • RecursiveAction: 用於沒有返回值的任務

7.2.1 使用RecursiveTask

以下是一個使用RecursiveTask計算斐波那契數列(Successione di Fibonacci)的例子:

import java.util.concurrent.RecursiveTask;

public class FibonacciTask extends RecursiveTask<Integer> {
    private final int n;

    public FibonacciTask(int n) {
        this.n = n;
    }

    @Override
    protected Integer compute() {
        if (n <= 1) {
            return n;
        }
        FibonacciTask f1 = new FibonacciTask(n - 1);
        f1.fork();
        FibonacciTask f2 = new FibonacciTask(n - 2);
        return f2.compute() + f1.join();
    }
}

7.2.2 使用RecursiveAction

以下是一個使用RecursiveAction進行數組排序的例子:

import java.util.Arrays;
import java.util.concurrent.RecursiveAction;

public class MergeSortTask extends RecursiveAction {
    private final int[] array;
    private final int start;
    private final int end;

    public MergeSortTask(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected void compute() {
        if (end - start <= 1000) {
            Arrays.sort(array, start, end);
        } else {
            int mid = (start + end) / 2;
            MergeSortTask left = new MergeSortTask(array, start, mid);
            MergeSortTask right = new MergeSortTask(array, mid, end);
            invokeAll(left, right);
            merge(start, mid, end);
        }
    }

    private void merge(int start, int mid, int end) {
        // 合併兩個已排序的子數組
        // 實現省略...
    }
}

7.3 ForkJoinPool

ForkJoinPool是執行Fork/Join任務的執行器服務。
從Java 8開始,我們可以使用ForkJoinPool.commonPool()來獲取公共的ForkJoinPool實例。

import java.util.concurrent.ForkJoinPool;

public class Main {
    public static void main(String[] args) {
        ForkJoinPool pool = ForkJoinPool.commonPool();
        int result = pool.invoke(new FibonacciTask(20));
        System.out.println("Fibonacci(20) = " + result);

        int[] array = new int[10000];
        // 初始化數組...
        pool.invoke(new MergeSortTask(array, 0, array.length));
    }
}

7.4 使用Fork/Join框架的最佳實踐

  1. 適當的任務粒度: 任務不應該太小(避免過多的開銷)或太大(影響負載平衡)。
  2. 避免同步: Fork/Join框架設計用於獨立任務,避免在任務間使用同步。
  3. 最小化任務間的依賴: 任務應該盡可能獨立,以充分利用並行性。
  4. 利用Java 8的並行流: 對於簡單的並行操作,可以考慮使用並行流。
import java.util.Arrays;

public class ParallelStreamExample {
    public static void main(String[] args) {
        int[] array = new int[1000000];
        // 初始化數組...
        
        long sum = Arrays.stream(array).parallel().sum();
        System.out.println("Sum: " + sum);
    }
}
  1. 注意異常處理: Fork/Join任務中的未捕獲異常可能導致整個計算失敗。
  2. 監控和調優: 使用ForkJoinPool的方法來監控執行狀況,必要時進行調優。

7.5 Fork/Join框架的適用場景

Fork/Join框架特別適合以下類型的問題:

  • 可以遞歸分解的問題(如快速排序、歸併排序)
  • 大規模數據處理(如並行搜索、並行聚合)
  • 圖像處理和計算機圖形學中的某些問題

本篇文章同步刊載: JYI.TW
筆者個人的網站: JUNYI


上一篇
Java進階:Optional類別的使用與最佳實踐
下一篇
Java進階:執行緒安全與同步機制
系列文
我的Java自學之路:一個轉職者的30篇技術統整30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言