一個(gè)最簡單的消息隊(duì)列,帶你理解 RabbitMQ!

Java技術(shù)棧
www.javastack.cn
關(guān)注閱讀更多優(yōu)質(zhì)文章
作者:海向
來源:www.cnblogs.com/haixiang/p/10826710.html
RabbitMQ 簡述
簡單隊(duì)列
簡單隊(duì)列是最簡單的一種模式,由生產(chǎn)者、隊(duì)列、消費(fèi)者組成。生產(chǎn)者將消息發(fā)送給隊(duì)列,消費(fèi)者從隊(duì)列中讀取消息完成消費(fèi)。關(guān)注公眾號(hào)Java技術(shù)?;貜?fù):面試,可以獲取我整理的 RabbitMQ 系列面試題,帶全部答案。
在下圖中,“P”是我們的生產(chǎn)者,“C”是我們的消費(fèi)者。中間的框是隊(duì)列 - RabbitMQ代表消費(fèi)者的消息緩沖區(qū)。

Java 方式
生產(chǎn)者
package?com.anqi.mq.nat;
import?com.rabbitmq.client.Channel;
import?com.rabbitmq.client.Connection;
import?com.rabbitmq.client.ConnectionFactory;
public?class?MyProducer?{
????private?static?final?String?QUEUE_NAME?=?"ITEM_QUEUE";
????public?static?void?main(String[]?args)?throws?Exception?{
????????//1.?創(chuàng)建一個(gè)?ConnectionFactory?并進(jìn)行設(shè)置
????????ConnectionFactory?factory?=?new?ConnectionFactory();
????????factory.setHost("localhost");
????????factory.setVirtualHost("/");
????????factory.setUsername("guest");
????????factory.setPassword("guest");
????????//2.?通過連接工廠來創(chuàng)建連接
????????Connection?connection?=?factory.newConnection();
????????//3.?通過?Connection?來創(chuàng)建?Channel
????????Channel?channel?=?connection.createChannel();
????????//實(shí)際場(chǎng)景中,消息多為json格式的對(duì)象
????????String?msg?=?"hello";
????????//4.?發(fā)送三條數(shù)據(jù)
????????for?(int?i?=?1;?i?<=?3?;?i++)?{
????????????channel.basicPublish("",?QUEUE_NAME,?null,?msg.getBytes());
????????????System.out.println("Send?message"?+?i?+"?:?"?+?msg);
????????}
????????//5.?關(guān)閉連接
????????channel.close();
????????connection.close();
????}
}
/**
?*?Declare?a?queue
?*?@param?queue?the?name?of?the?queue
?*?@param?durable?true?if?we?are?declaring?a?durable?queue?(the?queue?will?survive?a?server?restart)
?*?@param?exclusive?true?if?we?are?declaring?an?exclusive?queue?(restricted?to?this?connection)
?*?@param?autoDelete?true?if?we?are?declaring?an?autodelete?queue?(server?will?delete?it?when?no?longer?in?use)
?*?@param?arguments?other?properties?(construction?arguments)?for?the?queue
?*?@return?a?declaration-confirm?method?to?indicate?the?queue?was?successfully?declared
?*?@throws?java.io.IOException?if?an?error?is?encountered
?*/
Queue.DeclareOk?queueDeclare(String?queue,?boolean?durable,?boolean?exclusive,?boolean?autoDelete,Map?arguments) ?throws?IOException;
/**
?*?Publish?a?message
?*?@see?com.rabbitmq.client.AMQP.Basic.Publish
?*?@param?exchange?the?exchange?to?publish?the?message?to
?*?@param?routingKey?the?routing?key
?*?@param?props?other?properties?for?the?message?-?routing?headers?etc
?*?@param?body?the?message?body
?*?@throws?java.io.IOException?if?an?error?is?encountered
?*/
void?basicPublish(String?exchange,?String?routingKey,?BasicProperties?props,?byte[]?body)?throws?IOException;
/**
?*?Start?a?non-nolocal,?non-exclusive?consumer,?with
?*?a?server-generated?consumerTag.
?*?@param?queue?the?name?of?the?queue
?*?@param?autoAck?true?if?the?server?should?consider?messages
?*?acknowledged?once?delivered;?false?if?the?server?should?expect
?*?explicit?acknowledgements
?*?@param?callback?an?interface?to?the?consumer?object
?*?@return?the?consumerTag?generated?by?the?server
?*?@throws?java.io.IOException?if?an?error?is?encountered
?*?@see?com.rabbitmq.client.AMQP.Basic.Consume
?*?@see?com.rabbitmq.client.AMQP.Basic.ConsumeOk
?*?@see?#basicConsume(String,?boolean,?String,?boolean,?boolean,?Map,?Consumer)
?*/
String?basicConsume(String?queue,?boolean?autoAck,?Consumer?callback)?throws?IOException;
消費(fèi)者
package?com.anqi.mq.nat;
import?com.rabbitmq.client.*;
import?java.io.IOException;
public?class?MyConsumer?{
????private?static?final?String?QUEUE_NAME?=?"ITEM_QUEUE";
????public?static?void?main(String[]?args)?throws?Exception?{
????????//1.?創(chuàng)建一個(gè)?ConnectionFactory?并進(jìn)行設(shè)置
????????ConnectionFactory?factory?=?new?ConnectionFactory();
????????factory.setHost("localhost");
????????factory.setVirtualHost("/");
????????factory.setUsername("guest");
????????factory.setPassword("guest");
????????//2.?通過連接工廠來創(chuàng)建連接
????????Connection?connection?=?factory.newConnection();
????????//3.?通過?Connection?來創(chuàng)建?Channel
????????Channel?channel?=?connection.createChannel();
????????//4.?聲明一個(gè)隊(duì)列
????????channel.queueDeclare(QUEUE_NAME,?true,?false,?false,?null);
????????System.out.println("?[*]?Waiting?for?messages.?To?exit?press?CTRL+C");
????????/*
???????????true:表示自動(dòng)確認(rèn),只要消息從隊(duì)列中獲取,無論消費(fèi)者獲取到消息后是否成功消費(fèi),都會(huì)認(rèn)為消息已經(jīng)成功消費(fèi)
???????????false:表示手動(dòng)確認(rèn),消費(fèi)者獲取消息后,服務(wù)器會(huì)將該消息標(biāo)記為不可用狀態(tài),等待消費(fèi)者的反饋,如果消費(fèi)者一
???????????直沒有反饋,那么該消息將一直處于不可用狀態(tài),并且服務(wù)器會(huì)認(rèn)為該消費(fèi)者已經(jīng)掛掉,不會(huì)再給其發(fā)送消息,
???????????直到該消費(fèi)者反饋。
????????*/
????????//5.?創(chuàng)建消費(fèi)者并接收消息
????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,
???????????????????????????????????????AMQP.BasicProperties?properties,?byte[]?body)
????????????????????throws?IOException?{
????????????????String?message?=?new?String(body,?"UTF-8");
????????????????System.out.println("?[x]?Received?'"?+?message?+?"'");
????????????}
????????};
????????//6.?設(shè)置?Channel?消費(fèi)者綁定隊(duì)列
????????channel.basicConsume(QUEUE_NAME,?true,?consumer);
????}
}
Send?message1?:?hello
Send?message2?:?hello
Send?message3?:?hello
?[*]?Waiting?for?messages.?To?exit?press?CTRL+C
?[x]?Received?'hello'
?[x]?Received?'hello'
?[x]?Received?'hello'
當(dāng)我們啟動(dòng)生產(chǎn)者之后查看RabbitMQ管理后臺(tái)可以看到有一條消息正在等待被消費(fèi)。

