<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          RabbitMQ 的第一個程序

          共 3974字,需瀏覽 8分鐘

           ·

          2021-01-22 21:23

          點擊上方藍色字體,選擇“置頂或者星標”?


          優(yōu)質文章第一時間送達!


          RabbitMQ 的第一個程序

          RabbitMQ-生產者|消費者

          搭建環(huán)境

          java client

          生產者和消費者都屬于客戶端, rabbitMQ的java客戶端如下

          1218368cdc78214307b50560ee50ce8c.webp

          創(chuàng)建 maven 工程

          <dependency>
          ??<groupId>com.rabbitmqgroupId>
          ??<artifactId>amqp-clientartifactId>
          ??<version>5.10.0version>
          dependency>

          AMQP協議的回顧

          6b8bfe0edb2f61776d06ffa337bdb3f6.webp

          RabbitMQ支持的消息模型

          0ba4444eec6f690cfa5c91f19b8f6bdd.webpde361c6856050cf4f8f47a19c9063aa0.webp

          第一種模型(直連)

          94ed662cff929560fbc54ac418c2e3f4.webp

          在上圖的模型中,有以下概念:

          • P:生產者,也就是要發(fā)送消息的程序
          • C:消費者:消息的接受者,會一直等待消息到來。
          • queue:消息隊列,圖中紅色部分。類似一個郵箱,可以緩存消息;生產者向其中投遞消息,消費者從其中取出消息。

          開發(fā)生產者

          /**
          ?*?生產者
          ?*?


          ?*?直連模式
          ?*
          ?*?@author?mxz
          ?*/
          @Component
          public?class?Provider?{

          ????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{

          ????????//?獲取連接對象
          ????????Connection?connection?=?RabbitMQUtils.getConnection();
          ????????//?獲取連接中通道
          ????????Channel?channel?=?connection.createChannel();

          ????????//?通道綁定消息隊列
          ????????//?參數1?隊列的名稱,?如果不存在則自動創(chuàng)建
          ????????//?參數2?用來定義隊列是否需要持久化,?true?持久化隊列(mq關閉時,?會存到磁盤中)?false?不持久化(關閉即失)
          ????????//?參數3?exclusive?是否獨占隊列???true?獨占隊列??false?不獨占
          ????????//?參數4?autoDelete?是否在消費后自動刪除隊列??true?自動刪除???false?不刪除
          ????????//?參數5?額外的附加參數
          ????????channel.queueDeclare("hello",?false,?false,?false,?null);

          ????????//?發(fā)布消息

          ????????//?參數1?交換機名稱
          ????????//?參數2?隊列名稱
          ????????//?參數3?傳遞消息額外設置
          ????????//?參數4?消息的具體內容
          ????????channel.basicPublish("",?"hello",?null,?"hello?rabbitMQ".getBytes());

          ????????RabbitMQUtils.closeConnectionAndChannel(channel,?connection);
          ????}
          }

          開發(fā)消費者

          /**
          ?*?消費者
          ?*
          ?*?@author?mxz
          ?*/

          @Component
          public?class?Customer?{
          ????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{

          ????????//?獲取連接對象
          ????????Connection?connection?=?RabbitMQUtils.getConnection();

          ????????//?創(chuàng)建通道
          ????????Channel?channel?=?connection.createChannel();

          ????????//?通道綁定對象
          ????????channel.queueDeclare("hello",?false,?false,?false,?null);

          ????????//?消費消息
          ????????//?參數1?消息隊列的消息,?隊列名稱
          ????????//?參數2?開啟消息的確認機制
          ????????//?參數3?消息時的回調接口
          ????????channel.basicConsume("hello",?true,?new?DefaultConsumer(channel)?{
          ????????????//?最后一個參數?消息隊列中取出的消息
          ????????????@Override
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????????????System.out.println("new?String(body)"?+?new?String(body));
          ????????????}
          ????????});

          //????????channel.close();
          //????????connection.close();
          ????}

          }

          工具類

          /**
          ?*?@author?mxz
          ?*/

          public?class?RabbitMQUtils?{

          ????private?static?ConnectionFactory?connectionFactory;

          ????//?重量級資源??類加載執(zhí)行一次(即可)
          ????static?{
          ????????//?創(chuàng)建連接?mq?的連接工廠
          ????????connectionFactory?=?new?ConnectionFactory();
          ????????//?設置?rabbitmq?主機
          ????????connectionFactory.setHost("127.0.0.1");
          ????????//?設置端口號
          ????????connectionFactory.setPort(5672);
          ????????//?設置連接哪個虛擬主機
          ????????connectionFactory.setVirtualHost("/codingce");
          ????????//?設置訪問虛擬主機用戶名密碼
          ????????connectionFactory.setUsername("codingce");
          ????????connectionFactory.setPassword("123456");
          ????}

          ????/**
          ?????*?定義提供連接對象的方法
          ?????*
          ?????*?@return
          ?????*/

          ????public?static?Connection?getConnection()?{
          ????????try?{
          ????????????return?connectionFactory.newConnection();
          ????????}?catch?(Exception?e)?{
          ????????????e.printStackTrace();
          ????????}

          ????????return?null;
          ????}


          ????/**
          ?????*?關閉通道和關閉連接工具方法
          ?????*
          ?????*?@param?connection
          ?????*?@param?channel
          ?????*/

          ????public?static?void?closeConnectionAndChannel(Channel?channel,?Connection?connection)?{
          ????????try?{
          ????????????//?先關?channel
          ????????????if?(channel?!=?null)
          ????????????????channel.close();
          ????????????if?(connection?!=?null)
          ????????????????connection.close();
          ????????}?catch?(Exception?e)?{
          ????????????e.printStackTrace();
          ????????}
          ????}
          }

          第二種模型(work quene)

          Work queues,也被稱為(Task queues),任務模型。當消息處理比較耗時的時候,可能生產消息的速度會遠遠大于消息的消費速度。長此以往,消息就會堆積越來越多,無法及時處理。此時就可以使用work 模型:讓多個消費者綁定到一個隊列,共同消費隊列中的消息。隊列中的消息一旦消費,就會消失,因此任務是不會被重復執(zhí)行的。

          a82457394d2a887fe6055f0165c470b0.webp

          角色:

          • P:生產者:任務的發(fā)布者
          • C1:消費者-1,領取任務并且完成任務,假設完成速度較慢
          • C2:消費者-2:領取任務并完成任務,假設完成速度快

          開發(fā)生產者

          /**
          ?*?生產者
          ?*?


          ?*?任務模型?work?quenue
          ?*
          ?*?@author?mxz
          ?*/
          @Component
          public?class?Provider?{

          ????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
          ????????Connection?connection?=?RabbitMQUtils.getConnection();
          ????????Channel?channel?=?connection.createChannel();

          ????????//?通過通道聲明隊列
          ????????channel.queueDeclare("work",?true,?false,?false,?null);

          ????????for?(int?i?=?0;?i?10;?i++)?{
          ????????????//?生產消息
          ????????????channel.basicPublish("",?"work",?null,?("?"?+?i?+?"work?quenue").getBytes());
          ????????}

          ????????//?關閉資源
          ????????RabbitMQUtils.closeConnectionAndChannel(channel,?connection);

          ????}
          }

          開發(fā)消費者-1

          /**
          ?*?自動確認消費?autoAck?true?12搭配測試
          ?*?


          ?*?消費者?1
          ?*
          ?*?@author?mxz
          ?*/
          @Component
          public?class?CustomerOne?{
          ????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{

          ????????//?獲取連接對象
          ????????Connection?connection?=?RabbitMQUtils.getConnection();

          ????????//?創(chuàng)建通道
          ????????Channel?channel?=?connection.createChannel();

          ????????//?通道綁定對象
          ????????channel.queueDeclare("work",?true,?false,?false,?null);

          ????????//?消費消息
          ????????//?參數1?消息隊列的消息,?隊列名稱
          ????????//?參數2?開啟消息的確認機制
          ????????//?參數3?消息時的回調接口
          ????????channel.basicConsume("work",?true,?new?DefaultConsumer(channel)?{
          ????????????//?最后一個參數?消息隊列中取出的消息
          ????????????//?默認分配是平均的
          ????????????@Override
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????????????System.out.println("消費者-1"?+?new?String(body));
          ????????????????try?{
          ????????????????????Thread.sleep(1000);
          ????????????????}?catch?(InterruptedException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}
          ????????????}
          ????????});

          //????????channel.close();
          //????????connection.close();
          ????}

          }

          開發(fā)消費者-2

          /**
          ?*?自動確認消費?autoAck?true?12搭配測試
          ?*?


          ?*?消費者?2
          ?*
          ?*?@author?mxz
          ?*/
          @Component
          public?class?CustomerTwo?{
          ????public?static?void?main(String[]?args)?throws?IOException?{

          ????????//?獲取連接對象
          ????????Connection?connection?=?RabbitMQUtils.getConnection();

          ????????//?創(chuàng)建通道
          ????????Channel?channel?=?connection.createChannel();

          ????????//?通道綁定對象
          ????????channel.queueDeclare("work",?true,?false,?false,?null);

          ????????channel.basicConsume("work",?true,?new?DefaultConsumer(channel)?{
          ????????????//?最后一個參數?消息隊列中取出的消息
          ????????????@Override
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????????????System.out.println("消費者-1"?+?new?String(body));
          ????????????}
          ????????});

          //????????channel.close();
          //????????connection.close();
          ????}

          }

          測試結果

          824b1add7971219db235a4c56fddafe1.webp71ea476ad33d55db2132fb684ea09bc5.webp

          總結:默認情況下,RabbitMQ將按順序將每個消息發(fā)送給下一個使用者。平均而言,每個消費者都會收到相同數量的消息。這種分發(fā)消息的方式稱為循環(huán)。

          消息自動確認機制

          Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the consumer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We'll also lose all the messages that were dispatched to this particular worker but were not yet handled.

          But we don't want to lose any tasks. If a worker dies, we'd like the task to be delivered to another worker.

          消費者3

          /**
          ?*?能者多勞??34?搭配測試
          ?*?


          ?*?消費者?3
          ?*
          ?*?@author?mxz
          ?*/
          @Component
          public?class?CustomerThree?{
          ????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{

          ????????//?獲取連接對象
          ????????Connection?connection?=?RabbitMQUtils.getConnection();

          ????????//?創(chuàng)建通道
          ????????Channel?channel?=?connection.createChannel();

          ????????//?每一次只能消費一個消息
          ????????channel.basicQos(1);
          ????????//?通道綁定對象
          ????????channel.queueDeclare("work",?true,?false,?false,?null);

          ????????//?參數1?隊列名稱?參數2(autoAck)?消息自動確認?true?消費者自動向?rabbitMQ?確認消息消費??false?不會自動確認消息
          ????????//?若出現消費者宕機情況?消費者三可以進行消費
          ????????channel.basicConsume("work",?false,?new?DefaultConsumer(channel)?{
          ????????????//?最后一個參數?消息隊列中取出的消息
          ????????????//?默認分配是平均的
          ????????????@Override
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????????????System.out.println("消費者-1"?+?new?String(body));
          ????????????????//?手動確認?參數1?確認隊列中
          ????????????????channel.basicAck(envelope.getDeliveryTag(),?false);
          ????????????????try?{
          ????????????????????Thread.sleep(1000);
          ????????????????}?catch?(InterruptedException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}
          ????????????}
          ????????});

          //????????channel.close();
          //????????connection.close();
          ????}

          }

          消費者4

          /**
          ?*?能者多勞??34?搭配測試
          ?*?


          ?*?消費者?4
          ?*
          ?*?@author?mxz
          ?*/
          @Component
          public?class?CustomerFour?{
          ????public?static?void?main(String[]?args)?throws?IOException?{

          ????????//?獲取連接對象
          ????????Connection?connection?=?RabbitMQUtils.getConnection();

          ????????//?創(chuàng)建通道
          ????????Channel?channel?=?connection.createChannel();

          ????????//?每一次只能消費一個消息
          ????????channel.basicQos(1);

          ????????//?通道綁定對象
          ????????channel.queueDeclare("work",?true,?false,?false,?null);

          ????????channel.basicConsume("work",?false,?new?DefaultConsumer(channel)?{
          ????????????//?最后一個參數?消息隊列中取出的消息
          ????????????@Override
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????????????System.out.println("消費者-1"?+?new?String(body));

          ????????????????//?手動確認?參數1?手動確認
          ????????????????channel.basicAck(envelope.getDeliveryTag(),?false);
          ????????????}
          ????????});

          //????????channel.close();
          //????????connection.close();
          ????}

          }

          文章已上傳gitee https://gitee.com/codingce/hexo-blog
          項目地址: https://github.com/xzMhehe/codingce-java


          ?


          更多推薦內容

          ↓↓↓

          Elasticsearch應用之京東搜索

          IK分詞器詳解

          30個編程領域的趣圖

          如果你喜歡本文

          請長按二維碼,關注公眾號

          轉發(fā)朋友圈,是對我最大的支持喲

          以上,便是今天的分享,希望大家喜歡,覺得內容不錯的,歡迎「分享」「」或者點擊「在看」支持,謝謝各位。

          瀏覽 46
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲婷婷六月天 | 成人视频在线免费观看 | 婷婷九月丁香 | 欧美在线色图 | 欧美mv日韩mv国产 |