速入! 3W字帶你迅速上手RabbitMQ
?高清思維導(dǎo)圖已同步Git:https://github.com/SoWhat1412/xmindfile
?

1. 消息隊(duì)列解決了什么問題
消息中間件是目前比較流行的一個(gè)中間件,其中RabbitMQ更是占有一定的市場(chǎng)份額,主要用來做異步處理、應(yīng)用解耦、流量削峰、日志處理等等方面。
1. 異步處理
一個(gè)用戶登陸網(wǎng)址注冊(cè),然后系統(tǒng)發(fā)短信跟郵件告知注冊(cè)成功,一般有三種解決方法。
串行到依次執(zhí)行,問題是用戶注冊(cè)后就可以使用了,沒必要等驗(yàn)證碼跟郵件。 注冊(cè)成功后,郵件跟驗(yàn)證碼用并行等方式執(zhí)行,問題是郵件跟驗(yàn)證碼是非重要的任務(wù),系統(tǒng)注冊(cè)還要等這倆完成么? 基于異步MQ的處理,用戶注冊(cè)成功后直接把信息異步發(fā)送到MQ中,然后郵件系統(tǒng)跟驗(yàn)證碼系統(tǒng)主動(dòng)去拉取數(shù)據(jù)。 
2. 應(yīng)用解耦
比如我們有一個(gè)訂單系統(tǒng),還要一個(gè)庫存系統(tǒng),用戶下訂單了就要調(diào)用下庫存系統(tǒng)來處理,直接調(diào)用到話庫存系統(tǒng)出現(xiàn)問題咋辦呢?
3. 流量削峰
舉辦一個(gè) 秒殺活動(dòng),如何較好到設(shè)計(jì)?服務(wù)層直接接受瞬間搞密度訪問絕對(duì)不可以起碼要加入一個(gè)MQ。
4. 日志處理
用戶通過WebUI訪問發(fā)送請(qǐng)求到時(shí)候后端如何接受跟處理呢一般?
2. RabbitMQ 安裝跟配置
官網(wǎng):https://www.rabbitmq.com/download.html
開發(fā)語言:https://www.erlang.org/
正式到安裝跟允許需要Erlang跟RabbitMQ倆版本之間相互兼容!我這里圖省事直接用Docker 拉取鏡像了。下載:
開啟:管理頁面 默認(rèn)賬號(hào):guest ?默認(rèn)密碼:guest 。Docker啟動(dòng)時(shí)候可以指定賬號(hào)密碼對(duì)外端口以及
docker?run?-d?--hostname?my-rabbit?--name?rabbit?-e?RABBITMQ_DEFAULT_USER=admin?-e?RABBITMQ_DEFAULT_PASS=admin?-p?15672:15672?-p?5672:5672?-p?25672:25672?-p?61613:61613?-p?1883:1883?rabbitmq:management?
啟動(dòng):
用戶添加:
vitrual hosts 相當(dāng)于mysql中的DB。創(chuàng)建一個(gè)virtual hosts,一般以/ 開頭。
對(duì)用戶進(jìn)行授權(quán),點(diǎn)擊/vhost_mmr,
至于WebUI多點(diǎn)點(diǎn)即可了解。
3. 實(shí)戰(zhàn)
RabbitMQ 官網(wǎng)支持任務(wù)模式:https://www.rabbitmq.com/getstarted.htm
l創(chuàng)建Maven項(xiàng)目導(dǎo)入必要依賴:
????<dependencies>
????????<dependency>
????????????<groupId>com.rabbitmqgroupId>
????????????<artifactId>amqp-clientartifactId>
????????????<version>4.0.2version>
????????dependency>
????????<dependency>
????????????<groupId>org.slf4jgroupId>
????????????<artifactId>slf4j-apiartifactId>
????????????<version>1.7.10version>
????????dependency>
????????<dependency>
????????????<groupId>org.slf4jgroupId>
????????????<artifactId>slf4j-log4j12artifactId>
????????????<version>1.7.5version>
????????dependency>
????????<dependency>
????????????<groupId>log4jgroupId>
????????????<artifactId>log4jartifactId>
????????????<version>1.2.17version>
????????dependency>
????????<dependency>
????????????<groupId>junitgroupId>
????????????<artifactId>junitartifactId>
????????????<version>4.11version>
????????dependency>
????dependencies>
0. 獲取MQ連接
package?com.sowhat.mq.util;
import?com.rabbitmq.client.Connection;
import?com.rabbitmq.client.ConnectionFactory;
import?java.io.IOException;
import?java.util.concurrent.TimeoutException;
public?class?ConnectionUtils?{
????/**
?????*?連接器
?????*?@return
?????*?@throws?IOException
?????*?@throws?TimeoutException
?????*/
????public?static?Connection?getConnection()?throws?IOException,?TimeoutException?{
????????ConnectionFactory?factory?=?new?ConnectionFactory();
????????factory.setHost("127.0.0.1");
????????factory.setPort(5672);
????????factory.setVirtualHost("/vhost_mmr");
????????factory.setUsername("user_mmr");
????????factory.setPassword("sowhat");
????????Connection?connection?=?factory.newConnection();
????????return?connection;
????}
}
1. 簡(jiǎn)單隊(duì)列
P:Producer 消息的生產(chǎn)者
中間:Queue消息隊(duì)列
C:Consumer 消息的消費(fèi)者
package?com.sowhat.mq.simple;
import?com.rabbitmq.client.AMQP;
import?com.rabbitmq.client.Channel;
import?com.rabbitmq.client.Connection;
import?com.sowhat.mq.util.ConnectionUtils;
import?java.io.IOException;
import?java.util.concurrent.TimeoutException;
public?class?Send?{
????public?static?final?String?QUEUE_NAME?=?"test_simple_queue";
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
????????//?獲取一個(gè)連接
????????Connection?connection?=?ConnectionUtils.getConnection();
????????//?從連接獲取一個(gè)通道
????????Channel?channel?=?connection.createChannel();
????????//?創(chuàng)建隊(duì)列聲明
????????AMQP.Queue.DeclareOk?declareOk?=?channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
????????String?msg?=?"hello?Simple";
????????//?exchange,隊(duì)列,參數(shù),消息字節(jié)體
????????channel.basicPublish("",?QUEUE_NAME,?null,?msg.getBytes());
????????System.out.println("--send?msg:"?+?msg);
????????channel.close();
????????connection.close();
????}
}
---
package?com.sowhat.mq.simple;
import?com.rabbitmq.client.*;
import?com.sowhat.mq.util.ConnectionUtils;
import?java.io.IOException;
import?java.util.concurrent.TimeoutException;
/**
?*?消費(fèi)者獲取消息
?*/
public?class?Recv?{
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException,?InterruptedException?{
????????newApi();
????????oldApi();
????}
????private?static?void?newApi()?throws?IOException,?TimeoutException?{
????????//?創(chuàng)建連接
????????Connection?connection?=?ConnectionUtils.getConnection();
????????//?創(chuàng)建頻道
????????Channel?channel?=?connection.createChannel();
????????//?隊(duì)列聲明??隊(duì)列名,是否持久化,是否獨(dú)占模式,無消息后是否自動(dòng)刪除,消息攜帶參數(shù)
????????channel.queueDeclare(Send.QUEUE_NAME,false,false,false,null);
????????//?定義消費(fèi)者
????????DefaultConsumer?defaultConsumer?=?new?DefaultConsumer(channel)?{
????????????@Override??//?事件模型,消息來了會(huì)觸發(fā)該函數(shù)
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????String?s?=?new?String(body,?"utf-8");
????????????????System.out.println("---new?api?recv:"?+?s);
????????????}
????????};
????????//?監(jiān)聽隊(duì)列
????????channel.basicConsume(Send.QUEUE_NAME,true,defaultConsumer);
????}
????//?老方法?消費(fèi)者 MQ 在3。4以下?用次方法,
????private?static?void?oldApi()?throws?IOException,?TimeoutException,?InterruptedException?{
????????//?創(chuàng)建連接
????????Connection?connection?=?ConnectionUtils.getConnection();
????????//?創(chuàng)建頻道
????????Channel?channel?=?connection.createChannel();
????????//?定義隊(duì)列消費(fèi)者
????????QueueingConsumer?consumer?=?new?QueueingConsumer(channel);
????????//監(jiān)聽隊(duì)列
????????channel.basicConsume(Send.QUEUE_NAME,?true,?consumer);
????????while?(true)?{
????????????//?發(fā)貨體
????????????QueueingConsumer.Delivery?delivery?=?consumer.nextDelivery();
????????????byte[]?body?=?delivery.getBody();
????????????String?s?=?new?String(body);
????????????System.out.println("---Recv:"?+?s);
????????}
????}
}
右上角有可以設(shè)置頁面刷新頻率,然后可以在UI界面直接手動(dòng)消費(fèi)掉,如下圖:
簡(jiǎn)單隊(duì)列的不足:耦合性過高,生產(chǎn)者一一對(duì)應(yīng)消費(fèi)者,如果有多個(gè)消費(fèi)者想消費(fèi)隊(duì)列中信息就無法實(shí)現(xiàn)了。
2. WorkQueue 工作隊(duì)列
Simple隊(duì)列中只能一一對(duì)應(yīng)的生產(chǎn)消費(fèi),實(shí)際開發(fā)中生產(chǎn)者發(fā)消息很簡(jiǎn)單,而消費(fèi)者要跟業(yè)務(wù)結(jié)合,消費(fèi)者接受到消息后要處理從而會(huì)耗時(shí)。「可能會(huì)出現(xiàn)隊(duì)列中出現(xiàn)消息積壓」。所以如果多個(gè)消費(fèi)者可以加速消費(fèi)。
1. round robin 輪詢分發(fā)
代碼編程一個(gè)生產(chǎn)者兩個(gè)消費(fèi)者:
package?com.sowhat.mq.work;
import?com.rabbitmq.client.AMQP;
import?com.rabbitmq.client.Channel;
import?com.rabbitmq.client.Connection;
import?com.sowhat.mq.util.ConnectionUtils;
import?java.io.IOException;
import?java.util.concurrent.TimeoutException;
public?class?Send?{
????public?static?final?String??QUEUE_NAME?=?"test_work_queue";
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException,?InterruptedException?{
????????//?獲取連接
????????Connection?connection?=?ConnectionUtils.getConnection();
????????//?獲取?channel
????????Channel?channel?=?connection.createChannel();
????????//?聲明隊(duì)列
????????AMQP.Queue.DeclareOk?declareOk?=?channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
????????for?(int?i?=?0;?i?<50?;?i++)?{
????????????String?msg?=?"hello-"?+?i;
????????????System.out.println("WQ?send?"?+?msg);
????????????channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
????????????Thread.sleep(i*20);
????????}
????????channel.close();
????????connection.close();
????}
}
---
package?com.sowhat.mq.work;
import?com.rabbitmq.client.*;
import?com.sowhat.mq.util.ConnectionUtils;
import?java.io.IOException;
import?java.util.concurrent.TimeoutException;
public?class?Recv1?{
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
????????//?獲取連接
????????Connection?connection?=?ConnectionUtils.getConnection();
????????//?獲取通道
????????Channel?channel?=?connection.createChannel();
????????//?聲明隊(duì)列
????????channel.queueDeclare(Send.QUEUE_NAME,?false,?false,?false,?null);
????????//定義消費(fèi)者
????????DefaultConsumer?consumer?=?new?DefaultConsumer(channel)?{
????????????@Override?//?事件觸發(fā)機(jī)制
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????String?s?=?new?String(body,?"utf-8");
????????????????System.out.println("【1】:"?+?s);
????????????????try?{
????????????????????Thread.sleep(2000);
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}?finally?{
????????????????????System.out.println("【1】?done");
????????????????}
????????????}
????????};
????????boolean?autoAck?=?true;
????????channel.basicConsume(Send.QUEUE_NAME,?autoAck,?consumer);
????}
}
---
package?com.sowhat.mq.work;
import?com.rabbitmq.client.*;
import?com.sowhat.mq.util.ConnectionUtils;
import?java.io.IOException;
import?java.util.concurrent.TimeoutException;
public?class?Recv2?{
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
????????//?獲取連接
????????Connection?connection?=?ConnectionUtils.getConnection();
????????//?獲取通道
????????Channel?channel?=?connection.createChannel();
????????//?聲明隊(duì)列
????????channel.queueDeclare(Send.QUEUE_NAME,?false,?false,?false,?null);
????????//定義消費(fèi)者
????????DefaultConsumer?consumer?=?new?DefaultConsumer(channel)?{
????????????@Override?//?事件觸發(fā)機(jī)制
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????String?s?=?new?String(body,?"utf-8");
????????????????System.out.println("【2】:"?+?s);
????????????????try?{
????????????????????Thread.sleep(1000?);
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}?finally?{
????????????????????System.out.println("【2】?done");
????????????????}
????????????}
????????};
????????boolean?autoAck?=?true;
????????channel.basicConsume(Send.QUEUE_NAME,?autoAck,?consumer);
????}
}
現(xiàn)象:消費(fèi)者1 跟消費(fèi)者2 處理的數(shù)據(jù)量完全一樣的個(gè)數(shù):消費(fèi)者1:處理偶數(shù)
消費(fèi)者2:處理奇數(shù)
這種方式叫輪詢分發(fā)(round-robin)結(jié)果就是不管兩個(gè)消費(fèi)者誰忙,「數(shù)據(jù)總是你一個(gè)我一個(gè)」,MQ 給兩個(gè)消費(fèi)發(fā)數(shù)據(jù)的時(shí)候是不知道消費(fèi)者性能的,默認(rèn)就是雨露均沾。此時(shí) autoAck = true。
2. 公平分發(fā) fair dipatch
如果要實(shí)現(xiàn)公平分發(fā),要讓消費(fèi)者消費(fèi)完畢一條數(shù)據(jù)后就告知MQ,再讓MQ發(fā)數(shù)據(jù)即可。自動(dòng)應(yīng)答要關(guān)閉!
package?com.sowhat.mq.work;
import?com.rabbitmq.client.AMQP;
import?com.rabbitmq.client.Channel;
import?com.rabbitmq.client.Connection;
import?com.sowhat.mq.util.ConnectionUtils;
import?java.io.IOException;
import?java.util.concurrent.TimeoutException;
public?class?Send?{
????public?static?final?String??QUEUE_NAME?=?"test_work_queue";
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException,?InterruptedException?{
????????//?獲取連接
????????Connection?connection?=?ConnectionUtils.getConnection();
????????//?獲取?channel
????????Channel?channel?=?connection.createChannel();
????????//?s聲明隊(duì)列
????????AMQP.Queue.DeclareOk?declareOk?=?channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
????????//?每個(gè)消費(fèi)者發(fā)送確認(rèn)消息之前,消息隊(duì)列不發(fā)送下一個(gè)消息到消費(fèi)者,一次只發(fā)送一個(gè)消息
????????//?從而限制一次性發(fā)送給消費(fèi)者到消息不得超過1個(gè)。
????????int?perfetchCount?=?1;
????????channel.basicQos(perfetchCount);
????????for?(int?i?=?0;?i?<50?;?i++)?{
????????????String?msg?=?"hello-"?+?i;
????????????System.out.println("WQ?send?"?+?msg);
????????????channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
????????????Thread.sleep(i*20);
????????}
????????channel.close();
????????connection.close();
????}
}
---
package?com.sowhat.mq.work;
import?com.rabbitmq.client.*;
import?com.sowhat.mq.util.ConnectionUtils;
import?java.io.IOException;
import?java.util.concurrent.TimeoutException;
public?class?Recv1?{
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
????????//?獲取連接
????????Connection?connection?=?ConnectionUtils.getConnection();
????????//?獲取通道
????????final?Channel?channel?=?connection.createChannel();
????????//?聲明隊(duì)列
????????channel.queueDeclare(Send.QUEUE_NAME,?false,?false,?false,?null);
????????//?保證一次只分發(fā)一個(gè)
????????channel.basicQos(1);
????????//定義消費(fèi)者
????????DefaultConsumer?consumer?=?new?DefaultConsumer(channel)?{
????????????@Override?//?事件觸發(fā)機(jī)制
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????String?s?=?new?String(body,?"utf-8");
????????????????System.out.println("【1】:"?+?s);
????????????????try?{
????????????????????Thread.sleep(2000);
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}?finally?{
????????????????????System.out.println("【1】?done");
????????????????????//?手動(dòng)回執(zhí)
????????????????????channel.basicAck(envelope.getDeliveryTag(),false);
????????????????}
????????????}
????????};
????????//?自動(dòng)應(yīng)答
????????boolean?autoAck?=?false;
????????channel.basicConsume(Send.QUEUE_NAME,?autoAck,?consumer);
????}
}
---
package?com.sowhat.mq.work;
import?com.rabbitmq.client.*;
import?com.sowhat.mq.util.ConnectionUtils;
import?java.io.IOException;
import?java.util.concurrent.TimeoutException;
public?class?Recv2?{
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
????????//?獲取連接
????????Connection?connection?=?ConnectionUtils.getConnection();
????????//?獲取通道
????????final?Channel?channel?=?connection.createChannel();
????????//?聲明隊(duì)列
????????channel.queueDeclare(Send.QUEUE_NAME,?false,?false,?false,?null);
????????//?保證一次只分發(fā)一個(gè)
????????channel.basicQos(1);
????????//定義消費(fèi)者
????????DefaultConsumer?consumer?=?new?DefaultConsumer(channel)?{
????????????@Override?//?事件觸發(fā)機(jī)制
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????String?s?=?new?String(body,?"utf-8");
????????????????System.out.println("【2】:"?+?s);
????????????????try?{
????????????????????Thread.sleep(1000);
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}?finally?{
????????????????????System.out.println("【2】?done");
????????????????????//?手動(dòng)回執(zhí)
????????????????????channel.basicAck(envelope.getDeliveryTag(),false);
????????????????}
????????????}
????????};
????????//?自動(dòng)應(yīng)答
????????boolean?autoAck?=?false;
????????channel.basicConsume(Send.QUEUE_NAME,?autoAck,?consumer);
????}
}
結(jié)果:實(shí)現(xiàn)了公平分發(fā),消費(fèi)者2 是消費(fèi)者1消費(fèi)數(shù)量的2倍。
3. publish/subscribe 發(fā)布訂閱模式
類似公眾號(hào)的訂閱跟發(fā)布,無需指定routingKey:
解讀:
一個(gè)生產(chǎn)者多個(gè)消費(fèi)者 每一個(gè)消費(fèi)者都有一個(gè)自己的隊(duì)列 生產(chǎn)者沒有把消息直接發(fā)送到隊(duì)列而是發(fā)送到了 交換機(jī)轉(zhuǎn)化器(exchange)。每一個(gè)隊(duì)列都要綁定到交換機(jī)上。 生產(chǎn)者發(fā)送的消息經(jīng)過交換機(jī)到達(dá)隊(duì)列,從而實(shí)現(xiàn)一個(gè)消息被多個(gè)消費(fèi)者消費(fèi)。
生產(chǎn)者:
package?com.sowhat.mq.ps;
import?com.rabbitmq.client.Channel;
import?com.rabbitmq.client.Connection;
import?com.sowhat.mq.util.ConnectionUtils;
import?java.io.IOException;
import?java.util.concurrent.TimeoutException;
public?class?Send?{
????public?static?final?String?EXCHANGE_NAME?=?"test_exchange_fanout";
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
????????Connection?connection?=?ConnectionUtils.getConnection();
????????Channel?channel?=?connection.createChannel();
????????//聲明交換機(jī)
????????channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//?分發(fā)=?fanout
????????//?發(fā)送消息
????????String?msg?=?"hello?ps?";
????????channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
????????System.out.println("Send:"?+?msg);
????????channel.close();
????????connection.close();
????}
}