當(dāng)我們啟動(dòng)消費(fèi)者之后再次查看,可以看到積壓的一條消息已經(jīng)被消費(fèi)。

總結(jié)
隊(duì)列聲明queueDeclare的參數(shù):第一個(gè)參數(shù)表示隊(duì)列名稱、第二個(gè)參數(shù)為是否持久化(true表示是,隊(duì)列將在服務(wù)器重啟時(shí)生存)、第三個(gè)參數(shù)為是否是獨(dú)占隊(duì)列(創(chuàng)建者可以使用的私有隊(duì)列,斷開后自動(dòng)刪除)、第四個(gè)參數(shù)為當(dāng)所有消費(fèi)者客戶端連接斷開時(shí)是否自動(dòng)刪除隊(duì)列、第五個(gè)參數(shù)為隊(duì)列的其他參數(shù)。
basicConsume的第二個(gè)參數(shù)autoAck: 應(yīng)答模式,true:自動(dòng)應(yīng)答,即消費(fèi)者獲取到消息,該消息就會(huì)從隊(duì)列中刪除掉,false:手動(dòng)應(yīng)答,當(dāng)從隊(duì)列中取出消息后,需要程序員手動(dòng)調(diào)用方法應(yīng)答,如果沒有應(yīng)答,該消息還會(huì)再放進(jìn)隊(duì)列中,就會(huì)出現(xiàn)該消息一直沒有被消費(fèi)掉的現(xiàn)象。
這種簡單隊(duì)列的模式,系統(tǒng)會(huì)為每個(gè)隊(duì)列隱式地綁定一個(gè)默認(rèn)交換機(jī),交換機(jī)名稱為" (AMQP default)",類型為直連 direct,當(dāng)你手動(dòng)創(chuàng)建一個(gè)隊(duì)列時(shí),系統(tǒng)會(huì)自動(dòng)將這個(gè)隊(duì)列綁定到一個(gè)名稱為空的 Direct 類型的交換機(jī)上,綁定的路由鍵 routing key 與隊(duì)列名稱相同,相當(dāng)于channel.queueBind(queue:"QUEUE_NAME", exchange:"(AMQP default)“, routingKey:"QUEUE_NAME");雖然實(shí)例沒有顯式聲明交換機(jī),但是當(dāng)路由鍵和隊(duì)列名稱一樣時(shí),就會(huì)將消息發(fā)送到這個(gè)默認(rèn)的交換機(jī)中。這種方式比較簡單,但是無法滿足復(fù)雜的業(yè)務(wù)需求,所以通常在生產(chǎn)環(huán)境中很少使用這種方式。
The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. It is not possible to explicitly bind to, or unbind from the default exchange. It also cannot be deleted.默認(rèn)交換機(jī)隱式綁定到每個(gè)隊(duì)列,其中路由鍵等于隊(duì)列名稱。不可能顯式綁定到,或從缺省交換中解除綁定。它也不能被刪除。
——引自 RabbitMQ 官方文檔
spring-amqp方式
引入 Maven 依賴
<dependency>
????<groupId>com.rabbitmqgroupId>
????<artifactId>amqp-clientartifactId>
????<version>5.6.0version>
dependency>????????
<dependency>
????<groupId>org.springframework.amqpgroupId>
????<artifactId>spring-rabbitartifactId>
????<version>2.1.5.RELEASEversion>
dependency>
Spring Boot 學(xué)習(xí)教程:
https://github.com/javastacks/spring-boot-best-practice
Spring 配置文件
<beans?xmlns="http://www.springframework.org/schema/beans"
???????xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
???????xmlns:rabbit="http://www.springframework.org/schema/rabbit"
???????xsi:schemaLocation="http://www.springframework.org/schema/rabbit
???????????https://www.springframework.org/schema/rabbit/spring-rabbit.xsd
???????????http://www.springframework.org/schema/beans
???????????https://www.springframework.org/schema/beans/spring-beans.xsd">
????<rabbit:connection-factory?id="connectionFactory"?host="localhost"?virtual-host="/"
????username="guest"?password="guest"/>
????<rabbit:template?id="amqpTemplate"?connection-factory="connectionFactory"/>
????<rabbit:admin?connection-factory="connectionFactory"/>
????<rabbit:queue?name="MY-QUEUE"/>
beans>
使用測(cè)試
import?org.springframework.amqp.core.AmqpTemplate;
import?org.springframework.context.ApplicationContext;
import?org.springframework.context.support.ClassPathXmlApplicationContext;
public?class?Main?{
????public?static?void?main(String[]?args)?{
????????ApplicationContext?app?=?new?ClassPathXmlApplicationContext("spring/rabbit-context.xml");
????????AmqpTemplate?amqpTemplate?=?app.getBean(AmqpTemplate.class);
????????amqpTemplate.convertAndSend("MY-QUEUE",?"Item");
????????String?msg?=?(String)?amqpTemplate.receiveAndConvert("MY-QUEUE");
????????System.out.println(msg);
????}
}
參考方法
/**
?*?Convert?a?Java?object?to?an?Amqp?{@link?Message}?and?send?it?to?a?specific?exchange
?*?with?a?specific?routing?key.
?*
?*?@param?exchange?the?name?of?the?exchange
?*?@param?routingKey?the?routing?key
?*?@param?message?a?message?to?send
?*?@throws?AmqpException?if?there?is?a?problem
?*/
void?convertAndSend(String?exchange,?String?routingKey,?Object?message)?throws?AmqpException;
/**
??*?Receive?a?message?if?there?is?one?from?a?specific?queue?and?convert?it?to?a?Java
??*?object.?Returns?immediately,?possibly?with?a?null?value.
??*
??*?@param?queueName?the?name?of?the?queue?to?poll
??*?@return?a?message?or?null?if?there?is?none?waiting
??*?@throws?AmqpException?if?there?is?a?problem
??*/
@Nullable
Object?receiveAndConvert(String?queueName)?throws?AmqpException;






關(guān)注Java技術(shù)??锤喔韶?/strong>


