隨著多核處理器的普及和大數據處理需求的增加,有效利用系統資源以提高程式效能變得越來越重要,Java提供了豐富的並行程式設計工具和 API,使開發者能夠輕鬆地實現高效能的並行應用程式。
在深入探討Java並行程式設計之前,我們需要先了解一些基本概念。
雖然「並行」(Parallelism)和「並發」(Concurrency)這兩個詞經常被混用,但實際上有著不同的含義:
當多個執行緒同時訪問共享資源,並且至少有一個執行緒試圖修改該資源時,可能會發生競爭條件。這可能導致不可預測的結果。
當兩個或多個執行緒互相等待對方釋放資源時,就會發生死鎖。這會導致程式無法繼續執行。
一個類或方法如果在多執行緒環境下能夠正確工作,我們就說是執行緒安全的。實現執行緒安全通常需要使用同步機制。
在多執行緒環境中,一個執行緒對共享變數的修改可能對其他執行緒不可見。
Java提供了volatile關鍵字和同步機制來解決這個問題。
一個操作是原子的,意味著是不可分割的。
在多執行緒環境中,非原子操作可能導致不一致的結果。
指系統處理增加的工作負載的能力。
良好的並行設計應該能夠隨著處理器核心數的增加而提高性能。
Amdahl定律描述程式的整體速度提升受限於其串行部分的比例。
Amdahl定律可以用以下公式表示:
速度提升 = 1 / (串行部分 + (並行部分 / 處理器數量))
串行部分:程式中無法並行化的部分所佔的比例
並行部分:程式中可以並行化的部分所佔的比例
處理器數量:用於並行計算的處理器數量
在Java中,執行緒是並行程式設計的基本單位。
Java提供兩種主要的方式來創建和使用執行緒:通過繼承Thread類和實現Runnable介面。
Java的Thread類是執行緒的核心類,要創建一個新的執行緒,我們可以繼承Thread類並重寫其run()方法。
public class MyThread extends Thread {
@Override
public void run() {
System.out.println("執行緒正在運行: " + Thread.currentThread().getName());
}
public static void main(String[] args) {
MyThread thread = new MyThread();
thread.start();
}
}
注意,我們調用的是start()方法而不是run()方法。start()方法會創建一個新的執行緒並調用run()方法。
另一種創建執行緒的方法是實現Runnable介面,由於 Java 不支援多重繼承,但允許實現多個介面,使用 Runnable 介面可以讓您的類別同時實現其他介面,增加了設計的靈活性。
將執行緒的任務(what to do)與執行緒的控制(how to run)分離,符合單一職責原則,並且多個執行緒可以共享同一個 Runnable 物件,這在某些情況下可以提高效率。
public class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("Runnable正在運行: " + Thread.currentThread().getName());
}
public static void main(String[] args) {
Thread thread = new Thread(new MyRunnable());
thread.start();
}
}
從Java 8開始,我們可以使用Lambda表達式來更簡潔地創建Runnable對象:
public class LambdaThread {
public static void main(String[] args) {
Thread thread = new Thread(() -> {
System.out.println("Lambda執行緒正在運行: " + Thread.currentThread().getName());
});
thread.start();
}
}
Java執行緒有以下幾種狀態:
我們可以使用Thread.getState()方法來獲取執行緒的當前狀態。
Java提供了一些方法來控制執行緒的行為:
public class ThreadOperations {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
System.out.println("執行緒被中斷");
}
});
thread.start();
System.out.println("執行緒已啟動");
thread.join(1000);
System.out.println("等待1秒後");
if (thread.isAlive()) {
System.out.println("執行緒仍在運行");
thread.interrupt();
}
}
}
Java允許我們設置執行緒的優先級,範圍從1(最低)到10(最高)。
但是,執行緒調度器的實際行為取決於底層操作系統,因此不應過度依賴優先級。
Thread thread = new Thread(() -> {
// 執行緒程式碼
});
thread.setPriority(Thread.MAX_PRIORITY); // 設置最高優先級
thread.start();
守護執行緒是在背景運行的低優先級執行緒。
當所有非守護執行緒結束時,JVM會自動終止守護執行緒。
Thread daemonThread = new Thread(() -> {
while (true) {
System.out.println("守護執行緒正在運行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
break;
}
}
});
daemonThread.setDaemon(true);
daemonThread.start();
在多執行緒環境中,當多個執行緒同時訪問共享資源時,可能會導致數據不一致或其他並發問題。
synchronized是Java中最基本的同步化機制,可以用於方法或程式碼塊。
public class Counter {
private int count = 0;
public synchronized void increment() {
count++;
}
public synchronized int getCount() {
return count;
}
}
public class Counter {
private int count = 0;
private final Object lock = new Object();
public void increment() {
synchronized(lock) {
count++;
}
}
public int getCount() {
synchronized(lock) {
return count;
}
}
}
volatile關鍵字用於確保變數的可見性,但不保證原子性。
public class SharedFlag {
private volatile boolean flag = false;
public void setFlag(boolean value) {
flag = value;
}
public boolean isFlag() {
return flag;
}
}
Java 5引入了java.util.concurrent.locks包,提供了更靈活的鎖機制。
ReentrantLock提供了與synchronized相似的功能,但具有更多的靈活性。
import java.util.concurrent.locks.ReentrantLock;
public class Counter {
private int count = 0;
private final ReentrantLock lock = new ReentrantLock();
public void increment() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
public int getCount() {
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
}
ReadWriteLock允許多個讀操作同時進行,但寫操作需要獨佔訪問。
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Cache {
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Map<String, String> cache = new HashMap<>();
public String get(String key) {
rwLock.readLock().lock();
try {
return cache.get(key);
} finally {
rwLock.readLock().unlock();
}
}
public void put(String key, String value) {
rwLock.writeLock().lock();
try {
cache.put(key, value);
} finally {
rwLock.writeLock().unlock();
}
}
}
Condition提供了比Object的wait()和notify()更強大的等待/通知機制。
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class BoundedBuffer<T> {
private final T[] items;
private int putIndex, takeIndex, count;
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public BoundedBuffer(int capacity) {
items = (T[]) new Object[capacity];
}
public void put(T item) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putIndex] = item;
if (++putIndex == items.length) putIndex = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public T take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
T item = items[takeIndex];
if (++takeIndex == items.length) takeIndex = 0;
--count;
notFull.signal();
return item;
} finally {
lock.unlock();
}
}
}
java.util.concurrent.atomic包提供一系列原子變數類,可以在不使用同步的情況下實現執行緒安全。
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicCounter {
private AtomicInteger count = new AtomicInteger(0);
public void increment() {
count.incrementAndGet();
}
public int getCount() {
return count.get();
}
}
在並行程式設計中,創建和管理大量執行緒可能會導致效能問題和資源浪費。
執行緒池和Executor框架提供了一種更有效的方式來管理和重用執行緒,從而提高應用程式的效能和可擴展性。
執行緒池是一種執行緒管理模式,預先創建一定數量的執行緒,並將這些執行緒保存在池中。
當有任務需要執行時,從池中取出一個執行緒來執行任務,任務執行完畢後,執行緒被返回到池中等待下一個任務。
執行緒池的主要優點包括:
Java的java.util.concurrent包提供了Executor框架,用於異步執行任務的框架。
Executor框架的核心接口包括:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
ExecutorService executor = Executors.newFixedThreadPool(5);
ExecutorService executor = Executors.newSingleThreadExecutor();
ExecutorService executor = Executors.newCachedThreadPool();
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);
ExecutorService executor = Executors.newFixedThreadPool(5);
// 提交Runnable任務
executor.execute(() -> System.out.println("執行Runnable任務"));
// 提交Callable任務
Future<String> future = executor.submit(() -> "執行Callable任務");
// 獲取Callable任務的結果
try {
String result = future.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
executor.shutdown();
// 等待所有任務完成
executor.awaitTermination(60, TimeUnit.SECONDS);
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
// 延遲5秒後執行
scheduler.schedule(() -> System.out.println("延遲任務"), 5, TimeUnit.SECONDS);
// 每3秒執行一次
scheduler.scheduleAtFixedRate(() -> System.out.println("固定頻率任務"), 0, 3, TimeUnit.SECONDS);
// 每次執行完畢後等待2秒再執行
scheduler.scheduleWithFixedDelay(() -> System.out.println("固定延遲任務"), 0, 2, TimeUnit.SECONDS);
對於更複雜的場景,我們可以自定義ThreadPoolExecutor:
import java.util.concurrent.*;
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // 核心執行緒數
10, // 最大執行緒數
60L, // 空閒執行緒存活時間
TimeUnit.SECONDS, // 時間單位
new ArrayBlockingQueue<>(100), // 任務隊列
new ThreadPoolExecutor.CallerRunsPolicy() // 拒絕策略
);
Java 8引入的CompletableFuture提供異步寫程式碼能力:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(s -> s + " World")
.thenApply(String::toUpperCase);
future.thenAccept(System.out::println);
在多執行緒環境中,普通的Java集合類可能會出現執行緒安全問題。
為了解決這個問題,Java提供專門用於並行環境的集合類,這些類位於java.util.concurrent包中。
ConcurrentHashMap是HashMap的執行緒安全版本。
import java.util.concurrent.ConcurrentHashMap;
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put("key", 1);
map.putIfAbsent("key", 2); // 如果key不存在才插入
map.computeIfAbsent("newKey", k -> k.length()); // 如果key不存在,計算並插入新值
ConcurrentHashMap的主要特點:
CopyOnWriteArrayList是ArrayList的執行緒安全變體,適用於讀多寫少的場景。
import java.util.concurrent.CopyOnWriteArrayList;
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("item1");
list.addIfAbsent("item2");
CopyOnWriteArrayList的特點:
ConcurrentLinkedQueue是基於鏈接節點的無界執行緒安全隊列。
import java.util.concurrent.ConcurrentLinkedQueue;
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
queue.offer("item1");
String item = queue.poll();
ConcurrentLinkedQueue的特點:
BlockingQueue接口定義了阻塞隊列的行為,Java提供了多種實現:
固定大小的阻塞隊列,基於數組實現。
import java.util.concurrent.ArrayBlockingQueue;
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
queue.put("item"); // 如果隊列滿了,會阻塞
String item = queue.take(); // 如果隊列空了,會阻塞
基於鏈表的可選有界阻塞隊列。
import java.util.concurrent.LinkedBlockingQueue;
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(100); // 有界
LinkedBlockingQueue<String> unboundedQueue = new LinkedBlockingQueue<>(); // 無界
帶優先級的無界阻塞隊列。
import java.util.concurrent.PriorityBlockingQueue;
PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
queue.put(3);
queue.put(1);
queue.put(2);
System.out.println(queue.take()); // 輸出1
這兩個類分別是TreeMap和TreeSet的並發版本,基於跳表數據結構實現。
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
ConcurrentSkipListMap<String, Integer> map = new ConcurrentSkipListMap<>();
map.put("key", 1);
ConcurrentSkipListSet<Integer> set = new ConcurrentSkipListSet<>();
set.add(1);
這些類的特點:
Fork/Join框架是Java 7引入的一個用於並行執行任務的強大工具,特別適合用於"分而治之"的問題,即可以將大任務分解為多個小任務並行執行,然後將結果合併的問題。
Fork/Join框架的核心思想是:
使用了工作竊取(work-stealing)算法,空閒的執行緒可以竊取其他執行緒隊列中的任務來執行,從而提高效率。
Fork/Join框架主要通過兩個抽象類來實現:
以下是一個使用RecursiveTask計算斐波那契數列(Successione di Fibonacci)的例子:
import java.util.concurrent.RecursiveTask;
public class FibonacciTask extends RecursiveTask<Integer> {
private final int n;
public FibonacciTask(int n) {
this.n = n;
}
@Override
protected Integer compute() {
if (n <= 1) {
return n;
}
FibonacciTask f1 = new FibonacciTask(n - 1);
f1.fork();
FibonacciTask f2 = new FibonacciTask(n - 2);
return f2.compute() + f1.join();
}
}
以下是一個使用RecursiveAction進行數組排序的例子:
import java.util.Arrays;
import java.util.concurrent.RecursiveAction;
public class MergeSortTask extends RecursiveAction {
private final int[] array;
private final int start;
private final int end;
public MergeSortTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected void compute() {
if (end - start <= 1000) {
Arrays.sort(array, start, end);
} else {
int mid = (start + end) / 2;
MergeSortTask left = new MergeSortTask(array, start, mid);
MergeSortTask right = new MergeSortTask(array, mid, end);
invokeAll(left, right);
merge(start, mid, end);
}
}
private void merge(int start, int mid, int end) {
// 合併兩個已排序的子數組
// 實現省略...
}
}
ForkJoinPool是執行Fork/Join任務的執行器服務。
從Java 8開始,我們可以使用ForkJoinPool.commonPool()來獲取公共的ForkJoinPool實例。
import java.util.concurrent.ForkJoinPool;
public class Main {
public static void main(String[] args) {
ForkJoinPool pool = ForkJoinPool.commonPool();
int result = pool.invoke(new FibonacciTask(20));
System.out.println("Fibonacci(20) = " + result);
int[] array = new int[10000];
// 初始化數組...
pool.invoke(new MergeSortTask(array, 0, array.length));
}
}
import java.util.Arrays;
public class ParallelStreamExample {
public static void main(String[] args) {
int[] array = new int[1000000];
// 初始化數組...
long sum = Arrays.stream(array).parallel().sum();
System.out.println("Sum: " + sum);
}
}
Fork/Join框架特別適合以下類型的問題:
本篇文章同步刊載: JYI.TW
筆者個人的網站: JUNYI