delaymq高性能的任意延時消息隊列
RocketMQ 開源版本任意時間延時隊列實現(xiàn)
定時消息:Producer將消息發(fā)送到消息隊列RocketMQ版服務(wù)端,但并不期望立馬投遞這條消息,而是推遲到在當(dāng)前時間點之后的某一個時間投遞到Consumer進(jìn)行消費,該消息即定時消息。
延時消息:Producer將消息發(fā)送到消息隊列RocketMQ版服務(wù)端,但并不期望立馬投遞這條消息,而是延遲一定時間后才投遞到Consumer進(jìn)行消費,該消息即延時消息。
定時消息與延時消息在代碼配置上存在一些差異,但是最終達(dá)到的效果相同:消息在發(fā)送到消息隊列RocketMQ版服務(wù)端后并不會立馬投遞,而是根據(jù)消息中的屬性延遲固定時間后才投遞給消費者。
適用場景
定時消息和延時消息適用于以下一些場景:
消息生產(chǎn)和消費有時間窗口要求,例如在電商交易中超時未支付關(guān)閉訂單的場景,在訂單創(chuàng)建時會發(fā)送一條延時消息。
這條消息將會在30分鐘以后投遞給消費者,消費者收到此消息后需要判斷對應(yīng)的訂單是否已完成支付。
如支付未完成,則關(guān)閉訂單。如已完成支付則忽略。
通過消息觸發(fā)一些定時任務(wù),例如在某一固定時間點向用戶發(fā)送提醒消息。
使用方式 定時消息和延時消息的使用在代碼編寫上存在略微的區(qū)別:
發(fā)送定時消息需要明確指定消息發(fā)送時間點之后的某一時間點作為消息投遞的時間點。
發(fā)送延時消息時需要設(shè)定一個延時時間長度,消息將從當(dāng)前發(fā)送時間點開始延遲固定時間之后才開始投遞。
注意事項
定時消息的精度會有1s~2s的延遲誤差。
定時和延時消息的msg.setStartDeliverTime參數(shù)需要設(shè)置成當(dāng)前時間戳之后的某個時刻(單位毫秒)。
如果被設(shè)置成當(dāng)前時間戳之前的某個時刻,消息將立刻投遞給消費者。
定時和延時消息的msg.setStartDeliverTime參數(shù)可設(shè)置40天內(nèi)的任何時刻(單位毫秒),超過40天消息發(fā)送將失敗。
StartDeliverTime是服務(wù)端開始向消費端投遞的時間。如果消費者當(dāng)前有消息堆積,那么定時和延時消息會排在堆積消息后面,將不能嚴(yán)格按照配置的時間進(jìn)行投遞。
由于客戶端和服務(wù)端可能存在時間差,消息的實際投遞時間與客戶端設(shè)置的投遞時間之間可能存在偏差。
如何使用
推薦使用阿里云提供的rocketmq版本的pom
<dependency> <groupId>com.aliyun.openservicesgroupId> <artifactId>ons-clientartifactId> <version>1.8.4.Finalversion> dependency>
消息發(fā)送
import com.aliyun.openservices.ons.api.*; import com.aliyun.openservices.shade.org.apache.commons.lang3.time.DateFormatUtils; import java.util.Date; import java.util.Properties; public class ProducerDelayTest { public static void main(String[] args) { Properties properties = new Properties(); // AccessKey ID阿里云身份驗證,在阿里云RAM控制臺創(chuàng)建。 properties.put(PropertyKeyConst.AccessKey, "XXX"); // AccessKey Secret阿里云身份驗證,在阿里云RAM控制臺創(chuàng)建。 properties.put(PropertyKeyConst.SecretKey, "XXX"); // 設(shè)置TCP接入域名,進(jìn)入消息隊列RocketMQ版控制臺實例詳情頁面的接入點區(qū)域查看。 properties.put(PropertyKeyConst.NAMESRV_ADDR, "localhost:9876"); Producer producer = ONSFactory.createProducer(properties); // 在發(fā)送消息前,必須調(diào)用start方法來啟動Producer,只需調(diào)用一次即可。 producer.start(); { Message msg = new Message( // 您在消息隊列RocketMQ版控制臺創(chuàng)建的Topic。 "TopicTest", // Message Tag,可理解為Gmail中的標(biāo)簽,對消息進(jìn)行再歸類,方便Consumer指定過濾條件在消息隊列RocketMQ版服務(wù)器過濾。 "TagA", // Message Body可以是任何二進(jìn)制形式的數(shù)據(jù),消息隊列RocketMQ版不做任何干預(yù),需要Producer與Consumer協(xié)商好一致的序列化和反序列化方式。 "演示15秒鐘>>> ".getBytes()); // 設(shè)置代表消息的業(yè)務(wù)關(guān)鍵屬性,請盡可能全局唯一。 // 以方便您在無法正常收到消息情況下,可通過控制臺查詢消息并補發(fā)。 // 注意:不設(shè)置也不會影響消息正常收發(fā)。 msg.setKey("ORDERID_100e"); try { // 延時消息,單位毫秒(ms),在指定延遲時間(當(dāng)前時間之后)進(jìn)行投遞,例如消息在15秒后投遞。 long delayTime = System.currentTimeMillis() + 15000; System.out.println("發(fā)送時間>>" + DateFormatUtils.format(new Date(), "yyyy-MM-dd HH????ss")); // 設(shè)置消息需要被投遞的時間。 msg.setStartDeliverTime(delayTime); SendResult sendResult = producer.send(msg); // 同步發(fā)送消息,只要不拋異常就是成功。 if (sendResult != null) { System.out.println(DateFormatUtils.format(new Date(), "yyyy-MM-dd HH????ss") + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId()); } } catch (Exception e) { // 消息發(fā)送失敗,需要進(jìn)行重試處理,可重新發(fā)送這條消息或持久化這條數(shù)據(jù)進(jìn)行補償處理。 System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic()); e.printStackTrace(); } } // 在應(yīng)用退出前,銷毀Producer對象。 // 注意:如果不銷毀也沒有問題。 producer.shutdown(); } }