消息哪兒去了?丟失了,在RabbitMQ中只有隊(duì)列有存儲(chǔ)能力,「因?yàn)檫@個(gè)時(shí)候隊(duì)列還沒有綁定到交換機(jī) 所以消息丟失了」。消費(fèi)者:
package?com.sowhat.mq.ps;
import?com.rabbitmq.client.*;
import?com.sowhat.mq.util.ConnectionUtils;
import?java.io.IOException;
import?java.util.concurrent.TimeoutException;
public?class?Recv1?{
????public?static?final?String??QUEUE_NAME?=?"test_queue_fanout_email";
????public?static?final?String?EXCHANGE_NAME?=?"test_exchange_fanout";
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
????????Connection?connection?=?ConnectionUtils.getConnection();
????????final?Channel?channel?=?connection.createChannel();
????????//?隊(duì)列聲明
????????channel.queueDeclare(QUEUE_NAME,false,false,false,null);
????????//?綁定隊(duì)列到交換機(jī)轉(zhuǎn)發(fā)器
????????channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,""?);
????????//?保證一次只分發(fā)一個(gè)
????????channel.basicQos(1);
????????//定義消費(fèi)者
????????DefaultConsumer?consumer?=?new?DefaultConsumer(channel)?{
????????????@Override?//?事件觸發(fā)機(jī)制
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????String?s?=?new?String(body,?"utf-8");
????????????????System.out.println("【1】:"?+?s);
????????????????try?{
????????????????????Thread.sleep(2000);
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}?finally?{
????????????????????System.out.println("【1】?done");
????????????????????//?手動(dòng)回執(zhí)
????????????????????channel.basicAck(envelope.getDeliveryTag(),false);
????????????????}
????????????}
????????};
????????//?自動(dòng)應(yīng)答
????????boolean?autoAck?=?false;
????????channel.basicConsume(QUEUE_NAME,?autoAck,?consumer);
????}
}
---
package?com.sowhat.mq.ps;
import?com.rabbitmq.client.*;
import?com.sowhat.mq.util.ConnectionUtils;
import?java.io.IOException;
import?java.util.concurrent.TimeoutException;
public?class?Recv2?{
????public?static?final?String??QUEUE_NAME?=?"test_queue_fanout_sms";
????public?static?final?String?EXCHANGE_NAME?=?"test_exchange_fanout";
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
????????Connection?connection?=?ConnectionUtils.getConnection();
????????final?Channel?channel?=?connection.createChannel();
????????//?隊(duì)列聲明
????????channel.queueDeclare(QUEUE_NAME,false,false,false,null);
????????//?綁定隊(duì)列到交換機(jī)轉(zhuǎn)發(fā)器
????????channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,""?);
????????//?保證一次只分發(fā)一個(gè)
????????channel.basicQos(1);
????????//定義消費(fèi)者
????????DefaultConsumer?consumer?=?new?DefaultConsumer(channel)?{
????????????@Override?//?事件觸發(fā)機(jī)制
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????String?s?=?new?String(body,?"utf-8");
????????????????System.out.println("【2】:"?+?s);
????????????????try?{
????????????????????Thread.sleep(1000);
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}?finally?{
????????????????????System.out.println("【2】?done");
????????????????????//?手動(dòng)回執(zhí)
????????????????????channel.basicAck(envelope.getDeliveryTag(),false);
????????????????}
????????????}
????????};
????????//?自動(dòng)應(yīng)答
????????boolean?autoAck?=?false;
????????channel.basicConsume(QUEUE_NAME,?autoAck,?consumer);
????}
}
「同時(shí)還可以自己手動(dòng)的添加一個(gè)隊(duì)列監(jiān)控到該exchange」
4. routing 路由選擇 通配符模式
Exchange(交換機(jī),轉(zhuǎn)發(fā)器):「一方面接受生產(chǎn)者消息,另一方面是向隊(duì)列推送消息」。匿名轉(zhuǎn)發(fā)用 "" ?表示,比如前面到簡(jiǎn)單隊(duì)列跟WorkQueue。fanout:不處理路由鍵。「不需要指定routingKey」,我們只需要把隊(duì)列綁定到交換機(jī), 「消息就會(huì)被發(fā)送到所有到隊(duì)列中」。direct:處理路由鍵,「需要指定routingKey」,此時(shí)生產(chǎn)者發(fā)送數(shù)據(jù)到時(shí)候會(huì)指定key,任務(wù)隊(duì)列也會(huì)指定key,只有key一樣消息才會(huì)被傳送到隊(duì)列中。如下圖
package?com.sowhat.mq.routing;
import?com.rabbitmq.client.Channel;
import?com.rabbitmq.client.Connection;
import?com.sowhat.mq.util.ConnectionUtils;
import?java.io.IOException;
import?java.util.concurrent.TimeoutException;
public?class?Send?{
????public?static?final?String??EXCHANGE_NAME?=?"test_exchange_direct";
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
????????Connection?connection?=?ConnectionUtils.getConnection();
????????Channel?channel?=?connection.createChannel();
????????//?exchange
????????channel.exchangeDeclare(EXCHANGE_NAME,"direct");
????????String?msg?=?"hello?info!";
????????//?可以指定類型
????????String?routingKey?=?"info";
????????channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());
????????System.out.println("Send?:?"?+?msg);
????????channel.close();
????????connection.close();
????}
}
---
package?com.sowhat.mq.routing;
import?com.rabbitmq.client.*;
import?com.sowhat.mq.util.ConnectionUtils;
import?java.io.IOException;
import?java.util.concurrent.TimeoutException;
public?class?Recv1?{
????public?static?final?String??EXCHANGE_NAME?=?"test_exchange_direct";
????public?static?final?String?QUEUE_NAME?=?"test_queue_direct_1";
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
????????Connection?connection?=?ConnectionUtils.getConnection();
????????final?Channel?channel?=?connection.createChannel();
????????channel.queueDeclare(QUEUE_NAME,false,false,false,null);
????????channel.basicQos(1);
????????channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
????????//定義消費(fèi)者
????????DefaultConsumer?consumer?=?new?DefaultConsumer(channel)?{
????????????@Override?//?事件觸發(fā)機(jī)制
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????String?s?=?new?String(body,?"utf-8");
????????????????System.out.println("【1】:"?+?s);
????????????????try?{
????????????????????Thread.sleep(2000);
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}?finally?{
????????????????????System.out.println("【1】?done");
????????????????????//?手動(dòng)回執(zhí)
????????????????????channel.basicAck(envelope.getDeliveryTag(),false);
????????????????}
????????????}
????????};
????????//?自動(dòng)應(yīng)答
????????boolean?autoAck?=?false;
????????channel.basicConsume(QUEUE_NAME,?autoAck,?consumer);
????}
}
---
package?com.sowhat.mq.routing;
import?com.rabbitmq.client.*;
import?com.sowhat.mq.util.ConnectionUtils;
import?java.io.IOException;
import?java.util.concurrent.TimeoutException;
public?class?Recv2?{
????public?static?final?String?EXCHANGE_NAME?=?"test_exchange_direct";
????public?static?final?String?QUEUE_NAME?=?"test_queue_direct_2";
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
????????Connection?connection?=?ConnectionUtils.getConnection();
????????final?Channel?channel?=?connection.createChannel();
????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
????????channel.basicQos(1);
????????//?綁定種類似?Key
????????channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?"error");
????????channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?"info");
????????channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?"warning");
????????//定義消費(fèi)者
????????DefaultConsumer?consumer?=?new?DefaultConsumer(channel)?{
????????????@Override?//?事件觸發(fā)機(jī)制
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????String?s?=?new?String(body,?"utf-8");
????????????????System.out.println("【2】:"?+?s);
????????????????try?{
????????????????????Thread.sleep(1000);
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}?finally?{
????????????????????System.out.println("【2】?done");
????????????????????//?手動(dòng)回執(zhí)
????????????????????channel.basicAck(envelope.getDeliveryTag(),?false);
????????????????}
????????????}
????????};
????????//?自動(dòng)應(yīng)答
????????boolean?autoAck?=?false;
????????channel.basicConsume(QUEUE_NAME,?autoAck,?consumer);
????}
}
WebUI:
缺點(diǎn):路由key必須要明確,無法實(shí)現(xiàn)規(guī)則性模糊匹配。
5. Topics 主題
將路由鍵跟某個(gè)模式匹配,# 表示匹配 >=1個(gè)字符, *表示匹配一個(gè)。生產(chǎn)者會(huì)帶routingKey,但是消費(fèi)者的MQ會(huì)帶模糊routingKey。
商品:發(fā)布、刪除、修改、查詢。
package?com.sowhat.mq.topic;
import?com.rabbitmq.client.Channel;
import?com.rabbitmq.client.Connection;
import?com.sowhat.mq.util.ConnectionUtils;
import?java.io.IOException;
import?java.util.concurrent.TimeoutException;
public?class?Send?{
????public?static?final?String?EXCHANGE_NAME?=?"test_exchange_topic";
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
????????Connection?connection?=?ConnectionUtils.getConnection();
????????Channel?channel?=?connection.createChannel();
????????//?exchange
????????channel.exchangeDeclare(EXCHANGE_NAME,?"topic");
????????String?msg?=?"商品!";
????????//?可以指定類型
????????String?routingKey?=?"goods.find";
????????channel.basicPublish(EXCHANGE_NAME,?routingKey,?null,?msg.getBytes());
????????System.out.println("Send?:?"?+?msg);
????????channel.close();
????????connection.close();
????}
}
---
package?com.sowhat.mq.topic;
import?com.rabbitmq.client.*;
import?com.sowhat.mq.util.ConnectionUtils;
import?java.io.IOException;
import?java.util.concurrent.TimeoutException;
public?class?Recv1?{
????public?static?final?String??EXCHANGE_NAME?=?"test_exchange_topic";
????public?static?final?String?QUEUE_NAME?=?"test_queue_topic_1";
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
????????Connection?connection?=?ConnectionUtils.getConnection();
????????final?Channel?channel?=?connection.createChannel();
????????channel.queueDeclare(QUEUE_NAME,false,false,false,null);
????????channel.basicQos(1);
????????channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.add");
????????//定義消費(fèi)者
????????DefaultConsumer?consumer?=?new?DefaultConsumer(channel)?{
????????????@Override?//?事件觸發(fā)機(jī)制
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????String?s?=?new?String(body,?"utf-8");
????????????????System.out.println("【1】:"?+?s);
????????????????try?{
????????????????????Thread.sleep(2000);
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}?finally?{
????????????????????System.out.println("【1】?done");
????????????????????//?手動(dòng)回執(zhí)
????????????????????channel.basicAck(envelope.getDeliveryTag(),false);
????????????????}
????????????}
????????};
????????//?自動(dòng)應(yīng)答
????????boolean?autoAck?=?false;
????????channel.basicConsume(QUEUE_NAME,?autoAck,?consumer);
????}
}
---
package?com.sowhat.mq.topic;
import?com.rabbitmq.client.*;
import?com.sowhat.mq.util.ConnectionUtils;
import?java.io.IOException;
import?java.util.concurrent.TimeoutException;
public?class?Recv2?{
????public?static?final?String?EXCHANGE_NAME?=?"test_exchange_topic";
????public?static?final?String?QUEUE_NAME?=?"test_queue_topic_2";
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
????????Connection?connection?=?ConnectionUtils.getConnection();
????????final?Channel?channel?=?connection.createChannel();
????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
????????channel.basicQos(1);
????????//?此乃重點(diǎn)
????????channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?"goods.#");
????????//定義消費(fèi)者
????????DefaultConsumer?consumer?=?new?DefaultConsumer(channel)?{
????????????@Override?//?事件觸發(fā)機(jī)制
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????String?s?=?new?String(body,?"utf-8");
????????????????System.out.println("【2】:"?+?s);
????????????????try?{
????????????????????Thread.sleep(1000);
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}?finally?{
????????????????????System.out.println("【2】?done");
????????????????????//?手動(dòng)回執(zhí)
????????????????????channel.basicAck(envelope.getDeliveryTag(),?false);
????????????????}
????????????}
????????};
????????//?自動(dòng)應(yīng)答
????????boolean?autoAck?=?false;
????????channel.basicConsume(QUEUE_NAME,?autoAck,?consumer);
????}
}
6. MQ的持久化跟非持久化
因?yàn)橄⒃趦?nèi)存中,如果MQ掛了那么消息也丟失了,所以應(yīng)該考慮MQ的持久化。MQ是支持持久化的,
//?聲明隊(duì)列
channel.queueDeclare(Send.QUEUE_NAME,?false,?false,?false,?null);
????/**
?????*?Declare?a?queue
?????*?@see?com.rabbitmq.client.AMQP.Queue.Declare
?????*?@see?com.rabbitmq.client.AMQP.Queue.DeclareOk
?????*?@param?queue?the?name?of?the?queue
?????*?@param?durable?true?if?we?are?declaring?a?durable?queue?(the?queue?will?survive?a?server?restart)
?????*?@param?exclusive?true?if?we?are?declaring?an?exclusive?queue?(restricted?to?this?connection)
?????*?@param?autoDelete?true?if?we?are?declaring?an?autodelete?queue?(server?will?delete?it?when?no?longer?in?use)
?????*?@param?arguments?other?properties?(construction?arguments)?for?the?queue
?????*?@return?a?declaration-confirm?method?to?indicate?the?queue?was?successfully?declared
?????*?@throws?java.io.IOException?if?an?error?is?encountered
?????*/
????Queue.DeclareOk?queueDeclare(String?queue,?boolean?durable,?boolean?exclusive,?boolean?autoDelete,
?????????????????????????????????Map?arguments) ?throws?IOException;
boolean durable就是表明是否可以持久化,如果我們將程序中的durable = false改為true是不可以的!因?yàn)槲覀円呀?jīng)定義過的test_work_queue,這個(gè)queue已聲明為未持久化的。結(jié)論:MQ 不允許修改一個(gè)已經(jīng)存在的隊(duì)列參數(shù)。
7. 消費(fèi)者端手動(dòng)跟自動(dòng)確認(rèn)消息

