<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          實(shí)現(xiàn)延時(shí)任務(wù)的 4 種實(shí)現(xiàn)方案!

          共 6316字,需瀏覽 13分鐘

           ·

          2020-08-28 15:16

          一、應(yīng)用場(chǎng)景

          在需求開發(fā)過程中,我們經(jīng)常會(huì)遇到一些類似下面的場(chǎng)景:

          a. 外賣訂單超過15分鐘未支付,自動(dòng)取消


          b. 使用搶票軟件訂到車票后,1小時(shí)內(nèi)未支付,自動(dòng)取消


          c. 待處理申請(qǐng)超時(shí)1天,通知審核人員經(jīng)理,超時(shí)2天通知審核人員總監(jiān)


          d. 客戶預(yù)定自如房子后,24小時(shí)內(nèi)未支付,房源自動(dòng)釋放?



          那么針對(duì)這類場(chǎng)景的需求應(yīng)該如果實(shí)現(xiàn)呢,我們最先想到的一般是啟個(gè)定時(shí)任務(wù),來(lái)掃描數(shù)據(jù)庫(kù)里符合條件的數(shù)據(jù),并對(duì)其進(jìn)行更新操作。一般來(lái)說spring-quartz 、elasticjob 就可以實(shí)現(xiàn),甚至自己寫個(gè) Timer 也可以。

          但是這種方式有個(gè)弊端,就是需要不停的掃描數(shù)據(jù)庫(kù),如果數(shù)據(jù)量比較大,并且任務(wù)執(zhí)行間隔時(shí)間比較短,對(duì)數(shù)據(jù)庫(kù)會(huì)有一定的壓力。另外定時(shí)任務(wù)的執(zhí)行間隔時(shí)間的粒度也不太好設(shè)置,設(shè)置長(zhǎng)會(huì)影響時(shí)效性,設(shè)置太短又會(huì)增加服務(wù)壓力。我們來(lái)看一下有沒有更好的實(shí)現(xiàn)方式。

          二、JDK 延時(shí)隊(duì)列實(shí)現(xiàn)

          DelayQueue 是 JDK 中 java.util.concurrent 包下的一種無(wú)界阻塞隊(duì)列,底層是優(yōu)先隊(duì)列 PriorityQueue。對(duì)于放到隊(duì)列中的任務(wù),可以按照到期時(shí)間進(jìn)行排序,只需要取已經(jīng)到期的元素處理即可。

          具體的步驟是,要放入隊(duì)列的元素需要實(shí)現(xiàn) Delayed 接口并實(shí)現(xiàn) getDelay 方法來(lái)計(jì)算到期時(shí)間,compare 方法來(lái)對(duì)比到期時(shí)間以進(jìn)行排序。一個(gè)簡(jiǎn)單的使用例子如下:

          package com.lyqiang.delay.jdk;

          import java.time.LocalDateTime;
          import java.util.concurrent.DelayQueue;
          import java.util.concurrent.Delayed;
          import java.util.concurrent.TimeUnit;

          /**
          * @author lyqiang
          */

          public class TestDelayQueue {

          public static void main(String[] args) throws InterruptedException {

          // 新建3個(gè)任務(wù),并依次設(shè)置超時(shí)時(shí)間為 20s 10s 30s
          DelayTask d1 = new DelayTask(1, System.currentTimeMillis() + 20000L);
          DelayTask d2 = new DelayTask(2, System.currentTimeMillis() + 10000L);
          DelayTask d3 = new DelayTask(3, System.currentTimeMillis() + 30000L);

          DelayQueue queue = new DelayQueue<>();
          queue.add(d1);
          queue.add(d2);
          queue.add(d3);
          int size = queue.size();

          System.out.println("當(dāng)前時(shí)間是:" + LocalDateTime.now());

          // 從延時(shí)隊(duì)列中獲取元素, 將輸出 d2 、d1 、d3
          for (int i = 0; i < size; i++) {
          System.out.println(queue.take() + " ------ " + LocalDateTime.now());
          }
          }
          }

          class DelayTask implements Delayed {

          private Integer taskId;

          private long exeTime;

          DelayTask(Integer taskId, long exeTime) {
          this.taskId = taskId;
          this.exeTime = exeTime;
          }

          @Override
          public long getDelay(TimeUnit unit) {
          return exeTime - System.currentTimeMillis();
          }

          @Override
          public int compareTo(Delayed o) {
          DelayTask t = (DelayTask) o;
          if (this.exeTime - t.exeTime <= 0) {
          return -1;
          } else {
          return 1;
          }
          }

          @Override
          public String toString() {
          return "DelayTask{" +
          "taskId=" + taskId +
          ", exeTime=" + exeTime +
          '}';
          }
          }

          代碼的執(zhí)行結(jié)果如下:

          使用 DelayQueue, 只需要有一個(gè)線程不斷從隊(duì)列中獲取數(shù)據(jù)即可,它的優(yōu)點(diǎn)是不用引入第三方依賴,實(shí)現(xiàn)也很簡(jiǎn)單,缺點(diǎn)也很明顯,它是內(nèi)存存儲(chǔ),對(duì)分布式支持不友好,如果發(fā)生單點(diǎn)故障,可能會(huì)造成數(shù)據(jù)丟失,無(wú)界隊(duì)列還存在 OOM 的風(fēng)險(xiǎn)。

          三、時(shí)間輪算法實(shí)現(xiàn)

          1996 年 George Varghese 和 Tony Lauck 的論文《Hashed and Hierarchical Timing Wheels: Data Structures for the Efficient Implementation of a Timer Facility》中提出了一種時(shí)間輪管理 Timeout 事件的方式。其設(shè)計(jì)非常巧妙,并且類似時(shí)鐘的運(yùn)行,如下圖的原始時(shí)間輪有 8 個(gè)格子,假定指針經(jīng)過每個(gè)格子花費(fèi)時(shí)間是 1 個(gè)時(shí)間單位,當(dāng)前指針指向 0,一個(gè) 17 個(gè)時(shí)間單位后超時(shí)的任務(wù)則需要運(yùn)轉(zhuǎn) 2 圈再通過一個(gè)格子后被執(zhí)行,放在相同格子的任務(wù)會(huì)形成一個(gè)鏈表。

          Netty 包里提供了一種時(shí)間輪的實(shí)現(xiàn)——HashedWheelTimer,其底層使用了數(shù)組+鏈表的數(shù)據(jù)結(jié)構(gòu),使用方式如下:

          package com.lyqiang.delay.wheeltimer;

          import io.netty.util.HashedWheelTimer;
          import java.time.LocalDateTime;
          import java.util.concurrent.TimeUnit;

          /**
          * @author lyqiang
          */

          public class WheelTimerTest {

          public static void main(String[] args) {

          //設(shè)置每個(gè)格子是 100ms, 總共 256 個(gè)格子
          HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 256);

          //加入三個(gè)任務(wù),依次設(shè)置超時(shí)時(shí)間是 10s 5s 20s

          System.out.println("加入一個(gè)任務(wù),ID = 1, time= " + LocalDateTime.now());
          hashedWheelTimer.newTimeout(timeout -> {
          System.out.println("執(zhí)行一個(gè)任務(wù),ID = 1, time= " + LocalDateTime.now());
          }, 10, TimeUnit.SECONDS);

          System.out.println("加入一個(gè)任務(wù),ID = 2, time= " + LocalDateTime.now());
          hashedWheelTimer.newTimeout(timeout -> {
          System.out.println("執(zhí)行一個(gè)任務(wù),ID = 2, time= " + LocalDateTime.now());
          }, 5, TimeUnit.SECONDS);

          System.out.println("加入一個(gè)任務(wù),ID = 3, time= " + LocalDateTime.now());
          hashedWheelTimer.newTimeout(timeout -> {
          System.out.println("執(zhí)行一個(gè)任務(wù),ID = 3, time= " + LocalDateTime.now());
          }, 20, TimeUnit.SECONDS);

          System.out.println("等待任務(wù)執(zhí)行===========");
          }
          }

          代碼執(zhí)行結(jié)果如下:

          相比 DelayQueue 的數(shù)據(jù)結(jié)構(gòu),時(shí)間輪在算法復(fù)雜度上有一定優(yōu)勢(shì),但用時(shí)間輪來(lái)實(shí)現(xiàn)延時(shí)任務(wù)同樣避免不了單點(diǎn)故障。

          四、Redis ZSet 實(shí)現(xiàn)

          Redis 里有 5 種數(shù)據(jù)結(jié)構(gòu),最常用的是 String 和 Hash,而 ZSet 是一種支持按 score 排序的數(shù)據(jù)結(jié)構(gòu),每個(gè)元素都會(huì)關(guān)聯(lián)一個(gè) double 類型的分?jǐn)?shù),Redis 通過分?jǐn)?shù)來(lái)為集合中的成員進(jìn)行從小到大的排序,借助這個(gè)特性我們可以把超時(shí)時(shí)間作為 score 來(lái)將任務(wù)進(jìn)行排序。

          使用?zadd key score member?命令向 redis 中放入任務(wù),超時(shí)時(shí)間作為 score, 任務(wù) ID 作為 member, 使用?zrange key start stop withscores?命令從 redis 中讀取任務(wù),使用?zrem key member?命令從 redis 中刪除任務(wù)。代碼如下:

          package com.lyqiang.delay.redis;

          import java.time.LocalDateTime;
          import java.util.Set;
          import java.util.concurrent.Executors;
          import java.util.concurrent.TimeUnit;

          /**
          * @author lyqiang
          */

          public class TestRedisDelay {

          public static void main(String[] args) {

          TaskProducer taskProducer = new TaskProducer();
          //創(chuàng)建 3個(gè)任務(wù),并設(shè)置超時(shí)間為 10s 5s 20s
          taskProducer.produce(1, System.currentTimeMillis() + 10000);
          taskProducer.produce(2, System.currentTimeMillis() + 5000);
          taskProducer.produce(3, System.currentTimeMillis() + 20000);

          System.out.println("等待任務(wù)執(zhí)行===========");

          //消費(fèi)端從redis中消費(fèi)任務(wù)
          TaskConsumer taskConsumer = new TaskConsumer();
          taskConsumer.consumer();
          }
          }

          class TaskProducer {

          public void produce(Integer taskId, long exeTime) {
          System.out.println("加入任務(wù), taskId: " + taskId + ", exeTime: " + exeTime + ", 當(dāng)前時(shí)間:" + LocalDateTime.now());
          RedisOps.getJedis().zadd(RedisOps.key, exeTime, String.valueOf(taskId));
          }
          }

          class TaskConsumer {

          public void consumer() {

          Executors.newSingleThreadExecutor().submit(new Runnable() {
          @Override
          public void run() {
          while (true) {
          Set taskIdSet = RedisOps.getJedis().zrangeByScore(RedisOps.key, 0, System.currentTimeMillis(), 0, 1);
          if (taskIdSet == null || taskIdSet.isEmpty()) {
          //System.out.println("沒有任務(wù)");
          } else {
          taskIdSet.forEach(id -> {
          long result = RedisOps.getJedis().zrem(RedisOps.key, id);
          if (result == 1L) {
          System.out.println("從延時(shí)隊(duì)列中獲取到任務(wù),taskId:" + id + " , 當(dāng)前時(shí)間:" + LocalDateTime.now());
          }
          });
          }
          try {
          TimeUnit.MILLISECONDS.sleep(100);
          } catch (InterruptedException e) {
          e.printStackTrace();
          }
          }
          }
          });
          }
          }

          執(zhí)行結(jié)果如下:

          相比前兩種實(shí)現(xiàn)方式,使用 Redis 可以將數(shù)據(jù)持久化到磁盤,規(guī)避了數(shù)據(jù)丟失的風(fēng)險(xiǎn),并且支持分布式,避免了單點(diǎn)故障。

          五、MQ 延時(shí)隊(duì)列實(shí)現(xiàn)

          以 RabbitMQ 為例,它本身并沒有直接支持延時(shí)隊(duì)列的功能,但是通過一些特性,我們可以達(dá)到實(shí)現(xiàn)延時(shí)隊(duì)列的效果。

          RabbitMQ 可以為 Queue 設(shè)置 TTL,,到了過期時(shí)間沒有被消費(fèi)的消息將變?yōu)樗佬拧狣ead Letter。我們還可以為Queue 設(shè)置死信轉(zhuǎn)發(fā) x-dead-letter-exchange,過期的消息可以被路由到另一個(gè) Exchange。下圖說明了這個(gè)流程,生產(chǎn)者通過不同的 RoutingKey 發(fā)送不同過期時(shí)間的消息,多個(gè)隊(duì)列分別消費(fèi)并產(chǎn)生死信后被路由到 exe-dead-exchange,再有一些隊(duì)列綁定到這個(gè) exchange,從而進(jìn)行不同業(yè)務(wù)邏輯的消費(fèi)。

          在 RabbitMQ 界面操作如下:

          1、在?g_normal_exchange?發(fā)送測(cè)試消息

          2. 隊(duì)列?g_queue_10s?綁定到?g_normal_exchange,并設(shè)置 x-message-ttl 為 10s 過期,x-dead-letter-exchange 為?g_exe_dead_exchange,可以看到消息到達(dá)后,過了 10s 之后消息被路由到g_exe_dead_exchange

          3. 綁定到?g_exe_dead_exchange?的隊(duì)列?g_exe_10s_queue?消費(fèi)到了這條消息

          使用 MQ 實(shí)現(xiàn)的方式,支持分布式,并且消息支持持久化,在業(yè)內(nèi)應(yīng)用比較多,它的缺點(diǎn)是每種間隔時(shí)間的場(chǎng)景需要分別建立隊(duì)列。

          六、總結(jié)

          通過上面不同實(shí)現(xiàn)方式的比較,可以很明顯的看出各個(gè)方案的優(yōu)缺點(diǎn),在分布式系統(tǒng)中我們會(huì)優(yōu)先考慮使用 Redis 和 MQ 的實(shí)現(xiàn)方式。

          在需求開發(fā)中實(shí)現(xiàn)一個(gè)功能的方式多種多樣,需要我們進(jìn)行多維度的比較,才能選擇出合理的、可靠的、高效的并且適合自己業(yè)務(wù)的解決方案。

          最近熱文:
          1、重磅!《Java開發(fā)手冊(cè)(嵩山版)》最新發(fā)布
          2、打破你的認(rèn)知!Java空指針居然還能這樣玩
          3、盤點(diǎn) 35 個(gè) Apache 頂級(jí)項(xiàng)目,我拜服了…
          4、Spring Boot 太狠了,一次發(fā)布 3 個(gè)版本!
          5、Spring Boot 如何快速集成 Redis?
          6、盤點(diǎn) 6 個(gè)被淘汰的 Java 技術(shù),曾經(jīng)風(fēng)光過!
          7、Spring Boot Redis 實(shí)現(xiàn)分布式鎖,真香!
          8、國(guó)人開源了一款小而全的 Java 工具類庫(kù)!
          9、公司來(lái)了個(gè)新同事不會(huì)用 Lombok!
          10、同事寫了個(gè)隱藏 bug,我排查了 3 天!
          掃碼關(guān)注Java技術(shù)棧公眾號(hào)閱讀更多干貨。

          點(diǎn)擊「閱讀原文」獲取面試題大全~

          瀏覽 63
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  超碰AV青青草在线 | 中文字幕日韩无码一区 | 亚洲白浆 | 人人操人人摸人人 | 黄色在线一区 |