<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          RabbitMQ 的核心概念,看了必懂!

          共 6582字,需瀏覽 14分鐘

           ·

          2020-09-10 17:31

          Java技術(shù)棧

          www.javastack.cn

          關(guān)注閱讀更多優(yōu)質(zhì)文章



          作者:海向
          出處:cnblogs.com/haixiang/p/10853467.html

          RabbitMQ 特點(diǎn)

          RabbitMQ 相較于其他消息隊(duì)列,有一系列防止消息丟失的措施,擁有強(qiáng)悍的高可用性能,它的吞吐量可能沒有其他消息隊(duì)列大,但是其消息的保障性出類拔萃,被廣泛用于金融類業(yè)務(wù)。

          AMQP 協(xié)議

          AMQP: Advanced Message Queuing Protocol 高級(jí)消息隊(duì)列協(xié)議

          AMQP定義:是具有現(xiàn)代特征的二進(jìn)制協(xié)議。是一個(gè)提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級(jí)消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì)。

          Erlang語言最初在于交換機(jī)領(lǐng)域的架構(gòu)模式,這樣使得RabbitMQ在Broker之間進(jìn)行數(shù)據(jù)交互的性能是非常優(yōu)秀的,Erlang的優(yōu)點(diǎn): Erlang有著和原生Socket一樣的延遲。

          RabbitMQ是一個(gè)開源的消息代理和隊(duì)列服務(wù)器,用來通過普通協(xié)議在完全不同的應(yīng)用之間共享數(shù)據(jù), RabbitMQ是使用Erlang語言來編寫的,并且RabbitMQ是基于AMQP協(xié)議的。關(guān)注公眾號(hào)Java技術(shù)棧獲取系列RabbitMQ教程。

          RabbitMQ 消息傳遞機(jī)制

          生產(chǎn)者發(fā)送消息到指定的 Exchange,Exchange 依據(jù)自身的類型(direct、topic等),根據(jù) routing key 將消息發(fā)送給 0 - n 個(gè) 隊(duì)列,隊(duì)列再將消息轉(zhuǎn)發(fā)給了消費(fèi)者。

          Server: 又稱Broker, 接受客戶端的連接,實(shí)現(xiàn)AMQP實(shí)體服務(wù),這里指RabbitMQ 服務(wù)器

          Connection: 連接,應(yīng)用程序與Broker的網(wǎng)絡(luò)連接。

          Channel: 網(wǎng)絡(luò)信道,幾乎所有的操作都在 Channel 中進(jìn)行,Channel是進(jìn)行消息讀寫的通道??蛻舳丝山⒍鄠€(gè)Channel:,每個(gè)Channel代表一個(gè)會(huì)話任務(wù)。

          Virtual host: 虛似地址,用于迸行邏輯隔離,是最上層的消息路由。一個(gè) Virtual Host 里面可以有若干個(gè) Exchange和 Queue ,同一個(gè) VirtualHost 里面不能有相同名稱的 Exchange 或 Queue。權(quán)限控制的最小粒度是Virtual Host。

          Binding: Exchange 和 Queue 之間的虛擬連接,binding 中可以包含 routing key。

          Routing key: 一 個(gè)路由規(guī)則,虛擬機(jī)可用它來確定如何路由一個(gè)特定消息,即交換機(jī)綁定到 Queue 的鍵。

          Queue: 也稱為Message Queue,消息隊(duì)列,保存消息并將它們轉(zhuǎn)發(fā)給消費(fèi)者。

          Message

          消息,服務(wù)器和應(yīng)用程序之間傳送的數(shù)據(jù),由 Properties 和 Body 組成。Properties 可以對(duì)消息進(jìn)行修飾,比如消息的優(yōu)先級(jí)、延遲等高級(jí)特性;,Body 則就 是消息體內(nèi)容。

          properties 中我們可以設(shè)置消息過期時(shí)間以及是否持久化等,也可以傳入自定義的map屬性,這些在消費(fèi)端也都可以獲取到。

          生產(chǎn)者

          import?com.rabbitmq.client.AMQP;
          import?com.rabbitmq.client.Channel;
          import?com.rabbitmq.client.Connection;
          import?com.rabbitmq.client.ConnectionFactory;
          import?java.util.HashMap;
          import?java.util.Map;

          public?class?MessageProducer?{
          ????public?static?void?main(String[]?args)?throws?Exception?{
          ????????//1.?創(chuàng)建一個(gè)?ConnectionFactory?并進(jìn)行設(shè)置
          ????????ConnectionFactory?factory?=?new?ConnectionFactory();
          ????????factory.setHost("localhost");
          ????????factory.setVirtualHost("/");
          ????????factory.setUsername("guest");
          ????????factory.setPassword("guest");

          ????????//2.?通過連接工廠來創(chuàng)建連接
          ????????Connection?connection?=?factory.newConnection();

          ????????//3.?通過?Connection?來創(chuàng)建?Channel
          ????????Channel?channel?=?connection.createChannel();

          ????????//4.?聲明?使用默認(rèn)交換機(jī)?以隊(duì)列名作為?routing?key
          ????????String?queueName?=?"msg_queue";

          ????????/**
          ?????????*?deliverMode?設(shè)置為?2?的時(shí)候代表持久化消息
          ?????????*?expiration?意思是設(shè)置消息的有效期,超過10秒沒有被消費(fèi)者接收后會(huì)被自動(dòng)刪除
          ?????????*?headers?自定義的一些屬性
          ?????????*?*/
          ????????//5.?發(fā)送
          ????????Map?headers?=?new?HashMap();
          ????????headers.put("myhead1",?"111");
          ????????headers.put("myhead2",?"222");

          ????????AMQP.BasicProperties?properties?=?new?AMQP.BasicProperties().builder()
          ????????????????.deliveryMode(2)
          ????????????????.contentEncoding("UTF-8")
          ????????????????.expiration("100000")
          ????????????????.headers(headers)
          ????????????????.build();
          ????????String?msg?=?"test?message";
          ????????channel.basicPublish("",?queueName,?properties,?msg.getBytes());
          ????????System.out.println("Send?message?:?"?+?msg);

          ????????//6.?關(guān)閉連接
          ????????channel.close();
          ????????connection.close();

          ????}
          }

          消費(fèi)者

          import?com.rabbitmq.client.*;
          import?java.io.IOException;
          import?java.util.Map;

          public?class?MessageConsumer?{
          ????public?static?void?main(String[]?args)?throws?Exception{
          ????????//1.?創(chuàng)建一個(gè)?ConnectionFactory?并進(jìn)行設(shè)置
          ????????ConnectionFactory?factory?=?new?ConnectionFactory();
          ????????factory.setHost("localhost");
          ????????factory.setVirtualHost("/");
          ????????factory.setUsername("guest");
          ????????factory.setPassword("guest");
          ????????factory.setAutomaticRecoveryEnabled(true);
          ????????factory.setNetworkRecoveryInterval(3000);

          ????????//2.?通過連接工廠來創(chuàng)建連接
          ????????Connection?connection?=?factory.newConnection();

          ????????//3.?通過?Connection?來創(chuàng)建?Channel
          ????????Channel?channel?=?connection.createChannel();

          ????????//4.?聲明
          ????????String?queueName?=?"msg_queue";
          ????????channel.queueDeclare(queueName,?false,?false,?false,?null);

          ????????//5.?創(chuàng)建消費(fèi)者并接收消息
          ????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
          ????????????@Override
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,
          ???????????????????????????????????????AMQP.BasicProperties?properties,?byte[]?body)
          ????????????????????throws?IOException?{
          ????????????????String?message?=?new?String(body,?"UTF-8");
          ????????????????Map?headers?=?properties.getHeaders();
          ????????????????System.out.println("head:?"?+?headers.get("myhead1"));
          ????????????????System.out.println("?[x]?Received?'"?+?message?+?"'");
          ????????????????System.out.println("expiration?:?"+?properties.getExpiration());
          ????????????}
          ????????};

          ????????//6.?設(shè)置?Channel?消費(fèi)者綁定隊(duì)列
          ????????channel.basicConsume(queueName,?true,?consumer);
          ????}
          }
          Send?message?:?test?message

          head:?111
          ?[x]?Received?'test?message'
          100000

          Exchange

          1. 簡(jiǎn)介

          Exchange 就是交換機(jī),接收消息,根據(jù)路由鍵轉(zhuǎn)發(fā)消息到綁定的隊(duì)列。有很多的 Message 進(jìn)入到 Exchange 中,Exchange 根據(jù) Routing key 將 Message 分發(fā)到不同的 Queue 中。

          2. 類型

          RabbitMQ 中的 Exchange 有多種類型,類型不同,Message 的分發(fā)機(jī)制不同,如下:

          • fanout:廣播模式。這種類型的 Exchange 會(huì)將 Message 分發(fā)到綁定到該 Exchange 的所有 Queue。

          • direct:這種類型的 Exchange 會(huì)根據(jù) Routing key(精確匹配,將Message分發(fā)到指定的Queue。

          • Topic:這種類型的 Exchange 會(huì)根據(jù) Routing key(模糊匹配,將Message分發(fā)到指定的Queue。

          • headers: 主題交換機(jī)有點(diǎn)相似,但是不同于主題交換機(jī)的路由是基于路由鍵,頭交換機(jī)的路由值基于消息的header數(shù)據(jù)。主題交換機(jī)路由鍵只有是字符串,而頭交換機(jī)可以是整型和哈希值 .

          3. 屬性

          /**
          *?Declare?an?exchange,?via?an?interface?that?allows?the?complete?set?of
          *?arguments.
          *?@see?com.rabbitmq.client.AMQP.Exchange.Declare
          *?@see?com.rabbitmq.client.AMQP.Exchange.DeclareOk
          *?@param?exchange?the?name?of?the?exchange
          *?@param?type?the?exchange?type
          *?@param?durable?true?if?we?are?declaring?a?durable?exchange?(the?exchange?will?survive?a?server?restart)
          *?@param?autoDelete?true?if?the?server?should?delete?the?exchange?when?it?is?no?longer?in?use
          *?@param?internal?true?if?the?exchange?is?internal,?i.e.?can't?be?directly
          *?published?to?by?a?client.
          *?@param?arguments?other?properties?(construction?arguments)?for?the?exchange
          *?@return?a?declaration-confirm?method?to?indicate?the?exchange?was?successfully?declared
          *?@throws?java.io.IOException?if?an?error?is?encountered
          */
          Exchange.DeclareOk?exchangeDeclare(String?exchange,
          ?????????????????????????????????String?type,boolean?durable,
          ?????????????????????????????????boolean?autoDelete,boolean?internal,
          ?????????????????????????????????Map?arguments)?throws?IOException;
          • Name: 交換機(jī)名稱

          • Type: 交換機(jī)類型direct、topic、 fanout、 headers

          • Durability: 是否需要持久化,true為持久化

          • Auto Delete: 當(dāng)最后一個(gè)綁定到Exchange. 上的隊(duì)列刪除后,自動(dòng)刪除該Exchange

          • Internal: 當(dāng)前Exchange是否用于RabbitMQ內(nèi)部使用,默認(rèn)為False

          • Arguments: 擴(kuò)展參數(shù),用于擴(kuò)展AMQP協(xié)議自制定化使用

          最近熱文:
          1、我用 Java 8 寫了一段邏輯,同事直呼看不懂
          2、Spring Boot 學(xué)習(xí)筆記,這個(gè)太全了!
          3、吊打 Tomcat ,Undertow 性能很炸??!
          4、Spring Boot 太狠了,一次發(fā)布 3 個(gè)版本!
          5、Spring Boot 如何快速集成 Redis?
          6、《Java開發(fā)手冊(cè)(嵩山版)》最新發(fā)布
          7、Spring Boot Redis 實(shí)現(xiàn)分布式鎖,真香!
          8、國人開源了一款小而全的 Java 工具類庫!
          9、國人開源了一款超好用的 Redis 客戶端?。?/a>
          10、同事寫了個(gè)隱藏 bug,我排查了 3 天!
          掃碼關(guān)注Java技術(shù)棧公眾號(hào)閱讀更多干貨。

          點(diǎn)擊「閱讀原文」獲取面試題大全~

          瀏覽 34
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  曰韩日逼片 | 老鸭窝毛片 | 日韩国产精品在线看 | 狠狠躁夜夜躁 | 色婷婷成人 |