2022的第一次推送 | spring-boot整合activeMQ實(shí)現(xiàn)異步延遲消息

前言
最近,有一個(gè)需求,就是在用戶(hù)發(fā)起第一步操作之后,需要在一個(gè)固定時(shí)間之后觸發(fā)另一個(gè)操作(比如半小時(shí)之后),雖然通過(guò)定時(shí)任務(wù)也能實(shí)現(xiàn),但是考慮到用戶(hù)數(shù)量龐大,每個(gè)用戶(hù)的時(shí)間又不相同,同時(shí)考慮到服務(wù)重啟之后數(shù)據(jù)不會(huì)丟失,所以幾經(jīng)權(quán)衡之后,我們最終決定采用activeMQ異步延遲的方式,其好處也很明顯:
首先,原有系統(tǒng)中已經(jīng)有了MQ異步處理業(yè)務(wù)的實(shí)現(xiàn)(不過(guò)延遲消費(fèi)的還沒(méi)有),不需要向系統(tǒng)中引入的新的組件,不會(huì)改變系統(tǒng)復(fù)雜程度,但是引入大規(guī)模的定時(shí)任務(wù)就很難說(shuō),而且考慮到線上的多節(jié)點(diǎn)部署,定時(shí)器也存在很多問(wèn)題;
其次,MQ消息本身可以實(shí)現(xiàn)序列化,而且由于ACK(消息確認(rèn))的加持,可以確保消息不丟失,這樣就算服務(wù)重啟,相關(guān)業(yè)務(wù)也不會(huì)受到太大影響。
由于目前線上環(huán)境我們用的是ActiveMQ,所以今天我們就來(lái)通過(guò)一個(gè)簡(jiǎn)單的示例,來(lái)看下如何通過(guò)ActiveMQ來(lái)實(shí)現(xiàn)我們今天的需求。
activeMQ
首先需要注意的是,activeMQ是從5.4才開(kāi)始支持延遲消費(fèi)的,所以想要實(shí)現(xiàn)今天的示例,你的activeMQ的版本至少是5,4,我本次示例用到的是5.16.2:

配置
確認(rèn)activeMQ的版本之后,我們還需要修改conf/activemq.xml文件,修改的內(nèi)容也很簡(jiǎn)單,只需要找到broker配置標(biāo)簽,在其中加入schedulerSupport="true"即可,不需要其他配置:
<broker?xmlns="http://activemq.apache.org/schema/core"?brokerName="localhost"?dataDirectory="${activemq.data}"?schedulerSupport="true">
?????...
broker>
完成該配置之后,重啟activeMQ之后就可以創(chuàng)建我們的項(xiàng)目了。
spring-boot
這里先要?jiǎng)?chuàng)建spring boot項(xiàng)目,然后還需要簡(jiǎn)單配置下MQ,具體的配置和整合就不再細(xì)說(shuō),不清楚的小伙伴可以看下我之前分享的內(nèi)容:

