<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          1.6萬字+28張圖盤點11種延遲任務的實現(xiàn)方式

          共 33053字,需瀏覽 67分鐘

           ·

          2024-07-01 23:26

          大家好,我是三友~~

          延遲任務在我們?nèi)粘I钪斜容^常見,比如訂單支付超時取消訂單功能,又比如自動確定收貨的功能等等。

          所以本篇文章就來從實現(xiàn)到原理來盤點延遲任務的11種實現(xiàn)方式,這些方式并沒有絕對的好壞之分,只是適用場景的不大相同。

          DelayQueue

          DelayQueue是JDK提供的api,是一個延遲隊列

          DelayQueue泛型參數(shù)得實現(xiàn)Delayed接口,Delayed繼承了Comparable接口。

          getDelay方法返回這個任務還剩多久時間可以執(zhí)行,小于0的時候說明可以這個延遲任務到了執(zhí)行的時間了。

          compareTo這個是對任務排序的,保證最先到延遲時間的任務排到隊列的頭。

          來個demo

          @Getter
          public class SanYouTask implements Delayed {

              private final String taskContent;

              private final Long triggerTime;

              public SanYouTask(String taskContent, Long delayTime) {
                  this.taskContent = taskContent;
                  this.triggerTime = System.currentTimeMillis() + delayTime * 1000;
              }

              @Override
              public long getDelay(TimeUnit unit) {
                  return unit.convert(triggerTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
              }

              @Override
              public int compareTo(Delayed o) {
                  return this.triggerTime.compareTo(((SanYouTask) o).triggerTime);
              }

          }

          SanYouTask實現(xiàn)了Delayed接口,構造參數(shù)

          • taskContent:延遲任務的具體的內(nèi)容
          • delayTime:延遲時間,秒為單位

          測試

          @Slf4j
          public class DelayQueueDemo {

              public static void main(String[] args) {
                  DelayQueue<SanYouTask> sanYouTaskDelayQueue = new DelayQueue<>();

                  new Thread(() -> {
                      while (true) {
                          try {
                              SanYouTask sanYouTask = sanYouTaskDelayQueue.take();
                              log.info("獲取到延遲任務:{}", sanYouTask.getTaskContent());
                          } catch (Exception e) {
                          }
                      }
                  }).start();

                  log.info("提交延遲任務");
                  sanYouTaskDelayQueue.offer(new SanYouTask("三友的java日記5s"5L));
                  sanYouTaskDelayQueue.offer(new SanYouTask("三友的java日記3s"3L));
                  sanYouTaskDelayQueue.offer(new SanYouTask("三友的java日記8s"8L));
              }
          }

          開啟一個線程從DelayQueue中獲取任務,然后提交了三個任務,延遲時間分為別5s,3s,8s。

          測試結果:

          成功實現(xiàn)了延遲任務。

          實現(xiàn)原理

          offer方法在提交任務的時候,會通過根據(jù)compareTo的實現(xiàn)對任務進行排序,將最先需要被執(zhí)行的任務放到隊列頭。

          take方法獲取任務的時候,會拿到隊列頭部的元素,也就是隊列中最早需要被執(zhí)行的任務,通過getDelay返回值判斷任務是否需要被立刻執(zhí)行,如果需要的話,就返回任務,如果不需要就會等待這個任務到延遲時間的剩余時間,當時間到了就會將任務返回。

          Timer

          Timer也是JDK提供的api

          先來demo

          @Slf4j
          public class TimerDemo {

              public static void main(String[] args) {
                  Timer timer = new Timer();
                  
                  log.info("提交延遲任務");
                  timer.schedule(new TimerTask() {
                      @Override
                      public void run() {
                          log.info("執(zhí)行延遲任務");
                      }
                  }, 5000);
              }

          }

          通過schedule提交一個延遲時間為5s的延遲任務

          實現(xiàn)原理

          提交的任務是一個TimerTask

          public abstract class TimerTask implements Runnable {
              //忽略其它屬性
              
              long nextExecutionTime;
          }

          TimerTask內(nèi)部有一個nextExecutionTime屬性,代表下一次任務執(zhí)行的時間,在提交任務的時候會計算出nextExecutionTime值。

          Timer內(nèi)部有一個TaskQueue對象,用來保存TimerTask任務的,會根據(jù)nextExecutionTime來排序,保證能夠快速獲取到最早需要被執(zhí)行的延遲任務。

          在Timer內(nèi)部還有一個執(zhí)行任務的線程TimerThread,這個線程就跟DelayQueue demo中開啟的線程作用是一樣的,用來執(zhí)行到了延遲時間的任務。

          所以總的來看,Timer有點像整體封裝了DelayQueue demo中的所有東西,讓用起來簡單點。

          雖然Timer用起來比較簡單,但是在阿里規(guī)范中是不推薦使用的,主要是有以下幾點原因:

          • Timer使用單線程來處理任務,長時間運行的任務會導致其他任務的延時處理
          • Timer沒有對運行時異常進行處理,一旦某個任務觸發(fā)運行時異常,會導致整個Timer崩潰,不安全

          ScheduledThreadPoolExecutor

          由于Timer在使用上有一定的問題,所以在JDK1.5版本的時候提供了ScheduledThreadPoolExecutor,這個跟Timer的作用差不多,并且他們的方法的命名都是差不多的,但是ScheduledThreadPoolExecutor解決了單線程和異常崩潰等問題。

          來個demo

          @Slf4j
          public class ScheduledThreadPoolExecutorDemo {

              public static void main(String[] args) {
                  ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2new ThreadPoolExecutor.CallerRunsPolicy());

                  log.info("提交延遲任務");
                  executor.schedule(() -> log.info("執(zhí)行延遲任務"), 5, TimeUnit.SECONDS);
              }

          }

          結果

          實現(xiàn)原理

          ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor,也就是繼承了線程池,所以可以有很多個線程來執(zhí)行任務。

          ScheduledThreadPoolExecutor在構造的時候會傳入一個DelayedWorkQueue阻塞隊列,所以線程池內(nèi)部的阻塞隊列是DelayedWorkQueue。

          在提交延遲任務的時候,任務會被封裝一個任務會被封裝成ScheduledFutureTask對象,然后放到DelayedWorkQueue阻塞隊列中。

          ScheduledFutureTask

          ScheduledFutureTask實現(xiàn)了前面提到的Delayed接口,所以其實可以猜到DelayedWorkQueue會根據(jù)ScheduledFutureTask對于Delayed接口的實現(xiàn)來排序,所以線程能夠獲取到最早到延遲時間的任務。

          當線程從DelayedWorkQueue中獲取到需要執(zhí)行的任務之后就會執(zhí)行任務。

          RocketMQ

          RocketMQ是阿里開源的一款消息中間件,實現(xiàn)了延遲消息的功能,如果有對RocketMQ不熟悉的小伙伴可以看一下我之前寫的RocketMQ保姆級教程RocketMQ消息短暫而又精彩的一生 這兩篇文章。

          RocketMQ延遲消息的延遲時間默認有18個等級。

          當發(fā)送消息的時候只需要指定延遲等級即可。如果這18個等級的延遲時間不符和你的要求,可以修改RocketMQ服務端的配置文件。

          來個demo

          依賴

          <dependency>
              <groupId>org.apache.rocketmq</groupId>
              <artifactId>rocketmq-spring-boot-starter</artifactId>
              <version>2.2.1</version>
            
          <!--web依賴-->
          <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-web</artifactId>
              <version>2.2.5.RELEASE</version>
          </dependency>

          配置文件

          rocketmq:
            name-server: 192.168.200.144:9876 #服務器ip:nameServer端口
            producer:
              group: sanyouProducer

          controller類,通過DefaultMQProducer發(fā)送延遲消息到sanyouDelayTaskTopic這個topic,延遲等級為2,也就是延遲時間為5s的意思。

          @RestController
          @Slf4j
          public class RocketMQDelayTaskController {

              @Resource
              private DefaultMQProducer producer;

              @GetMapping("/rocketmq/add")
              public void addTask(@RequestParam("task") String task) throws Exception {
                  Message msg = new Message("sanyouDelayTaskTopic""TagA", task.getBytes(RemotingHelper.DEFAULT_CHARSET));
                  msg.setDelayTimeLevel(2);
                  // 發(fā)送消息并得到消息的發(fā)送結果,然后打印
                  log.info("提交延遲任務");
                  producer.send(msg);
              }

          }

          創(chuàng)建一個消費者,監(jiān)聽sanyouDelayTaskTopic的消息。

          @Component
          @RocketMQMessageListener(consumerGroup = "sanyouConsumer", topic = "sanyouDelayTaskTopic")
          @Slf4j
          public class SanYouDelayTaskTopicListener implements RocketMQListener<String{

              @Override
              public void onMessage(String msg) {
                  log.info("獲取到延遲任務:{}", msg);
              }

          }

          啟動應用,瀏覽器輸入以下鏈接添加任務

          http://localhost:8080/rocketmq/add?task=sanyou

          測試結果:

          實現(xiàn)原理

          生產(chǎn)者發(fā)送延遲消息之后,RocketMQ服務端在接收到消息之后,會去根據(jù)延遲級別是否大于0來判斷是否是延遲消息

          • 如果不大于0,說明不是延遲消息,那就會將消息保存到指定的topic中
          • 如果大于0,說明是延遲消息,此時RocketMQ會進行一波偷梁換柱的操作,將消息的topic改成SCHEDULE_TOPIC_XXXX中,XXXX不是占位符,然后存儲。

          在BocketMQ內(nèi)部有一個延遲任務,相當于是一個定時任務,這個任務就會獲取SCHEDULE_TOPIC_XXXX中的消息,判斷消息是否到了延遲時間,如果到了,那么就會將消息的topic存儲到原來真正的topic(拿我們的例子來說就是sanyouDelayTaskTopic)中,之后消費者就可以從真正的topic中獲取到消息了。

          定時任務

          RocketMQ這種實現(xiàn)方式相比于前面提到的三種更加可靠,因為前面提到的三種任務內(nèi)容都是存在內(nèi)存的,服務器重啟任務就丟了,如果要實現(xiàn)任務不丟還得自己實現(xiàn)邏輯,但是RocketMQ消息有持久化機制,能夠保證任務不丟失。

          RabbitMQ

          RabbitMQ也是一款消息中間件,通過RabbitMQ的死信隊列也可以是先延遲任務的功能。

          demo

          引入RabbitMQ的依賴

          <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-amqp</artifactId>
              <version>2.2.5.RELEASE</version>
          </dependency>

          配置文件

          spring:
            rabbitmq:
              host: 192.168.200.144 #服務器ip
              port: 5672
              virtual-host: /

          RabbitMQ死信隊列的配置類,后面說原理的時候會介紹干啥的

          @Configuration
          public class RabbitMQConfiguration {
              
              @Bean
              public DirectExchange sanyouDirectExchangee() {
                  return new DirectExchange("sanyouDirectExchangee");
              }

              @Bean
              public Queue sanyouQueue() {
                  return QueueBuilder
                          //指定隊列名稱,并持久化
                          .durable("sanyouQueue")
                          //設置隊列的超時時間為5秒,也就是延遲任務的時間
                          .ttl(5000)
                          //指定死信交換機
                          .deadLetterExchange("sanyouDelayTaskExchangee")
                          .build();
              }

              @Bean
              public Binding sanyouQueueBinding() {
                  return BindingBuilder.bind(sanyouQueue()).to(sanyouDirectExchangee()).with("");
              }

              @Bean
              public DirectExchange sanyouDelayTaskExchange() {
                  return new DirectExchange("sanyouDelayTaskExchangee");
              }

              @Bean
              public Queue sanyouDelayTaskQueue() {
                  return QueueBuilder
                          //指定隊列名稱,并持久化
                          .durable("sanyouDelayTaskQueue")
                          .build();
              }

              @Bean
              public Binding sanyouDelayTaskQueueBinding() {
                  return BindingBuilder.bind(sanyouDelayTaskQueue()).to(sanyouDelayTaskExchange()).with("");
              }

          }

          RabbitMQDelayTaskController用來發(fā)送消息,這里沒指定延遲時間,是因為在聲明隊列的時候指定了延遲時間為5s

          @RestController
          @Slf4j
          public class RabbitMQDelayTaskController {

              @Resource
              private RabbitTemplate rabbitTemplate;

              @GetMapping("/rabbitmq/add")
              public void addTask(@RequestParam("task") String task) throws Exception {
                  // 消息ID,需要封裝到CorrelationData中
                  CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
                  log.info("提交延遲任務");
                  // 發(fā)送消息
                  rabbitTemplate.convertAndSend("sanyouDirectExchangee""", task, correlationData);
              }

          }

          啟動應用,瀏覽器輸入以下鏈接添加任務

          http://localhost:8080/rabbitmq/add?task=sanyou

          測試結果,成功實現(xiàn)5s的延遲任務

          實現(xiàn)原理

          整個工作流程如下:

          • 消息發(fā)送的時候會將消息發(fā)送到sanyouDirectExchange這個交換機上
          • 由于sanyouDirectExchange綁定了sanyouQueue,所以消息會被路由到sanyouQueue這個隊列上
          • 由于sanyouQueue沒有消費者消費消息,并且又設置了5s的過期時間,所以當消息過期之后,消息就被放到綁定的sanyouDelayTaskExchange死信交換機中
          • 消息到達sanyouDelayTaskExchange交換機后,由于跟sanyouDelayTaskQueue進行了綁定,所以消息就被路由到sanyouDelayTaskQueue中,消費者就能從sanyouDelayTaskQueue中拿到消息了

          上面說的隊列與交換機的綁定關系,就是上面的配置類所干的事。

          其實從這個單從消息流轉的角度可以看出,RabbitMQ跟RocketMQ實現(xiàn)有相似之處。

          消息最開始都并沒有放到最終消費者消費的隊列中,而都是放到一個中間隊列中,等消息到了過期時間或者說是延遲時間,消息就會被放到最終的隊列供消費者消息。

          只不過RabbitMQ需要你顯示的手動指定消息所在的中間隊列,而RocketMQ是在內(nèi)部已經(jīng)做好了這塊邏輯。

          除了基于RabbitMQ的死信隊列來做,RabbitMQ官方還提供了延時插件,也可以實現(xiàn)延遲消息的功能,這個插件的大致原理也跟上面說的一樣,延時消息會被先保存在一個中間的地方,叫做Mnesia,然后有一個定時任務去查詢最近需要被投遞的消息,將其投遞到目標隊列中。

          監(jiān)聽Redis過期key

          在Redis中,有個發(fā)布訂閱的機制

          生產(chǎn)者在消息發(fā)送時需要到指定發(fā)送到哪個channel上,消費者訂閱這個channel就能獲取到消息。圖中channel理解成MQ中的topic。

          并且在Redis中,有很多默認的channel,只不過向這些channel發(fā)送消息的生產(chǎn)者不是我們寫的代碼,而是Redis本身。這里面就有這么一個channel叫做__keyevent@<db>__:expired,db是指Redis數(shù)據(jù)庫的序號。

          當某個Redis的key過期之后,Redis內(nèi)部會發(fā)布一個事件到__keyevent@<db>__:expired這個channel上,只要監(jiān)聽這個事件,那么就可以獲取到過期的key。

          所以基于監(jiān)聽Redis過期key實現(xiàn)延遲任務的原理如下:

          • 將延遲任務作為key,過期時間設置為延遲時間
          • 監(jiān)聽__keyevent@<db>__:expired這個channel,那么一旦延遲任務到了過期時間(延遲時間),那么就可以獲取到這個任務

          來個demo

          Spring已經(jīng)實現(xiàn)了監(jiān)聽__keyevent@*__:expired這個channel這個功能,__keyevent@*__:expired中的*代表通配符的意思,監(jiān)聽所有的數(shù)據(jù)庫。

          所以demo寫起來就很簡單了,只需4步即可

          依賴

          <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-data-redis</artifactId>
              <version>2.2.5.RELEASE</version>
          </dependency>

          配置文件

          spring:
            redis:
              host: 192.168.200.144
              port: 6379

          配置類

          @Configuration
          public class RedisConfiguration {

              @Bean
              public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
                  RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
                  redisMessageListenerContainer.setConnectionFactory(connectionFactory);
                  return redisMessageListenerContainer;
              }

              @Bean
              public KeyExpirationEventMessageListener redisKeyExpirationListener(RedisMessageListenerContainer redisMessageListenerContainer) {
                  return new KeyExpirationEventMessageListener(redisMessageListenerContainer);
              }

          }

          KeyExpirationEventMessageListener實現(xiàn)了對__keyevent@*__:expiredchannel的監(jiān)聽

          當KeyExpirationEventMessageListener收到Redis發(fā)布的過期Key的消息的時候,會發(fā)布RedisKeyExpiredEvent事件

          所以我們只需要監(jiān)聽RedisKeyExpiredEvent事件就可以拿到過期消息的Key,也就是延遲消息。

          對RedisKeyExpiredEvent事件的監(jiān)聽實現(xiàn)MyRedisKeyExpiredEventListener

          @Component
          public class MyRedisKeyExpiredEventListener implements ApplicationListener<RedisKeyExpiredEvent{

              @Override
              public void onApplicationEvent(RedisKeyExpiredEvent event) {
                  byte[] body = event.getSource();
                  System.out.println("獲取到延遲消息:" + new String(body));
              }

          }

          代碼寫好,啟動應用

          之后我直接通過Redis命令設置消息,就沒通過代碼發(fā)送消息了,消息的key為sanyou,值為task,值不重要,過期時間為5s

          set sanyou task 

          expire sanyou 5

          成功獲取到延遲任務

          雖然這種方式可以實現(xiàn)延遲任務,但是這種方式比較多

          任務存在延遲

          Redis過期事件的發(fā)布不是指key到了過期時間就發(fā)布,而是key到了過期時間被清除之后才會發(fā)布事件。

          而Redis過期key的兩種清除策略,就是面試八股文常背的兩種:

          • 惰性清除。當這個key過期之后,訪問時,這個Key才會被清除
          • 定時清除。后臺會定期檢查一部分key,如果有key過期了,就會被清除

          所以即使key到了過期時間,Redis也不一定會發(fā)送key過期事件,這就到導致雖然延遲任務到了延遲時間也可能獲取不到延遲任務。

          丟消息太頻繁

          Redis實現(xiàn)的發(fā)布訂閱模式,消息是沒有持久化機制,當消息發(fā)布到某個channel之后,如果沒有客戶端訂閱這個channel,那么這個消息就丟了,并不會像MQ一樣進行持久化,等有消費者訂閱的時候再給消費者消費。

          所以說,假設服務重啟期間,某個生產(chǎn)者或者是Redis本身發(fā)布了一條消息到某個channel,由于服務重啟,沒有監(jiān)聽這個channel,那么這個消息自然就丟了。

          消息消費只有廣播模式

          Redis的發(fā)布訂閱模式消息消費只有廣播模式一種。

          所謂的廣播模式就是多個消費者訂閱同一個channel,那么每個消費者都能消費到發(fā)布到這個channel的所有消息。

          如圖,生產(chǎn)者發(fā)布了一條消息,內(nèi)容為sanyou,那么兩個消費者都可以同時收到sanyou這條消息。

          所以,如果通過監(jiān)聽channel來獲取延遲任務,那么一旦服務實例有多個的話,還得保證消息不能重復處理,額外地增加了代碼開發(fā)量。

          接收到所有key的某個事件

          這個不屬于Redis發(fā)布訂閱模式的問題,而是Redis本身事件通知的問題。

          當監(jiān)聽了__keyevent@<db>__:expired的channel,那么所有的Redis的key只要發(fā)生了過期事件都會被通知給消費者,不管這個key是不是消費者想接收到的。

          所以如果你只想消費某一類消息的key,那么還得自行加一些標記,比如消息的key加個前綴,消費的時候判斷一下帶前綴的key就是需要消費的任務。

          Redisson的RDelayedQueue

          Redisson他是Redis的兒子(Redis son),基于Redis實現(xiàn)了非常多的功能,其中最常使用的就是Redis分布式鎖的實現(xiàn),但是除了實現(xiàn)Redis分布式鎖之外,它還實現(xiàn)了延遲隊列的功能。

          先來個demo

          引入pom

          <dependency>
              <groupId>org.redisson</groupId>
              <artifactId>redisson</artifactId>
              <version>3.13.1</version>
          </dependency>

          封裝了一個RedissonDelayQueue類

          @Component
          @Slf4j
          public class RedissonDelayQueue {

              private RedissonClient redissonClient;

              private RDelayedQueue<String> delayQueue;
              private RBlockingQueue<String> blockingQueue;

              @PostConstruct
              public void init() {
                  initDelayQueue();
                  startDelayQueueConsumer();
              }

              private void initDelayQueue() {
                  Config config = new Config();
                  SingleServerConfig serverConfig = config.useSingleServer();
                  serverConfig.setAddress("redis://localhost:6379");
                  redissonClient = Redisson.create(config);

                  blockingQueue = redissonClient.getBlockingQueue("SANYOU");
                  delayQueue = redissonClient.getDelayedQueue(blockingQueue);
              }

              private void startDelayQueueConsumer() {
                  new Thread(() -> {
                      while (true) {
                          try {
                              String task = blockingQueue.take();
                              log.info("接收到延遲任務:{}", task);
                          } catch (Exception e) {
                              e.printStackTrace();
                          }
                      }
                  }, "SANYOU-Consumer").start();
              }

              public void offerTask(String task, long seconds) {
                  log.info("添加延遲任務:{} 延遲時間:{}s", task, seconds);
                  delayQueue.offer(task, seconds, TimeUnit.SECONDS);
              }

          }

          這個類在創(chuàng)建的時候會去初始化延遲隊列,創(chuàng)建一個RedissonClient對象,之后通過RedissonClient對象獲取到RDelayedQueue和RBlockingQueue對象,傳入的隊列名字叫SANYOU,這個名字無所謂。

          當延遲隊列創(chuàng)建之后,會開啟一個延遲任務的消費線程,這個線程會一直從RBlockingQueue中通過take方法阻塞獲取延遲任務。

          添加任務的時候是通過RDelayedQueue的offer方法添加的。

          controller類,通過接口添加任務,延遲時間為5s

          @RestController
          public class RedissonDelayQueueController {

              @Resource
              private RedissonDelayQueue redissonDelayQueue;

              @GetMapping("/add")
              public void addTask(@RequestParam("task") String task) {
                  redissonDelayQueue.offerTask(task, 5);
              }

          }

          啟動項目,在瀏覽器輸入如下連接,添加任務

          http://localhost:8080/add?task=sanyou

          靜靜等待5s,成功獲取到任務。

          實現(xiàn)原理

          如下是Redisson延遲隊列的實現(xiàn)原理

          SANYOU前面的前綴都是固定的,Redisson創(chuàng)建的時候會拼上前綴。

          • redisson_delay_queue_timeout:SANYOU,sorted set數(shù)據(jù)類型,存放所有延遲任務,按照延遲任務的到期時間戳(提交任務時的時間戳 + 延遲時間)來排序的,所以列表的最前面的第一個元素就是整個延遲隊列中最早要被執(zhí)行的任務,這個概念很重要
          • redisson_delay_queue:SANYOU,list數(shù)據(jù)類型,也是存放所有的任務,但是研究下來發(fā)現(xiàn)好像沒什么用。。
          • SANYOU,list數(shù)據(jù)類型,被稱為目標隊列,這個里面存放的任務都是已經(jīng)到了延遲時間的,可以被消費者獲取的任務,所以上面demo中的RBlockingQueue的take方法是從這個目標隊列中獲取到任務的
          • redisson_delay_queue_channel:SANYOU,是一個channel,用來通知客戶端開啟一個延遲任務

          任務提交的時候,Redisson會將任務放到redisson_delay_queue_timeout:SANYOU中,分數(shù)就是提交任務的時間戳+延遲時間,就是延遲任務的到期時間戳

          Redisson客戶端內(nèi)部通過監(jiān)聽redisson_delay_queue_channel:SANYOU這個channel來提交一個延遲任務,這個延遲任務能夠保證將redisson_delay_queue_timeout:SANYOU中到了延遲時間的任務從redisson_delay_queue_timeout:SANYOU中移除,存到SANYOU這個目標隊列中。

          于是消費者就可以從SANYOU這個目標隊列獲取到延遲任務了。

          所以從這可以看出,Redisson的延遲任務的實現(xiàn)跟前面說的MQ的實現(xiàn)都是殊途同歸,最開始任務放到中間的一個地方,叫做redisson_delay_queue_timeout:SANYOU,然后會開啟一個類似于定時任務的一個東西,去判斷這個中間地方的消息是否到了延遲時間,到了再放到最終的目標的隊列供消費者消費。

          Redisson的這種實現(xiàn)方式比監(jiān)聽Redis過期key的實現(xiàn)方式更加可靠,因為消息都存在list和sorted set數(shù)據(jù)類型中,所以消息很少丟。

          上述說的兩種Redis的方案更詳細的介紹,可以查看我之前寫的用Redis實現(xiàn)延遲隊列,我研究了兩種方案,發(fā)現(xiàn)并不簡單這篇文章。

          Netty的HashedWheelTimer

          先來個demo

          @Slf4j
          public class NettyHashedWheelTimerDemo {

              public static void main(String[] args) {
                  HashedWheelTimer timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 8);
                  timer.start();

                  log.info("提交延遲任務");
                  timer.newTimeout(timeout -> log.info("執(zhí)行延遲任務"), 5, TimeUnit.SECONDS);
              }

          }

          測試結果

          實現(xiàn)原理

          如圖,時間輪會被分成很多格子(上述demo中的8就代表了8個格子),一個格子代表一段時間(上述demo中的100就代表一個格子是100ms),所以上述demo中,每800ms會走一圈。

          當任務提交的之后,會根據(jù)任務的到期時間進行hash取模,計算出這個任務的執(zhí)行時間所在具體的格子,然后添加到這個格子中,通過如果這個格子有多個任務,會用鏈表來保存。所以這個任務的添加有點像HashMap儲存元素的原理。

          HashedWheelTimer內(nèi)部會開啟一個線程,輪詢每個格子,找到到了延遲時間的任務,然后執(zhí)行。

          由于HashedWheelTimer也是單線程來處理任務,所以跟Timer一樣,長時間運行的任務會導致其他任務的延時處理。

          前面Redisson中提到的客戶端延遲任務就是基于Netty的HashedWheelTimer實現(xiàn)的。

          Hutool的SystemTimer

          Hutool工具類也提供了延遲任務的實現(xiàn)SystemTimer

          demo

          @Slf4j
          public class SystemTimerDemo {

              public static void main(String[] args) {
                  SystemTimer systemTimer = new SystemTimer();
                  systemTimer.start();

                  log.info("提交延遲任務");
                  systemTimer.addTask(new TimerTask(() -> log.info("執(zhí)行延遲任務"), 5000));
              }

          }

          執(zhí)行結果

          Hutool底層其實也用到了時間輪。

          Qurtaz

          Qurtaz是一款開源作業(yè)調度框架,基于Qurtaz提供的api也可以實現(xiàn)延遲任務的功能。

          demo

          依賴

          <dependency>
              <groupId>org.quartz-scheduler</groupId>
              <artifactId>quartz</artifactId>
              <version>2.3.2</version>
          </dependency>

          SanYouJob實現(xiàn)Job接口,當任務到達執(zhí)行時間的時候會調用execute的實現(xiàn),從context可以獲取到任務的內(nèi)容

          @Slf4j
          public class SanYouJob implements Job {
              @Override
              public void execute(JobExecutionContext context) throws JobExecutionException {
                  JobDetail jobDetail = context.getJobDetail();
                  JobDataMap jobDataMap = jobDetail.getJobDataMap();
                  log.info("獲取到延遲任務:{}", jobDataMap.get("delayTask"));
              }
          }

          測試類

          public class QuartzDemo {

              public static void main(String[] args) throws SchedulerException, InterruptedException {
                  // 1.創(chuàng)建Scheduler的工廠
                  SchedulerFactory sf = new StdSchedulerFactory();
                  // 2.從工廠中獲取調度器實例
                  Scheduler scheduler = sf.getScheduler();

                  // 6.啟動 調度器
                  scheduler.start();

                  // 3.創(chuàng)建JobDetail,Job類型就是上面說的SanYouJob
                  JobDetail jb = JobBuilder.newJob(SanYouJob.class)
                          .usingJobData("delayTask", "這是一個延遲任務")
                          .build()
          ;

                  // 4.創(chuàng)建Trigger
                  Trigger t = TriggerBuilder.newTrigger()
                          //任務的觸發(fā)時間就是延遲任務到的延遲時間
                          .startAt(DateUtil.offsetSecond(new Date(), 5))
                          .build();

                  // 5.注冊任務和定時器
                  log.info("提交延遲任務");
                  scheduler.scheduleJob(jb, t);
              }
          }

          執(zhí)行結果:

          實現(xiàn)原理

          核心組件

          • Job:表示一個任務,execute方法的實現(xiàn)是對任務的執(zhí)行邏輯
          • JobDetail:任務的詳情,可以設置任務需要的參數(shù)等信息
          • Trigger:觸發(fā)器,是用來觸發(fā)業(yè)務的執(zhí)行,比如說指定5s后觸發(fā)任務,那么任務就會在5s后觸發(fā)
          • Scheduler:調度器,內(nèi)部可以注冊多個任務和對應任務的觸發(fā)器,之后會調度任務的執(zhí)行

          啟動的時候會開啟一個QuartzSchedulerThread調度線程,這個線程會去判斷任務是否到了執(zhí)行時間,到的話就將任務交給任務線程池去執(zhí)行。

          無限輪詢延遲任務

          無限輪詢的意思就是開啟一個線程不停的去輪詢?nèi)蝿眨斶@些任務到達了延遲時間,那么就執(zhí)行任務。

          demo

          @Slf4j
          public class PollingTaskDemo {

              private static final List<DelayTask> DELAY_TASK_LIST = new CopyOnWriteArrayList<>();

              public static void main(String[] args) {
                  new Thread(() -> {
                      while (true) {
                          try {
                              for (DelayTask delayTask : DELAY_TASK_LIST) {
                                  if (delayTask.triggerTime <= System.currentTimeMillis()) {
                                      log.info("處理延遲任務:{}", delayTask.taskContent);
                                      DELAY_TASK_LIST.remove(delayTask);
                                  }
                              }
                              TimeUnit.MILLISECONDS.sleep(100);
                          } catch (Exception e) {
                          }
                      }
                  }).start();

                  log.info("提交延遲任務");
                  DELAY_TASK_LIST.add(new DelayTask("三友的java日記"5L));
              }

              @Getter
              @Setter
              public static class DelayTask {

                  private final String taskContent;

                  private final Long triggerTime;

                  public DelayTask(String taskContent, Long delayTime) {
                      this.taskContent = taskContent;
                      this.triggerTime = System.currentTimeMillis() + delayTime * 1000;
                  }
              }

          }

          任務可以存在數(shù)據(jù)庫又或者是內(nèi)存,看具體的需求,這里我為了簡單就放在內(nèi)存里了。

          執(zhí)行結果:

          這種操作簡單,但是就是效率低下,每次都得遍歷所有的任務。

          最后

          最后,本文所有示例代碼地址:

          https://github.com/sanyou3/delay-task-demo.git

          ··············  END  ··············

          往期熱門文章推薦

          如何去閱讀源碼,我總結了18條心法

          如何寫出漂亮代碼,我總結了45個小技巧

          三萬字盤點Spring/Boot的那些常用擴展點

          三萬字盤點Spring 9大核心基礎功能

          兩萬字盤點那些被玩爛了的設計模式

          萬字+20張圖探秘Nacos注冊中心核心實現(xiàn)原理

          萬字+20張圖剖析Spring啟動時12個核心步驟

          1.5萬字+30張圖盤點索引常見的11個知識點

          瀏覽 75
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  外国一级片 | 日韩无码三级视频 | 免贾观看国产女人高潮 | 九一综合色| 亚洲视频免费观看 |