RabbitMQ 的核心概念,看了必懂!

Java技術(shù)棧
www.javastack.cn
關(guān)注閱讀更多優(yōu)質(zhì)文章
出處:cnblogs.com/haixiang/p/10853467.html
RabbitMQ 特點(diǎn)
RabbitMQ 相較于其他消息隊(duì)列,有一系列防止消息丟失的措施,擁有強(qiáng)悍的高可用性能,它的吞吐量可能沒有其他消息隊(duì)列大,但是其消息的保障性出類拔萃,被廣泛用于金融類業(yè)務(wù)。
AMQP 協(xié)議
AMQP: Advanced Message Queuing Protocol 高級(jí)消息隊(duì)列協(xié)議
AMQP定義:是具有現(xiàn)代特征的二進(jìn)制協(xié)議。是一個(gè)提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級(jí)消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì)。
Erlang語言最初在于交換機(jī)領(lǐng)域的架構(gòu)模式,這樣使得RabbitMQ在Broker之間進(jìn)行數(shù)據(jù)交互的性能是非常優(yōu)秀的,Erlang的優(yōu)點(diǎn): Erlang有著和原生Socket一樣的延遲。
RabbitMQ是一個(gè)開源的消息代理和隊(duì)列服務(wù)器,用來通過普通協(xié)議在完全不同的應(yīng)用之間共享數(shù)據(jù), RabbitMQ是使用Erlang語言來編寫的,并且RabbitMQ是基于AMQP協(xié)議的。關(guān)注公眾號(hào)Java技術(shù)棧獲取系列RabbitMQ教程。
RabbitMQ 消息傳遞機(jī)制
生產(chǎn)者發(fā)送消息到指定的 Exchange,Exchange 依據(jù)自身的類型(direct、topic等),根據(jù) routing key 將消息發(fā)送給 0 - n 個(gè) 隊(duì)列,隊(duì)列再將消息轉(zhuǎn)發(fā)給了消費(fèi)者。
Server: 又稱Broker, 接受客戶端的連接,實(shí)現(xiàn)AMQP實(shí)體服務(wù),這里指RabbitMQ 服務(wù)器
Connection: 連接,應(yīng)用程序與Broker的網(wǎng)絡(luò)連接。
Channel: 網(wǎng)絡(luò)信道,幾乎所有的操作都在 Channel 中進(jìn)行,Channel是進(jìn)行消息讀寫的通道??蛻舳丝山⒍鄠€(gè)Channel:,每個(gè)Channel代表一個(gè)會(huì)話任務(wù)。
Virtual host: 虛似地址,用于迸行邏輯隔離,是最上層的消息路由。一個(gè) Virtual Host 里面可以有若干個(gè) Exchange和 Queue ,同一個(gè) VirtualHost 里面不能有相同名稱的 Exchange 或 Queue。權(quán)限控制的最小粒度是Virtual Host。
Binding: Exchange 和 Queue 之間的虛擬連接,binding 中可以包含 routing key。
Routing key: 一 個(gè)路由規(guī)則,虛擬機(jī)可用它來確定如何路由一個(gè)特定消息,即交換機(jī)綁定到 Queue 的鍵。
Queue: 也稱為Message Queue,消息隊(duì)列,保存消息并將它們轉(zhuǎn)發(fā)給消費(fèi)者。
Message
消息,服務(wù)器和應(yīng)用程序之間傳送的數(shù)據(jù),由 Properties 和 Body 組成。Properties 可以對(duì)消息進(jìn)行修飾,比如消息的優(yōu)先級(jí)、延遲等高級(jí)特性;,Body 則就 是消息體內(nèi)容。
properties 中我們可以設(shè)置消息過期時(shí)間以及是否持久化等,也可以傳入自定義的map屬性,這些在消費(fèi)端也都可以獲取到。
生產(chǎn)者
import?com.rabbitmq.client.AMQP;
import?com.rabbitmq.client.Channel;
import?com.rabbitmq.client.Connection;
import?com.rabbitmq.client.ConnectionFactory;
import?java.util.HashMap;
import?java.util.Map;
public?class?MessageProducer?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????//1.?創(chuàng)建一個(gè)?ConnectionFactory?并進(jìn)行設(shè)置
????????ConnectionFactory?factory?=?new?ConnectionFactory();
????????factory.setHost("localhost");
????????factory.setVirtualHost("/");
????????factory.setUsername("guest");
????????factory.setPassword("guest");
????????//2.?通過連接工廠來創(chuàng)建連接
????????Connection?connection?=?factory.newConnection();
????????//3.?通過?Connection?來創(chuàng)建?Channel
????????Channel?channel?=?connection.createChannel();
????????//4.?聲明?使用默認(rèn)交換機(jī)?以隊(duì)列名作為?routing?key
????????String?queueName?=?"msg_queue";
????????/**
?????????*?deliverMode?設(shè)置為?2?的時(shí)候代表持久化消息
?????????*?expiration?意思是設(shè)置消息的有效期,超過10秒沒有被消費(fèi)者接收后會(huì)被自動(dòng)刪除
?????????*?headers?自定義的一些屬性
?????????*?*/
????????//5.?發(fā)送
????????Map?headers?=?new?HashMap();
????????headers.put("myhead1",?"111");
????????headers.put("myhead2",?"222");
????????AMQP.BasicProperties?properties?=?new?AMQP.BasicProperties().builder()
????????????????.deliveryMode(2)
????????????????.contentEncoding("UTF-8")
????????????????.expiration("100000")
????????????????.headers(headers)
????????????????.build();
????????String?msg?=?"test?message";
????????channel.basicPublish("",?queueName,?properties,?msg.getBytes());
????????System.out.println("Send?message?:?"?+?msg);
????????//6.?關(guān)閉連接
????????channel.close();
????????connection.close();
????}
}
消費(fèi)者
import?com.rabbitmq.client.*;
import?java.io.IOException;
import?java.util.Map;
public?class?MessageConsumer?{
????public?static?void?main(String[]?args)?throws?Exception{
????????//1.?創(chuàng)建一個(gè)?ConnectionFactory?并進(jìn)行設(shè)置
????????ConnectionFactory?factory?=?new?ConnectionFactory();
????????factory.setHost("localhost");
????????factory.setVirtualHost("/");
????????factory.setUsername("guest");
????????factory.setPassword("guest");
????????factory.setAutomaticRecoveryEnabled(true);
????????factory.setNetworkRecoveryInterval(3000);
????????//2.?通過連接工廠來創(chuàng)建連接
????????Connection?connection?=?factory.newConnection();
????????//3.?通過?Connection?來創(chuàng)建?Channel
????????Channel?channel?=?connection.createChannel();
????????//4.?聲明
????????String?queueName?=?"msg_queue";
????????channel.queueDeclare(queueName,?false,?false,?false,?null);
????????//5.?創(chuàng)建消費(fèi)者并接收消息
????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,
???????????????????????????????????????AMQP.BasicProperties?properties,?byte[]?body)
????????????????????throws?IOException?{
????????????????String?message?=?new?String(body,?"UTF-8");
????????????????Map?headers?=?properties.getHeaders();
????????????????System.out.println("head:?"?+?headers.get("myhead1"));
????????????????System.out.println("?[x]?Received?'"?+?message?+?"'");
????????????????System.out.println("expiration?:?"+?properties.getExpiration());
????????????}
????????};
????????//6.?設(shè)置?Channel?消費(fèi)者綁定隊(duì)列
????????channel.basicConsume(queueName,?true,?consumer);
????}
}
Send?message?:?test?message
head:?111
?[x]?Received?'test?message'
100000
Exchange
1. 簡(jiǎn)介
Exchange 就是交換機(jī),接收消息,根據(jù)路由鍵轉(zhuǎn)發(fā)消息到綁定的隊(duì)列。有很多的 Message 進(jìn)入到 Exchange 中,Exchange 根據(jù) Routing key 將 Message 分發(fā)到不同的 Queue 中。
2. 類型
RabbitMQ 中的 Exchange 有多種類型,類型不同,Message 的分發(fā)機(jī)制不同,如下:
fanout:廣播模式。這種類型的 Exchange 會(huì)將 Message 分發(fā)到綁定到該 Exchange 的所有 Queue。
direct:這種類型的 Exchange 會(huì)根據(jù) Routing key(精確匹配,將Message分發(fā)到指定的Queue。
Topic:這種類型的 Exchange 會(huì)根據(jù) Routing key(模糊匹配,將Message分發(fā)到指定的Queue。
headers: 主題交換機(jī)有點(diǎn)相似,但是不同于主題交換機(jī)的路由是基于路由鍵,頭交換機(jī)的路由值基于消息的header數(shù)據(jù)。主題交換機(jī)路由鍵只有是字符串,而頭交換機(jī)可以是整型和哈希值 .
3. 屬性
/**
*?Declare?an?exchange,?via?an?interface?that?allows?the?complete?set?of
*?arguments.
*?@see?com.rabbitmq.client.AMQP.Exchange.Declare
*?@see?com.rabbitmq.client.AMQP.Exchange.DeclareOk
*?@param?exchange?the?name?of?the?exchange
*?@param?type?the?exchange?type
*?@param?durable?true?if?we?are?declaring?a?durable?exchange?(the?exchange?will?survive?a?server?restart)
*?@param?autoDelete?true?if?the?server?should?delete?the?exchange?when?it?is?no?longer?in?use
*?@param?internal?true?if?the?exchange?is?internal,?i.e.?can't?be?directly
*?published?to?by?a?client.
*?@param?arguments?other?properties?(construction?arguments)?for?the?exchange
*?@return?a?declaration-confirm?method?to?indicate?the?exchange?was?successfully?declared
*?@throws?java.io.IOException?if?an?error?is?encountered
*/
Exchange.DeclareOk?exchangeDeclare(String?exchange,
?????????????????????????????????String?type,boolean?durable,
?????????????????????????????????boolean?autoDelete,boolean?internal,
?????????????????????????????????Map?arguments)?throws?IOException;
Name: 交換機(jī)名稱
Type: 交換機(jī)類型direct、topic、 fanout、 headers
Durability: 是否需要持久化,true為持久化
Auto Delete: 當(dāng)最后一個(gè)綁定到Exchange. 上的隊(duì)列刪除后,自動(dòng)刪除該Exchange
Internal: 當(dāng)前Exchange是否用于RabbitMQ內(nèi)部使用,默認(rèn)為False
Arguments: 擴(kuò)展參數(shù),用于擴(kuò)展AMQP協(xié)議自制定化使用
點(diǎn)擊「閱讀原文」獲取面試題大全~