????????//?自動(dòng)應(yīng)答
????????boolean?autoAck?=?false;
????????channel.basicConsume(Send.QUEUE_NAME,?autoAck,?consumer);
當(dāng)MQ發(fā)送數(shù)據(jù)個(gè)消費(fèi)者后,消費(fèi)者要對(duì)收到對(duì)信息應(yīng)答給MQ。
如果autoAck = true 表示「自動(dòng)確認(rèn)模式」,一旦MQ把消息分發(fā)給消費(fèi)者就會(huì)把消息從內(nèi)存中刪除。如果消費(fèi)者收到消息但是還沒有消費(fèi)完而MQ中數(shù)據(jù)已刪除則會(huì)導(dǎo)致丟失了正在處理對(duì)消息。
如果autoAck = false表示「手動(dòng)確認(rèn)模式」,如果有個(gè)消費(fèi)者掛了,MQ因?yàn)闆]有收到回執(zhí)信息可以把該信息再發(fā)送給其他對(duì)消費(fèi)者。
MQ支持消息應(yīng)答(Message acknowledgement),消費(fèi)者發(fā)送一個(gè)消息應(yīng)答告訴MQ這個(gè)消息已經(jīng)被消費(fèi)了,MQ才從內(nèi)存中刪除。消息應(yīng)答模式「默認(rèn)為 false」。
8. RabbitMQ生產(chǎn)者端消息確認(rèn)機(jī)制(事務(wù) + confirm)
在RabbitMQ中我們可以通過持久化來解決MQ服務(wù)器異常的數(shù)據(jù)丟失問題,但是「生產(chǎn)者如何確保數(shù)據(jù)發(fā)送到MQ了」?默認(rèn)情況下生產(chǎn)者也是不知道的。如何解決 呢?
1. AMQP事務(wù)
第一種方式AMQP實(shí)現(xiàn)了事務(wù)機(jī)制,類似mysql的事務(wù)機(jī)制。txSelect:用戶將當(dāng)前channel設(shè)置為transition模式。txCommit:用于提交事務(wù)。txRollback:用于回滾事務(wù)。
以上都是對(duì)生產(chǎn)者對(duì)操作。
package?com.sowhat.mq.tx;
import?com.rabbitmq.client.Channel;
import?com.rabbitmq.client.Connection;
import?com.sowhat.mq.util.ConnectionUtils;
import?java.io.IOException;
import?java.util.concurrent.TimeoutException;
public?class?TxSend?{
????public?static?final?String?QUEUE_NAME?=?"test_queue_tx";
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
????????Connection?connection?=?ConnectionUtils.getConnection();
????????Channel?channel?=?connection.createChannel();
????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
????????String?msg?=?"hello?tx?message";
????????try?{
????????????//開啟事務(wù)模式
????????????channel.txSelect();
????????????channel.basicPublish("",?QUEUE_NAME,?null,?msg.getBytes());
????????????int?x?=?1?/?0;
????????????
????????????//?提交事務(wù)
????????????channel.txCommit();
????????}?catch?(IOException?e)?{
????????????//?回滾
????????????channel.txRollback();
????????????System.out.println("send?message?rollback");
????????}?finally?{
????????????channel.close();
????????????connection.close();
????????}
????}
}
---
package?com.sowhat.mq.tx;
import?com.rabbitmq.client.*;
import?com.sowhat.mq.util.ConnectionUtils;
import?java.io.IOException;
import?java.util.concurrent.TimeoutException;
public?class?TxRecv?{
????public?static?final?String?QUEUE_NAME?=?"test_queue_tx";
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
????????Connection?connection?=?ConnectionUtils.getConnection();
????????Channel?channel?=?connection.createChannel();
????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
????????String?s?=?channel.basicConsume(QUEUE_NAME,?true,?new?DefaultConsumer(channel)?{
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????System.out.println("recv[tx]?msg:"?+?new?String(body,?"utf-8"));
????????????}
????????});
????????channel.close();
????????connection.close();
????}
}
缺點(diǎn)就是大量對(duì)請(qǐng)求嘗試然后失敗然后回滾,會(huì)降低MQ的吞吐量。
2. Confirm模式。
「生產(chǎn)者端confirm實(shí)現(xiàn)原理」生產(chǎn)者將信道設(shè)置為confirm模式,一旦信道進(jìn)入了confirm模式,所以該信道上發(fā)布的信息都會(huì)被派一個(gè)唯一的ID(從1開始),一旦消息被投遞到所有的匹配隊(duì)列后,Broker就回發(fā)送一個(gè)確認(rèn)給生產(chǎn)者(包含消息唯一ID),這就使得生產(chǎn)者知道消息已經(jīng)正確到達(dá)目的隊(duì)列了,如果消息跟隊(duì)列是可持久化的,那么確認(rèn)消息會(huì)在消息寫入到磁盤后才發(fā)出。broker回傳給生產(chǎn)者到確認(rèn)消息中deliver-tag域包含了確認(rèn)消息到序列號(hào),此外broker也可以設(shè)置basic.ack的multiple域,表示這個(gè)序列號(hào)之前所以信息都已經(jīng)得到處理。
Confirm模式最大的好處在于是異步的。第一條消息發(fā)送后不用一直等待回復(fù)后才發(fā)第二條消息。
開啟confirm模式:channel.confimSelect()編程模式:
1. 普通的發(fā)送一個(gè)消息后就 waitForConfirms()
package?com.sowhat.confirm;
import?com.rabbitmq.client.Channel;
import?com.rabbitmq.client.Connection;
import?com.sowhat.mq.util.ConnectionUtils;
import?java.io.IOException;
import?java.util.concurrent.TimeoutException;
public?class?Send1?{
????public?static?final?String?QUEUE_NAME?=?"test_queue_confirm1";
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException,?InterruptedException?{
????????Connection?connection?=?ConnectionUtils.getConnection();
????????Channel?channel?=?connection.createChannel();
????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
????????//?將channel模式設(shè)置為 confirm模式,注意設(shè)置這個(gè)不能設(shè)置為事務(wù)模式。
????????channel.confirmSelect();
????????String?msg?=?"hello?confirm?message";
????????channel.basicPublish("",?QUEUE_NAME,?null,?msg.getBytes());
????????if?(!channel.waitForConfirms())?{
????????????System.out.println("消息發(fā)送失敗");
????????}?else?{
????????????System.out.println("消息發(fā)送OK");
????????}
????????channel.close();
????????connection.close();
????}
}
---
package?com.sowhat.confirm;
import?com.rabbitmq.client.*;
import?com.sowhat.mq.util.ConnectionUtils;
import?java.io.IOException;
import?java.util.concurrent.TimeoutException;
public?class?Recv?{
????public?static?final?String?QUEUE_NAME?=?"test_queue_confirm1";
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
????????Connection?connection?=?ConnectionUtils.getConnection();
????????Channel?channel?=?connection.createChannel();
????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
????????String?s?=?channel.basicConsume(QUEUE_NAME,?true,?new?DefaultConsumer(channel)?{
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????System.out.println("recv[tx]?msg:"?+?new?String(body,?"utf-8"));
????????????}
????????});
????}
}
2. 批量的發(fā)一批數(shù)據(jù) waitForConfirms()
package?com.sowhat.confirm;
import?com.rabbitmq.client.Channel;
import?com.rabbitmq.client.Connection;
import?com.sowhat.mq.util.ConnectionUtils;
import?java.io.IOException;
import?java.util.concurrent.TimeoutException;
public?class?Send2?{
????public?static?final?String?QUEUE_NAME?=?"test_queue_confirm1";
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException,?InterruptedException?{
????????Connection?connection?=?ConnectionUtils.getConnection();
????????Channel?channel?=?connection.createChannel();
????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
????????//?將channel模式設(shè)置為 confirm模式,注意設(shè)置這個(gè)不能設(shè)置為事務(wù)模式。
????????channel.confirmSelect();
????????String?msg?=?"hello?confirm?message";
????????//?批量發(fā)送
????????for?(int?i?=?0;?i?10;?i++)?{
????????????channel.basicPublish("",?QUEUE_NAME,?null,?msg.getBytes());
????????}
????????//?確認(rèn)
????????if?(!channel.waitForConfirms())?{
????????????System.out.println("消息發(fā)送失敗");
????????}?else?{
????????????System.out.println("消息發(fā)送OK");
????????}
????????channel.close();
????????connection.close();
????}
}
---
接受信息跟上面一樣
3. 異步confirm模式,提供一個(gè)回調(diào)方法。
Channel對(duì)象提供的ConfirmListener()回調(diào)方法只包含deliveryTag(包含當(dāng)前發(fā)出消息序號(hào)),我們需要自己為每一個(gè)Channel維護(hù)一個(gè)unconfirm的消息序號(hào)集合,每publish一條數(shù)據(jù),集合中元素加1,每回調(diào)一次handleAck方法,unconfirm集合刪掉響應(yīng)的一條(multiple=false)或多條(multiple=true)記錄,從運(yùn)行效率來看,unconfirm集合最好采用有序集合SortedSet存儲(chǔ)結(jié)構(gòu)。

