<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          都在建議你不要直接使用 @Async 注解,為什么?

          共 9262字,需瀏覽 19分鐘

           ·

          2021-11-18 10:39

          來源:cnblogs.com/wlandwl/p/async.html

          本文講述@Async注解,在Spring體系中的應(yīng)用。本文僅說明@Async注解的應(yīng)用規(guī)則,對(duì)于原理,調(diào)用邏輯,源碼分析,暫不介紹。對(duì)于異步方法調(diào)用,從Spring3開始提供了@Async注解,該注解可以被標(biāo)注在方法上,以便異步地調(diào)用該方法。調(diào)用者將在調(diào)用時(shí)立即返回,方法的實(shí)際執(zhí)行將提交給Spring TaskExecutor的任務(wù)中,由指定的線程池中的線程執(zhí)行。

          在項(xiàng)目應(yīng)用中,@Async調(diào)用線程池,推薦使用自定義線程池的模式。自定義線程池常用方案:重新實(shí)現(xiàn)接口AsyncConfigurer。

          簡(jiǎn)介

          應(yīng)用場(chǎng)景

          同步: 同步就是整個(gè)處理過程順序執(zhí)行,當(dāng)各個(gè)過程都執(zhí)行完畢,并返回結(jié)果。

          異步: 異步調(diào)用則是只是發(fā)送了調(diào)用的指令,調(diào)用者無需等待被調(diào)用的方法完全執(zhí)行完畢;而是繼續(xù)執(zhí)行下面的流程。

          例如, 在某個(gè)調(diào)用中,需要順序調(diào)用 A, B, C三個(gè)過程方法;如他們都是同步調(diào)用,則需要將他們都順序執(zhí)行完畢之后,方算作過程執(zhí)行完畢;如B為一個(gè)異步的調(diào)用方法,則在執(zhí)行完A之后,調(diào)用B,并不等待B完成,而是執(zhí)行開始調(diào)用C,待C執(zhí)行完畢之后,就意味著這個(gè)過程執(zhí)行完畢了。

          在Java中,一般在處理類似的場(chǎng)景之時(shí),都是基于創(chuàng)建獨(dú)立的線程去完成相應(yīng)的異步調(diào)用邏輯,通過主線程和不同的業(yè)務(wù)子線程之間的執(zhí)行流程,從而在啟動(dòng)獨(dú)立的線程之后,主線程繼續(xù)執(zhí)行而不會(huì)產(chǎn)生停滯等待的情況。

          Spring 已經(jīng)實(shí)現(xiàn)的線程池

          1. SimpleAsyncTaskExecutor:不是真的線程池,這個(gè)類不重用線程,默認(rèn)每次調(diào)用都會(huì)創(chuàng)建一個(gè)新的線程。
          2. SyncTaskExecutor:這個(gè)類沒有實(shí)現(xiàn)異步調(diào)用,只是一個(gè)同步操作。只適用于不需要多線程的地方。
          3. ConcurrentTaskExecutor:Executor的適配類,不推薦使用。如果ThreadPoolTaskExecutor不滿足要求時(shí),才用考慮使用這個(gè)類。
          4. SimpleThreadPoolTaskExecutor:是Quartz的SimpleThreadPool的類。線程池同時(shí)被quartz和非quartz使用,才需要使用此類。
          5. ThreadPoolTaskExecutor :最常使用,推薦。其實(shí)質(zhì)是對(duì)java.util.concurrent.ThreadPoolExecutor的包裝。

          異步的方法有:

          1. 最簡(jiǎn)單的異步調(diào)用,返回值為void
          2. 帶參數(shù)的異步調(diào)用,異步方法可以傳入?yún)?shù)
          3. 存在返回值,常調(diào)用返回Future

          Spring中啟用@Async

          //?基于Java配置的啟用方式:
          @Configuration
          @EnableAsync
          public?class?SpringAsyncConfig?{?...?}

          // Spring boot啟用:
          @EnableAsync
          @EnableTransactionManagement
          public?class?SettlementApplication?{
          ????public?static?void?main(String[]?args)?{
          ????????SpringApplication.run(SettlementApplication.class,?args);
          ????}
          }

          @Async應(yīng)用默認(rèn)線程池

          Spring應(yīng)用默認(rèn)的線程池,指在@Async注解在使用時(shí),不指定線程池的名稱。查看源碼,@Async的默認(rèn)線程池為SimpleAsyncTaskExecutor。

          無返回值調(diào)用

          基于@Async無返回值調(diào)用,直接在使用類,使用方法(建議在使用方法)上,加上注解。若需要拋出異常,需手動(dòng)new一個(gè)異常拋出。

          /**
          ?????*?帶參數(shù)的異步調(diào)用?異步方法可以傳入?yún)?shù)
          ?????*??對(duì)于返回值是void,異常會(huì)被AsyncUncaughtExceptionHandler處理掉
          ?????*?@param?s
          ?????*/

          ????@Async
          ????public?void?asyncInvokeWithException(String?s)?{
          ????????log.info("asyncInvokeWithParameter,?parementer={}",?s);
          ????????throw?new?IllegalArgumentException(s);
          ????}

          有返回值Future調(diào)用

          /**
          ?????*?異常調(diào)用返回Future
          ?????*??對(duì)于返回值是Future,不會(huì)被AsyncUncaughtExceptionHandler處理,需要我們?cè)诜椒ㄖ胁东@異常并處理
          ?????*??或者在調(diào)用方在調(diào)用Futrue.get時(shí)捕獲異常進(jìn)行處理
          ?????*
          ?????*?@param?i
          ?????*?@return
          ?????*/

          ????@Async
          ????public?Future?asyncInvokeReturnFuture(int?i)?{
          ????????log.info("asyncInvokeReturnFuture,?parementer={}",?i);
          ????????Future?future;
          ????????try?{
          ????????????Thread.sleep(1000?*?1);
          ????????????future?=?new?AsyncResult("success:"?+?i);
          ????????????throw?new?IllegalArgumentException("a");
          ????????}?catch?(InterruptedException?e)?{
          ????????????future?=?new?AsyncResult("error");
          ????????}?catch(IllegalArgumentException?e){
          ????????????future?=?new?AsyncResult("error-IllegalArgumentException");
          ????????}
          ????????return?future;
          ????}

          有返回值CompletableFuture調(diào)用

          CompletableFuture 并不使用@Async注解,可達(dá)到調(diào)用系統(tǒng)線程池處理業(yè)務(wù)的功能。

          JDK5新增了Future接口,用于描述一個(gè)異步計(jì)算的結(jié)果。雖然 Future 以及相關(guān)使用方法提供了異步執(zhí)行任務(wù)的能力,但是對(duì)于結(jié)果的獲取卻是很不方便,只能通過阻塞或者輪詢的方式得到任務(wù)的結(jié)果。阻塞的方式顯然和我們的異步編程的初衷相違背,輪詢的方式又會(huì)耗費(fèi)無謂的 CPU 資源,而且也不能及時(shí)地得到計(jì)算結(jié)果。

          • CompletionStage代表異步計(jì)算過程中的某一個(gè)階段,一個(gè)階段完成以后可能會(huì)觸發(fā)另外一個(gè)階段
          • 一個(gè)階段的計(jì)算執(zhí)行可以是一個(gè)Function,Consumer或者Runnable。比如:stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())
          • 一個(gè)階段的執(zhí)行可能是被單個(gè)階段的完成觸發(fā),也可能是由多個(gè)階段一起觸發(fā)

          在Java8中,CompletableFuture提供了非常強(qiáng)大的Future的擴(kuò)展功能,可以幫助我們簡(jiǎn)化異步編程的復(fù)雜性,并且提供了函數(shù)式編程的能力,可以通過回調(diào)的方式處理計(jì)算結(jié)果,也提供了轉(zhuǎn)換和組合 CompletableFuture 的方法。

          • 它可能代表一個(gè)明確完成的Future,也有可能代表一個(gè)完成階段( CompletionStage ),它支持在計(jì)算完成以后觸發(fā)一些函數(shù)或執(zhí)行某些動(dòng)作。
          • 它實(shí)現(xiàn)了Future和CompletionStage接口
          /**
          ?????*?數(shù)據(jù)查詢線程池
          ?????*/

          ????private?static?final?ThreadPoolExecutor?SELECT_POOL_EXECUTOR?=?new?ThreadPoolExecutor(10,?20,?5000,
          ????????????TimeUnit.MILLISECONDS,?new?LinkedBlockingQueue<>(1024),?new?ThreadFactoryBuilder().setNameFormat("selectThreadPoolExecutor-%d").build());

          //?tradeMapper.countTradeLog(tradeSearchBean)方法表示,獲取數(shù)量,返回值為int
          ?//?獲取總條數(shù)
          ????????CompletableFuture?countFuture?=?CompletableFuture
          ????????????????.supplyAsync(()?->?tradeMapper.countTradeLog(tradeSearchBean),?SELECT_POOL_EXECUTOR);
          //?同步阻塞
          ????CompletableFuture.allOf(countFuture).join();
          //?獲取結(jié)果
          ?int?count?=?countFuture.get();

          默認(rèn)線程池的弊端

          在線程池應(yīng)用中,參考阿里巴巴java開發(fā)規(guī)范:線程池不允許使用Executors去創(chuàng)建,不允許使用系統(tǒng)默認(rèn)的線程池,推薦通過ThreadPoolExecutor的方式,這樣的處理方式讓開發(fā)的工程師更加明確線程池的運(yùn)行規(guī)則,規(guī)避資源耗盡的風(fēng)險(xiǎn)。Executors各個(gè)方法的弊端:

          • newFixedThreadPool和newSingleThreadExecutor:主要問題是堆積的請(qǐng)求處理隊(duì)列可能會(huì)耗費(fèi)非常大的內(nèi)存,甚至OOM。
          • newCachedThreadPool和newScheduledThreadPool:要問題是線程數(shù)最大數(shù)是Integer.MAX_VALUE,可能會(huì)創(chuàng)建數(shù)量非常多的線程,甚至OOM。

          @Async默認(rèn)異步配置使用的是SimpleAsyncTaskExecutor,該線程池默認(rèn)來一個(gè)任務(wù)創(chuàng)建一個(gè)線程,若系統(tǒng)中不斷的創(chuàng)建線程,最終會(huì)導(dǎo)致系統(tǒng)占用內(nèi)存過高,引發(fā)OutOfMemoryError錯(cuò)誤。

          針對(duì)線程創(chuàng)建問題,SimpleAsyncTaskExecutor提供了限流機(jī)制,通過concurrencyLimit屬性來控制開關(guān),當(dāng)concurrencyLimit>=0時(shí)開啟限流機(jī)制,默認(rèn)關(guān)閉限流機(jī)制即concurrencyLimit=-1,當(dāng)關(guān)閉情況下,會(huì)不斷創(chuàng)建新的線程來處理任務(wù)。基于默認(rèn)配置,SimpleAsyncTaskExecutor并不是嚴(yán)格意義的線程池,達(dá)不到線程復(fù)用的功能。

          @Async應(yīng)用自定義線程池

          自定義線程池,可對(duì)系統(tǒng)中線程池更加細(xì)粒度的控制,方便調(diào)整線程池大小配置,線程執(zhí)行異常控制和處理。在設(shè)置系統(tǒng)自定義線程池代替默認(rèn)線程池時(shí),雖可通過多種模式設(shè)置,但替換默認(rèn)線程池最終產(chǎn)生的線程池有且只能設(shè)置一個(gè)(不能設(shè)置多個(gè)類繼承AsyncConfigurer)。自定義線程池有如下模式:

          • 重新實(shí)現(xiàn)接口AsyncConfigurer
          • 繼承AsyncConfigurerSupport
          • 配置由自定義的TaskExecutor替代內(nèi)置的任務(wù)執(zhí)行器

          通過查看Spring源碼關(guān)于@Async的默認(rèn)調(diào)用規(guī)則,會(huì)優(yōu)先查詢?cè)创a中實(shí)現(xiàn)AsyncConfigurer這個(gè)接口的類,實(shí)現(xiàn)這個(gè)接口的類為AsyncConfigurerSupport。但默認(rèn)配置的線程池和異步處理方法均為空,所以,無論是繼承或者重新實(shí)現(xiàn)接口,都需指定一個(gè)線程池。且重新實(shí)現(xiàn) public Executor getAsyncExecutor()方法。

          實(shí)現(xiàn)接口AsyncConfigurer

          @Configuration
          ?public?class?AsyncConfiguration?implements?AsyncConfigurer?{
          ?????@Bean("kingAsyncExecutor")
          ?????public?ThreadPoolTaskExecutor?executor()?{
          ?????????ThreadPoolTaskExecutor?executor?=?new?ThreadPoolTaskExecutor();
          ?????????int?corePoolSize?=?10;
          ?????????executor.setCorePoolSize(corePoolSize);
          ?????????int?maxPoolSize?=?50;
          ?????????executor.setMaxPoolSize(maxPoolSize);
          ?????????int?queueCapacity?=?10;
          ?????????executor.setQueueCapacity(queueCapacity);
          ?????????executor.setRejectedExecutionHandler(new?ThreadPoolExecutor.CallerRunsPolicy());
          ?????????String?threadNamePrefix?=?"kingDeeAsyncExecutor-";
          ?????????executor.setThreadNamePrefix(threadNamePrefix);
          ?????????executor.setWaitForTasksToCompleteOnShutdown(true);
          ?????????//?使用自定義的跨線程的請(qǐng)求級(jí)別線程工廠類19?????????int?awaitTerminationSeconds?=?5;
          ?????????executor.setAwaitTerminationSeconds(awaitTerminationSeconds);
          ?????????executor.initialize();
          ?????????return?executor;
          ?????}
          ?
          ?????@Override
          ?????public?Executor?getAsyncExecutor()?{
          ?????????return?executor();
          ?????}
          ?
          ?????@Override
          ?????public?AsyncUncaughtExceptionHandler?getAsyncUncaughtExceptionHandler()?{
          ?????????return?(ex,?method,?params)?->?ErrorLogger.getInstance().log(String.format("執(zhí)行異步任務(wù)'%s'",?method),?ex);
          ?????}
          ?}

          繼承AsyncConfigurerSupport

          @Configuration??
          @EnableAsync??
          class?SpringAsyncConfigurer?extends?AsyncConfigurerSupport?{??
          ??
          ????@Bean??
          ????public?ThreadPoolTaskExecutor?asyncExecutor()?{??
          ????????ThreadPoolTaskExecutor?threadPool?=?new?ThreadPoolTaskExecutor();??
          ????????threadPool.setCorePoolSize(3);??
          ????????threadPool.setMaxPoolSize(3);??
          ????????threadPool.setWaitForTasksToCompleteOnShutdown(true);??
          ????????threadPool.setAwaitTerminationSeconds(60?*?15);??
          ????????return?threadPool;??
          ????}??
          ??
          ????@Override??
          ????public?Executor?getAsyncExecutor()?{??
          ????????return?asyncExecutor;??
          }??

          ??@Override??
          ????public?AsyncUncaughtExceptionHandler?getAsyncUncaughtExceptionHandler()?{
          ????return?(ex,?method,?params)?->?ErrorLogger.getInstance().log(String.format("執(zhí)行異步任務(wù)'%s'",?method),?ex);
          }
          }

          配置自定義的TaskExecutor

          由于AsyncConfigurer的默認(rèn)線程池在源碼中為空,Spring通過beanFactory.getBean(TaskExecutor.class)先查看是否有線程池,未配置時(shí),通過beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class),又查詢是否存在默認(rèn)名稱為TaskExecutor的線程池。

          所以可以在項(xiàng)目中,定義名稱為TaskExecutor的bean生成一個(gè)默認(rèn)線程池。也可不指定線程池的名稱,申明一個(gè)線程池,本身底層是基于TaskExecutor.class便可。

          比如:

          Executor.class:ThreadPoolExecutorAdapter->ThreadPoolExecutor->AbstractExecutorService->ExecutorService->Executor

          這樣的模式,最終底層為Executor.class,在替換默認(rèn)的線程池時(shí),需設(shè)置默認(rèn)的線程池名稱為TaskExecutor

          TaskExecutor.class:ThreadPoolTaskExecutor->SchedulingTaskExecutor->AsyncTaskExecutor->TaskExecutor

          這樣的模式,最終底層為TaskExecutor.class,在替換默認(rèn)的線程池時(shí),可不指定線程池名稱。

          @EnableAsync
          ?@Configuration
          ?public?class?TaskPoolConfig?{
          ?????@Bean(name?=?AsyncExecutionAspectSupport.DEFAULT_TASK_EXECUTOR_BEAN_NAME)
          ?????public?Executor?taskExecutor()?{
          ?????????ThreadPoolTaskExecutor?executor?=?new?ThreadPoolTaskExecutor();
          ??????????//核心線程池大小
          ?????????executor.setCorePoolSize(10);
          ?????????//最大線程數(shù)
          ?????????executor.setMaxPoolSize(20);
          ?????????//隊(duì)列容量
          ?????????executor.setQueueCapacity(200);
          ?????????//活躍時(shí)間
          ?????????executor.setKeepAliveSeconds(60);
          ?????????//線程名字前綴
          ?????????executor.setThreadNamePrefix("taskExecutor-");
          ?????????executor.setRejectedExecutionHandler(new?ThreadPoolExecutor.CallerRunsPolicy());
          ?????????return?executor;
          ?????}
          ????@Bean(name?=?"new_task")
          ?????public?Executor?taskExecutor()?{
          ?????????ThreadPoolTaskExecutor?executor?=?new?ThreadPoolTaskExecutor();
          ??????????//核心線程池大小
          ?????????executor.setCorePoolSize(10);
          ?????????//最大線程數(shù)
          ?????????executor.setMaxPoolSize(20);
          ?????????//隊(duì)列容量
          ?????????executor.setQueueCapacity(200);
          ?????????//活躍時(shí)間
          ?????????executor.setKeepAliveSeconds(60);
          ?????????//線程名字前綴
          ?????????executor.setThreadNamePrefix("taskExecutor-");
          ?????????executor.setRejectedExecutionHandler(new?ThreadPoolExecutor.CallerRunsPolicy());
          ?????????return?executor;
          ?????}
          ?}

          多個(gè)線程池

          @Async注解,使用系統(tǒng)默認(rèn)或者自定義的線程池(代替默認(rèn)線程池)。可在項(xiàng)目中設(shè)置多個(gè)線程池,在異步調(diào)用時(shí),指明需要調(diào)用的線程池名稱,如@Async("new_task")

          @Async部分重要源碼解析

          源碼-獲取線程池方法

          源碼-設(shè)置默認(rèn)線程池defaultExecutor,默認(rèn)是空的,當(dāng)重新實(shí)現(xiàn)接口AsyncConfigurer的getAsyncExecutor()時(shí),可以設(shè)置默認(rèn)的線程池。

          源碼-都沒有找到項(xiàng)目中設(shè)置的默認(rèn)線程池時(shí),采用spring 默認(rèn)的線程池

          程序汪資料鏈接

          程序汪接的7個(gè)私活都在這里,經(jīng)驗(yàn)整理

          Java項(xiàng)目分享 最新整理全集,找項(xiàng)目不累啦 06版

          堪稱神級(jí)的Spring Boot手冊(cè),從基礎(chǔ)入門到實(shí)戰(zhàn)進(jìn)階

          臥槽!字節(jié)跳動(dòng)《算法中文手冊(cè)》火了,完整版 PDF 開放下載!

          臥槽!阿里大佬總結(jié)的《圖解Java》火了,完整版PDF開放下載!

          字節(jié)跳動(dòng)總結(jié)的設(shè)計(jì)模式 PDF 火了,完整版開放下載!


          歡迎添加程序汪個(gè)人微信 itwang009? 進(jìn)粉絲群或圍觀朋友圈

          瀏覽 29
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  精品久久噜噜午夜论理电影 | 久久久精品国产 | 欧美亚洲国产一区导航 | 91人人干 | 人妻少妇内射无码一区二区 |