<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          SpringBoot整合rabbitMq實(shí)現(xiàn)消息延時(shí)發(fā)送

          共 3411字,需瀏覽 7分鐘

           ·

          2020-11-17 09:21

          點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號(hào)”

          優(yōu)質(zhì)文章,第一時(shí)間送達(dá)

          ? 作者?|??山陰路的秋天

          來源 |? urlify.cn/VRfei2

          66套java從入門到精通實(shí)戰(zhàn)課程分享

          實(shí)現(xiàn)思路:利用mq的ttl設(shè)置消息失效時(shí)間?當(dāng)達(dá)到設(shè)置時(shí)間后通過交換機(jī)到達(dá)死信隊(duì)列中,消費(fèi)者端綁定讀取死信隊(duì)列中信息來達(dá)到延時(shí)發(fā)送消息的功能。

          demo 如下:

          (1)在pom.xml?中引入rabbitMq相關(guān)包

          ????????
          ????????????org.springframework.boot
          ????????????spring-boot-starter-amqp
          ????????

          (2)創(chuàng)建rabbitMq連接的工具類

          public?class?MqConnectionUtil?{

          ????public?static?Connection?getConnection()?throws?Exception?{
          ????????ConnectionFactory?factory?=?new?ConnectionFactory();
          ????????factory.setHost("localhost");
          ????????factory.setPort(5672);
          ????????factory.setVirtualHost("virtualHost_wl");
          ????????factory.setUsername("admin");
          ????????factory.setPassword("admin");
          ????????Connection?connection?=?factory.newConnection();
          ????????return?connection;
          ????}
          }

          (3)創(chuàng)建生產(chǎn)者發(fā)送消息

          public?class?Send?{

          ????private?final?static?String?QUEUE_NAME?=?"msg_ttl_queue";

          ????public?static?void?main(String[]?argv)?throws?Exception?{
          ????????//獲取連接
          ????????Connection?connection?=?MqConnectionUtil.getConnection();
          ????????//創(chuàng)建通道
          ????????Channel?channel?=?connection.createChannel();
          ????????//設(shè)置延時(shí)隊(duì)列
          ????????HashMap?args?=?new?HashMap<>();
          ????????args.put("x-dead-letter-exchange",?"delay-exchange");
          ????????args.put("x-dead-letter-routing-key",?"msg_ttl_routingKey");
          ????????channel.queueDeclare("delay_queue",?true,?false,?false,?args);
          ????????//聲明隊(duì)列
          ????????channel.queueDeclare(QUEUE_NAME,?true,?false,?false,?null);
          ????????//綁定路由(delay-queue隊(duì)列消息失效之后轉(zhuǎn)發(fā)到msg_ttl_queue中)
          ????????channel.queueBind(QUEUE_NAME,?"delay-exchange",?"msg_ttl_routingKey");
          ????????//設(shè)置延時(shí)屬性
          ????????AMQP.BasicProperties.Builder?builder?=?new?AMQP.BasicProperties.Builder();
          ????????//設(shè)置延時(shí)時(shí)間?1分鐘
          ????????AMQP.BasicProperties?properties?=?builder.expiration("60000").deliveryMode(2).build();
          ????????//消息內(nèi)容
          ????????SimpleDateFormat?sft?=?new?SimpleDateFormat("yyyy-MM-dd?HH:mm:ss");
          ????????String?message?=?"生產(chǎn)者創(chuàng)建時(shí)間:?"?+?sft.format(new?Date());
          ????????channel.basicPublish("",?"delay_queue",?properties,?message.getBytes());
          ????????System.out.println("Sent:?'"?+?message?+?"'");
          ????????//關(guān)閉通道和連接
          ????????channel.close();
          ????????connection.close();
          ????}
          }

          (4)創(chuàng)建對(duì)應(yīng)的消費(fèi)者接受消息

          public?class?Receive?{

          ????private?final?static?String?QUEUE_NAME?=?"msg_ttl_queue";

          ????public?static?void?main(String[]?argv)?throws?Exception?{
          ????????//獲取連接
          ????????Connection?connection?=?MqConnectionUtil.getConnection();
          ????????//創(chuàng)建通道
          ????????Channel?channel?=?connection.createChannel();
          ????????//?聲明隊(duì)列
          ????????channel.queueDeclare(QUEUE_NAME,?true,?false,?false,?null);
          ????????//?定義隊(duì)列的消費(fèi)者
          ????????MyConsumer?myConsumer?=?new?MyConsumer(channel);
          ????????//?監(jiān)聽隊(duì)列
          ????????channel.basicConsume(QUEUE_NAME,?true,?myConsumer);
          ????}
          }

          自定義消費(fèi)者類為:

          public?class?MyConsumer?extends?DefaultConsumer?{
          ????public?MyConsumer(Channel?channel)?{
          ????????super(channel);
          ????}

          ????@Override
          ????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????SimpleDateFormat?sft?=?new?SimpleDateFormat("yyyy-MM-dd?HH:mm:ss");
          ????????System.out.println("消費(fèi)者接受時(shí)間:"?+?sft.format(new?Date()));
          ????????System.out.println("consumerTag:?"?+?consumerTag);
          ????????System.out.println("envelope:?"?+?envelope);
          ????????System.out.println("properties:?"?+?properties);
          ????????System.out.println("body:?"?+?new?String(body));
          ????}
          }

          生產(chǎn)者發(fā)送消息到延時(shí)隊(duì)列:

          408aa1bc710746eda7b2768c87c843f4.webp

          ?

          ?到達(dá)設(shè)定的失效時(shí)間1分鐘后到達(dá)指定的隊(duì)列中:

          44c0173d168b54766291c292fbe4b173.webp

          ?

          ?設(shè)置的延時(shí)交換機(jī)為:

          fa485ce889b6e7baee4076559b5d146a.webp

          ?

          ?

          控制臺(tái)打印信息為:

          dc6755b19308ddb70775fc81de880336.webp






          粉絲福利:實(shí)戰(zhàn)springboot+CAS單點(diǎn)登錄系統(tǒng)視頻教程免費(fèi)領(lǐng)取

          ???

          ?長按上方微信二維碼?2 秒即可獲取資料



          感謝點(diǎn)贊支持下哈?084c8ead4b93aae467a81c6fc4f84fb7.webp

          瀏覽 91
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  久久人人爽人人爽人人片aV东京热 | 亚洲日韩在线电影 | 欧美精品一区三区 | 污网站在线观看 | 天天爽日日澡AAAA片 |