package?com.sowhat.mq.confirm;
import?com.rabbitmq.client.*;
import?com.sowhat.mq.util.ConnectionUtils;
import?java.io.IOException;
import?java.util.Collections;
import?java.util.SortedSet;
import?java.util.TreeSet;
import?java.util.concurrent.TimeoutException;
public?class?Send3?{
????public?static?final?String?QUEUE_NAME?=?"test_queue_confirm3";
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException,?InterruptedException?{
????????Connection?connection?=?ConnectionUtils.getConnection();
????????Channel?channel?=?connection.createChannel();
????????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);
????????//生產(chǎn)者調(diào)用confirmSelect
????????channel.confirmSelect();
????????//?存放未確認(rèn)消息
????????final?SortedSet?confirmSet?=?Collections.synchronizedSortedSet(new?TreeSet());
???????//?添加監(jiān)聽通道
????????channel.addConfirmListener(new?ConfirmListener()?{
????????????//?回執(zhí)有問題的
????????????public?void?handleAck(long?deliveryTag,?boolean?multiple)?throws?IOException?{
????????????????if?(multiple)?{
????????????????????System.out.println("--handleNack---multiple");
????????????????????confirmSet.headSet(deliveryTag?+?1).clear();
????????????????}?else?{
????????????????????System.out.println("--handleNack--?multiple?false");
????????????????????confirmSet.remove(deliveryTag);
????????????????}
????????????}
????????????//?沒有問題的handleAck
????????????public?void?handleNack(long?deliveryTag,?boolean?multiple)?throws?IOException?{
????????????????if?(multiple)?{
????????????????????System.out.println("--handleAck---multiple");
????????????????????confirmSet.headSet(deliveryTag?+?1).clear();
????????????????}?else?{
????????????????????System.out.println("--handleAck--multiple?false");
????????????????????confirmSet.remove(deliveryTag);
????????????????}
????????????}
????????});
????????//?一般情況下是先開啟?消費(fèi)者,指定好?exchange跟routingkey,如果生產(chǎn)者等routingkey?就會(huì)觸發(fā)這個(gè)return?方法
????????channel.addReturnListener(new?ReturnListener()?{
????????????public?void?handleReturn(int?replyCode,?String?replyText,?String?exchange,?String?routingKey,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????System.out.println("----?handle?return----");
????????????????System.out.println("replyCode:"?+?replyCode?);
????????????????System.out.println("replyText:"?+replyText?);
????????????????System.out.println("exchange:"?+?exchange);
????????????????System.out.println("routingKey:"?+?routingKey);
????????????????System.out.println("properties:"?+?properties);
????????????????System.out.println("body:"?+?new?String(body));
????????????}
????????});
????????String?msgStr?=?"sssss";
????????while(true){
????????????long?nextPublishSeqNo?=?channel.getNextPublishSeqNo();
????????????channel.basicPublish("",QUEUE_NAME,null,msgStr.getBytes());
????????????confirmSet.add(nextPublishSeqNo);
????????????Thread.sleep(1000);
????????}
????}
}
總結(jié):AMQP模式相對(duì)來說沒Confirm模式性能好些,推薦使用后者。
9. RabbitMQ延遲隊(duì)列 跟死信
淘寶訂單付款,驗(yàn)證碼等限時(shí)類型服務(wù)。
????????Map?headers?=??new?HashMap();
????????headers.put("my1","111");
????????headers.put("my2","222");
????????AMQP.BasicProperties?build?=?new?AMQP.BasicProperties().builder().deliveryMode(2).contentEncoding("utf-8").expiration("10000").headers(headers).build();
死信的處理:
10. SpringBoot Tpoic Demo
需求圖:
新建SpringBoot 項(xiàng)目添加如下依賴:
???????<dependency>
????????????<groupId>org.springframework.bootgroupId>
????????????<artifactId>spring-boot-starter-amqpartifactId>
????????dependency>
1. 生產(chǎn)者
application.yml
spring:
??rabbitmq:
????host:?127.0.0.1
????username:?admin
????password:?admin
測(cè)試用例:
package?com.sowhat.mqpublisher;
import?org.junit.jupiter.api.Test;
import?org.springframework.amqp.core.AmqpTemplate;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class?MqpublisherApplicationTests?{
????@Autowired
????private?AmqpTemplate?amqpTemplate;
????@Test
????void?userInfo()?{
????????/**
?????????*?exchange,routingKey,message
?????????*/
????????this.amqpTemplate.convertAndSend("log.topic","user.log.error","Users...");
????}
}
2. 消費(fèi)者
application.xml
spring:
??rabbitmq:
????host:?127.0.0.1
????username:?admin
????password:?admin
#?自定義配置
mq:
??config:
????exchange_name:?log.topic
????#?配置隊(duì)列名稱
????queue_name:
????????info:?log.info
????????error:?log.error
????????logs:?log.logs
三個(gè)不同的消費(fèi)者:
package?com.sowhat.mqconsumer.service;
import?org.springframework.amqp.core.ExchangeTypes;
import?org.springframework.amqp.rabbit.annotation.Exchange;
import?org.springframework.amqp.rabbit.annotation.Queue;
import?org.springframework.amqp.rabbit.annotation.QueueBinding;
import?org.springframework.amqp.rabbit.annotation.RabbitListener;
import?org.springframework.stereotype.Service;
/**
?*?@QueueBinding?value屬性:用于綁定一個(gè)隊(duì)列。@Queue去查找一個(gè)名字為value屬性中的值得隊(duì)列,如果沒有則創(chuàng)建,如果有則返回
?* type = ExchangeTypes.TOPIC 指定交換器類型。默認(rèn)的direct交換器
?*/
@Service
public?class?ErrorReceiverService?{
????/**
??????*?把一個(gè)方法跟一個(gè)隊(duì)列進(jìn)行綁定,收到消息后綁定給msg
?????*/
????@RabbitListener(bindings?=?@QueueBinding(
????????????value?=?@Queue(value?=?"${mq.config.queue_name.error}"),
????????????exchange?=?@Exchange(value?=?"${mq.config.exchange_name}",?type?=?ExchangeTypes.TOPIC),
????????????key?=?"*.log.error"
????????)
????)
????public?void?process(String?msg)?{
????????System.out.println(msg?+?"?Logs...........");
????}
}
---
package?com.sowhat.mqconsumer.service;
import?org.springframework.amqp.core.ExchangeTypes;
import?org.springframework.amqp.rabbit.annotation.Exchange;
import?org.springframework.amqp.rabbit.annotation.Queue;
import?org.springframework.amqp.rabbit.annotation.QueueBinding;
import?org.springframework.amqp.rabbit.annotation.RabbitListener;
import?org.springframework.stereotype.Service;
/**
?*?@QueueBinding?value屬性:用于綁定一個(gè)隊(duì)列。
?*?@Queue去查找一個(gè)名字為value屬性中的值得隊(duì)列,如果沒有則創(chuàng)建,如果有則返回
?*/
@Service
public?class?InfoReceiverService?{
????/**
?????*?添加一個(gè)能夠處理消息的方法
?????*/
????@RabbitListener(bindings?=?@QueueBinding(
????????????value?=?@Queue(value?="${mq.config.queue_name.info}"),
????????????exchange?=?@Exchange(value?=?"${mq.config.exchange_name}",type?=?ExchangeTypes.TOPIC),
????????????key?=?"*.log.info"
????))
????public?void?process(String?msg){
????????System.out.println(msg+"?Info...........");
????}
}
--
package?com.sowhat.mqconsumer.service;
import?org.springframework.amqp.core.ExchangeTypes;
import?org.springframework.amqp.rabbit.annotation.Exchange;
import?org.springframework.amqp.rabbit.annotation.Queue;
import?org.springframework.amqp.rabbit.annotation.QueueBinding;
import?org.springframework.amqp.rabbit.annotation.RabbitListener;
import?org.springframework.stereotype.Service;
/**
?*?@QueueBinding?value屬性:用于綁定一個(gè)隊(duì)列。
?*?@Queue去查找一個(gè)名字為value屬性中的值得隊(duì)列,如果沒有則創(chuàng)建,如果有則返回
?*/
@Service
public?class?LogsReceiverService?{
????/**
?????*?添加一個(gè)能夠處理消息的方法
?????*/
????@RabbitListener(bindings?=?@QueueBinding(
????????????value?=?@Queue(value?="${mq.config.queue_name.logs}"),
????????????exchange?=?@Exchange(value?=?"${mq.config.exchange_name}",type?=?ExchangeTypes.TOPIC),
????????????key?=?"*.log.*"
????))
????public?void?process(String?msg){
????????System.out.println(msg+"?Error...........");
????}
}
詳細(xì)安裝跟代碼看參考下載:
總結(jié)
如果需要指定模式一般是在消費(fèi)者端設(shè)置,靈活性調(diào)節(jié)。
| 模式 | 生產(chǎn)者Queue | 生產(chǎn)者exchange | 生產(chǎn)者routingKey | 消費(fèi)者exchange | 消費(fèi)者queue | routingKey |
|---|---|---|---|---|---|---|
| Simple(簡(jiǎn)單模式少用) | 指定 | 不指定 | 不指定 | 不指定 | 指定 | 不指定 |
| WorkQueue(多個(gè)消費(fèi)者少用) | 指定 | 不指定 | 不指定 | 不指定 | 指定 | 不指定 |
| fanout(publish/subscribe模式) | 不指定 | 指定 | 不指定 | 指定 | 指定 | 不指定 |
| direct(路由模式) | 不指定 | 指定 | 指定 | 指定 | 指定 | 消費(fèi)者routingKey精確指定多個(gè) |
| topic(主題模糊匹配) | 不指定 | 指定 | 指定 | 指定 | 指定 | 消費(fèi)者routingKey可以進(jìn)行模糊匹配 |
長(zhǎng)按關(guān)注,學(xué)習(xí)Java
