iT邦幫忙

0

day26優化 ThreadPool(使用 ExecutorService + bounded queue)

  • 分享至 

  • xImage
  •  

今天我接觸到了一個比較進階的主題:
用 ThreadPoolExecutor + Bounded Queue 來優化 Port Scanner。
以往我寫的程式都是一個一個 Port 去掃,或是用很多 Thread 去同時掃。問題是:
如果 Thread 開太多,電腦容易爆掉(資源被吃光)。
如果 Thread 太少,掃描會很慢。
所以今天學到的技巧是: 先設定一個 固定數量的工人 (Thread Pool),然後用一個「排隊區 (Bounded Queue)」來放還沒掃的工作。
這樣可以同時兼顧:
不會一次丟出太多 Thread
也不會讓電腦崩潰。

今天練習這段程式碼:
package day1.day1;
import java.io.;
import java.net.
;
import java.util.Scanner;
import java.util.concurrent.*;

public class Day26Demo {
private static String checkPort(String host , int port , int timeoutMs){
try (Socket socket = new Socket()){
socket.connect(new InetSocketAddress(host,port),timeoutMs);
return "OPEN";
}catch (IOException e){
return "CLOSED";
}
}
public static void main(String[] args)throws Exception{
Scanner sc = new Scanner(System.in);
System.out.print("Host (e.g. localhost): ) : ");
String host = sc.nextLine().trim();
System.out.print("START port :");
int start= Integer.parseInt(sc.nextLine().trim());
System.out.print("END port :");
int end= Integer.parseInt(sc.nextLine().trim());
System.out.print("TIMEOUT ms(e.g. 200):");
int timeout= Integer.parseInt(sc.nextLine().trim());

    final int total = Math.max(0 ,end - start+1);
    if (total == 0){System.out.println("No ports to scan.");sc.close(); return;}

    int poolSize = 10;
    int queueCapacity = 50;
    BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(queueCapacity);
    ThreadPoolExecutor executor = new ThreadPoolExecutor(poolSize,poolSize,0L, TimeUnit.MILLISECONDS,queue,new ThreadPoolExecutor.CallerRunsPolicy());

    String outfile= "simplepool_scan.csv";
    CountDownLatch latch = new CountDownLatch(total);

    long startTime = System.currentTimeMillis();

    try(BufferedWriter bw = new BufferedWriter(new FileWriter(outfile))){
        bw.write("timestamp,host,port,status");
        bw.newLine();
        bw.flush();
        for(int p =start ;p<=end;p++){
            final int port=p;
            executor.execute(()->{
                String status = checkPort(host,port,timeout);
                String ts =java.time.LocalDateTime.now().format(java.time.format.DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
                String line =String.format("%s,%s,%d,%s", ts, host, port, status);

                synchronized(bw){
                    try{bw.write(line);
                        bw.newLine();
                    }catch (IOException e) {
                        e.printStackTrace();
                    }
                }

                long done =total - latch.getCount()+1;
                System.out.printf("\rScanned port %d (%d/%d)", port, (int)(total - latch.getCount() + 1), total);
                latch.countDown();
            });

        }
        latch.await();

        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.MINUTES);

        bw.flush();
        }finally {
        if (!executor.isShutdown())
            executor.shutdownNow();
        sc.close();
        long elapsed= (System.currentTimeMillis() - startTime)-1000;
        System.out.println("\nScan finished in " + elapsed + "s. Output: " + outfile);
    }


}

}

https://ithelp.ithome.com.tw/upload/images/20251017/20179429U1xcctVV4H.png
int queueCapacity = 50;
任務等待隊列(bounded queue)的大小上限。
當執行緒都忙而隊列未滿時,新任務會放到這個隊列排隊。

BlockingQueue queue = new ArrayBlockingQueue<>(queueCapacity);
使用 ArrayBlockingQueue 作為有界佇列(固定容量、先進先出)。
有界佇列能保護記憶體,不會讓任務無限堆積。

int queueCapacity = 50;
任務等待隊列(bounded queue)的大小上限。
當執行緒都忙而隊列未滿時,新任務會放到這個隊列排隊。
BlockingQueue queue = new ArrayBlockingQueue<>(queueCapacity);
使用 ArrayBlockingQueue 作為有界佇列(固定容量、先進先出)。
有界佇列能保護記憶體,不會讓任務無限堆積。

CallerRunsPolicy:
CallerRunsPolicy:當 pool 與 queue 都滿時,新的任務不會被丟掉;相反地,提交任務的那個 thread(通常是你主程式)會改為自己執行任務,這樣可以自動「慢下來」,達到節流效果。

CallerRunsPolicy 的行為:當你呼叫 exec.execute(task),但「執行緒池已滿」且「佇列也已滿」,executor 不會丟掉任務也不會拋例外;它會讓提交任務的那一條執行緒(也就是你的主執行緒 / submitter)去直接執行這個任務。
實務效果(好處):
產生一種「自然的節流 / back-pressure」:提交任務的程式會被拖慢(因為它變成了執行者之一),因此不會無限制地把任務塞進系統。
避免 OOM 或系統被塞爆。
注意:
如果你的提交程式是在主執行緒(通常是你呼叫 for 提交 tasks 的那個 thread),使用 CallerRunsPolicy 會讓提交變慢(因為主程式被阻塞去執行掃 port),但這通常是希望的節流行為。
如果 submitter 不是想被阻塞的,那就不適合用這個策略。

一開始練習這段程式碼真的覺得很複雜,因為同時出現了好多新的東西(ThreadPoolExecutor、BlockingQueue、CountDownLatch)。
我知道「多執行緒」不能亂開,要有規劃。
ThreadPool 是一種 有效率又安全的解法,可以平衡速度和資源。
也體會到「同步寫檔案」的重要性,不然結果會亂掉。
雖然程式碼我還不能全部自己寫出來,但我覺得已經開始有一點感覺了。


圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言