<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          速入! 3W字帶你迅速上手RabbitMQ

          共 6089字,需瀏覽 13分鐘

           ·

          2020-12-06 20:46

          ?

          高清思維導(dǎo)圖已同步Git:https://github.com/SoWhat1412/xmindfile

          ?

          1. 消息隊(duì)列解決了什么問題

          消息中間件是目前比較流行的一個(gè)中間件,其中RabbitMQ更是占有一定的市場(chǎng)份額,主要用來做異步處理、應(yīng)用解耦、流量削峰、日志處理等等方面。

          1. 異步處理

          一個(gè)用戶登陸網(wǎng)址注冊(cè),然后系統(tǒng)發(fā)短信跟郵件告知注冊(cè)成功,一般有三種解決方法。

          1. 串行到依次執(zhí)行,問題是用戶注冊(cè)后就可以使用了,沒必要等驗(yàn)證碼跟郵件。
          2. 注冊(cè)成功后,郵件跟驗(yàn)證碼用并行等方式執(zhí)行,問題是郵件跟驗(yàn)證碼是非重要的任務(wù),系統(tǒng)注冊(cè)還要等這倆完成么?
          3. 基于異步MQ的處理,用戶注冊(cè)成功后直接把信息異步發(fā)送到MQ中,然后郵件系統(tǒng)跟驗(yàn)證碼系統(tǒng)主動(dòng)去拉取數(shù)據(jù)。

          2. 應(yīng)用解耦

          比如我們有一個(gè)訂單系統(tǒng),還要一個(gè)庫存系統(tǒng),用戶下訂單了就要調(diào)用下庫存系統(tǒng)來處理,直接調(diào)用到話庫存系統(tǒng)出現(xiàn)問題咋辦呢?

          3. 流量削峰

          舉辦一個(gè) 秒殺活動(dòng),如何較好到設(shè)計(jì)?服務(wù)層直接接受瞬間搞密度訪問絕對(duì)不可以起碼要加入一個(gè)MQ。

          4. 日志處理

          用戶通過WebUI訪問發(fā)送請(qǐng)求到時(shí)候后端如何接受跟處理呢一般?

          2. RabbitMQ 安裝跟配置

          官網(wǎng):https://www.rabbitmq.com/download.html

          開發(fā)語言:https://www.erlang.org/

          正式到安裝跟允許需要Erlang跟RabbitMQ倆版本之間相互兼容!我這里圖省事直接用Docker 拉取鏡像了。下載:開啟:管理頁面 默認(rèn)賬號(hào):guest ?默認(rèn)密碼:guest 。Docker啟動(dòng)時(shí)候可以指定賬號(hào)密碼對(duì)外端口以及

          docker?run?-d?--hostname?my-rabbit?--name?rabbit?-e?RABBITMQ_DEFAULT_USER=admin?-e?RABBITMQ_DEFAULT_PASS=admin?-p?15672:15672?-p?5672:5672?-p?25672:25672?-p?61613:61613?-p?1883:1883?rabbitmq:management?

          啟動(dòng):用戶添加:vitrual hosts 相當(dāng)于mysql中的DB。創(chuàng)建一個(gè)virtual hosts,一般以/ 開頭。對(duì)用戶進(jìn)行授權(quán),點(diǎn)擊/vhost_mmr,至于WebUI多點(diǎn)點(diǎn)即可了解。

          3. 實(shí)戰(zhàn)

          RabbitMQ 官網(wǎng)支持任務(wù)模式:https://www.rabbitmq.com/getstarted.htm

          l創(chuàng)建Maven項(xiàng)目導(dǎo)入必要依賴:

          ????<dependencies>
          ????????<dependency>
          ????????????<groupId>com.rabbitmqgroupId>
          ????????????<artifactId>amqp-clientartifactId>
          ????????????<version>4.0.2version>
          ????????dependency>

          ????????<dependency>
          ????????????<groupId>org.slf4jgroupId>
          ????????????<artifactId>slf4j-apiartifactId>
          ????????????<version>1.7.10version>
          ????????dependency>

          ????????<dependency>
          ????????????<groupId>org.slf4jgroupId>
          ????????????<artifactId>slf4j-log4j12artifactId>
          ????????????<version>1.7.5version>
          ????????dependency>

          ????????<dependency>
          ????????????<groupId>log4jgroupId>
          ????????????<artifactId>log4jartifactId>
          ????????????<version>1.2.17version>
          ????????dependency>

          ????????<dependency>
          ????????????<groupId>junitgroupId>
          ????????????<artifactId>junitartifactId>
          ????????????<version>4.11version>
          ????????dependency>
          ????dependencies>

          0. 獲取MQ連接

          package?com.sowhat.mq.util;

          import?com.rabbitmq.client.Connection;
          import?com.rabbitmq.client.ConnectionFactory;

          import?java.io.IOException;
          import?java.util.concurrent.TimeoutException;

          public?class?ConnectionUtils?{
          ????/**
          ?????*?連接器
          ?????*?@return
          ?????*?@throws?IOException
          ?????*?@throws?TimeoutException
          ?????*/

          ????public?static?Connection?getConnection()?throws?IOException,?TimeoutException?{
          ????????ConnectionFactory?factory?=?new?ConnectionFactory();
          ????????factory.setHost("127.0.0.1");
          ????????factory.setPort(5672);
          ????????factory.setVirtualHost("/vhost_mmr");
          ????????factory.setUsername("user_mmr");
          ????????factory.setPassword("sowhat");
          ????????Connection?connection?=?factory.newConnection();
          ????????return?connection;
          ????}
          }

          1. 簡(jiǎn)單隊(duì)列

          P:Producer 消息的生產(chǎn)者 中間:Queue消息隊(duì)列 C:Consumer 消息的消費(fèi)者

          package?com.sowhat.mq.simple;

          import?com.rabbitmq.client.AMQP;
          import?com.rabbitmq.client.Channel;
          import?com.rabbitmq.client.Connection;
          import?com.sowhat.mq.util.ConnectionUtils;

          import?java.io.IOException;
          import?java.util.concurrent.TimeoutException;

          public?class?Send?{
          ????public?static?final?String?QUEUE_NAME?=?"test_simple_queue";

          ????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
          ????????//?獲取一個(gè)連接
          ????????Connection?connection?=?ConnectionUtils.getConnection();
          ????????//?從連接獲取一個(gè)通道
          ????????Channel?channel?=?connection.createChannel();
          ????????//?創(chuàng)建隊(duì)列聲明
          ????????AMQP.Queue.DeclareOk?declareOk?=?channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);

          ????????String?msg?=?"hello?Simple";
          ????????//?exchange,隊(duì)列,參數(shù),消息字節(jié)體
          ????????channel.basicPublish("",?QUEUE_NAME,?null,?msg.getBytes());

          ????????System.out.println("--send?msg:"?+?msg);

          ????????channel.close();

          ????????connection.close();

          ????}
          }
          ---
          package?com.sowhat.mq.simple;

          import?com.rabbitmq.client.*;
          import?com.sowhat.mq.util.ConnectionUtils;

          import?java.io.IOException;
          import?java.util.concurrent.TimeoutException;

          /**
          ?*?消費(fèi)者獲取消息
          ?*/

          public?class?Recv?{
          ????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException,?InterruptedException?{
          ????????newApi();
          ????????oldApi();
          ????}

          ????private?static?void?newApi()?throws?IOException,?TimeoutException?{
          ????????//?創(chuàng)建連接
          ????????Connection?connection?=?ConnectionUtils.getConnection();
          ????????//?創(chuàng)建頻道
          ????????Channel?channel?=?connection.createChannel();
          ????????//?隊(duì)列聲明??隊(duì)列名,是否持久化,是否獨(dú)占模式,無消息后是否自動(dòng)刪除,消息攜帶參數(shù)
          ????????channel.queueDeclare(Send.QUEUE_NAME,false,false,false,null);
          ????????//?定義消費(fèi)者
          ????????DefaultConsumer?defaultConsumer?=?new?DefaultConsumer(channel)?{
          ????????????@Override??//?事件模型,消息來了會(huì)觸發(fā)該函數(shù)
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????????????String?s?=?new?String(body,?"utf-8");
          ????????????????System.out.println("---new?api?recv:"?+?s);
          ????????????}
          ????????};
          ????????//?監(jiān)聽隊(duì)列
          ????????channel.basicConsume(Send.QUEUE_NAME,true,defaultConsumer);
          ????}

          ????//?老方法?消費(fèi)者 MQ 在3。4以下?用次方法,
          ????private?static?void?oldApi()?throws?IOException,?TimeoutException,?InterruptedException?{
          ????????//?創(chuàng)建連接
          ????????Connection?connection?=?ConnectionUtils.getConnection();
          ????????//?創(chuàng)建頻道
          ????????Channel?channel?=?connection.createChannel();
          ????????//?定義隊(duì)列消費(fèi)者
          ????????QueueingConsumer?consumer?=?new?QueueingConsumer(channel);
          ????????//監(jiān)聽隊(duì)列
          ????????channel.basicConsume(Send.QUEUE_NAME,?true,?consumer);
          ????????while?(true)?{
          ????????????//?發(fā)貨體
          ????????????QueueingConsumer.Delivery?delivery?=?consumer.nextDelivery();
          ????????????byte[]?body?=?delivery.getBody();
          ????????????String?s?=?new?String(body);
          ????????????System.out.println("---Recv:"?+?s);
          ????????}
          ????}
          }

          右上角有可以設(shè)置頁面刷新頻率,然后可以在UI界面直接手動(dòng)消費(fèi)掉,如下圖:簡(jiǎn)單隊(duì)列的不足:耦合性過高,生產(chǎn)者一一對(duì)應(yīng)消費(fèi)者,如果有多個(gè)消費(fèi)者想消費(fèi)隊(duì)列中信息就無法實(shí)現(xiàn)了。

          2. WorkQueue 工作隊(duì)列

          Simple隊(duì)列中只能一一對(duì)應(yīng)的生產(chǎn)消費(fèi),實(shí)際開發(fā)中生產(chǎn)者發(fā)消息很簡(jiǎn)單,而消費(fèi)者要跟業(yè)務(wù)結(jié)合,消費(fèi)者接受到消息后要處理從而會(huì)耗時(shí)。「可能會(huì)出現(xiàn)隊(duì)列中出現(xiàn)消息積壓」。所以如果多個(gè)消費(fèi)者可以加速消費(fèi)。

          1. round robin 輪詢分發(fā)

          代碼編程一個(gè)生產(chǎn)者兩個(gè)消費(fèi)者:

          package?com.sowhat.mq.work;

          import?com.rabbitmq.client.AMQP;
          import?com.rabbitmq.client.Channel;
          import?com.rabbitmq.client.Connection;
          import?com.sowhat.mq.util.ConnectionUtils;

          import?java.io.IOException;
          import?java.util.concurrent.TimeoutException;

          public?class?Send?{
          ????public?static?final?String??QUEUE_NAME?=?"test_work_queue";
          ????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException,?InterruptedException?{
          ????????//?獲取連接
          ????????Connection?connection?=?ConnectionUtils.getConnection();
          ????????//?獲取?channel
          ????????Channel?channel?=?connection.createChannel();
          ????????//?聲明隊(duì)列
          ????????AMQP.Queue.DeclareOk?declareOk?=?channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
          ????????for?(int?i?=?0;?i?<50?;?i++)?{
          ????????????String?msg?=?"hello-"?+?i;
          ????????????System.out.println("WQ?send?"?+?msg);
          ????????????channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
          ????????????Thread.sleep(i*20);
          ????????}
          ????????channel.close();
          ????????connection.close();
          ????}
          }

          ---
          package?com.sowhat.mq.work;

          import?com.rabbitmq.client.*;
          import?com.sowhat.mq.util.ConnectionUtils;

          import?java.io.IOException;
          import?java.util.concurrent.TimeoutException;

          public?class?Recv1?{
          ????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
          ????????//?獲取連接
          ????????Connection?connection?=?ConnectionUtils.getConnection();
          ????????//?獲取通道
          ????????Channel?channel?=?connection.createChannel();
          ????????//?聲明隊(duì)列
          ????????channel.queueDeclare(Send.QUEUE_NAME,?false,?false,?false,?null);
          ????????//定義消費(fèi)者
          ????????DefaultConsumer?consumer?=?new?DefaultConsumer(channel)?{

          ????????????@Override?//?事件觸發(fā)機(jī)制
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????????????String?s?=?new?String(body,?"utf-8");
          ????????????????System.out.println("【1】:"?+?s);
          ????????????????try?{
          ????????????????????Thread.sleep(2000);
          ????????????????}?catch?(InterruptedException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}?finally?{
          ????????????????????System.out.println("【1】?done");
          ????????????????}
          ????????????}
          ????????};
          ????????boolean?autoAck?=?true;
          ????????channel.basicConsume(Send.QUEUE_NAME,?autoAck,?consumer);
          ????}
          }
          ---
          package?com.sowhat.mq.work;

          import?com.rabbitmq.client.*;
          import?com.sowhat.mq.util.ConnectionUtils;

          import?java.io.IOException;
          import?java.util.concurrent.TimeoutException;

          public?class?Recv2?{
          ????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
          ????????//?獲取連接
          ????????Connection?connection?=?ConnectionUtils.getConnection();
          ????????//?獲取通道
          ????????Channel?channel?=?connection.createChannel();
          ????????//?聲明隊(duì)列
          ????????channel.queueDeclare(Send.QUEUE_NAME,?false,?false,?false,?null);
          ????????//定義消費(fèi)者
          ????????DefaultConsumer?consumer?=?new?DefaultConsumer(channel)?{

          ????????????@Override?//?事件觸發(fā)機(jī)制
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????????????String?s?=?new?String(body,?"utf-8");
          ????????????????System.out.println("【2】:"?+?s);
          ????????????????try?{
          ????????????????????Thread.sleep(1000?);
          ????????????????}?catch?(InterruptedException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}?finally?{
          ????????????????????System.out.println("【2】?done");
          ????????????????}
          ????????????}
          ????????};
          ????????boolean?autoAck?=?true;
          ????????channel.basicConsume(Send.QUEUE_NAME,?autoAck,?consumer);
          ????}
          }

          現(xiàn)象:消費(fèi)者1 跟消費(fèi)者2 處理的數(shù)據(jù)量完全一樣的個(gè)數(shù):消費(fèi)者1:處理偶數(shù) 消費(fèi)者2:處理奇數(shù) 這種方式叫輪詢分發(fā)(round-robin)結(jié)果就是不管兩個(gè)消費(fèi)者誰忙,「數(shù)據(jù)總是你一個(gè)我一個(gè)」,MQ 給兩個(gè)消費(fèi)發(fā)數(shù)據(jù)的時(shí)候是不知道消費(fèi)者性能的,默認(rèn)就是雨露均沾。此時(shí) autoAck = true。

          2. 公平分發(fā) fair dipatch

          如果要實(shí)現(xiàn)公平分發(fā),要讓消費(fèi)者消費(fèi)完畢一條數(shù)據(jù)后就告知MQ,再讓MQ發(fā)數(shù)據(jù)即可。自動(dòng)應(yīng)答要關(guān)閉!

          package?com.sowhat.mq.work;

          import?com.rabbitmq.client.AMQP;
          import?com.rabbitmq.client.Channel;
          import?com.rabbitmq.client.Connection;
          import?com.sowhat.mq.util.ConnectionUtils;

          import?java.io.IOException;
          import?java.util.concurrent.TimeoutException;

          public?class?Send?{
          ????public?static?final?String??QUEUE_NAME?=?"test_work_queue";
          ????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException,?InterruptedException?{
          ????????//?獲取連接
          ????????Connection?connection?=?ConnectionUtils.getConnection();
          ????????//?獲取?channel
          ????????Channel?channel?=?connection.createChannel();
          ????????//?s聲明隊(duì)列
          ????????AMQP.Queue.DeclareOk?declareOk?=?channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);

          ????????//?每個(gè)消費(fèi)者發(fā)送確認(rèn)消息之前,消息隊(duì)列不發(fā)送下一個(gè)消息到消費(fèi)者,一次只發(fā)送一個(gè)消息
          ????????//?從而限制一次性發(fā)送給消費(fèi)者到消息不得超過1個(gè)。
          ????????int?perfetchCount?=?1;
          ????????channel.basicQos(perfetchCount);

          ????????for?(int?i?=?0;?i?<50?;?i++)?{
          ????????????String?msg?=?"hello-"?+?i;
          ????????????System.out.println("WQ?send?"?+?msg);
          ????????????channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
          ????????????Thread.sleep(i*20);
          ????????}
          ????????channel.close();
          ????????connection.close();
          ????}
          }
          ---
          package?com.sowhat.mq.work;

          import?com.rabbitmq.client.*;
          import?com.sowhat.mq.util.ConnectionUtils;

          import?java.io.IOException;
          import?java.util.concurrent.TimeoutException;

          public?class?Recv1?{
          ????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
          ????????//?獲取連接
          ????????Connection?connection?=?ConnectionUtils.getConnection();
          ????????//?獲取通道
          ????????final?Channel?channel?=?connection.createChannel();
          ????????//?聲明隊(duì)列
          ????????channel.queueDeclare(Send.QUEUE_NAME,?false,?false,?false,?null);
          ????????//?保證一次只分發(fā)一個(gè)
          ????????channel.basicQos(1);
          ????????//定義消費(fèi)者
          ????????DefaultConsumer?consumer?=?new?DefaultConsumer(channel)?{

          ????????????@Override?//?事件觸發(fā)機(jī)制
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????????????String?s?=?new?String(body,?"utf-8");
          ????????????????System.out.println("【1】:"?+?s);
          ????????????????try?{
          ????????????????????Thread.sleep(2000);
          ????????????????}?catch?(InterruptedException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}?finally?{
          ????????????????????System.out.println("【1】?done");
          ????????????????????//?手動(dòng)回執(zhí)
          ????????????????????channel.basicAck(envelope.getDeliveryTag(),false);
          ????????????????}
          ????????????}
          ????????};
          ????????//?自動(dòng)應(yīng)答
          ????????boolean?autoAck?=?false;
          ????????channel.basicConsume(Send.QUEUE_NAME,?autoAck,?consumer);
          ????}
          }
          ---
          package?com.sowhat.mq.work;

          import?com.rabbitmq.client.*;
          import?com.sowhat.mq.util.ConnectionUtils;

          import?java.io.IOException;
          import?java.util.concurrent.TimeoutException;

          public?class?Recv2?{
          ????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
          ????????//?獲取連接
          ????????Connection?connection?=?ConnectionUtils.getConnection();
          ????????//?獲取通道
          ????????final?Channel?channel?=?connection.createChannel();
          ????????//?聲明隊(duì)列
          ????????channel.queueDeclare(Send.QUEUE_NAME,?false,?false,?false,?null);
          ????????//?保證一次只分發(fā)一個(gè)
          ????????channel.basicQos(1);
          ????????//定義消費(fèi)者
          ????????DefaultConsumer?consumer?=?new?DefaultConsumer(channel)?{

          ????????????@Override?//?事件觸發(fā)機(jī)制
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????????????String?s?=?new?String(body,?"utf-8");
          ????????????????System.out.println("【2】:"?+?s);
          ????????????????try?{
          ????????????????????Thread.sleep(1000);
          ????????????????}?catch?(InterruptedException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}?finally?{
          ????????????????????System.out.println("【2】?done");
          ????????????????????//?手動(dòng)回執(zhí)
          ????????????????????channel.basicAck(envelope.getDeliveryTag(),false);
          ????????????????}
          ????????????}
          ????????};
          ????????//?自動(dòng)應(yīng)答
          ????????boolean?autoAck?=?false;
          ????????channel.basicConsume(Send.QUEUE_NAME,?autoAck,?consumer);
          ????}
          }

          結(jié)果:實(shí)現(xiàn)了公平分發(fā),消費(fèi)者2 是消費(fèi)者1消費(fèi)數(shù)量的2倍。

          3. publish/subscribe 發(fā)布訂閱模式

          類似公眾號(hào)的訂閱跟發(fā)布,無需指定routingKey:

          解讀:

          1. 一個(gè)生產(chǎn)者多個(gè)消費(fèi)者
          2. 每一個(gè)消費(fèi)者都有一個(gè)自己的隊(duì)列
          3. 生產(chǎn)者沒有把消息直接發(fā)送到隊(duì)列而是發(fā)送到了交換機(jī)轉(zhuǎn)化器(exchange)。
          4. 每一個(gè)隊(duì)列都要綁定到交換機(jī)上。
          5. 生產(chǎn)者發(fā)送的消息經(jīng)過交換機(jī)到達(dá)隊(duì)列,從而實(shí)現(xiàn)一個(gè)消息被多個(gè)消費(fèi)者消費(fèi)。

          生產(chǎn)者:

          package?com.sowhat.mq.ps;

          import?com.rabbitmq.client.Channel;
          import?com.rabbitmq.client.Connection;
          import?com.sowhat.mq.util.ConnectionUtils;

          import?java.io.IOException;
          import?java.util.concurrent.TimeoutException;

          public?class?Send?{
          ????public?static?final?String?EXCHANGE_NAME?=?"test_exchange_fanout";
          ????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
          ????????Connection?connection?=?ConnectionUtils.getConnection();
          ????????Channel?channel?=?connection.createChannel();

          ????????//聲明交換機(jī)
          ????????channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//?分發(fā)=?fanout

          ????????//?發(fā)送消息
          ????????String?msg?=?"hello?ps?";

          ????????channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
          ????????System.out.println("Send:"?+?msg);

          ????????channel.close();
          ????????connection.close();
          ????}
          }

          消息哪兒去了?丟失了,在RabbitMQ中只有隊(duì)列有存儲(chǔ)能力,「因?yàn)檫@個(gè)時(shí)候隊(duì)列還沒有綁定到交換機(jī) 所以消息丟失了」。消費(fèi)者:

          package?com.sowhat.mq.ps;

          import?com.rabbitmq.client.*;
          import?com.sowhat.mq.util.ConnectionUtils;

          import?java.io.IOException;
          import?java.util.concurrent.TimeoutException;

          public?class?Recv1?{
          ????public?static?final?String??QUEUE_NAME?=?"test_queue_fanout_email";
          ????public?static?final?String?EXCHANGE_NAME?=?"test_exchange_fanout";
          ????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
          ????????Connection?connection?=?ConnectionUtils.getConnection();
          ????????final?Channel?channel?=?connection.createChannel();
          ????????//?隊(duì)列聲明
          ????????channel.queueDeclare(QUEUE_NAME,false,false,false,null);
          ????????//?綁定隊(duì)列到交換機(jī)轉(zhuǎn)發(fā)器
          ????????channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,""?);

          ????????//?保證一次只分發(fā)一個(gè)
          ????????channel.basicQos(1);
          ????????//定義消費(fèi)者
          ????????DefaultConsumer?consumer?=?new?DefaultConsumer(channel)?{

          ????????????@Override?//?事件觸發(fā)機(jī)制
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????????????String?s?=?new?String(body,?"utf-8");
          ????????????????System.out.println("【1】:"?+?s);
          ????????????????try?{
          ????????????????????Thread.sleep(2000);
          ????????????????}?catch?(InterruptedException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}?finally?{
          ????????????????????System.out.println("【1】?done");
          ????????????????????//?手動(dòng)回執(zhí)
          ????????????????????channel.basicAck(envelope.getDeliveryTag(),false);
          ????????????????}
          ????????????}
          ????????};
          ????????//?自動(dòng)應(yīng)答
          ????????boolean?autoAck?=?false;
          ????????channel.basicConsume(QUEUE_NAME,?autoAck,?consumer);
          ????}
          }
          ---
          package?com.sowhat.mq.ps;

          import?com.rabbitmq.client.*;
          import?com.sowhat.mq.util.ConnectionUtils;

          import?java.io.IOException;
          import?java.util.concurrent.TimeoutException;

          public?class?Recv2?{
          ????public?static?final?String??QUEUE_NAME?=?"test_queue_fanout_sms";
          ????public?static?final?String?EXCHANGE_NAME?=?"test_exchange_fanout";
          ????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
          ????????Connection?connection?=?ConnectionUtils.getConnection();
          ????????final?Channel?channel?=?connection.createChannel();
          ????????//?隊(duì)列聲明
          ????????channel.queueDeclare(QUEUE_NAME,false,false,false,null);
          ????????//?綁定隊(duì)列到交換機(jī)轉(zhuǎn)發(fā)器
          ????????channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,""?);
          ????????//?保證一次只分發(fā)一個(gè)
          ????????channel.basicQos(1);
          ????????//定義消費(fèi)者
          ????????DefaultConsumer?consumer?=?new?DefaultConsumer(channel)?{
          ????????????@Override?//?事件觸發(fā)機(jī)制
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????????????String?s?=?new?String(body,?"utf-8");
          ????????????????System.out.println("【2】:"?+?s);
          ????????????????try?{
          ????????????????????Thread.sleep(1000);
          ????????????????}?catch?(InterruptedException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}?finally?{
          ????????????????????System.out.println("【2】?done");
          ????????????????????//?手動(dòng)回執(zhí)
          ????????????????????channel.basicAck(envelope.getDeliveryTag(),false);
          ????????????????}
          ????????????}
          ????????};
          ????????//?自動(dòng)應(yīng)答
          ????????boolean?autoAck?=?false;
          ????????channel.basicConsume(QUEUE_NAME,?autoAck,?consumer);
          ????}
          }

          「同時(shí)還可以自己手動(dòng)的添加一個(gè)隊(duì)列監(jiān)控到該exchange」

          4. routing 路由選擇 通配符模式

          Exchange(交換機(jī),轉(zhuǎn)發(fā)器):「一方面接受生產(chǎn)者消息,另一方面是向隊(duì)列推送消息」。匿名轉(zhuǎn)發(fā)用 "" ?表示,比如前面到簡(jiǎn)單隊(duì)列跟WorkQueue。fanout:不處理路由鍵。「不需要指定routingKey」,我們只需要把隊(duì)列綁定到交換機(jī), 「消息就會(huì)被發(fā)送到所有到隊(duì)列中」direct:處理路由鍵,「需要指定routingKey」,此時(shí)生產(chǎn)者發(fā)送數(shù)據(jù)到時(shí)候會(huì)指定key,任務(wù)隊(duì)列也會(huì)指定key,只有key一樣消息才會(huì)被傳送到隊(duì)列中。如下圖

          package?com.sowhat.mq.routing;

          import?com.rabbitmq.client.Channel;
          import?com.rabbitmq.client.Connection;
          import?com.sowhat.mq.util.ConnectionUtils;

          import?java.io.IOException;
          import?java.util.concurrent.TimeoutException;

          public?class?Send?{
          ????public?static?final?String??EXCHANGE_NAME?=?"test_exchange_direct";
          ????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
          ????????Connection?connection?=?ConnectionUtils.getConnection();
          ????????Channel?channel?=?connection.createChannel();
          ????????//?exchange
          ????????channel.exchangeDeclare(EXCHANGE_NAME,"direct");

          ????????String?msg?=?"hello?info!";

          ????????//?可以指定類型
          ????????String?routingKey?=?"info";
          ????????channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());
          ????????System.out.println("Send?:?"?+?msg);
          ????????channel.close();
          ????????connection.close();
          ????}
          }
          ---
          package?com.sowhat.mq.routing;

          import?com.rabbitmq.client.*;
          import?com.sowhat.mq.util.ConnectionUtils;

          import?java.io.IOException;
          import?java.util.concurrent.TimeoutException;

          public?class?Recv1?{
          ????public?static?final?String??EXCHANGE_NAME?=?"test_exchange_direct";
          ????public?static?final?String?QUEUE_NAME?=?"test_queue_direct_1";
          ????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
          ????????Connection?connection?=?ConnectionUtils.getConnection();
          ????????final?Channel?channel?=?connection.createChannel();

          ????????channel.queueDeclare(QUEUE_NAME,false,false,false,null);
          ????????channel.basicQos(1);

          ????????channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");

          ????????//定義消費(fèi)者
          ????????DefaultConsumer?consumer?=?new?DefaultConsumer(channel)?{

          ????????????@Override?//?事件觸發(fā)機(jī)制
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????????????String?s?=?new?String(body,?"utf-8");
          ????????????????System.out.println("【1】:"?+?s);
          ????????????????try?{
          ????????????????????Thread.sleep(2000);
          ????????????????}?catch?(InterruptedException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}?finally?{
          ????????????????????System.out.println("【1】?done");
          ????????????????????//?手動(dòng)回執(zhí)
          ????????????????????channel.basicAck(envelope.getDeliveryTag(),false);
          ????????????????}
          ????????????}
          ????????};
          ????????//?自動(dòng)應(yīng)答
          ????????boolean?autoAck?=?false;
          ????????channel.basicConsume(QUEUE_NAME,?autoAck,?consumer);
          ????}
          }
          ---
          package?com.sowhat.mq.routing;

          import?com.rabbitmq.client.*;
          import?com.sowhat.mq.util.ConnectionUtils;

          import?java.io.IOException;
          import?java.util.concurrent.TimeoutException;

          public?class?Recv2?{
          ????public?static?final?String?EXCHANGE_NAME?=?"test_exchange_direct";
          ????public?static?final?String?QUEUE_NAME?=?"test_queue_direct_2";

          ????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
          ????????Connection?connection?=?ConnectionUtils.getConnection();
          ????????final?Channel?channel?=?connection.createChannel();

          ????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
          ????????channel.basicQos(1);

          ????????//?綁定種類似?Key
          ????????channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?"error");
          ????????channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?"info");
          ????????channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?"warning");

          ????????//定義消費(fèi)者
          ????????DefaultConsumer?consumer?=?new?DefaultConsumer(channel)?{
          ????????????@Override?//?事件觸發(fā)機(jī)制
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????????????String?s?=?new?String(body,?"utf-8");
          ????????????????System.out.println("【2】:"?+?s);
          ????????????????try?{
          ????????????????????Thread.sleep(1000);
          ????????????????}?catch?(InterruptedException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}?finally?{
          ????????????????????System.out.println("【2】?done");
          ????????????????????//?手動(dòng)回執(zhí)
          ????????????????????channel.basicAck(envelope.getDeliveryTag(),?false);
          ????????????????}
          ????????????}
          ????????};
          ????????//?自動(dòng)應(yīng)答
          ????????boolean?autoAck?=?false;
          ????????channel.basicConsume(QUEUE_NAME,?autoAck,?consumer);

          ????}
          }

          WebUI:缺點(diǎn):路由key必須要明確,無法實(shí)現(xiàn)規(guī)則性模糊匹配。

          5. Topics 主題

          將路由鍵跟某個(gè)模式匹配,# 表示匹配 >=1個(gè)字符, *表示匹配一個(gè)。生產(chǎn)者會(huì)帶routingKey,但是消費(fèi)者的MQ會(huì)帶模糊routingKey。商品:發(fā)布、刪除、修改、查詢。

          package?com.sowhat.mq.topic;

          import?com.rabbitmq.client.Channel;
          import?com.rabbitmq.client.Connection;
          import?com.sowhat.mq.util.ConnectionUtils;

          import?java.io.IOException;
          import?java.util.concurrent.TimeoutException;

          public?class?Send?{
          ????public?static?final?String?EXCHANGE_NAME?=?"test_exchange_topic";

          ????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
          ????????Connection?connection?=?ConnectionUtils.getConnection();
          ????????Channel?channel?=?connection.createChannel();
          ????????//?exchange
          ????????channel.exchangeDeclare(EXCHANGE_NAME,?"topic");

          ????????String?msg?=?"商品!";

          ????????//?可以指定類型
          ????????String?routingKey?=?"goods.find";
          ????????channel.basicPublish(EXCHANGE_NAME,?routingKey,?null,?msg.getBytes());
          ????????System.out.println("Send?:?"?+?msg);
          ????????channel.close();
          ????????connection.close();
          ????}
          }
          ---
          package?com.sowhat.mq.topic;

          import?com.rabbitmq.client.*;
          import?com.sowhat.mq.util.ConnectionUtils;

          import?java.io.IOException;
          import?java.util.concurrent.TimeoutException;

          public?class?Recv1?{
          ????public?static?final?String??EXCHANGE_NAME?=?"test_exchange_topic";
          ????public?static?final?String?QUEUE_NAME?=?"test_queue_topic_1";
          ????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
          ????????Connection?connection?=?ConnectionUtils.getConnection();
          ????????final?Channel?channel?=?connection.createChannel();

          ????????channel.queueDeclare(QUEUE_NAME,false,false,false,null);
          ????????channel.basicQos(1);

          ????????channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.add");

          ????????//定義消費(fèi)者
          ????????DefaultConsumer?consumer?=?new?DefaultConsumer(channel)?{

          ????????????@Override?//?事件觸發(fā)機(jī)制
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????????????String?s?=?new?String(body,?"utf-8");
          ????????????????System.out.println("【1】:"?+?s);
          ????????????????try?{
          ????????????????????Thread.sleep(2000);
          ????????????????}?catch?(InterruptedException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}?finally?{
          ????????????????????System.out.println("【1】?done");
          ????????????????????//?手動(dòng)回執(zhí)
          ????????????????????channel.basicAck(envelope.getDeliveryTag(),false);
          ????????????????}
          ????????????}
          ????????};
          ????????//?自動(dòng)應(yīng)答
          ????????boolean?autoAck?=?false;
          ????????channel.basicConsume(QUEUE_NAME,?autoAck,?consumer);
          ????}
          }
          ---
          package?com.sowhat.mq.topic;

          import?com.rabbitmq.client.*;
          import?com.sowhat.mq.util.ConnectionUtils;

          import?java.io.IOException;
          import?java.util.concurrent.TimeoutException;

          public?class?Recv2?{
          ????public?static?final?String?EXCHANGE_NAME?=?"test_exchange_topic";
          ????public?static?final?String?QUEUE_NAME?=?"test_queue_topic_2";

          ????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
          ????????Connection?connection?=?ConnectionUtils.getConnection();
          ????????final?Channel?channel?=?connection.createChannel();

          ????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
          ????????channel.basicQos(1);
          ????????//?此乃重點(diǎn)
          ????????channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?"goods.#");

          ????????//定義消費(fèi)者
          ????????DefaultConsumer?consumer?=?new?DefaultConsumer(channel)?{
          ????????????@Override?//?事件觸發(fā)機(jī)制
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????????????String?s?=?new?String(body,?"utf-8");
          ????????????????System.out.println("【2】:"?+?s);
          ????????????????try?{
          ????????????????????Thread.sleep(1000);
          ????????????????}?catch?(InterruptedException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}?finally?{
          ????????????????????System.out.println("【2】?done");
          ????????????????????//?手動(dòng)回執(zhí)
          ????????????????????channel.basicAck(envelope.getDeliveryTag(),?false);
          ????????????????}
          ????????????}
          ????????};
          ????????//?自動(dòng)應(yīng)答
          ????????boolean?autoAck?=?false;
          ????????channel.basicConsume(QUEUE_NAME,?autoAck,?consumer);
          ????}
          }

          6. MQ的持久化跟非持久化

          因?yàn)橄⒃趦?nèi)存中,如果MQ掛了那么消息也丟失了,所以應(yīng)該考慮MQ的持久化。MQ是支持持久化的,

          //?聲明隊(duì)列
          channel.queueDeclare(Send.QUEUE_NAME,?false,?false,?false,?null);
          ????/**
          ?????*?Declare?a?queue
          ?????*?@see?com.rabbitmq.client.AMQP.Queue.Declare
          ?????*?@see?com.rabbitmq.client.AMQP.Queue.DeclareOk
          ?????*?@param?queue?the?name?of?the?queue
          ?????*?@param?durable?true?if?we?are?declaring?a?durable?queue?(the?queue?will?survive?a?server?restart)
          ?????*?@param?exclusive?true?if?we?are?declaring?an?exclusive?queue?(restricted?to?this?connection)
          ?????*?@param?autoDelete?true?if?we?are?declaring?an?autodelete?queue?(server?will?delete?it?when?no?longer?in?use)
          ?????*?@param?arguments?other?properties?(construction?arguments)?for?the?queue
          ?????*?@return?a?declaration-confirm?method?to?indicate?the?queue?was?successfully?declared
          ?????*?@throws?java.io.IOException?if?an?error?is?encountered
          ?????*/

          ????Queue.DeclareOk?queueDeclare(String?queue,?boolean?durable,?boolean?exclusive,?boolean?autoDelete,
          ?????????????????????????????????Map?arguments)
          ?throws?IOException
          ;

          boolean durable就是表明是否可以持久化,如果我們將程序中的durable = false改為true是不可以的!因?yàn)槲覀円呀?jīng)定義過的test_work_queue,這個(gè)queue已聲明為未持久化的。結(jié)論:MQ 不允許修改一個(gè)已經(jīng)存在的隊(duì)列參數(shù)。

          7. 消費(fèi)者端手動(dòng)跟自動(dòng)確認(rèn)消息


          ????????//?自動(dòng)應(yīng)答
          ????????boolean?autoAck?=?false;
          ????????channel.basicConsume(Send.QUEUE_NAME,?autoAck,?consumer);

          當(dāng)MQ發(fā)送數(shù)據(jù)個(gè)消費(fèi)者后,消費(fèi)者要對(duì)收到對(duì)信息應(yīng)答給MQ。

          如果autoAck = true 表示「自動(dòng)確認(rèn)模式」,一旦MQ把消息分發(fā)給消費(fèi)者就會(huì)把消息從內(nèi)存中刪除。如果消費(fèi)者收到消息但是還沒有消費(fèi)完而MQ中數(shù)據(jù)已刪除則會(huì)導(dǎo)致丟失了正在處理對(duì)消息。

          如果autoAck = false表示「手動(dòng)確認(rèn)模式」,如果有個(gè)消費(fèi)者掛了,MQ因?yàn)闆]有收到回執(zhí)信息可以把該信息再發(fā)送給其他對(duì)消費(fèi)者。

          MQ支持消息應(yīng)答(Message acknowledgement),消費(fèi)者發(fā)送一個(gè)消息應(yīng)答告訴MQ這個(gè)消息已經(jīng)被消費(fèi)了,MQ才從內(nèi)存中刪除。消息應(yīng)答模式「默認(rèn)為 false」。

          8. RabbitMQ生產(chǎn)者端消息確認(rèn)機(jī)制(事務(wù) + confirm)

          在RabbitMQ中我們可以通過持久化來解決MQ服務(wù)器異常的數(shù)據(jù)丟失問題,但是「生產(chǎn)者如何確保數(shù)據(jù)發(fā)送到MQ了」?默認(rèn)情況下生產(chǎn)者也是不知道的。如何解決 呢?

          1. AMQP事務(wù)

          第一種方式AMQP實(shí)現(xiàn)了事務(wù)機(jī)制,類似mysql的事務(wù)機(jī)制。txSelect:用戶將當(dāng)前channel設(shè)置為transition模式。txCommit:用于提交事務(wù)。txRollback:用于回滾事務(wù)。

          以上都是對(duì)生產(chǎn)者對(duì)操作。

          package?com.sowhat.mq.tx;

          import?com.rabbitmq.client.Channel;
          import?com.rabbitmq.client.Connection;
          import?com.sowhat.mq.util.ConnectionUtils;

          import?java.io.IOException;
          import?java.util.concurrent.TimeoutException;

          public?class?TxSend?{
          ????public?static?final?String?QUEUE_NAME?=?"test_queue_tx";

          ????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
          ????????Connection?connection?=?ConnectionUtils.getConnection();
          ????????Channel?channel?=?connection.createChannel();
          ????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);

          ????????String?msg?=?"hello?tx?message";

          ????????try?{
          ????????????//開啟事務(wù)模式
          ????????????channel.txSelect();
          ????????????channel.basicPublish("",?QUEUE_NAME,?null,?msg.getBytes());
          ????????????int?x?=?1?/?0;
          ????????????
          ????????????//?提交事務(wù)
          ????????????channel.txCommit();
          ????????}?catch?(IOException?e)?{
          ????????????//?回滾
          ????????????channel.txRollback();
          ????????????System.out.println("send?message?rollback");
          ????????}?finally?{
          ????????????channel.close();
          ????????????connection.close();
          ????????}
          ????}
          }
          ---
          package?com.sowhat.mq.tx;

          import?com.rabbitmq.client.*;
          import?com.sowhat.mq.util.ConnectionUtils;

          import?java.io.IOException;
          import?java.util.concurrent.TimeoutException;

          public?class?TxRecv?{
          ????public?static?final?String?QUEUE_NAME?=?"test_queue_tx";

          ????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
          ????????Connection?connection?=?ConnectionUtils.getConnection();
          ????????Channel?channel?=?connection.createChannel();

          ????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);

          ????????String?s?=?channel.basicConsume(QUEUE_NAME,?true,?new?DefaultConsumer(channel)?{
          ????????????@Override
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????????????System.out.println("recv[tx]?msg:"?+?new?String(body,?"utf-8"));
          ????????????}
          ????????});
          ????????channel.close();
          ????????connection.close();
          ????}
          }

          缺點(diǎn)就是大量對(duì)請(qǐng)求嘗試然后失敗然后回滾,會(huì)降低MQ的吞吐量。

          2. Confirm模式。

          「生產(chǎn)者端confirm實(shí)現(xiàn)原理」生產(chǎn)者將信道設(shè)置為confirm模式,一旦信道進(jìn)入了confirm模式,所以該信道上發(fā)布的信息都會(huì)被派一個(gè)唯一的ID(從1開始),一旦消息被投遞到所有的匹配隊(duì)列后,Broker就回發(fā)送一個(gè)確認(rèn)給生產(chǎn)者(包含消息唯一ID),這就使得生產(chǎn)者知道消息已經(jīng)正確到達(dá)目的隊(duì)列了,如果消息跟隊(duì)列是可持久化的,那么確認(rèn)消息會(huì)在消息寫入到磁盤后才發(fā)出。broker回傳給生產(chǎn)者到確認(rèn)消息中deliver-tag域包含了確認(rèn)消息到序列號(hào),此外broker也可以設(shè)置basic.ack的multiple域,表示這個(gè)序列號(hào)之前所以信息都已經(jīng)得到處理。

          Confirm模式最大的好處在于是異步的。第一條消息發(fā)送后不用一直等待回復(fù)后才發(fā)第二條消息。

          開啟confirm模式:channel.confimSelect()編程模式:

          1. 普通的發(fā)送一個(gè)消息后就 waitForConfirms()
          package?com.sowhat.confirm;

          import?com.rabbitmq.client.Channel;
          import?com.rabbitmq.client.Connection;
          import?com.sowhat.mq.util.ConnectionUtils;

          import?java.io.IOException;
          import?java.util.concurrent.TimeoutException;

          public?class?Send1?{
          ????public?static?final?String?QUEUE_NAME?=?"test_queue_confirm1";

          ????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException,?InterruptedException?{
          ????????Connection?connection?=?ConnectionUtils.getConnection();
          ????????Channel?channel?=?connection.createChannel();
          ????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);

          ????????//?將channel模式設(shè)置為 confirm模式,注意設(shè)置這個(gè)不能設(shè)置為事務(wù)模式。
          ????????channel.confirmSelect();

          ????????String?msg?=?"hello?confirm?message";
          ????????channel.basicPublish("",?QUEUE_NAME,?null,?msg.getBytes());
          ????????if?(!channel.waitForConfirms())?{
          ????????????System.out.println("消息發(fā)送失敗");
          ????????}?else?{
          ????????????System.out.println("消息發(fā)送OK");
          ????????}
          ????????channel.close();
          ????????connection.close();
          ????}
          }
          ---
          package?com.sowhat.confirm;

          import?com.rabbitmq.client.*;
          import?com.sowhat.mq.util.ConnectionUtils;

          import?java.io.IOException;
          import?java.util.concurrent.TimeoutException;

          public?class?Recv?{
          ????public?static?final?String?QUEUE_NAME?=?"test_queue_confirm1";

          ????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
          ????????Connection?connection?=?ConnectionUtils.getConnection();
          ????????Channel?channel?=?connection.createChannel();

          ????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);

          ????????String?s?=?channel.basicConsume(QUEUE_NAME,?true,?new?DefaultConsumer(channel)?{
          ????????????@Override
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????????????System.out.println("recv[tx]?msg:"?+?new?String(body,?"utf-8"));
          ????????????}
          ????????});
          ????}
          }
          2. 批量的發(fā)一批數(shù)據(jù) waitForConfirms()
          package?com.sowhat.confirm;

          import?com.rabbitmq.client.Channel;
          import?com.rabbitmq.client.Connection;
          import?com.sowhat.mq.util.ConnectionUtils;

          import?java.io.IOException;
          import?java.util.concurrent.TimeoutException;

          public?class?Send2?{
          ????public?static?final?String?QUEUE_NAME?=?"test_queue_confirm1";

          ????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException,?InterruptedException?{
          ????????Connection?connection?=?ConnectionUtils.getConnection();
          ????????Channel?channel?=?connection.createChannel();
          ????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);

          ????????//?將channel模式設(shè)置為 confirm模式,注意設(shè)置這個(gè)不能設(shè)置為事務(wù)模式。
          ????????channel.confirmSelect();

          ????????String?msg?=?"hello?confirm?message";
          ????????//?批量發(fā)送
          ????????for?(int?i?=?0;?i?10;?i++)?{
          ????????????channel.basicPublish("",?QUEUE_NAME,?null,?msg.getBytes());
          ????????}
          ????????//?確認(rèn)
          ????????if?(!channel.waitForConfirms())?{
          ????????????System.out.println("消息發(fā)送失敗");
          ????????}?else?{
          ????????????System.out.println("消息發(fā)送OK");
          ????????}
          ????????channel.close();
          ????????connection.close();
          ????}
          }
          ---
          接受信息跟上面一樣
          3. 異步confirm模式,提供一個(gè)回調(diào)方法。

          Channel對(duì)象提供的ConfirmListener()回調(diào)方法只包含deliveryTag(包含當(dāng)前發(fā)出消息序號(hào)),我們需要自己為每一個(gè)Channel維護(hù)一個(gè)unconfirm的消息序號(hào)集合,每publish一條數(shù)據(jù),集合中元素加1,每回調(diào)一次handleAck方法,unconfirm集合刪掉響應(yīng)的一條(multiple=false)或多條(multiple=true)記錄,從運(yùn)行效率來看,unconfirm集合最好采用有序集合SortedSet存儲(chǔ)結(jié)構(gòu)。

          package?com.sowhat.mq.confirm;

          import?com.rabbitmq.client.*;
          import?com.sowhat.mq.util.ConnectionUtils;

          import?java.io.IOException;
          import?java.util.Collections;
          import?java.util.SortedSet;
          import?java.util.TreeSet;
          import?java.util.concurrent.TimeoutException;

          public?class?Send3?{
          ????public?static?final?String?QUEUE_NAME?=?"test_queue_confirm3";

          ????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException,?InterruptedException?{
          ????????Connection?connection?=?ConnectionUtils.getConnection();
          ????????Channel?channel?=?connection.createChannel();
          ????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);

          ????????//生產(chǎn)者調(diào)用confirmSelect
          ????????channel.confirmSelect();

          ????????//?存放未確認(rèn)消息
          ????????final?SortedSet?confirmSet?=?Collections.synchronizedSortedSet(new?TreeSet());

          ???????//?添加監(jiān)聽通道
          ????????channel.addConfirmListener(new?ConfirmListener()?{
          ????????????//?回執(zhí)有問題的
          ????????????public?void?handleAck(long?deliveryTag,?boolean?multiple)?throws?IOException?{
          ????????????????if?(multiple)?{
          ????????????????????System.out.println("--handleNack---multiple");
          ????????????????????confirmSet.headSet(deliveryTag?+?1).clear();
          ????????????????}?else?{
          ????????????????????System.out.println("--handleNack--?multiple?false");
          ????????????????????confirmSet.remove(deliveryTag);
          ????????????????}
          ????????????}

          ????????????//?沒有問題的handleAck
          ????????????public?void?handleNack(long?deliveryTag,?boolean?multiple)?throws?IOException?{
          ????????????????if?(multiple)?{
          ????????????????????System.out.println("--handleAck---multiple");
          ????????????????????confirmSet.headSet(deliveryTag?+?1).clear();
          ????????????????}?else?{
          ????????????????????System.out.println("--handleAck--multiple?false");
          ????????????????????confirmSet.remove(deliveryTag);
          ????????????????}
          ????????????}
          ????????});

          ????????//?一般情況下是先開啟?消費(fèi)者,指定好?exchange跟routingkey,如果生產(chǎn)者等routingkey?就會(huì)觸發(fā)這個(gè)return?方法
          ????????channel.addReturnListener(new?ReturnListener()?{
          ????????????public?void?handleReturn(int?replyCode,?String?replyText,?String?exchange,?String?routingKey,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????????????System.out.println("----?handle?return----");
          ????????????????System.out.println("replyCode:"?+?replyCode?);
          ????????????????System.out.println("replyText:"?+replyText?);
          ????????????????System.out.println("exchange:"?+?exchange);
          ????????????????System.out.println("routingKey:"?+?routingKey);
          ????????????????System.out.println("properties:"?+?properties);
          ????????????????System.out.println("body:"?+?new?String(body));
          ????????????}
          ????????});

          ????????String?msgStr?=?"sssss";
          ????????while(true){
          ????????????long?nextPublishSeqNo?=?channel.getNextPublishSeqNo();
          ????????????channel.basicPublish("",QUEUE_NAME,null,msgStr.getBytes());
          ????????????confirmSet.add(nextPublishSeqNo);
          ????????????Thread.sleep(1000);
          ????????}
          ????}
          }

          總結(jié):AMQP模式相對(duì)來說沒Confirm模式性能好些,推薦使用后者。

          9. RabbitMQ延遲隊(duì)列 跟死信

          淘寶訂單付款,驗(yàn)證碼等限時(shí)類型服務(wù)。

          ????????Map?headers?=??new?HashMap();
          ????????headers.put("my1","111");
          ????????headers.put("my2","222");
          ????????AMQP.BasicProperties?build?=?new?AMQP.BasicProperties().builder().deliveryMode(2).contentEncoding("utf-8").expiration("10000").headers(headers).build();

          死信的處理:

          10. SpringBoot Tpoic Demo

          需求圖:新建SpringBoot 項(xiàng)目添加如下依賴:

          ???????<dependency>
          ????????????<groupId>org.springframework.bootgroupId>
          ????????????<artifactId>spring-boot-starter-amqpartifactId>
          ????????dependency>
          1. 生產(chǎn)者

          application.yml

          spring:
          ??rabbitmq:
          ????host:?127.0.0.1
          ????username:?admin
          ????password:?admin

          測(cè)試用例:

          package?com.sowhat.mqpublisher;

          import?org.junit.jupiter.api.Test;
          import?org.springframework.amqp.core.AmqpTemplate;
          import?org.springframework.beans.factory.annotation.Autowired;
          import?org.springframework.boot.test.context.SpringBootTest;

          @SpringBootTest
          class?MqpublisherApplicationTests?{
          ????@Autowired
          ????private?AmqpTemplate?amqpTemplate;

          ????@Test
          ????void?userInfo()?{
          ????????/**
          ?????????*?exchange,routingKey,message
          ?????????*/

          ????????this.amqpTemplate.convertAndSend("log.topic","user.log.error","Users...");
          ????}
          }
          2. 消費(fèi)者

          application.xml

          spring:
          ??rabbitmq:
          ????host:?127.0.0.1
          ????username:?admin
          ????password:?admin

          #?自定義配置
          mq:
          ??config:
          ????exchange_name:?log.topic
          ????#?配置隊(duì)列名稱
          ????queue_name:
          ????????info:?log.info
          ????????error:?log.error
          ????????logs:?log.logs

          三個(gè)不同的消費(fèi)者:

          package?com.sowhat.mqconsumer.service;

          import?org.springframework.amqp.core.ExchangeTypes;
          import?org.springframework.amqp.rabbit.annotation.Exchange;
          import?org.springframework.amqp.rabbit.annotation.Queue;
          import?org.springframework.amqp.rabbit.annotation.QueueBinding;
          import?org.springframework.amqp.rabbit.annotation.RabbitListener;
          import?org.springframework.stereotype.Service;

          /**
          ?*?@QueueBinding?value屬性:用于綁定一個(gè)隊(duì)列。@Queue去查找一個(gè)名字為value屬性中的值得隊(duì)列,如果沒有則創(chuàng)建,如果有則返回
          ?* type = ExchangeTypes.TOPIC 指定交換器類型。默認(rèn)的direct交換器
          ?*/

          @Service
          public?class?ErrorReceiverService?{

          ????/**
          ??????*?把一個(gè)方法跟一個(gè)隊(duì)列進(jìn)行綁定,收到消息后綁定給msg
          ?????*/

          ????@RabbitListener(bindings?=?@QueueBinding(
          ????????????value?=?@Queue(value?=?"${mq.config.queue_name.error}"),
          ????????????exchange?=?@Exchange(value?=?"${mq.config.exchange_name}",?type?=?ExchangeTypes.TOPIC),
          ????????????key?=?"*.log.error"
          ????????)
          ????)
          ????public?void?process(String?msg)?{
          ????????System.out.println(msg?+?"?Logs...........");
          ????}
          }
          ---
          package?com.sowhat.mqconsumer.service;

          import?org.springframework.amqp.core.ExchangeTypes;
          import?org.springframework.amqp.rabbit.annotation.Exchange;
          import?org.springframework.amqp.rabbit.annotation.Queue;
          import?org.springframework.amqp.rabbit.annotation.QueueBinding;
          import?org.springframework.amqp.rabbit.annotation.RabbitListener;
          import?org.springframework.stereotype.Service;

          /**
          ?*?@QueueBinding?value屬性:用于綁定一個(gè)隊(duì)列。
          ?*?@Queue去查找一個(gè)名字為value屬性中的值得隊(duì)列,如果沒有則創(chuàng)建,如果有則返回
          ?*/

          @Service
          public?class?InfoReceiverService?{

          ????/**
          ?????*?添加一個(gè)能夠處理消息的方法
          ?????*/

          ????@RabbitListener(bindings?=?@QueueBinding(
          ????????????value?=?@Queue(value?="${mq.config.queue_name.info}"),
          ????????????exchange?=?@Exchange(value?=?"${mq.config.exchange_name}",type?=?ExchangeTypes.TOPIC),
          ????????????key?=?"*.log.info"
          ????))
          ????public?void?process(String?msg){
          ????????System.out.println(msg+"?Info...........");

          ????}
          }
          --
          package?com.sowhat.mqconsumer.service;

          import?org.springframework.amqp.core.ExchangeTypes;
          import?org.springframework.amqp.rabbit.annotation.Exchange;
          import?org.springframework.amqp.rabbit.annotation.Queue;
          import?org.springframework.amqp.rabbit.annotation.QueueBinding;
          import?org.springframework.amqp.rabbit.annotation.RabbitListener;
          import?org.springframework.stereotype.Service;

          /**
          ?*?@QueueBinding?value屬性:用于綁定一個(gè)隊(duì)列。
          ?*?@Queue去查找一個(gè)名字為value屬性中的值得隊(duì)列,如果沒有則創(chuàng)建,如果有則返回
          ?*/

          @Service
          public?class?LogsReceiverService?{

          ????/**
          ?????*?添加一個(gè)能夠處理消息的方法
          ?????*/

          ????@RabbitListener(bindings?=?@QueueBinding(
          ????????????value?=?@Queue(value?="${mq.config.queue_name.logs}"),
          ????????????exchange?=?@Exchange(value?=?"${mq.config.exchange_name}",type?=?ExchangeTypes.TOPIC),
          ????????????key?=?"*.log.*"
          ????))
          ????public?void?process(String?msg){
          ????????System.out.println(msg+"?Error...........");
          ????}
          }

          詳細(xì)安裝跟代碼看參考下載:

          總結(jié)

          如果需要指定模式一般是在消費(fèi)者端設(shè)置,靈活性調(diào)節(jié)。

          模式生產(chǎn)者Queue生產(chǎn)者exchange生產(chǎn)者routingKey消費(fèi)者exchange消費(fèi)者queueroutingKey
          Simple(簡(jiǎn)單模式少用)指定不指定不指定不指定指定不指定
          WorkQueue(多個(gè)消費(fèi)者少用)指定不指定不指定不指定指定不指定
          fanout(publish/subscribe模式)不指定指定不指定指定指定不指定
          direct(路由模式)不指定指定指定指定指定消費(fèi)者routingKey精確指定多個(gè)
          topic(主題模糊匹配)不指定指定指定指定指定消費(fèi)者routingKey可以進(jìn)行模糊匹配



          長(zhǎng)按關(guān)注,學(xué)習(xí)Java

          瀏覽 27
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  大鸡巴在线看 | 国产一卡二卡三卡四卡在线观看 | 午夜久久精品嫖妓av一区二区三区 | 日韩精品视频一区二区三区 | 伊人大久热|