iT邦幫忙

2021 iThome 鐵人賽

DAY 22
0
Software Development

Wow ! There is no doubt about Learn Spring framework in a month.系列 第 22

[Day - 22] - 今晚我想來個Spring Async非同步的感覺

Abrstract

叮咚!今晚我想來點可不可的.....,不對,是Async,這是在Spring 2.2.4 所提倡非同步註解模式(@Async),在這之前呢,大部分使用者都是採用Thread,Runnable及Callable這三種方法進去建立執行緒達到非同步效果,這樣的寫法到產出許多笨重且難以維護的程式碼,此時Spring @Async進行以提倡新的寫法,可將所有非同步方法包裝在一個服務(@Service)內,透過每個方法回傳非同步物件(AsyncResult)提供於客戶端進行獲取值使用,每個方法都透過Future介面實現,在java.util.concurrent包中。Future介面是Java執行緒Future模式的實現,可以來進行非同步計算,透過此種方式方便注入個式領域服務(Domain service)中進行觸發對應的執行緒,可達到更簡便的方式進行非同步工作效益,現在運送員小編將快速將這個套件功能送到你腦袋中。

Principle Introduction

每個異步處理程序(@Async)相當於創建一隻新的子執行緒(Thread),並自己獨立執行一項任務,父執行緒會循序等待每個子執行緒執行完任務,並獲取相關物件值,再次進行相關邏輯運算後,才會回傳最終結果,以下範例讀者可進行參考,可進一步了解相關運作原理。

步驟一、配置執行緒池設定檔

spring.task.pool.corePoolSize = 5
spring.task.pool.maxPoolSize = 50
spring.task.pool.keepAliveSeconds= 60
spring.task.pool.queueCapacity = 10000
spring.task.pool.threadNamePrefix = Wesiting-Thread-

步驟二、將配置組態模型,並將值注入模型中

@Configuration
@ConfigurationProperties(prefix = "spring.task.pool")
public class TaskThreadPoolConfig {
    private int corePoolSize = 5;

    private int maxPoolSize = 50;

    private int keepAliveSeconds = 60;

    private int queueCapacity =10000;

    private String threadNamePrefix = "Weisting-AsyncTask-";

    public int getCorePoolSize() {
        return corePoolSize;
    }

    public void setCorePoolSize(int corePoolSize) {
        this.corePoolSize = corePoolSize;
    }

    public int getMaxPoolSize() {
        return maxPoolSize;
    }

    public void setMaxPoolSize(int maxPoolSize) {
        this.maxPoolSize = maxPoolSize;
    }

    public int getKeepAliveSeconds() {
        return keepAliveSeconds;
    }

    public void setKeepAliveSeconds(int keepAliveSeconds) {
        this.keepAliveSeconds = keepAliveSeconds;
    }

    public int getQueueCapacity() {
        return queueCapacity;
    }

    public void setQueueCapacity(int queueCapacity) {
        this.queueCapacity = queueCapacity;
    }

    public String getThreadNamePrefix() {
        return threadNamePrefix;
    }

    public void setThreadNamePrefix(String threadNamePrefix) {
        this.threadNamePrefix = threadNamePrefix;
    }
}

步驟三、配置非同步服務,將每個方法配置Future介面,並回傳AsyncResult封裝物件。

Service
public class AsyncCaller {
    Logger logger = LoggerFactory.getLogger(AsyncCaller.class);
    @Async
    public Future<SeaFood> asyncSeaFoodWithReturnTypeFromTaiwan(String id) {
        System.out.println("Execute taiwan's async method asynchronously - "
                + Thread.currentThread().getName());
        try {
            Thread.sleep(5000);
            SeaFood seaFood = SEA_FOOD_CACHE_TAIWAN.asMap().getOrDefault(id,null);
            return new AsyncResult<SeaFood>(seaFood);
        } catch (InterruptedException e) {
            logger.error("InterruptedException : {}",e.toString() );
        }

        return null;
    }