這里的內(nèi)容完成之后,我們就可以開(kāi)始今天示例最核心的內(nèi)容了。
最原始方式
首先我們來(lái)看最原始的方式,當(dāng)然這也不能算是最原始的方式,畢竟這里還是通過(guò)jmsTemplate獲取了連接工廠,不過(guò)注釋那里我也寫(xiě)了最原始的方式:
public?void?sendDelayMessage(String?queueName,?String?messageInfo)?{
????????Connection?connection?=?null;
????????Session?session?=?null;
????????MessageProducer?producer?=?null;
????????//?ConnectionFactory?connectionFactory?=?new?ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
????????//?獲取連接工廠
????????ConnectionFactory?connectionFactory?=?jmsTemplate.getConnectionFactory();
????????try?{
????????????//?獲取連接
????????????connection?=?connectionFactory.createConnection();
????????????connection.start();
????????????//?獲取session,true開(kāi)啟事務(wù),false關(guān)閉事務(wù)
????????????session?=?connection.createSession(Boolean.TRUE,?Session.AUTO_ACKNOWLEDGE);
????????????//?創(chuàng)建一個(gè)消息隊(duì)列
????????????producer?=?session.createProducer(new?ActiveMQQueue(queueName));
????????????producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue());
????????????ObjectMessage?message?=?session.createObjectMessage(messageInfo);
????????????//設(shè)置延遲時(shí)間
????????????message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,?60*1000L);
????????????//?發(fā)送消息
????????????producer.send(message);
????????????logger.info("發(fā)送消息:{}",?messageInfo);
????????????session.commit();
????????}?catch?(Exception?e)?{
????????????e.printStackTrace();
????????}?finally?{
????????????try?{
????????????????if?(producer?!=?null)?{
????????????????????producer.close();
????????????????}
????????????????if?(session?!=?null)?{
????????????????????session.close();
????????????????}
????????????????if?(connection?!=?null)?{
????????????????????connection.close();
????????????????}
????????????}?catch?(Exception?e)?{
????????????????e.printStackTrace();
????????????}
????????}
????}
這里最核心的操作是setLongProperty,也就是設(shè)置消息的延遲時(shí)間,這里我設(shè)置的延遲消費(fèi)時(shí)間是1分鐘,也就是說(shuō)我通過(guò)該方法發(fā)送的消息會(huì)等待(延遲)一分鐘之后被消費(fèi),流程簡(jiǎn)單描述如下:
通過(guò)
JmsMessagingTemplate拿到mq的ConnectionFactory;然后通過(guò)連接工廠拿到連接,從連接中拿到
session信息;再接著從會(huì)話中創(chuàng)建消息生產(chǎn)者,同時(shí)創(chuàng)建消息對(duì)象,并設(shè)置消息延遲時(shí)間;
最后通過(guò)生產(chǎn)者發(fā)送消息,提交會(huì)話事務(wù)并關(guān)閉資源。
這里可以看到,我們開(kāi)啟了session事務(wù),這樣可以確保同一個(gè)會(huì)話發(fā)送多個(gè)消息,最終會(huì)一起成功或者一起失敗,和數(shù)據(jù)庫(kù)事務(wù)類(lèi)似。
JmsTemplate
JmsTemplate是spring提供的一套mq連接模板,我們可以通過(guò)它來(lái)操作常用的MQ。下面是基于JmsTemplate的延遲消息實(shí)現(xiàn),相比上面這種原始的寫(xiě)法,這里的寫(xiě)法更簡(jiǎn)潔,也更優(yōu)雅:
public?void?sendDelayMessage2(String?queueName,?String?messageInfo)?{
????????JmsBaseTemplate.send(new?ActiveMQQueue(queueName),?session?->?{
????????????ObjectMessage?message?=?session.createObjectMessage(messageInfo);
????????????//設(shè)置延遲時(shí)間
????????????message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,?60*1000L);
????????????return?message;
????????});
}
只是這里的寫(xiě)法需要注入JmsTemplate:
@Autowired
private?JmsTemplate?JmsBaseTemplate;
這里我們是通過(guò)調(diào)用了JmsTemplate的send方法實(shí)現(xiàn)的延遲消息的,只是需要我們自己通過(guò)lambda的方式創(chuàng)建messageCreator,并實(shí)現(xiàn)它的createMessage方法,在messageCreator的createMessage方法中,我們只需要像第一種方式一樣指定消息的延遲時(shí)間即可。
send(final?Destination?destination,?final?MessageCreator?messageCreator)
這里之所以如此簡(jiǎn)潔優(yōu)雅,其實(shí)是因?yàn)?code style="overflow-wrap: break-word;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;background-color: rgba(27, 31, 35, 0.05);font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;color: rgb(255, 100, 65);">JmsBaseTemplate已經(jīng)幫我們封裝了session之外的其他操作,包括關(guān)閉資源等:


