<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          SpringBoot集成RabbitMQ

          共 5684字,需瀏覽 12分鐘

           ·

          2021-01-06 15:05

          點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號”

          優(yōu)質(zhì)文章,第一時(shí)間送達(dá)

          ? 作者?|? 未夏

          來源 |? urlify.cn/MV3qeq

          66套java從入門到精通實(shí)戰(zhàn)課程分享

          一、介紹

          RabbitMQ是一個(gè)實(shí)現(xiàn)了AMQP協(xié)議(Advanced Message Queue Protocol)的消息隊(duì)列

          AMQP信息模型如下圖所示:

          RoutingKey,生產(chǎn)者分布信息時(shí),指定RoutingKey

          BindingKey,表示把隊(duì)列綁定到交換機(jī)的路徑名

          當(dāng)一個(gè)消息發(fā)布到 RabbitMQ 后,首先到達(dá)指定的Exchange并從消息中取出RoutingKey,由Exchange判斷同哪個(gè)BindingKey匹配,配對成功后 Exchange 把消息分配給 指定的Queue,消費(fèi)者從 Queue 中得到消息

          Exchange有4種類型:

          • Fanout Exchange:忽略key對比,發(fā)送Message到Exchange下游綁定的所有Queue

          • Direct Exchange:比較Message的routing key和Queue的binding key,完全匹配時(shí),Message才會發(fā)送到該Queue

          • Topic Exchange:比較Message的routing key和Queue的binding key,按規(guī)則匹配成功時(shí),Message才會發(fā)送到該Queue(使用?*?和?#?這2個(gè)通配符。*?- 匹配一個(gè)詞,#?- 匹配 0 個(gè)或多個(gè)詞)

          • 默認(rèn)Exchange:比較Message的routing key和Queue的名字,完全匹配時(shí),Message才會發(fā)送到該Queue

          消息隊(duì)列是有序的,只有頭部的信息被消費(fèi)后,才能消費(fèi)下一個(gè)消息


          二、SpringBoot集成RabbitMQ



          2.1引入依賴



          ?org.springframework.boot
          ?spring-boot-starter-amqp



          2.2配置RabbitMQ


          配置主要分為3步:

          • 聲明交換機(jī)

          • 聲明隊(duì)列

          • 將隊(duì)列綁定到隊(duì)列上

          @Configuration
          public?class?RabbitmqConfig?{
          ????public?final?static?String?EXCHANGE_TOPIC?=?"topicExchange";
          ????public?final?static?String?QUEUE_USER?=?"user_Queue";
          ????public?final?static?String?QUEUE_CITY?=?"city_Queue";
          ????public?final?static?String?QUEUE_DEVICE?=?"device_Queue";
          ????public?final?static?String?BINDINGKEY_ONE?=?"topic.a";
          ????public?final?static?String?BINDINGKEY_OTHER?=?"other.#";

          ????@Bean
          ????TopicExchange?topicExchange()?{
          ????????return?new?TopicExchange(EXCHANGE_TOPIC);
          ????}

          ????@Bean
          ????Queue?userQueue()?{
          ????????return?new?Queue(QUEUE_USER);
          ????}

          ????@Bean
          ????Queue?cityQueue()?{
          ????????return?new?Queue(QUEUE_CITY);
          ????}

          ????@Bean
          ????Queue?deviceQueue()?{
          ????????return?new?Queue(QUEUE_DEVICE);
          ????}

          ????@Bean
          ????Binding?userBinding(@Qualifier("userQueue")?Queue?queue,?TopicExchange?topicExchange)?{
          ????????return?BindingBuilder.bind(queue).to(topicExchange).with(BINDINGKEY_ONE);
          ????}

          ????@Bean
          ????Binding?cityBinding(@Qualifier("cityQueue")?Queue?queue,?TopicExchange?topicExchange)?{
          ????????return?BindingBuilder.bind(queue).to(topicExchange).with(BINDINGKEY_OTHER);
          ????}

          ????@Bean
          ????Binding?deviceBinding(@Qualifier("deviceQueue")?Queue?queue,?TopicExchange?topicExchange)?{
          ????????return?BindingBuilder.bind(queue).to(topicExchange).with(BINDINGKEY_OTHER);
          ????}
          }



          2.3生產(chǎn)信息


          Spring提供模板方法AmqpTemplate來進(jìn)行信息的發(fā)布、接收

          AmqpTemplateconvertAndSend需要指定發(fā)往的交換機(jī)名、RoutingKey以及信息內(nèi)容

          下面進(jìn)行簡單封裝:


          @Component
          public?class?MessageSender?{
          ????@Autowired
          ????private?AmqpTemplate?amqpTemplate;

          ????public?void?send(String?routingkey,?String?message)?{
          ????????this.amqpTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPIC,?routingkey,?message);
          ????}
          }

          單元測試:


          @ExtendWith(SpringExtension.class)
          @SpringBootTest
          class?MessageSenderTest?{
          ????@Autowired
          ????private?MessageSender?messageSender;

          ????@Test
          ????public?void?userQueueTest()?{
          ????????messageSender.send("topic.a",?"wo?is?user");
          ????}

          ????@Test
          ????public?void?cityQueueTest()?{
          ????????messageSender.send("other.a",?"wo?is?city");
          ????}

          ????@Test
          ????public?void?deviceQueueTest()?{
          ????????messageSender.send("other.b",?"wo?is?device");
          ????}

          }




          2.4消費(fèi)消息


          @RabbitListener注解聲明消息監(jiān)聽器,并指定要監(jiān)聽的隊(duì)列

          @RabbitHandler注解聲明消息的具體處理方法

          @Component
          @RabbitListener(queues?=?RabbitmqConfig.QUEUE_CITY)
          public?class?CityQueueListenter?{
          ????@RabbitHandler
          ????public?void?handle(String?message)?{
          ????????System.out.println("city-queue:?"?+?message);
          ????}
          }



          三、延時(shí)隊(duì)列


          延時(shí)隊(duì)列是一種特殊的消息隊(duì)列

          消息隊(duì)列是有序的,只有頭部的信息被消費(fèi)后,才能消費(fèi)下一個(gè)消息。延時(shí)隊(duì)列也遵循這個(gè)規(guī)則,不同于普通隊(duì)列是隊(duì)列里面的某個(gè)信息一旦到達(dá)指定期限將會被提權(quán)進(jìn)行優(yōu)先處理,而不管隊(duì)列前面有多少個(gè)消息在排隊(duì)


          3.1利用死信隊(duì)列實(shí)現(xiàn)

          RabbitMQ中有一個(gè)高級特性叫TTL(Time To Live),表示一條消息或者該隊(duì)列中的所有消息的最大存活時(shí)間,單位是毫秒

          如果一條消息設(shè)置了TTL屬性或者進(jìn)入了設(shè)置TTL屬性的隊(duì)列,那么這條消息如果在TTL設(shè)置的時(shí)間內(nèi)沒有被消費(fèi),則會成為“死信”

          “死信”是RabbitMQ中的一種消息機(jī)制,同普通的消息隊(duì)列沒有什么區(qū)別,只不過消息生成者由RabbitMQ充當(dāng)。如果隊(duì)列里的消息出現(xiàn)以下情況:

          • 消息被否定確認(rèn)(使用?channel.basicNack?或?channel.basicReject?),并且此時(shí)requeue?屬性被設(shè)置為false

          • 消息在隊(duì)列的存活時(shí)間超過設(shè)置的TTL時(shí)間

          • 消息隊(duì)列的消息數(shù)量已經(jīng)超過最大隊(duì)列長度

          RabbitMQ會將消息從原先的隊(duì)列抽取出來,發(fā)送到死信隊(duì)列里面

          如果設(shè)置了隊(duì)列的TTL屬性,那么一旦消息過期,就會被隊(duì)列丟棄;如果消息單獨(dú)設(shè)置TTL屬性,消息即使過期,也不一定會被馬上丟棄,因?yàn)橄⑹欠襁^期是在即將投遞到消費(fèi)者之前判定的,如果當(dāng)前隊(duì)列有嚴(yán)重的消息積壓情況,則已過期的消息也許還能存活較長時(shí)間。

          利用死信實(shí)現(xiàn)延時(shí)隊(duì)列:

          • 通過x-message-ttl為隊(duì)列設(shè)置TTL,一旦消息超時(shí)將被轉(zhuǎn)發(fā)給死信交換機(jī)

          • 通過x-dead-letter-exchange為消息隊(duì)列綁定死信交換機(jī),將消息路由至死信隊(duì)列,然后消費(fèi)者監(jiān)聽死信隊(duì)列

          Map?args?=?new?HashMap<>();

          //?設(shè)置死信交換機(jī)
          args.put("x-dead-letter-exchange",?DEAD_LETTER_EXCHANGE);
          //?給成為死信的消息指定Routing?Key
          args.put("x-dead-letter-routing-key",?DEAD_LETTER_QUEUEA_ROUTING_KEY);
          //?設(shè)置隊(duì)列TTL,?凡進(jìn)入該隊(duì)列的消息超時(shí)后將成為死信
          args.put("x-message-ttl",?6000);
          //?返回延時(shí)隊(duì)列
          return?QueueBuilder.durable(DELAY_QUEUEA_NAME).withArguments(args).build();

          僅在聲明延時(shí)隊(duì)列時(shí)指定上述設(shè)置,其余死信交換機(jī)、死信隊(duì)列以及消費(fèi)消息正常設(shè)置即可


          3.2利用插件實(shí)現(xiàn)

          為隊(duì)列指定TTL實(shí)現(xiàn)延時(shí)隊(duì)列的缺點(diǎn)是所有消息的過期時(shí)間一樣,不夠多樣化;如果為消息單獨(dú)指定TTL,無法保證消息超時(shí)一定變?yōu)樗佬?/span>

          RabbitMQ官方提供一個(gè)插件bbitmq_delayed_message_exchange,解決上述問題

          點(diǎn)擊官網(wǎng)下載插件,放到安裝目錄plugins文件夾下,然后轉(zhuǎn)到sbin目錄下執(zhí)行:

          rabbitmq-plugins?enable?rabbitmq_delayed_message_exchange

          利用死信實(shí)現(xiàn)延時(shí)隊(duì)列需要聲明兩個(gè)交換機(jī)、兩個(gè)隊(duì)列以及為其中一個(gè)隊(duì)列設(shè)置TTL等屬性,而使用插件僅需配置一個(gè)CustomExchange類型的交換機(jī)即可:

          @Configuration
          public?class?RabbitmqConfig?{
          ????@Bean
          ????public?CustomExchange?delayExchange()?{
          ????????Map?args?=?new?HashMap<>();
          ????????args.put("x-delayed-type",?"direct");
          ????????return?new?CustomExchange("delayed_exchange",?"x-delayed-message",?true,?false,?args);
          ????}

          ????@Bean
          ????public?Queue?queue()?{
          ????????Queue?queue?=?new?Queue("delay_queue_1");
          ????????return?queue;
          ????}

          ????@Bean
          ????public?Binding?binding()?{
          ????????return?BindingBuilder.bind(queue()).to(delayExchange()).with("delay_queue_1").noargs();
          ????}

          }

          CustomExchange必須是x-delayed-message類型

          消息生產(chǎn)端,AmqpTemplateconvertAndSend方法需要傳入一個(gè)MessagePostProcessor實(shí)例設(shè)置消息的TTL

          rabbitTemplate.convertAndSend("delayed_exchange",?"delay_queue_1",?msg,?new?MessagePostProcessor()?{
          ?@Override
          ?public?Message?postProcessMessage(Message?message)?throws?AmqpException?{
          ??message.getMessageProperties().setHeader("x-delay",?TTL);
          ??return?message;
          ?}
          });

          消費(fèi)端代碼不用變更


          四、參考

          圖解RabbitMQ
          一文帶你搞定RabbitMQ死信隊(duì)列
          一文帶你搞定RabbitMQ延遲隊(duì)列




          粉絲福利:Java從入門到入土學(xué)習(xí)路線圖

          ???

          ?長按上方微信二維碼?2 秒


          感謝點(diǎn)贊支持下哈?

          瀏覽 39
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  五月婷婷中文字幕 | 久久高凊无码免费一区 | 色婷婷亚洲1 | 北条麻妃无码中文 | 麻豆免费视频 |