<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          2022的第一次推送 | spring-boot整合activeMQ實(shí)現(xiàn)異步延遲消息

          共 4873字,需瀏覽 10分鐘

           ·

          2022-01-04 20:03

          前言

          最近,有一個(gè)需求,就是在用戶(hù)發(fā)起第一步操作之后,需要在一個(gè)固定時(shí)間之后觸發(fā)另一個(gè)操作(比如半小時(shí)之后),雖然通過(guò)定時(shí)任務(wù)也能實(shí)現(xiàn),但是考慮到用戶(hù)數(shù)量龐大,每個(gè)用戶(hù)的時(shí)間又不相同,同時(shí)考慮到服務(wù)重啟之后數(shù)據(jù)不會(huì)丟失,所以幾經(jīng)權(quán)衡之后,我們最終決定采用activeMQ異步延遲的方式,其好處也很明顯:

          首先,原有系統(tǒng)中已經(jīng)有了MQ異步處理業(yè)務(wù)的實(shí)現(xiàn)(不過(guò)延遲消費(fèi)的還沒(méi)有),不需要向系統(tǒng)中引入的新的組件,不會(huì)改變系統(tǒng)復(fù)雜程度,但是引入大規(guī)模的定時(shí)任務(wù)就很難說(shuō),而且考慮到線上的多節(jié)點(diǎn)部署,定時(shí)器也存在很多問(wèn)題;

          其次,MQ消息本身可以實(shí)現(xiàn)序列化,而且由于ACK(消息確認(rèn))的加持,可以確保消息不丟失,這樣就算服務(wù)重啟,相關(guān)業(yè)務(wù)也不會(huì)受到太大影響。

          由于目前線上環(huán)境我們用的是ActiveMQ,所以今天我們就來(lái)通過(guò)一個(gè)簡(jiǎn)單的示例,來(lái)看下如何通過(guò)ActiveMQ來(lái)實(shí)現(xiàn)我們今天的需求。

          activeMQ

          首先需要注意的是,activeMQ是從5.4才開(kāi)始支持延遲消費(fèi)的,所以想要實(shí)現(xiàn)今天的示例,你的activeMQ的版本至少是5,4,我本次示例用到的是5.16.2:

          配置

          確認(rèn)activeMQ的版本之后,我們還需要修改conf/activemq.xml文件,修改的內(nèi)容也很簡(jiǎn)單,只需要找到broker配置標(biāo)簽,在其中加入schedulerSupport="true"即可,不需要其他配置:

          <broker?xmlns="http://activemq.apache.org/schema/core"?brokerName="localhost"?dataDirectory="${activemq.data}"?schedulerSupport="true">

          ?????...

          broker>

          完成該配置之后,重啟activeMQ之后就可以創(chuàng)建我們的項(xiàng)目了。

          spring-boot

          這里先要?jiǎng)?chuàng)建spring boot項(xiàng)目,然后還需要簡(jiǎn)單配置下MQ,具體的配置和整合就不再細(xì)說(shuō),不清楚的小伙伴可以看下我之前分享的內(nèi)容:

          這里的內(nèi)容完成之后,我們就可以開(kāi)始今天示例最核心的內(nèi)容了。

          最原始方式

          首先我們來(lái)看最原始的方式,當(dāng)然這也不能算是最原始的方式,畢竟這里還是通過(guò)jmsTemplate獲取了連接工廠,不過(guò)注釋那里我也寫(xiě)了最原始的方式:

          public?void?sendDelayMessage(String?queueName,?String?messageInfo)?{
          ????????Connection?connection?=?null;
          ????????Session?session?=?null;
          ????????MessageProducer?producer?=?null;
          ????????//?ConnectionFactory?connectionFactory?=?new?ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
          ????????//?獲取連接工廠
          ????????ConnectionFactory?connectionFactory?=?jmsTemplate.getConnectionFactory();
          ????????try?{
          ????????????//?獲取連接
          ????????????connection?=?connectionFactory.createConnection();
          ????????????connection.start();
          ????????????//?獲取session,true開(kāi)啟事務(wù),false關(guān)閉事務(wù)
          ????????????session?=?connection.createSession(Boolean.TRUE,?Session.AUTO_ACKNOWLEDGE);

          ????????????//?創(chuàng)建一個(gè)消息隊(duì)列
          ????????????producer?=?session.createProducer(new?ActiveMQQueue(queueName));
          ????????????producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue());
          ????????????ObjectMessage?message?=?session.createObjectMessage(messageInfo);
          ????????????//設(shè)置延遲時(shí)間
          ????????????message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,?60*1000L);
          ????????????//?發(fā)送消息
          ????????????producer.send(message);
          ????????????logger.info("發(fā)送消息:{}",?messageInfo);
          ????????????session.commit();
          ????????}?catch?(Exception?e)?{
          ????????????e.printStackTrace();
          ????????}?finally?{
          ????????????try?{
          ????????????????if?(producer?!=?null)?{
          ????????????????????producer.close();
          ????????????????}
          ????????????????if?(session?!=?null)?{
          ????????????????????session.close();
          ????????????????}
          ????????????????if?(connection?!=?null)?{
          ????????????????????connection.close();
          ????????????????}
          ????????????}?catch?(Exception?e)?{
          ????????????????e.printStackTrace();
          ????????????}
          ????????}
          ????}

          這里最核心的操作是setLongProperty,也就是設(shè)置消息的延遲時(shí)間,這里我設(shè)置的延遲消費(fèi)時(shí)間是1分鐘,也就是說(shuō)我通過(guò)該方法發(fā)送的消息會(huì)等待(延遲)一分鐘之后被消費(fèi),流程簡(jiǎn)單描述如下:

          • 通過(guò)JmsMessagingTemplate拿到mqConnectionFactory;

          • 然后通過(guò)連接工廠拿到連接,從連接中拿到session信息;

          • 再接著從會(huì)話中創(chuàng)建消息生產(chǎn)者,同時(shí)創(chuàng)建消息對(duì)象,并設(shè)置消息延遲時(shí)間;

          • 最后通過(guò)生產(chǎn)者發(fā)送消息,提交會(huì)話事務(wù)并關(guān)閉資源。

          這里可以看到,我們開(kāi)啟了session事務(wù),這樣可以確保同一個(gè)會(huì)話發(fā)送多個(gè)消息,最終會(huì)一起成功或者一起失敗,和數(shù)據(jù)庫(kù)事務(wù)類(lèi)似。

          JmsTemplate

          JmsTemplatespring提供的一套mq連接模板,我們可以通過(guò)它來(lái)操作常用的MQ。下面是基于JmsTemplate的延遲消息實(shí)現(xiàn),相比上面這種原始的寫(xiě)法,這里的寫(xiě)法更簡(jiǎn)潔,也更優(yōu)雅:

          public?void?sendDelayMessage2(String?queueName,?String?messageInfo)?{
          ????????JmsBaseTemplate.send(new?ActiveMQQueue(queueName),?session?->?{
          ????????????ObjectMessage?message?=?session.createObjectMessage(messageInfo);
          ????????????//設(shè)置延遲時(shí)間
          ????????????message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,?60*1000L);
          ????????????return?message;
          ????????});
          }

          只是這里的寫(xiě)法需要注入JmsTemplate:

          @Autowired
          private?JmsTemplate?JmsBaseTemplate;

          這里我們是通過(guò)調(diào)用了JmsTemplatesend方法實(shí)現(xiàn)的延遲消息的,只是需要我們自己通過(guò)lambda的方式創(chuàng)建messageCreator,并實(shí)現(xiàn)它的createMessage方法,在messageCreatorcreateMessage方法中,我們只需要像第一種方式一樣指定消息的延遲時(shí)間即可。

          send(final?Destination?destination,?final?MessageCreator?messageCreator)

          這里之所以如此簡(jiǎn)潔優(yōu)雅,其實(shí)是因?yàn)?code style="overflow-wrap: break-word;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;background-color: rgba(27, 31, 35, 0.05);font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;color: rgb(255, 100, 65);">JmsBaseTemplate已經(jīng)幫我們封裝了session之外的其他操作,包括關(guān)閉資源等:

          消息消費(fèi)者

          消費(fèi)者和普通消息沒(méi)有任何區(qū)別,只需要配置對(duì)應(yīng)的消息隊(duì)列名稱(chēng)即可,不一樣的是,延遲消息必須等待等到設(shè)定的延遲時(shí)間之后,才會(huì)被消費(fèi):

          @JmsListener(destination?=?"delay-message",?containerFactory?=?"jmsListenerContainerFactory")
          public?void?dealDelayMessage(String?message)?{
          ????SimpleDateFormat?dateFormat?=?new?SimpleDateFormat("yyyy-MM-dd?HH:mm:ss.SSS");
          ????logger.info("delay-message 收到消息:?{}, timestamp:{}",?message,?dateFormat.format(new?Date()));
          }

          如果還有疑問(wèn),后面看下示例運(yùn)行結(jié)果。

          測(cè)試

          不論原始方式還是JmsTemplate,最終的運(yùn)行效果都沒(méi)有本質(zhì)區(qū)別,運(yùn)行效果大致如下:

          在測(cè)試的開(kāi)始,我把延遲消費(fèi)的時(shí)間設(shè)置為1min,可以看到我們發(fā)送的消息在1分鐘在之后才被消費(fèi),當(dāng)我們將延遲時(shí)間改成5min,那么我們的消息會(huì)在發(fā)送5分鐘后會(huì)被消費(fèi):

          結(jié)語(yǔ)

          今天的內(nèi)容,總體來(lái)說(shuō)很簡(jiǎn)單,而且技術(shù)層面也沒(méi)有特別高大上的實(shí)現(xiàn),但是這種有別于定時(shí)器的解決方案還是特別值得推薦的,它特別適用于那些規(guī)模特別大的異步延遲回調(diào)場(chǎng)景,比如定時(shí)緩存清理,再比如免打擾消息的發(fā)送,當(dāng)然更多業(yè)務(wù)場(chǎng)景可能還需要各位小伙伴結(jié)合自己的業(yè)務(wù)場(chǎng)景進(jìn)行進(jìn)一步的分析。

          簡(jiǎn)單總結(jié)

          作為2022的第一次推送,我想簡(jiǎn)單說(shuō)兩句。

          首先是想簡(jiǎn)單總結(jié)下自己的2021(后面細(xì)說(shuō)),過(guò)去的一年,3~9總體學(xué)習(xí)的內(nèi)容比較多,也有較多的輸出,9月之后逐漸忙碌,所以也就疏于學(xué)習(xí)了,總體來(lái)說(shuō)還是收獲不小的,特別是換了新的工作,新的環(huán)境之后(后面再詳細(xì)梳理下)。

          新年計(jì)劃

          再一個(gè)就是新的一年的計(jì)劃,這一塊計(jì)劃春節(jié)之前完成更詳細(xì)的計(jì)劃,我目前的經(jīng)驗(yàn)是,目標(biāo)越明確,就越容易達(dá)成,所以新的一年目標(biāo)會(huì)盡可能詳細(xì)、明確。

          很坦誠(chéng)地說(shuō),我覺(jué)得我現(xiàn)在依然很菜雞,有很多東西還要學(xué)習(xí),有很多東西還要沉淀,哪怕是現(xiàn)在看著掌握得還可以的技術(shù),其實(shí)也可能只是流于表面。

          基于這樣的原因,在新的一年里,我會(huì)朝著三個(gè)方向繼續(xù)進(jìn)發(fā):

          一個(gè)是繼續(xù)深挖基礎(chǔ),扎實(shí)多線程、算法、JVM等相關(guān)知識(shí)點(diǎn),這些可能才是真正能提升自己技術(shù)實(shí)力的關(guān)鍵;

          另一個(gè)是廣泛擴(kuò)展自己的知識(shí)面,熟悉常見(jiàn)的組件、解決方案等,這些都是未來(lái)激發(fā)個(gè)人創(chuàng)造力的關(guān)鍵;

          最后一個(gè)是軟實(shí)力提升,這塊主要包括自己的思維方式、做事方法、工作技巧、個(gè)人管理等方面的提升,這一塊實(shí)力的提升,很大程度上決定了我前兩個(gè)目標(biāo)的完成效率,這塊實(shí)力提升越快,前兩個(gè)目標(biāo)也可以更高效完成。

          好了,今天就說(shuō)這么多,希望各位小可愛(ài)在新的一年里,都能達(dá)成自己的目標(biāo)……加油!晚安!

          - END -


          瀏覽 99
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  在线观看91av | 青青伊人久久 | 无码AV大香线蕉伊人 | 欧美黄在线观看 | 国产青春草在线观看 |