<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          RabbitMQ簡(jiǎn)介以及應(yīng)用

          共 9061字,需瀏覽 19分鐘

           ·

          2020-08-14 17:39

          程序員的成長(zhǎng)之路
          互聯(lián)網(wǎng)/程序員/技術(shù)/資料共享?
          關(guān)注


          閱讀本文大概需要 7 分鐘。

          原文轉(zhuǎn)載:https://www.jianshu.com/p/2f55cd7a3e1c
          作者:會(huì)跳舞的機(jī)器人

          一、簡(jiǎn)要介紹

          • 開(kāi)源AMQP實(shí)現(xiàn),Erlang語(yǔ)言編寫,支持多種客戶端
          • 分布式、高可用、持久化、可靠、安全
          • 支持多種協(xié)議:AMQP、STOMP、MQTT、HTTP
          • 適用于多系統(tǒng)之間的業(yè)務(wù)解耦的消息中間件

          二、基本概念

          1、exchange:交換器,負(fù)責(zé)接收消息,轉(zhuǎn)發(fā)消息至綁定的隊(duì)列,有四種類型:

          • direct:完全匹配的路由
          • topic:模式匹配的路由
          • fanout:廣播模式
          • headers:鍵值對(duì)匹配路由
          Exchange屬性:
          • 持久化:如果啟用,那么rabbit服務(wù)重啟之后仍然存在
          • 自動(dòng)刪除:如果啟用,那么交換器將會(huì)在其綁定的隊(duì)列都被刪除掉之后自動(dòng)刪除掉自身

          2、Queue:隊(duì)列,rabbitmq的內(nèi)部對(duì)象,用于存儲(chǔ)消息,其屬性類似于Exchange,同樣可以設(shè)置是否持久化、自動(dòng)刪除等。

          消費(fèi)者從Queue中獲取消息并消費(fèi)。多個(gè)消費(fèi)者可以訂閱同一個(gè)Queue,這時(shí)Queue中的消息會(huì)被平均分?jǐn)偨o多個(gè)消費(fèi)者進(jìn)行處理,而不是每個(gè)消費(fèi)者都收到所有的消息并處理。

          3、Binding:綁定,根據(jù)路由規(guī)則綁定交換器與隊(duì)列

          4、Routing:路由鍵,路由的關(guān)鍵字

          三、消息的可靠性

          • Message acknowledgment:消息確認(rèn),在消息確認(rèn)機(jī)制下,收到回執(zhí)才會(huì)刪除消息,未收到回執(zhí)而斷開(kāi)了連接,消息會(huì)轉(zhuǎn)發(fā)給其他消費(fèi)者,如果忘記回執(zhí),會(huì)導(dǎo)致消息堆積,消費(fèi)者重啟后會(huì)重復(fù)消費(fèi)這些消息并重復(fù)執(zhí)行業(yè)務(wù)邏輯。

          • Message durability:消息持久化,設(shè)置消息持久化可以避免絕大部分消息丟失,比如rabbitmq服務(wù)重啟,但是采用非持久化可以提升隊(duì)列的處理效率。如果要確保消息的持久化,那么消息對(duì)應(yīng)的Exchange和Queue同樣要設(shè)置為持久化。

          • Prefetch count,每次發(fā)送給消費(fèi)者消息的數(shù)量,默認(rèn)為1

          另外,如果需要可靠性業(yè)務(wù),需要設(shè)置持久化和ack機(jī)制,如果系統(tǒng)高吞吐,可以設(shè)置為非持久化、noack、自動(dòng)刪除機(jī)制。

          四、簡(jiǎn)單應(yīng)用

          模擬這樣一個(gè)業(yè)務(wù)場(chǎng)景,用戶下單成功后,需要給用戶增加積分,同時(shí)還需要給用戶發(fā)送下單成功的消息,這是在電商業(yè)務(wù)中很常見(jiàn)的一個(gè)業(yè)務(wù)場(chǎng)景。
          如果系統(tǒng)是微服務(wù)架構(gòu),可能用戶下單功能在訂單服務(wù),給用戶增加積分的功能在積分服務(wù),給用戶發(fā)送通知消息的功能在通知服務(wù),各個(gè)服務(wù)之間解耦,互不影響。那么要實(shí)現(xiàn)上述的業(yè)務(wù)場(chǎng)景,消息中間件rabbitmq是一個(gè)很好的選擇。
          原因如下:
          • 高性能,它的實(shí)現(xiàn)語(yǔ)言是天生具備高并發(fā)高可用的erlang 語(yǔ)言
          • 支持消息的持久化,即使服務(wù)器掛了,也不會(huì)丟失消息
          • 消息應(yīng)答(ack)機(jī)制,消費(fèi)者消費(fèi)完消息后發(fā)送一個(gè)消息應(yīng)答,rabbitmq才會(huì)刪除消息,確保消息的可靠性
          • 支持高可用集群
          • 靈活的路由
          實(shí)現(xiàn)思路:
          用戶下單成功后,rabbitmq發(fā)送一條消息至EXCHANGE.ORDER_CREATE交換器,該交換器綁定了兩個(gè)隊(duì)列,QUEUE.ORDER_INCREASESCORE、QUEUE.ORDER_NOTIFY,消費(fèi)者訂閱這兩個(gè)隊(duì)列分別用來(lái)處理增加積分、發(fā)送用戶通知。
          如果后續(xù)日志系統(tǒng)還需要記錄下單的相關(guān)日志,那么我們只需要再定義一個(gè)隊(duì)列并將其綁定到EXCHANGE.ORDER_CREATE即可。
          下單發(fā)rabbitmq消息

          package?com.robot.rabbitmq;

          import?com.rabbitmq.client.AMQP;
          import?com.rabbitmq.client.Channel;
          import?com.rabbitmq.client.Connection;
          import?com.rabbitmq.client.ConnectionFactory;

          import?java.io.IOException;
          import?java.util.UUID;
          import?java.util.concurrent.TimeoutException;

          /**
          ?*?@author:?會(huì)跳舞的機(jī)器人
          ?*?@date:?2017/10/13?10:46
          ?*?@description:?模擬用戶下單之后發(fā)送rabbitmq消息
          ?*/
          public?class?OrderCreator?{
          ????//?交換器名稱
          ????private?static?final?String?EXCHANGE?=?"EXCHANGE.ORDER_CREATE";
          ????//?消息內(nèi)容
          ????private?static?String?msg?=?"create?order?success";

          ????/**
          ?????*?模擬創(chuàng)建訂單后發(fā)送mq消息
          ?????*/
          ????public?void?createOrder()?{
          ????????System.out.println("下單成功,開(kāi)始發(fā)送rabbitmq消息");

          ????????ConnectionFactory?connectionFactory?=?new?ConnectionFactory();
          ????????connectionFactory.setHost("192.168.12.44");
          ????????connectionFactory.setPort(56720);
          ????????connectionFactory.setUsername("baibei");
          ????????connectionFactory.setPassword("baibei");

          ????????Connection?connection;
          ????????Channel?channel;
          ????????try?{
          ????????????connection?=?connectionFactory.newConnection();
          ????????????channel?=?connection.createChannel();
          ????????????//?持久化
          ????????????boolean?durable?=?true;
          ????????????//?topic類型
          ????????????String?type?=?"topic";
          ????????????//?聲明交換器,如果交換器不存在則創(chuàng)建之
          ????????????channel.exchangeDeclare(EXCHANGE,?type,?durable);

          ????????????String?messgeId?=?UUID.randomUUID().toString();
          ????????????//?deliveryMode>=2表示設(shè)置消息持久化
          ????????????AMQP.BasicProperties?props?=?new?AMQP.BasicProperties.Builder().deliveryMode(2).messageId(messgeId).build();
          ????????????//?發(fā)布消息
          ????????????String?routingKey?=?"order_create";
          ????????????channel.basicPublish(EXCHANGE,?routingKey,?props,?msg.getBytes("utf-8"));
          ????????????connection.close();
          ????????}?catch?(IOException?e)?{
          ????????????e.printStackTrace();
          ????????}?catch?(TimeoutException?e)?{
          ????????????e.printStackTrace();
          ????????}
          ????}
          }

          積分系統(tǒng)訂閱消息

          package?com.robot.rabbitmq;

          import?com.rabbitmq.client.AMQP;
          import?com.rabbitmq.client.Channel;
          import?com.rabbitmq.client.Connection;
          import?com.rabbitmq.client.ConnectionFactory;
          import?com.rabbitmq.client.Consumer;
          import?com.rabbitmq.client.Envelope;
          import?com.rabbitmq.client.ShutdownSignalException;

          import?java.io.IOException;
          import?java.util.concurrent.TimeoutException;

          /**
          ?*?@author:?會(huì)跳舞的機(jī)器人
          ?*?@date:?2017/10/13?16:02
          ?*?@description:?rabbitmq消費(fèi)者,模擬下單成功后給用戶增加積分
          ?*/
          public?class?IncreaseScoreConsumer?implements?Consumer?{
          ????private?Connection?connection;
          ????private?Channel?channel;
          ????//?交換器名稱
          ????private?static?final?String?EXCHANGE?=?"EXCHANGE.ORDER_CREATE";
          ????//?增加積分隊(duì)列名稱
          ????private?static?final?String?QUEUENAME?=?"QUEUE.ORDER_INCREASESCORE";


          ????public?void?consume()?{
          ????????//?初始化rabbitmq連接信息
          ????????ConnectionFactory?connectionFactory?=?new?ConnectionFactory();
          ????????connectionFactory.setHost("192.168.12.44");
          ????????connectionFactory.setPort(56720);
          ????????connectionFactory.setUsername("baibei");
          ????????connectionFactory.setPassword("baibei");
          ????????try?{
          ????????????connection?=?connectionFactory.newConnection();
          ????????????channel?=?connection.createChannel();
          ????????????//?聲明交換器
          ????????????channel.exchangeDeclare(EXCHANGE,?"topic",?true);
          ????????????//?聲明隊(duì)列
          ????????????channel.queueDeclare(QUEUENAME,?true,?false,?false,?null);
          ????????????//?交換器與隊(duì)列綁定并設(shè)置routingKey
          ????????????channel.queueBind(QUEUENAME,?EXCHANGE,?"order_create");
          ????????????//?消費(fèi)消息,callback是該類,關(guān)閉自動(dòng)確認(rèn)消息,在完成業(yè)務(wù)邏輯后手動(dòng)確認(rèn)確認(rèn)
          ????????????channel.basicConsume(QUEUENAME,?false,?this);
          ????????}?catch?(IOException?e)?{
          ????????????e.printStackTrace();
          ????????}?catch?(TimeoutException?e)?{
          ????????????e.printStackTrace();
          ????????}
          ????}

          ????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????String?msg?=?new?String(body,?"UTF-8");
          ????????System.out.println("《積分系統(tǒng)》收到訂單消息:"?+?msg?+?",給用戶增加積分......");
          ????????//?手動(dòng)確認(rèn)消息
          ????????channel.basicAck(envelope.getDeliveryTag(),?false);

          ????????/**
          ?????????*?channel.basicReject(envelope.getDeliveryTag(),?false);該方法會(huì)丟棄掉隊(duì)列中的這條消息
          ?????????*?channel.basicReject(envelope.getDeliveryTag(),?true);該方法會(huì)把消息重新放回隊(duì)列
          ?????????*?一般系統(tǒng)會(huì)設(shè)定一個(gè)重試次數(shù),如果超過(guò)重試次數(shù),則會(huì)丟棄消息,反之則會(huì)把消息再放入隊(duì)列
          ?????????*/
          ????}

          ????public?void?handleConsumeOk(String?consumerTag)?{

          ????}

          ????public?void?handleCancelOk(String?consumerTag)?{

          ????}

          ????public?void?handleCancel(String?consumerTag)?throws?IOException?{

          ????}

          ????public?void?handleShutdownSignal(String?consumerTag,?ShutdownSignalException?sig)?{

          ????}

          ????public?void?handleRecoverOk(String?consumerTag)?{

          ????}

          }

          通知系統(tǒng)訂閱消息

          package?com.robot.rabbitmq;

          import?com.rabbitmq.client.AMQP;
          import?com.rabbitmq.client.Channel;
          import?com.rabbitmq.client.Connection;
          import?com.rabbitmq.client.ConnectionFactory;
          import?com.rabbitmq.client.Consumer;
          import?com.rabbitmq.client.Envelope;
          import?com.rabbitmq.client.ShutdownSignalException;

          import?java.io.IOException;
          import?java.util.concurrent.TimeoutException;

          /**
          ?*?@author:?會(huì)跳舞的機(jī)器人
          ?*?@date:?2017/10/13?16:20
          ?*?@description:?rabbitmq消費(fèi)者,模擬下單成功后給用戶發(fā)送通知
          ?*/
          public?class?NotifyConsumer?implements?Consumer?{
          ????private?Connection?connection;
          ????private?Channel?channel;

          ????//?交換器名稱
          ????private?static?final?String?EXCHANGE?=?"EXCHANGE.ORDER_CREATE";
          ????//?通知用戶下單成功通知隊(duì)列名稱
          ????private?static?final?String?QUEUENAME?=?"QUEUE.ORDER_NOTIFY";


          ????public?void?consume()?{
          ????????//?初始化rabbitmq連接信息
          ????????ConnectionFactory?connectionFactory?=?new?ConnectionFactory();
          ????????connectionFactory.setHost("192.168.12.44");
          ????????connectionFactory.setPort(56720);
          ????????connectionFactory.setUsername("baibei");
          ????????connectionFactory.setPassword("baibei");
          ????????try?{
          ????????????connection?=?connectionFactory.newConnection();
          ????????????channel?=?connection.createChannel();
          ????????????//?聲明交換器
          ????????????channel.exchangeDeclare(EXCHANGE,?"topic",?true);
          ????????????//?聲明隊(duì)列
          ????????????channel.queueDeclare(QUEUENAME,?true,?false,?false,?null);
          ????????????//?交換器與隊(duì)列綁定并設(shè)置routingKey
          ????????????channel.queueBind(QUEUENAME,?EXCHANGE,?"order_create");
          ????????????//?消費(fèi)消息,callback是該類,關(guān)閉自動(dòng)確認(rèn)消息,在完成業(yè)務(wù)邏輯后手動(dòng)確認(rèn)確認(rèn)
          ????????????channel.basicConsume(QUEUENAME,?false,?this);
          ????????}?catch?(IOException?e)?{
          ????????????e.printStackTrace();
          ????????}?catch?(TimeoutException?e)?{
          ????????????e.printStackTrace();
          ????????}
          ????}

          ????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
          ????????String?msg?=?new?String(body,?"UTF-8");
          ????????System.out.println("《通知系統(tǒng)》收到訂單消息:"?+?msg?+?",開(kāi)始給用戶發(fā)送通知......");
          ????????//?手動(dòng)確認(rèn)消息
          ????????channel.basicAck(envelope.getDeliveryTag(),?false);

          ????????/**
          ?????????*?channel.basicReject(envelope.getDeliveryTag(),?false);該方法會(huì)丟棄掉隊(duì)列中的這條消息
          ?????????*?channel.basicReject(envelope.getDeliveryTag(),?true);該方法會(huì)把消息重新放回隊(duì)列
          ?????????*?一般系統(tǒng)會(huì)設(shè)定一個(gè)重試次數(shù),如果超過(guò)重試次數(shù),則會(huì)丟棄消息,反之則會(huì)把消息再放入隊(duì)列
          ?????????*/
          ????}

          ????public?void?handleConsumeOk(String?consumerTag)?{

          ????}

          ????public?void?handleCancelOk(String?consumerTag)?{

          ????}

          ????public?void?handleCancel(String?consumerTag)?throws?IOException?{

          ????}

          ????public?void?handleShutdownSignal(String?consumerTag,?ShutdownSignalException?sig)?{

          ????}

          ????public?void?handleRecoverOk(String?consumerTag)?{

          ????}
          }

          測(cè)試

          package?com.robot.rabbitmq;

          /**
          ?*?@author:?會(huì)跳舞的機(jī)器人
          ?*?@date:?2017/10/13?16:27
          ?*?@description:
          ?*/
          public?class?Test?{
          ????public?static?void?main(String[]?args)?{

          ????????IncreaseScoreConsumer?increaseScoreConsumer?=?new?IncreaseScoreConsumer();
          ????????increaseScoreConsumer.consume();

          ????????NotifyConsumer?notifyConsumer?=?new?NotifyConsumer();
          ????????notifyConsumer.consume();

          ????????OrderCreator?orderCreator?=?new?OrderCreator();
          ????????for?(int?i?=?0;?i?????????????orderCreator.createOrder();
          ????????}
          ????}
          }

          輸出:

          下單成功,開(kāi)始發(fā)送rabbitmq消息
          《積分系統(tǒng)》收到訂單消息:create order success,給用戶增加積分......
          《通知系統(tǒng)》收到訂單消息:create order success,開(kāi)始給用戶發(fā)送通知......
          下單成功,開(kāi)始發(fā)送rabbitmq消息
          《積分系統(tǒng)》收到訂單消息:create order success,給用戶增加積分......
          《通知系統(tǒng)》收到訂單消息:create order success,開(kāi)始給用戶發(fā)送通知......
          下單成功,開(kāi)始發(fā)送rabbitmq消息
          《積分系統(tǒng)》收到訂單消息:create order success,給用戶增加積分......
          《通知系統(tǒng)》收到訂單消息:create order success,開(kāi)始給用戶發(fā)送通知......

          推薦閱讀:

          “kill -9”一時(shí)爽,秋后算賬淚兩行

          IntelliJ IDEA 2020.2 正式發(fā)布,真香!

          5T技術(shù)資源大放送!包括但不限于:C/C++,Linux,Python,Java,PHP,人工智能,單片機(jī),樹莓派,等等。在公眾號(hào)內(nèi)回復(fù)「2048」,即可免費(fèi)獲?。?!

          微信掃描二維碼,關(guān)注我的公眾號(hào)

          寫留言

          朕已閱?

          瀏覽 39
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  www啪啪 | 国产精品男人操女人 | 北条麻妃国产九九九 | 伊人狼人在线 | 成人免费黄片视频 |