消息消費(fèi)者
消費(fèi)者和普通消息沒(méi)有任何區(qū)別,只需要配置對(duì)應(yīng)的消息隊(duì)列名稱(chēng)即可,不一樣的是,延遲消息必須等待等到設(shè)定的延遲時(shí)間之后,才會(huì)被消費(fèi):
@JmsListener(destination?=?"delay-message",?containerFactory?=?"jmsListenerContainerFactory")
public?void?dealDelayMessage(String?message)?{
????SimpleDateFormat?dateFormat?=?new?SimpleDateFormat("yyyy-MM-dd?HH:mm:ss.SSS");
????logger.info("delay-message 收到消息:?{}, timestamp:{}",?message,?dateFormat.format(new?Date()));
}
如果還有疑問(wèn),后面看下示例運(yùn)行結(jié)果。
測(cè)試
不論原始方式還是JmsTemplate,最終的運(yùn)行效果都沒(méi)有本質(zhì)區(qū)別,運(yùn)行效果大致如下:

在測(cè)試的開(kāi)始,我把延遲消費(fèi)的時(shí)間設(shè)置為1min,可以看到我們發(fā)送的消息在1分鐘在之后才被消費(fèi),當(dāng)我們將延遲時(shí)間改成5min,那么我們的消息會(huì)在發(fā)送5分鐘后會(huì)被消費(fèi):

結(jié)語(yǔ)
今天的內(nèi)容,總體來(lái)說(shuō)很簡(jiǎn)單,而且技術(shù)層面也沒(méi)有特別高大上的實(shí)現(xiàn),但是這種有別于定時(shí)器的解決方案還是特別值得推薦的,它特別適用于那些規(guī)模特別大的異步延遲回調(diào)場(chǎng)景,比如定時(shí)緩存清理,再比如免打擾消息的發(fā)送,當(dāng)然更多業(yè)務(wù)場(chǎng)景可能還需要各位小伙伴結(jié)合自己的業(yè)務(wù)場(chǎng)景進(jìn)行進(jìn)一步的分析。
簡(jiǎn)單總結(jié)
作為2022的第一次推送,我想簡(jiǎn)單說(shuō)兩句。
首先是想簡(jiǎn)單總結(jié)下自己的2021(后面細(xì)說(shuō)),過(guò)去的一年,3~9總體學(xué)習(xí)的內(nèi)容比較多,也有較多的輸出,9月之后逐漸忙碌,所以也就疏于學(xué)習(xí)了,總體來(lái)說(shuō)還是收獲不小的,特別是換了新的工作,新的環(huán)境之后(后面再詳細(xì)梳理下)。
新年計(jì)劃
再一個(gè)就是新的一年的計(jì)劃,這一塊計(jì)劃春節(jié)之前完成更詳細(xì)的計(jì)劃,我目前的經(jīng)驗(yàn)是,目標(biāo)越明確,就越容易達(dá)成,所以新的一年目標(biāo)會(huì)盡可能詳細(xì)、明確。
很坦誠(chéng)地說(shuō),我覺(jué)得我現(xiàn)在依然很菜雞,有很多東西還要學(xué)習(xí),有很多東西還要沉淀,哪怕是現(xiàn)在看著掌握得還可以的技術(shù),其實(shí)也可能只是流于表面。
基于這樣的原因,在新的一年里,我會(huì)朝著三個(gè)方向繼續(xù)進(jìn)發(fā):
一個(gè)是繼續(xù)深挖基礎(chǔ),扎實(shí)多線程、算法、JVM等相關(guān)知識(shí)點(diǎn),這些可能才是真正能提升自己技術(shù)實(shí)力的關(guān)鍵;
另一個(gè)是廣泛擴(kuò)展自己的知識(shí)面,熟悉常見(jiàn)的組件、解決方案等,這些都是未來(lái)激發(fā)個(gè)人創(chuàng)造力的關(guān)鍵;
最后一個(gè)是軟實(shí)力提升,這塊主要包括自己的思維方式、做事方法、工作技巧、個(gè)人管理等方面的提升,這一塊實(shí)力的提升,很大程度上決定了我前兩個(gè)目標(biāo)的完成效率,這塊實(shí)力提升越快,前兩個(gè)目標(biāo)也可以更高效完成。
好了,今天就說(shuō)這么多,希望各位小可愛(ài)在新的一年里,都能達(dá)成自己的目標(biāo)……加油!晚安!
- END -