<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          SpringBoot 如何實(shí)現(xiàn)并發(fā)定時(shí)任務(wù)?

          共 8548字,需瀏覽 18分鐘

           ·

          2020-07-28 13:10


          一、在JAVA開發(fā)領(lǐng)域,目前可以通過以下幾種方式進(jìn)行定時(shí)任務(wù)


          1、單機(jī)部署模式


          • Timer:

            jdk中自帶的一個(gè)定時(shí)調(diào)度類,可以簡(jiǎn)單的實(shí)現(xiàn)按某一頻度進(jìn)行任務(wù)執(zhí)行。

            提供的功能比較單一,無法實(shí)現(xiàn)復(fù)雜的調(diào)度任務(wù)。


          • ScheduledExecutorService:

            也是jdk自帶的一個(gè)基于線程池設(shè)計(jì)的定時(shí)任務(wù)類。

            其每個(gè)調(diào)度任務(wù)都會(huì)分配到線程池中的一個(gè)線程執(zhí)行,所以其任務(wù)是并發(fā)執(zhí)行的,互不影響。


          • Spring Task:

            Spring提供的一個(gè)任務(wù)調(diào)度工具,支持注解和配置文件形式,支持Cron表達(dá)式,使用簡(jiǎn)單但功能強(qiáng)大。


          • Quartz:

            一款功能強(qiáng)大的任務(wù)調(diào)度器,可以實(shí)現(xiàn)較為復(fù)雜的調(diào)度功能,如每月一號(hào)執(zhí)行、每天凌晨執(zhí)行、每周五執(zhí)行等等,還支持分布式調(diào)度,就是配置稍顯復(fù)雜。


          2、分布式集群模式(不多介紹,簡(jiǎn)單提一下)


          問題:


          I、如何解決定時(shí)任務(wù)的多次執(zhí)行?


          II、如何解決任務(wù)的單點(diǎn)問題,實(shí)現(xiàn)任務(wù)的故障轉(zhuǎn)移?


          問題I的簡(jiǎn)單思考:


          1、固定執(zhí)行定時(shí)任務(wù)的機(jī)器(可以有效避免多次執(zhí)行的情況 ,缺點(diǎn)就是單點(diǎn)故障問題)。
          2、借助Redis的過期機(jī)制和分布式鎖。
          3、借助mysql的鎖機(jī)制等。


          成熟的解決方案:


          1、Quartz:可以去看看這篇文章[Quartz分布式]( https://www.cnblogs.com/jiafuwei/p/6145280.html)。

          2、elastic-job:(https://github.com/elasticjob/elastic-job-lite)當(dāng)當(dāng)開發(fā)的彈性分布式任務(wù)調(diào)度系統(tǒng),采用zookeeper實(shí)現(xiàn)分布式協(xié)調(diào),實(shí)現(xiàn)任務(wù)高可用以及分片。


          3、xxl-job:(https://github.com/xuxueli/xxl-job)是大眾點(diǎn)評(píng)員發(fā)布的分布式任務(wù)調(diào)度平臺(tái),是一個(gè)輕量級(jí)分布式任務(wù)調(diào)度框架。

          4、saturn:(https://github.com/vipshop/Saturn) 是唯品會(huì)提供一個(gè)分布式、容錯(cuò)和高可用的作業(yè)調(diào)度服務(wù)框架。

          二、SpringTask實(shí)現(xiàn)定時(shí)任務(wù)(這里是基于springboot)


          1、簡(jiǎn)單的定時(shí)任務(wù)實(shí)現(xiàn)使用方式:使用@EnableScheduling注解開啟對(duì)定時(shí)任務(wù)的支持。


          使用@Scheduled 注解即可,基于corn、fixedRate、fixedDelay等一些定時(shí)策略來實(shí)現(xiàn)定時(shí)任務(wù)。


          使用缺點(diǎn):1、多個(gè)定時(shí)任務(wù)使用的是同一個(gè)調(diào)度線程,所以任務(wù)是阻塞執(zhí)行的,執(zhí)行效率不高。


          2、其次如果出現(xiàn)任務(wù)阻塞,導(dǎo)致一些場(chǎng)景的定時(shí)計(jì)算沒有實(shí)際意義,比如每天12點(diǎn)的一個(gè)計(jì)算任務(wù)被阻塞到1點(diǎn)去執(zhí)行,會(huì)導(dǎo)致結(jié)果并非我們想要的。

          使用優(yōu)點(diǎn):


          1、配置簡(jiǎn)單
          2、適用于單個(gè)后臺(tái)線程執(zhí)行周期任務(wù),并且保證順序一致執(zhí)行的場(chǎng)景

          源碼分析:


          //默認(rèn)使用的調(diào)度器
          if(this.taskScheduler == null) {
          ????this.localExecutor = Executors.newSingleThreadScheduledExecutor();
          ????this.taskScheduler = new?ConcurrentTaskScheduler(this.localExecutor);
          }
          //可以看到SingleThreadScheduledExecutor指定的核心線程為1,說白了就是單線程執(zhí)行
          public?static?ScheduledExecutorService newSingleThreadScheduledExecutor()?{
          ????return?new?DelegatedScheduledExecutorService
          ????????(new?ScheduledThreadPoolExecutor(1));
          }
          //利用了DelayedWorkQueue延時(shí)隊(duì)列作為任務(wù)的存放隊(duì)列,這樣便可以實(shí)現(xiàn)任務(wù)延遲執(zhí)行或者定時(shí)執(zhí)行
          public?ScheduledThreadPoolExecutor(int?corePoolSize)?{
          ????super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          ??????????new?DelayedWorkQueue());
          }


          2、實(shí)現(xiàn)并發(fā)的定時(shí)任務(wù)使用方式:


          • 方式一:由1中我們知道之所以定時(shí)任務(wù)是阻塞執(zhí)行,是配置的線程池決定的,那就好辦了,換一個(gè)不就行了!直接上代碼:


          @Configuration
          ??public?class?ScheduledConfig?implements?SchedulingConfigurer?{

          ????@Autowired
          ??????private?TaskScheduler myThreadPoolTaskScheduler;
          ??
          ??????@Override
          ??????public?void?configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar)?{
          ??????????//簡(jiǎn)單粗暴的方式直接指定
          ??????//scheduledTaskRegistrar.setScheduler(Executors.newScheduledThreadPool(5));
          ??????????//也可以自定義的線程池,方便線程的使用與維護(hù),這里不多說了
          ??????scheduledTaskRegistrar.setTaskScheduler(myThreadPoolTaskScheduler);
          ??????}
          ??}

          ??@Bean(name = "myThreadPoolTaskScheduler")
          ??public?TaskScheduler getMyThreadPoolTaskScheduler()?{
          ??????ThreadPoolTaskScheduler taskScheduler = new?ThreadPoolTaskScheduler();
          ??????taskScheduler.setPoolSize(10);
          ??????taskScheduler.setThreadNamePrefix("Haina-Scheduled-");
          ??????taskScheduler.setRejectedExecutionHandler(new?ThreadPoolExecutor.CallerRunsPolicy());
          ??????//調(diào)度器shutdown被調(diào)用時(shí)等待當(dāng)前被調(diào)度的任務(wù)完成
          ??????taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
          ??????//等待時(shí)長(zhǎng)
          ??????taskScheduler.setAwaitTerminationSeconds(60);
          ??????return?taskScheduler;
          ??}


          • 方式二:方式一的本質(zhì)改變了任務(wù)調(diào)度器默認(rèn)使用的線程池,接下來這種是不改變調(diào)度器的默認(rèn)線程池,而是把當(dāng)前任務(wù)交給一個(gè)異步線程池去執(zhí)行。廢話太多,直接上代碼:


          @Scheduled(fixedRate = 1000*10,initialDelay = 1000*20)
          ??@Async("myThreadPoolTaskExecutor")
          ??//@Async
          ??public?void?scheduledTest02(){
          ??????System.out.println(Thread.currentThread().getName()+"--->xxxxx--->"+Thread.currentThread().getId());
          ??}

          ??//自定義線程池
          ??@Bean(name = "myThreadPoolTaskExecutor")
          ??public?TaskExecutor getMyThreadPoolTaskExecutor()?{
          ??????ThreadPoolTaskExecutor taskExecutor = new?ThreadPoolTaskExecutor();
          ??????taskExecutor.setCorePoolSize(20);
          ??????taskExecutor.setMaxPoolSize(200);
          ??????taskExecutor.setQueueCapacity(25);
          ??????taskExecutor.setKeepAliveSeconds(200);
          ??????taskExecutor.setThreadNamePrefix("Haina-ThreadPool-");
          ??????// 線程池對(duì)拒絕任務(wù)(無線程可用)的處理策略,目前只支持AbortPolicy、CallerRunsPolicy;默認(rèn)為后者
          ??????taskExecutor.setRejectedExecutionHandler(new?ThreadPoolExecutor.CallerRunsPolicy());
          ??????//調(diào)度器shutdown被調(diào)用時(shí)等待當(dāng)前被調(diào)度的任務(wù)完成
          ??????taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
          ??????//等待時(shí)長(zhǎng)
          ??????taskExecutor.setAwaitTerminationSeconds(60);
          ??????taskExecutor.initialize();
          ??????return?taskExecutor;
          ??}


            • 首先使用@EnableAsync 啟用異步任務(wù)

            • 然后在定時(shí)任務(wù)的方法加上@Async即可,默認(rèn)使用的線程池為SimpleAsyncTaskExecutor(該線程池默認(rèn)來一個(gè)任務(wù)創(chuàng)建一個(gè)線程,就會(huì)不斷創(chuàng)建大量線程,極有可能壓爆服務(wù)器內(nèi)存。

              當(dāng)然它有自己的限流機(jī)制,這里就不多說了,有興趣的自己翻翻源碼~)

            • 項(xiàng)目中為了更好的控制線程的使用,我們可以自定義我們自己的線程池,使用方式@Async("myThreadPool")


          • 線程池的使用心得(后續(xù)有專門文章來探討)


            • java中提供了ThreadPoolExecutor和ScheduledThreadPoolExecutor,對(duì)應(yīng)與spring中的ThreadPoolTaskExecutor和ThreadPoolTaskScheduler,但是在原有的基礎(chǔ)上增加了新的特性,在spring環(huán)境下更容易使用和控制。

            • 使用自定義的線程池能夠避免一些默認(rèn)線程池造成的內(nèi)存溢出、阻塞等等問題,更貼合自己的服務(wù)特性

            • 使用自定義的線程池便于對(duì)項(xiàng)目中線程的管理、維護(hù)以及監(jiān)控。

            • 即便在非spring環(huán)境下也不要使用java默認(rèn)提供的那幾種線程池,坑很多,阿里代碼規(guī)約不說了嗎,得相信大廠!


          三、動(dòng)態(tài)定時(shí)任務(wù)的實(shí)現(xiàn)問題:


          • 使用@Scheduled注解來完成設(shè)置定時(shí)任務(wù),但是有時(shí)候我們往往需要對(duì)周期性的時(shí)間的設(shè)置會(huì)做一些改變,或者要?jiǎng)討B(tài)的啟停一個(gè)定時(shí)任務(wù),那么這個(gè)時(shí)候使用此注解就不太方便了,原因在于這個(gè)注解中配置的cron表達(dá)式必須是常量,那么當(dāng)我們修改定時(shí)參數(shù)的時(shí)候,就需要停止服務(wù),重新部署。


          解決辦法:


          • 方式一:


            實(shí)現(xiàn)SchedulingConfigurer接口,重寫configureTasks方法,重新制定Trigger,核心方法就是addTriggerTask(Runnable task, Trigger trigger) ,不過需要注意的是,此種方式修改了配置值后,需要在下一次調(diào)度結(jié)束后,才會(huì)更新調(diào)度器,并不會(huì)在修改配置值時(shí)實(shí)時(shí)更新,實(shí)時(shí)更新需要在修改配置值時(shí)額外增加相關(guān)邏輯處理。


          @Configuration
          ??public?class?ScheduledConfig?implements?SchedulingConfigurer?{

          ??@Autowired
          ??private?TaskScheduler myThreadPoolTaskScheduler;

          ??@Override
          ??public?void?configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar)?{
          ??????//scheduledTaskRegistrar.setScheduler(Executors.newScheduledThreadPool(5));
          ??????scheduledTaskRegistrar.setTaskScheduler(myThreadPoolTaskScheduler);
          ??????//可以實(shí)現(xiàn)動(dòng)態(tài)調(diào)整定時(shí)任務(wù)的執(zhí)行頻率
          ????scheduledTaskRegistrar.addTriggerTask(
          ??????????????//1.添加任務(wù)內(nèi)容(Runnable)
          ??????????????() -> System.out.println("cccccccccccccccc--->"?+ Thread.currentThread().getId()),
          ??????????????//2.設(shè)置執(zhí)行周期(Trigger)
          ??????????????triggerContext -> {
          ??????????????????//2.1 從數(shù)據(jù)庫(kù)動(dòng)態(tài)獲取執(zhí)行周期
          ??????????????????String cron = "0/2 * * * * ? ";
          ??????????????????//2.2 合法性校驗(yàn).
          ??// if (StringUtils.isEmpty(cron)) {
          ??// // Omitted Code ..
          ??// }
          ??????????????????????//2.3 返回執(zhí)行周期(Date)
          ??????????????????????return?new?CronTrigger(cron).nextExecutionTime(triggerContext);
          ??????????????????}
          ??????????);
          ??}
          ??}


          • 方式二:


            使用threadPoolTaskScheduler類可實(shí)現(xiàn)動(dòng)態(tài)添加刪除功能,當(dāng)然也可實(shí)現(xiàn)執(zhí)行頻率的調(diào)整 ?首先,我們要認(rèn)識(shí)下這個(gè)調(diào)度類,它其實(shí)是對(duì)java中ScheduledThreadPoolExecutor的一個(gè)封裝改進(jìn)后的產(chǎn)物,主要改進(jìn)有以下幾點(diǎn):


            ? 1、提供默認(rèn)配置,因?yàn)槭荢cheduledThreadPoolExecutor,所以只有poolSize這一個(gè)默認(rèn)參數(shù)。


            ? 2、支持自定義任務(wù),通過傳入Trigger參數(shù)。


            ? 3、對(duì)任務(wù)出錯(cuò)處理進(jìn)行優(yōu)化,如果是重復(fù)性的任務(wù),不拋出異常,通過日志記錄下來,不影響下次運(yùn)行,如果是只執(zhí)行一次的任務(wù),將異常往上拋。


            順便說下ThreadPoolTaskExecutor相對(duì)于ThreadPoolExecutor的改進(jìn)點(diǎn):


            ? 1、提供默認(rèn)配置,原生的ThreadPoolExecutor的除了ThreadFactory和RejectedExecutionHandler其他沒有默認(rèn)配置


            ? 2、實(shí)現(xiàn)AsyncListenableTaskExecutor接口,支持對(duì)FutureTask添加success和fail的回調(diào),任務(wù)成功或失敗的時(shí)候回執(zhí)行對(duì)應(yīng)回調(diào)方法。


            ? 3、因?yàn)槭莝pring的工具類,所以拋出的RejectedExecutionException也會(huì)被轉(zhuǎn)換為spring框架的TaskRejectedException異常(這個(gè)無所謂)


            ? 4、提供默認(rèn)ThreadFactory實(shí)現(xiàn),直接通過參數(shù)重載配置


            扯了這么多,還是直接上代碼:


          @Component
          ??public?class?DynamicTimedTask?{
          ??
          ??????private?static?final?Logger logger = LoggerFactory.getLogger(DynamicTimedTask.class);
          ??
          ??????//利用創(chuàng)建好的調(diào)度類統(tǒng)一管理
          ??????//@Autowired
          ??????//@Qualifier("myThreadPoolTaskScheduler")
          ??????//private ThreadPoolTaskScheduler myThreadPoolTaskScheduler;
          ??
          ??
          ??????//接受任務(wù)的返回結(jié)果
          ??????private?ScheduledFuture future;
          ??
          ??????@Autowired
          ??????private?ThreadPoolTaskScheduler threadPoolTaskScheduler;
          ??????
          ????//實(shí)例化一個(gè)線程池任務(wù)調(diào)度類,可以使用自定義的ThreadPoolTaskScheduler
          ??????@Bean
          ??????public?ThreadPoolTaskScheduler threadPoolTaskScheduler()?{
          ??????????ThreadPoolTaskScheduler executor = new?ThreadPoolTaskScheduler();
          ??????????return?new?ThreadPoolTaskScheduler();
          ??????}
          ??
          ??
          ??????/**
          ???????* 啟動(dòng)定時(shí)任務(wù)
          ???????* @return
          ???????*/

          ??????public?boolean?startCron()?{
          ??????????boolean?flag = false;
          ??????//從數(shù)據(jù)庫(kù)動(dòng)態(tài)獲取執(zhí)行周期
          ??????????String cron = "0/2 * * * * ? ";
          ??????????future = threadPoolTaskScheduler.schedule(new?CheckModelFile(),cron);
          ??????????if?(future!=null){
          ??????????????flag = true;
          ??????????????logger.info("定時(shí)check訓(xùn)練模型文件,任務(wù)啟動(dòng)成功!!!");
          ??????????}else?{
          ??????????????logger.info("定時(shí)check訓(xùn)練模型文件,任務(wù)啟動(dòng)失敗!!!");
          ??????????}
          ??????????return?flag;
          ??????}
          ??
          ??????/**
          ???????* 停止定時(shí)任務(wù)
          ???????* @return
          ???????*/

          ??????public?boolean?stopCron()?{
          ??????????boolean?flag = false;
          ??????????if?(future != null) {
          ??????????????boolean?cancel = future.cancel(true);
          ??????????????if?(cancel){
          ??????????????????flag = true;
          ??????????????????logger.info("定時(shí)check訓(xùn)練模型文件,任務(wù)停止成功!!!");
          ??????????????}else?{
          ??????????????????logger.info("定時(shí)check訓(xùn)練模型文件,任務(wù)停止失敗!!!");
          ??????????????}
          ??????????}else?{
          ??????????????flag = true;
          ??????????????logger.info("定時(shí)check訓(xùn)練模型文件,任務(wù)已經(jīng)停止!!!");
          ??????????}
          ??????????return?flag;
          ??????}
          ??
          ??
          ??????class?CheckModelFile?implements?Runnable{
          ??
          ??????????@Override
          ??????????public?void?run()?{
          ????????//編寫你自己的業(yè)務(wù)邏輯
          ????????System.out.print("模型文件檢查完畢!!!")
          ??????????}
          ??????}

          ??}


          四、總結(jié)


          到此基于springtask下的定時(shí)任務(wù)的簡(jiǎn)單使用算是差不多了,其中不免有些錯(cuò)誤的地方,或者理解有偏頗的地方歡迎大家提出來!


          出處:cnblogs.com/baixianlong/p/10659045.html




          瀏覽 74
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  男生操女生视频网站 | 欧美精品在线观看网站 | 国产乱伦麻豆 | 亚洲69视频 | 中国十大黄色操逼网站 |