<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          springcloud微服務架構開發(fā)實戰(zhàn):分布式消息總線

          共 2565字,需瀏覽 6分鐘

           ·

          2022-05-15 00:36


          消息總線的定義

          前面在1.4.2節(jié)中強調過,在微服務架構中,經(jīng)常會使用REST 服務或基于消息的通信機制。

          在3.6節(jié)中也詳細介紹了消息通信的實現(xiàn)方式。消息總線就是一種基于消息的通信機制。

          消息總線是一種通信工具,可以在機器之間互相傳輸消息、文件等,它扮演著—種消息路由的角色,擁有一套完備的路由機制來決定消息傳輸方向。發(fā)送端只需要向消息總線發(fā)出消息,而不用管消息被如何轉發(fā)。

          Spring Cloud Bus通過輕量消息代理連接各個分布的節(jié)點。管理和傳播所有分布式項目中的消息,本質是利用了MQ的廣播機制在分布式的系統(tǒng)中傳播消息,目前常用的有Kafka和RabbitMQ等。


          消息總線常見的設計模式

          在消息總線中,常見的設計模式有點對點模式及訂閱/發(fā)布模式。

          1.點對點(P2P)

          點對點模式包含三個角色。

          • 消息隊列( Queue )。

          • 生產(chǎn)者( Producer ) 。

          • 消費者(Consumer )。

          點對點模式中的每個消息都被發(fā)送到一個特定的隊列,消費者從隊列中獲取消息。隊列保留著消息,直到它們被消費或超時。圖16-1展示了點對點模式的運行流程圖。


          點對點模式具有以下特點。

          • 每個消息只有一個消費者,即消息一旦被消費,就不在消息隊列中了。

          • 生產(chǎn)者和消費者之間在時間上沒有依賴性,也就是說當生產(chǎn)者發(fā)送了消息之后,不管消費者有沒有正在運行,都不會影響到消息被發(fā)送到隊列。

          • 消費者在成功接收消息之后需向隊列應答成功,這樣消息隊列才能知道消息是否被成功消費。

          2.訂閱/發(fā)布(PublSub )

          訂閱/發(fā)布模式包含三個角色。

          • 主題(Topic )。

          • 發(fā)布者(Publisher )。

          • 訂閱者(Subscriber )。

          訂閱/發(fā)布模式中,多個發(fā)布者將消息發(fā)送到對應的主題,系統(tǒng)將這些消息傳遞給多個訂閱者。圖16-2展示了訂閱/發(fā)布模式的運行流程圖。


          訂閱/發(fā)布模式具有以下特點。

          • 每個消息可以有多個消費者。

          • 主題可以被認為是消息的傳輸中介,發(fā)布者發(fā)布消息到主題,訂閱者從主題訂閱消息。

          • 主題使得消息訂閱者和消息發(fā)布者保持互相獨立,不需要接觸即可保證消息的傳送。

          消息總線的意義

          在微服務架構中,經(jīng)常會使用REST服務作為服務間的通信機制。REST以其輕量、簡單、易理解而著稱,但這種通信機制也并非適合所有的場景。例如,在一些高并發(fā)、高可靠、實時的場景,則需要消息總線來幫忙。

          概括起來,消息總線具有以下幾個優(yōu)點。

          1.實時性高

          與REST 服務的“請求—響應”模式不同,消息總線的實時性非常高。使用了消息總線,生產(chǎn)者一方只要把消息往隊列里一扔,就可以立馬返回,響應用戶了。無須等待處理結果,實現(xiàn)了異步處理。

          同時,對于消費者而言,消費者對于消息的到達感知也非常及時。消費者會對消息總線進行監(jiān)聽,只要有消息進入隊列,就可以馬上得到通知。這種優(yōu)勢是REST 服務所不能具備的。在REST服務中,要想及時獲取到更新通知,就不得不進行輪詢。這往往非常低效。

          2生產(chǎn)者與消費者解耦

          在消息總線中,生產(chǎn)者負責將消息發(fā)送到隊列中,而消費者把消息從隊列中取出來。生產(chǎn)者無須等待消費者啟動,消費者也無須關心生產(chǎn)者是否已經(jīng)處于就緒狀態(tài)。所以,這種模式能很好地實現(xiàn)生產(chǎn)者與消費者的解耦。

          然而,如果是在REST服務中,服務調用方必須等待服務的提供方準備好了才能調用,否則就會調用失敗。

          3.故障率低

          消息總線擁有對其他通信方式更高的成功率。一方面,生產(chǎn)者與消費者之間實現(xiàn)了解耦,所以,生產(chǎn)者與消費者之間不存在強關聯(lián)關系,即便是生產(chǎn)者或消費者任意一方掉線了,也不會影響消息最終的送達;另一方面,消息總線往往會結合數(shù)據(jù)庫來實現(xiàn)消息的持久化,并設置狀態(tài)標識。只有消息消費成功,才會去修改狀態(tài)標識。

          消息總線同時還承擔著緩沖區(qū)的作用。大量業(yè)務消息首先會進入消息隊列進行緩存,消息的消費者可以根據(jù)自己的處理能力來進行消費,所以不管消息的數(shù)據(jù)量有多少,都不會對消費者造成沖擊。

          消息總線常見的實現(xiàn)方式

          《分布式系統(tǒng)常用技術及案例分析》一書列舉了非常多的流行的、開源的分布式消息服務,如Apache ActiveMQ、RabbitMQ、Apache RocketMQ、Apache Kafka等。這些消息中間件都實現(xiàn)了點對點模式及訂閱/發(fā)布模式等常見的消息模式。

          以下例子演示的是使用ActiveMQ實現(xiàn)生產(chǎn)者—消費者的Java實現(xiàn)方式。

          生產(chǎn)者程序Producer.java:

          public class Producer{
          private static final Logger LOGGER=LoggerFactory.getLogger (Producer.
          class);
          private static final string BROKER_URE = ActiveMQConnection.DEFAULT_
          BROKER URL;
          private static final String SUBJECT= "waylau-queue";
          public static void main (String[] args) throws JMSException f
          //初始化連接工廠
          ConnectionFactory connectionFactory= new ActiveMQConnection
          Factory(BROKER_URL);
          //獲得連接
          Connection conn = connectionFactory.createConnection();
          //啟動連接
          conn.start(;
          //創(chuàng)建session,第一個參數(shù)表示會話是否在事務中執(zhí)行,第二個參數(shù)設定會話的應答模式
          Session session = conn.createSession(false, Session.AUTO_
          ACKNOWLEDGE);
          //創(chuàng)建隊列
          Destination dest = session.createQueue(SUBJECT);
          //createTopic方法用來創(chuàng)建Topic
          //session.createTopic ("TOPIC");
          //通過session 可以創(chuàng)建消息的生產(chǎn)者
          MessageProducer producer = session.createProducer(dest);
          for(int i=0;i<100;i++){
          //初始化一個MQ消息
          TextMessage message= session.createTextMessage ("Welcome to
          waylau.com"
          +i);
          //發(fā)送消息
          producer. send(message);
          LOGGER.info("send message {}",i);
          //關閉 MQ 連接
          conn.close();
          }
          }

          消費者程序Consumer.java:

          public class Consumer implements MessageListener {
          private static finalLogger LOGGER = LoggerFactory.getLogger
          (Consumer.class);
          private static final String BROKER_URL = ActiveMQConnection.DEFAULT
          BROKER URL;
          private static final string SUBJECT = "waylau-queue";
          public static void main(String[] args) throws JMSExceptionf
          //初始化 ConnectionFactory
          ConnectionFactory connectionFactory
          =new ActiveMOConnection
          Factory(BROKER_URL);
          //創(chuàng)建Mo連接
          Connection conn = connectionFactory.createConnection()
          ;
          //啟動連接
          conn .start(;
          //創(chuàng)建會話
          Session session= conn.createSession (false,Session.AUTO_
          ACKNOWLEDGE);
          //通過會話創(chuàng)建目標
          Destination dest = session.createQueue(SUBJECT);
          //創(chuàng)建 MO 消息的消費者
          MessageConsumer consumer = session.createConsumer(dest);
          //初始化 MessageListener
          consumer me=newConsumer();
          //給消費者設定監(jiān)聽對象
          consumer .setMessageListener (me);
          @override
          public void onMessage(Message message){
          TextMessage txtMessage =(TextMessage)message;
          try{
          LOGGER.info("get message " + txtMessage.getText());
          }catch (JMSException e) {
          LOGGER.error("error {}",e));
          }
          }

          執(zhí)行命令來啟動ActiveMQa:

          bin/activemg start

          生產(chǎn)者執(zhí)行如下命令:

          mvn clean compile exec:java -Dexec.mainClass=com.waylau.activemq.ProducerApp

          輸出如下。

          20:12:10.807 [ActiveMQ Task-1]INEO org.apache.activemq.transport.
          failover.FailoverTransport- Successfully connected to tcp://localhost:61616
          20:12:10.928[main] INFOcom.waylau.activemq.Producer- send message 0
          20:12:10.963 [main] INPO com.waylau.activemq.Producer- send message 1
          20:12:10.992 [main] INFO com.waylau.activemq.Producer - send message 2
          20:12:11.019[main] INFO com.waylau.activemq.Producer - send message 3
          20:12:11.036[main] INFOcom.waylau.activemq.Producer- send message 4
          20:12:11.058 [main] INFO com.waylau.activemq.producer -send message 5
          20:12:11.085[main] INFOcom.waylau.activemq.Producer - send message6
          20:12:11.113 [main] INFOcom.waylau.activemq.Producer - send message 7
          20:12:11.141[main] INFOcom.waylau.activemq.Producer - send message 8
          20:12:11.191 [main] INFO com.waylau.activemq.Producer- send message 9

          消費者執(zhí)行如下命令:

          mvn clean compile exec:java-Dexec.mainClass=com.waylau.activemq. ConsumerApp

          輸出如下。

          20:12:05.262[ActiveMQ Task-1] INFO org.apache.activemq.transport.
          failover.FailoverTransport- Successfully connected to tcp://localhost:
          61616
          20:12:10.875 [ActiveMQ Session Task-1] INEOcom.waylau.activemg.Consumer -
          get message welcome to waylau.com o
          20:12:10.939 [ActiveMQ Session Task-1]INFO com.waylau.activemq.Consumer-
          get message welcome to waylau.com 1
          20:12:10.965 [ActiveMQ Session Task-1] INFO com.waylau.activemq.Consumer-
          get message Welcome to waylau.com 2
          20:12:10.994 [ActiveMQ Session Task-1] INFO com.waylau.activemq. Consumer -
          get message Welcome to waylau .com 3
          20:12:11.020 [ActiveMQ Session Task-1] INFO com.waylau.activemq. Consumer-
          get message Welcome to waylau.com 4
          20:12:11.038 [ActiveMQ Session Task-1] INFO com.waylau.activemq.Consumer-
          get message Welcome to waylau.com 5
          20:12:11.059 [ActiveMQ Session Task-1] INFO com.waylau.activemq. Consumer -
          get message Welcome to waylau.com6
          20:12:11.086[ActiveMQ Session Task-1] INEO com.waylau.activemq. Consumer-
          get message welcome to waylau.com 7
          20:12:11.114[ActiveMQ Session Task-1] INFO com.waylau.activemq.Consumer-
          get message Welcome to waylau.com 8
          20:12:11.142 [ActiveMQ Session Task-1] INFO com.waylau.activemq. Consumer-
          get message Welcome to waylau.com 9

          上述例子的源碼,可以在
          https://github.com/waylau/distributed-systems-technologies-and-cas-es-analysis網(wǎng)址的samples目錄下找到。

          Spring Cloud Bus 實現(xiàn)消息總線

          Spring Cloud Bus通過輕量消息代理連接各個分布的節(jié)點,管理和傳播所有分布式項目中的消息,本質是利用了消息中間件的廣播機制在分布式的系統(tǒng)中傳播消息。

          目前Spring Cloud Bus所支持的常用的消息中間件有RabbitMQ和Kafka,使用時,只須添加
          spring-cloud-starter-bus-amqp或spring-cloud-starter-bus-kafka依賴即可。同時,需要確保相關的消息中間件連接配置正確。

          下面是使用RabbitMQ作為Spring Cloud Bus 的application.yml配置情況。

          spring:
          rabbitmg:
          host: mybroker .com
          port:5672
          username:user
          password:secret

          其中,spring.rabbitmq.host配置項用于指定RabbitMQ的主機位置。

          Spring Cloud Bus支持消息發(fā)送到所有已監(jiān)聽的節(jié)點,或者某個特定服務的所有節(jié)點。同時,Spring Cloud Bus提供了一些HTTP接口/bus/*,用于觸發(fā)Spring Cloud Bus內(nèi)部的事件。

          目前,Spring Cloud Bus主要有以下兩個接口實現(xiàn)。

          • ./bus/env:發(fā)送鍵值對去更新每個節(jié)點的Spring Environment。

          • ./bus/refresh:重新加載每一個應用的配置信息,類似于/refresh。

          所以,Spring Cloud Bus結合Spring Cloud Config 的使用,可以實現(xiàn)配置文件的自動更新。

          本篇文章內(nèi)容給大家講解的是分布式消息總線

          1. 下篇文章給大家講解的是SpringCloudBus 設計原理;

          2. 覺得文章不錯的朋友可以轉發(fā)此文關注小編;

          3. 感謝大家的支持!


          本文就是愿天堂沒有BUG給大家分享的內(nèi)容,大家有收獲的話可以分享下,想學習更多的話可以到微信公眾號里找我,我等你哦。

          瀏覽 56
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  天天爽天天爽天天爽天天爽天天爽 | 成人网站www污污污免费网站 | 黄片视频一区二区三区 | 在线观看黄色电影 | 青青草黄色成人视频 |