    @Async
    public Future<SeaFood> asyncSeaFoodWithReturnTypeFromChina(String id) {
        System.out.println("Execute china's async method asynchronously - "
                + Thread.currentThread().getName());
        try {
            Thread.sleep(5000);
            SeaFood seaFood = SEA_FOOD_CACHE_CHINESE.asMap().getOrDefault(id,null);
            return new AsyncResult<SeaFood>(seaFood);
        } catch (InterruptedException e) {
            logger.error("InterruptedException : {}",e.toString() );
        }

        return null;
    }
}

步驟四、注入非同步服務,並觸發後,並等待任務完成後進行獲取相關物件值,以便進行回傳。此段寫法與各位開發者早期Future-Callable寫法相同,可進行參考。

    @Autowired
    AsyncCaller asyncCaller;

    public SeaFood testAsyncAnnotationForMethodsWithReturnSeaFood(LocationEnum locationEnum,String id)
            throws InterruptedException, ExecutionException {
        System.out.println("Invoking an asynchronous method. "
                + Thread.currentThread().getName());

        Future<SeaFood> future = null;
        if (locationEnum == LocationEnum.TAIWAN)
            future = asyncCaller.asyncSeaFoodWithReturnTypeFromTaiwan(id);
        else
            future = asyncCaller.asyncSeaFoodWithReturnTypeFromChina(id);

        while (true) {
            if (future.isDone()) {
               return future.get();
            }
            System.out.println("Continue doing something else. Waiting... ");
            Thread.sleep(1000);
        }
    }

根據觸發Find Taiwan product API後,會跟上續結果一樣等待五秒後才接收到回傳結果。

image

//Log
Invoking an asynchronous method. http-nio-8080-exec-1
Continue doing something else. Waiting... 
Execute taiwan's async method asynchronously - Wesiting-Thread-1
Continue doing something else. Waiting... 
Continue doing something else. Waiting... 
Continue doing something else. Waiting... 
Continue doing something else. Waiting... 
Continue doing something else. Waiting... 

透過以上範例與原理,小編在這邊相當推薦給大家做使用,非常簡便又快速,是種不錯的好方法。

其運作方式皆是透過我們先前所敘的CGLIB切面式編程框架進行代理,由下圖每個Bean類型我們可得知,此監測GUI為運送員小編抽空開發,最後一天會開源給大家做參考引用,先行提供給大家參考。
image

Structure

如圖一所示,在預設的情況下會自動透過代理程序創建一個AsyncAnnotationBeanPostProcessor處理器,無論是何種處理器都會間接時間BeanFactoryAware此接口,並在實例化的方法(setBeanFactory)中,除了設置BeanFactory之外,也創建一個Async註解模式的處理器(AsyncAnnotationAdvisor),透過圖二我們可進一步的瞭解其構造。

image
圖一、AsyncAnnotationBeanPostProcessor啟動關係流程圖

由下圖可得知,在AsyncAnnotationAdvisor進行建構子初始化時,會建立執行緒與異常攔截器(AnnotationAsyncExecutionInterceptor),及根據@Async註解位置建立切入點(AnnotationMatchingPointcut)觸發元件,可以看到非同步觸發攔截器(AnnotationAsyncExecutionInterceptor)間接實現了方法攔截器(MethodInterceptor)接口,而方法攔截器(MethodInterceptor)是AOP切入點處理器,處理器中最終被觸發的是invoke方法,即可進行觸發開發者所配置類別中的Future方法,並回傳AsyncResult結果物件值,即可達到使用者預期效果。

image
圖二、Async核心運作流程圖

Sample Source

spring-sample-async

Reference url

深入理解Spring系列之十五:@Async实现原理

Spring Boot(5) @Async非同步執行緒池詳解


上一篇
[Day - 21] - 規律的一天從Spring Scheduled 開始
下一篇
[Day - 23] - Spring Reactor之進入忍者龜的Flux
系列文
Wow ! There is no doubt about Learn Spring framework in a month.30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言