<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          一個(gè)最簡單的消息隊(duì)列,帶你理解 RabbitMQ!

          共 1559字,需瀏覽 4分鐘

           ·

          2021-01-28 11:12

          Java技術(shù)棧

          www.javastack.cn

          關(guān)注閱讀更多優(yōu)質(zhì)文章



          作者:海向
          來源:www.cnblogs.com/haixiang/p/10826710.html

          RabbitMQ 簡述

          RabbitMQ是一個(gè)消息代理:它接受并轉(zhuǎn)發(fā)消息。您可以將其視為郵局:當(dāng)您將要把寄發(fā)的郵件投遞到郵箱中時(shí),您可以確信Postman 先生最終會(huì)將郵件發(fā)送給收件人。在這個(gè)比喻中,RabbitMQ是一個(gè)郵箱,郵局和郵遞員,用來接受,存儲(chǔ)和轉(zhuǎn)發(fā)二進(jìn)制數(shù)據(jù)塊的消息。

          隊(duì)列就像是在RabbitMQ中扮演郵箱的角色。雖然消息經(jīng)過RabbitMQ和應(yīng)用程序,但它們只能存儲(chǔ)在隊(duì)列中。隊(duì)列只受主機(jī)的內(nèi)存和磁盤限制的限制,它本質(zhì)上是一個(gè)大的消息緩沖區(qū)。許多生產(chǎn)者可以發(fā)送到一個(gè)隊(duì)列的消息,許多消費(fèi)者可以嘗試從一個(gè)隊(duì)列接收數(shù)據(jù)。

          producer即為生產(chǎn)者,用來產(chǎn)生消息發(fā)送給隊(duì)列。consumer是消費(fèi)者,需要去讀隊(duì)列內(nèi)的消息。producer,consumer和broker(rabbitMQ server)不必駐留在同一個(gè)主機(jī)上;確實(shí)在大多數(shù)應(yīng)用程序中它們是這樣分布的。

          簡單隊(duì)列

          簡單隊(duì)列是最簡單的一種模式,由生產(chǎn)者、隊(duì)列、消費(fèi)者組成。生產(chǎn)者將消息發(fā)送給隊(duì)列,消費(fèi)者從隊(duì)列中讀取消息完成消費(fèi)。關(guān)注公眾號(hào)Java技術(shù)?;貜?fù):面試,可以獲取我整理的 RabbitMQ 系列面試題,帶全部答案。

          在下圖中,“P”是我們的生產(chǎn)者,“C”是我們的消費(fèi)者。中間的框是隊(duì)列 - RabbitMQ代表消費(fèi)者的消息緩沖區(qū)。

          Java 方式

          生產(chǎn)者

          package?com.anqi.mq.nat;

          import?com.rabbitmq.client.Channel;
          import?com.rabbitmq.client.Connection;
          import?com.rabbitmq.client.ConnectionFactory;

          public?class?MyProducer?{
          ????private?static?final?String?QUEUE_NAME?=?"ITEM_QUEUE";

          ????public?static?void?main(String[]?args)?throws?Exception?{
          ????????//1.?創(chuàng)建一個(gè)?ConnectionFactory?并進(jìn)行設(shè)置
          ????????ConnectionFactory?factory?=?new?ConnectionFactory();
          ????????factory.setHost("localhost");
          ????????factory.setVirtualHost("/");
          ????????factory.setUsername("guest");
          ????????factory.setPassword("guest");

          ????????//2.?通過連接工廠來創(chuàng)建連接
          ????????Connection?connection?=?factory.newConnection();

          ????????//3.?通過?Connection?來創(chuàng)建?Channel
          ????????Channel?channel?=?connection.createChannel();

          ????????//實(shí)際場(chǎng)景中,消息多為json格式的對(duì)象
          ????????String?msg?=?"hello";
          ????????//4.?發(fā)送三條數(shù)據(jù)
          ????????for?(int?i?=?1;?i?<=?3?;?i++)?{
          ????????????channel.basicPublish("",?QUEUE_NAME,?null,?msg.getBytes());
          ????????????System.out.println("Send?message"?+?i?+"?:?"?+?msg);
          ????????}

          ????????//5.?關(guān)閉連接
          ????????channel.close();
          ????????connection.close();
          ????}
          }
          /**
          ?*?Declare?a?queue
          ?*?@param?queue?the?name?of?the?queue
          ?*?@param?durable?true?if?we?are?declaring?a?durable?queue?(the?queue?will?survive?a?server?restart)
          ?*?@param?exclusive?true?if?we?are?declaring?an?exclusive?queue?(restricted?to?this?connection)
          ?*?@param?autoDelete?true?if?we?are?declaring?an?autodelete?queue?(server?will?delete?it?when?no?longer?in?use)
          ?*?@param?arguments?other?properties?(construction?arguments)?for?the?queue
          ?*?@return?a?declaration-confirm?method?to?indicate?the?queue?was?successfully?declared
          ?*?@throws?java.io.IOException?if?an?error?is?encountered
          ?*/

          Queue.DeclareOk?queueDeclare(String?queue,?boolean?durable,?boolean?exclusive,?boolean?autoDelete,Map?arguments)?throws?IOException;

          /**
          ?*?Publish?a?message
          ?*?@see?com.rabbitmq.client.AMQP.Basic.Publish
          ?*?@param?exchange?the?exchange?to?publish?the?message?to
          ?*?@param?routingKey?the?routing?key
          ?*?@param?props?other?properties?for?the?message?-?routing?headers?etc
          ?*?@param?body?the?message?body
          ?*?@throws?java.io.IOException?if?an?error?is?encountered
          ?*/

          void?basicPublish(String?exchange,?String?routingKey,?BasicProperties?props,?byte[]?body)?throws?IOException;


          /**
          ?*?Start?a?non-nolocal,?non-exclusive?consumer,?with
          ?*?a?server-generated?consumerTag.
          ?*?@param?queue?the?name?of?the?queue
          ?*?@param?autoAck?true?if?the?server?should?consider?messages
          ?*?acknowledged?once?delivered;?false?if?the?server?should?expect
          ?*?explicit?acknowledgements
          ?*?@param?callback?an?interface?to?the?consumer?object
          ?*?@return?the?consumerTag?generated?by?the?server
          ?*?@throws?java.io.IOException?if?an?error?is?encountered
          ?*?@see?com.rabbitmq.client.AMQP.Basic.Consume
          ?*?@see?com.rabbitmq.client.AMQP.Basic.ConsumeOk
          ?*?@see?#basicConsume(String,?boolean,?String,?boolean,?boolean,?Map,?Consumer)
          ?*/

          String?basicConsume(String?queue,?boolean?autoAck,?Consumer?callback)?throws?IOException;

          消費(fèi)者

          package?com.anqi.mq.nat;

          import?com.rabbitmq.client.*;
          import?java.io.IOException;

          public?class?MyConsumer?{

          ????private?static?final?String?QUEUE_NAME?=?"ITEM_QUEUE";

          ????public?static?void?main(String[]?args)?throws?Exception?{
          ????????//1.?創(chuàng)建一個(gè)?ConnectionFactory?并進(jìn)行設(shè)置
          ????????ConnectionFactory?factory?=?new?ConnectionFactory();
          ????????factory.setHost("localhost");
          ????????factory.setVirtualHost("/");
          ????????factory.setUsername("guest");
          ????????factory.setPassword("guest");

          ????????//2.?通過連接工廠來創(chuàng)建連接
          ????????Connection?connection?=?factory.newConnection();

          ????????//3.?通過?Connection?來創(chuàng)建?Channel
          ????????Channel?channel?=?connection.createChannel();

          ????????//4.?聲明一個(gè)隊(duì)列
          ????????channel.queueDeclare(QUEUE_NAME,?true,?false,?false,?null);
          ????????System.out.println("?[*]?Waiting?for?messages.?To?exit?press?CTRL+C");

          ????????/*
          ???????????true:表示自動(dòng)確認(rèn),只要消息從隊(duì)列中獲取,無論消費(fèi)者獲取到消息后是否成功消費(fèi),都會(huì)認(rèn)為消息已經(jīng)成功消費(fèi)
          ???????????false:表示手動(dòng)確認(rèn),消費(fèi)者獲取消息后,服務(wù)器會(huì)將該消息標(biāo)記為不可用狀態(tài),等待消費(fèi)者的反饋,如果消費(fèi)者一
          ???????????直沒有反饋,那么該消息將一直處于不可用狀態(tài),并且服務(wù)器會(huì)認(rèn)為該消費(fèi)者已經(jīng)掛掉,不會(huì)再給其發(fā)送消息,
          ???????????直到該消費(fèi)者反饋。
          ????????*/


          ????????//5.?創(chuàng)建消費(fèi)者并接收消息
          ????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
          ????????????@Override
          ????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,
          ???????????????????????????????????????AMQP.BasicProperties?properties,?byte[]?body)

          ????????????????????throws?IOException?
          {
          ????????????????String?message?=?new?String(body,?"UTF-8");
          ????????????????System.out.println("?[x]?Received?'"?+?message?+?"'");
          ????????????}
          ????????};

          ????????//6.?設(shè)置?Channel?消費(fèi)者綁定隊(duì)列
          ????????channel.basicConsume(QUEUE_NAME,?true,?consumer);

          ????}
          }

          Send?message1?:?hello
          Send?message2?:?hello
          Send?message3?:?hello

          ?[*]?Waiting?for?messages.?To?exit?press?CTRL+C
          ?[x]?Received?'hello'
          ?[x]?Received?'hello'
          ?[x]?Received?'hello'

          當(dāng)我們啟動(dòng)生產(chǎn)者之后查看RabbitMQ管理后臺(tái)可以看到有一條消息正在等待被消費(fèi)。



          當(dāng)我們啟動(dòng)消費(fèi)者之后再次查看,可以看到積壓的一條消息已經(jīng)被消費(fèi)。


          總結(jié)

          隊(duì)列聲明queueDeclare的參數(shù):第一個(gè)參數(shù)表示隊(duì)列名稱、第二個(gè)參數(shù)為是否持久化(true表示是,隊(duì)列將在服務(wù)器重啟時(shí)生存)、第三個(gè)參數(shù)為是否是獨(dú)占隊(duì)列(創(chuàng)建者可以使用的私有隊(duì)列,斷開后自動(dòng)刪除)、第四個(gè)參數(shù)為當(dāng)所有消費(fèi)者客戶端連接斷開時(shí)是否自動(dòng)刪除隊(duì)列、第五個(gè)參數(shù)為隊(duì)列的其他參數(shù)。

          basicConsume的第二個(gè)參數(shù)autoAck: 應(yīng)答模式,true:自動(dòng)應(yīng)答,即消費(fèi)者獲取到消息,該消息就會(huì)從隊(duì)列中刪除掉,false:手動(dòng)應(yīng)答,當(dāng)從隊(duì)列中取出消息后,需要程序員手動(dòng)調(diào)用方法應(yīng)答,如果沒有應(yīng)答,該消息還會(huì)再放進(jìn)隊(duì)列中,就會(huì)出現(xiàn)該消息一直沒有被消費(fèi)掉的現(xiàn)象。

          這種簡單隊(duì)列的模式,系統(tǒng)會(huì)為每個(gè)隊(duì)列隱式地綁定一個(gè)默認(rèn)交換機(jī),交換機(jī)名稱為" (AMQP default)",類型為直連 direct,當(dāng)你手動(dòng)創(chuàng)建一個(gè)隊(duì)列時(shí),系統(tǒng)會(huì)自動(dòng)將這個(gè)隊(duì)列綁定到一個(gè)名稱為空的 Direct 類型的交換機(jī)上,綁定的路由鍵 routing key 與隊(duì)列名稱相同,相當(dāng)于channel.queueBind(queue:"QUEUE_NAME", exchange:"(AMQP default)“, routingKey:"QUEUE_NAME");雖然實(shí)例沒有顯式聲明交換機(jī),但是當(dāng)路由鍵和隊(duì)列名稱一樣時(shí),就會(huì)將消息發(fā)送到這個(gè)默認(rèn)的交換機(jī)中。這種方式比較簡單,但是無法滿足復(fù)雜的業(yè)務(wù)需求,所以通常在生產(chǎn)環(huán)境中很少使用這種方式。

          The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. It is not possible to explicitly bind to, or unbind from the default exchange. It also cannot be deleted.默認(rèn)交換機(jī)隱式綁定到每個(gè)隊(duì)列,其中路由鍵等于隊(duì)列名稱。不可能顯式綁定到,或從缺省交換中解除綁定。它也不能被刪除。

          ——引自 RabbitMQ 官方文檔

          spring-amqp方式

          引入 Maven 依賴

          <dependency>
          ????<groupId>com.rabbitmqgroupId>
          ????<artifactId>amqp-clientartifactId>
          ????<version>5.6.0version>
          dependency>????????
          <dependency>
          ????<groupId>org.springframework.amqpgroupId>
          ????<artifactId>spring-rabbitartifactId>
          ????<version>2.1.5.RELEASEversion>
          dependency>

          Spring Boot 學(xué)習(xí)教程:

          https://github.com/javastacks/spring-boot-best-practice

          Spring 配置文件

          <beans?xmlns="http://www.springframework.org/schema/beans"
          ???????xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          ???????xmlns:rabbit="http://www.springframework.org/schema/rabbit"
          ???????xsi:schemaLocation="http://www.springframework.org/schema/rabbit
          ???????????https://www.springframework.org/schema/rabbit/spring-rabbit.xsd
          ???????????http://www.springframework.org/schema/beans
          ???????????https://www.springframework.org/schema/beans/spring-beans.xsd"
          >


          ????<rabbit:connection-factory?id="connectionFactory"?host="localhost"?virtual-host="/"
          ????username="guest"?password="guest"/>

          ????<rabbit:template?id="amqpTemplate"?connection-factory="connectionFactory"/>
          ????<rabbit:admin?connection-factory="connectionFactory"/>
          ????<rabbit:queue?name="MY-QUEUE"/>
          beans>

          使用測(cè)試

          import?org.springframework.amqp.core.AmqpTemplate;
          import?org.springframework.context.ApplicationContext;
          import?org.springframework.context.support.ClassPathXmlApplicationContext;

          public?class?Main?{
          ????public?static?void?main(String[]?args)?{
          ????????ApplicationContext?app?=?new?ClassPathXmlApplicationContext("spring/rabbit-context.xml");
          ????????AmqpTemplate?amqpTemplate?=?app.getBean(AmqpTemplate.class);
          ????????amqpTemplate.convertAndSend("MY-QUEUE",?"Item");
          ????????String?msg?=?(String)?amqpTemplate.receiveAndConvert("MY-QUEUE");
          ????????System.out.println(msg);
          ????}
          }

          參考方法

          /**
          ?*?Convert?a?Java?object?to?an?Amqp?{@link?Message}?and?send?it?to?a?specific?exchange
          ?*?with?a?specific?routing?key.
          ?*
          ?*?@param?exchange?the?name?of?the?exchange
          ?*?@param?routingKey?the?routing?key
          ?*?@param?message?a?message?to?send
          ?*?@throws?AmqpException?if?there?is?a?problem
          ?*/

          void?convertAndSend(String?exchange,?String?routingKey,?Object?message)?throws?AmqpException;
          /**
          ??*?Receive?a?message?if?there?is?one?from?a?specific?queue?and?convert?it?to?a?Java
          ??*?object.?Returns?immediately,?possibly?with?a?null?value.
          ??*
          ??*?@param?queueName?the?name?of?the?queue?to?poll
          ??*?@return?a?message?or?null?if?there?is?none?waiting
          ??*?@throws?AmqpException?if?there?is?a?problem
          ??*/

          @Nullable
          Object?receiveAndConvert(String?queueName)?throws?AmqpException;






          關(guān)注Java技術(shù)??锤喔韶?/strong>



          戳原文,獲取精選面試題!
          瀏覽 30
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲一区二区av 亚洲一区欧美一区 | 乱伦色区| 国产一级精品成人无码毛片 | 亚洲天堂手机在线 | 日本爱爱视频在线 |