<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          3分鐘白話RocketMQ系列—— 如何發(fā)送消息

          共 4322字,需瀏覽 9分鐘

           ·

          2023-08-10 17:35


          白話3分鐘,快速了解RocketMQ如何發(fā)送消息。

          看完如果不了解,歡迎來打我。

          我們知道RocketMQ主要分為消息 生產、存儲(消息堆積)、消費 三大塊領域。

          那接下來,我們白話一下,RocketMQ是如何發(fā)送消息的,揭秘消息生產全過程。

          注意,如果白話中不小心提到相關代碼配置與類名,請參考RocketMQ 4.9.4版本

          關鍵字摘要

          • 哪些消息類型?

          • 發(fā)給誰?

          • 怎么發(fā)?

          • 怎么知道發(fā)成功了還是失敗了?

          • 發(fā)失敗了怎么辦?

          Q1: RocketMQ有哪些消息類型?

          RocketMQ生產消息時,支持多種「消息類型」:

          • 普通消息:發(fā)送普通消息。

          SendResult send(final Message msg);
          • 普通有序消息:發(fā)送普通有序消息,通過指定「消息篩選器selector」,動態(tài)決定發(fā)送哪個隊列。

          SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg);
          • 嚴格有序消息:發(fā)送嚴格有序消息,通過指定隊列,保證嚴格有序。

          SendResult send(final Message msg, final MessageQueue mq);
          • 事務消息:實現分布式事務。(屬于分布式事務范疇,區(qū)別較大,本文不再展開討論,后面單獨寫一篇針對「事務消息」的分析)

          上面列舉的三種send方法,都是以同步發(fā)送模式為例。
          定時/延遲消息從發(fā)送方式角度來說,不算一種獨立的消息類型。

          Q2:RocketMQ怎么知道一條消息要發(fā)送給誰?

          一般我們要發(fā)送一條消息給RocketMQ,需要創(chuàng)建這樣一個消息體。

          Message msg = new Message( "TestTopic""Hello World".getBytes() );

          在這個消息體里面,我們只單純指定了要發(fā)送的Topic名字,以及要發(fā)送的消息內容。

          那么,RocketMQ-client怎么知道這條消息要發(fā)送到RocketMQ集群中的哪一個broker上呢?

          這里需要了解下RocketMQ中Topic的「路由注冊與發(fā)現機制」。

          RocketMQ基本架構

          Topic 路由注冊與發(fā)現:

          • Broker 每30秒向 NameServer 發(fā)送心跳包,里面包含Topic的路由信息,包括主題的讀寫隊列數和操作權限等。NameServer會保存這些路由信息,并記錄最后一次收到 Broker 心跳包的時間(NameServer每10秒根據記錄的時間戳清理已經失聯120秒以上的 Broker)。

          • 生產者每30秒獲取一次主題的路由信息。這意味著消息生產者不會立即知道有新的 Broker 加入或者被移除。

          Topic路由信息

          Topic的路由信息,包括了Topic的 隊列queue和broker的映射關系 ,那么如何利用這個Topic的路由信息呢?

          我們需要根據前面的不同「消息類型」進行分別討論:

          • 普通消息:默認采用輪詢機制,消息會依次發(fā)送到Topic的每個可用的 Broker 的某個隊列queue上,以實現負載均衡。

          • 普通有序消息:根據傳遞的 MessageQueueSelector 和消息體 msg 內容,計算可以投遞的隊列queue,然后發(fā)送消息。(可以類比分庫分表中的分表計算寫入的方式)

          • 嚴格有序消息:根據傳遞的 MessageQueue 信息,強制消息發(fā)送到對應隊列queue上。(可以類比分庫分表中,強制指定物理表寫入的方式)

          根據消息類型獲取到目標隊列queue后,就可以根據Topic路由信息發(fā)送消息到指定broker上了。

          Q3:怎么發(fā)送一條消息?

          從發(fā)送模式角度來說,RocketMQ有三種「消息發(fā)送模式」:

          • 同步發(fā)送:調用發(fā)送消息方法后,同步阻塞,直到返回SendResult。

          SendResult send(final Message msg);
          • 異步發(fā)送:調用發(fā)送消息方法后,立即返回,發(fā)送結果會通過開發(fā)者自己注冊的回調函數SendCallback進行處理。

          void send(final Message msg, final SendCallback sendCallback);
          • 單向發(fā)送:這種方法完全不關心發(fā)送后的返回結果。顯然,它具有最大吞吐量,但也存在消息丟失的潛在風險。

          void sendOneway(final Message msg);

          上面列舉的三種send方法,都是以「普通消息」為例。

          「消息類型」 和 「消息發(fā)送模式」 是 N*M 的關系,所以聰明的你一定已經想到了,存在9種不同組合(不包括事物消息),RocketMQ也是在接口中定義了9種不同方法。

          Q4: 發(fā)送后,怎么知道消息發(fā)成功了還是失敗了?

          前面介紹了三種「消息發(fā)送模式」,其中「單向發(fā)送」屬于不可靠發(fā)送,我們無法知道是否發(fā)送成功。

          而「同步發(fā)送」和「異步發(fā)送」都是可靠發(fā)送,我們能夠獲取發(fā)送狀態(tài),知道是否成功。

          在「同步發(fā)送」中,我們可以根據SendResult中的sendStatus屬性判斷是否發(fā)送成功。

          SendResult類屬性

          在「異步發(fā)送」中,我們可以自定義實現SendCallbackonSuccess()方法和onException()方法,來判斷消息是否發(fā)送成功。

          SendCallback接口定義

          Q5: 消息發(fā)送失敗了怎么辦?

          如果消息發(fā)送失敗了,RocketMQ-client默認有重試機制,以確保消息的高可用性。

          前面提到,生產者每30秒獲取一次主題的路由信息,所以即使某個 Broker 宕機,消息發(fā)送者可能無法立即察覺到它的宕機狀態(tài)。

          但是,當消息發(fā)送者向某個 Broker 發(fā)送消息后,如果返回異常,生產者會在接下來的一段時間內(例如5分鐘)避免再次選擇該 Broker 上的隊列來發(fā)送消息。這樣做的目的是規(guī)避可能發(fā)生故障的 Broker。

          當然了,用戶也能根據返回的異常,自己定義業(yè)務重試、補償機制。

          需要注意的是,不同「消息類型」和「消息發(fā)送模式」的RocketMQ-client默認重試機制不同。

          消息類型:

          • 普通消息:無順序性要求,異常時RocketMQ-client默認重試。

          • 普通有序消息:異常時RocketMQ-client默認不重試,可以用戶自己捕獲異常重試,并發(fā)送到其他隊列。

          • 嚴格有序消息:保證嚴格有序,異常時RocketMQ-client默認不重試,可以用戶自己捕獲異常重試。

          注意:有序消息異常時RocketMQ-client都是默認不重試

          消息發(fā)送模式:

          • 同步發(fā)送:配置retryTimesWhenSendFailed默認重試次數。

          • 異步發(fā)送:配置retryTimesWhenSendAsyncFailed默認重試次數。

          • 單向發(fā)送:無重試機制,存在丟失消息的風險。

          注意:單向發(fā)送模式異常時RocketMQ-client默認不重試

          總結

          • 有哪些消息類型:普通消息、有序消息、事務消息

          • 發(fā)給誰?:Topic路由信息注冊與發(fā)現機制、普通消息輪詢發(fā)送、有序消息指定selector或者queue發(fā)送

          • 怎么發(fā)?:同步發(fā)送、異步發(fā)送、單向發(fā)送

          • 怎么知道發(fā)成功了還是失敗了?:同步&異步都能夠獲取發(fā)送狀態(tài)(可靠發(fā)送)、單向發(fā)送不可靠

          • 發(fā)失敗了怎么辦?: 失敗重試機制

          3分鐘到了嗎?應該對RocketMQ如何生產消息有全面了解了吧。
          如果還想了解更多,歡迎關注下一期內容。






          往期熱門筆記合集推薦:


          原創(chuàng):阿丸筆記(微信公眾號:aone_note),歡迎 分享,轉載請保留出處。

          沒有留言功能的悲傷,掃描下方二維碼「加我」聊些有的沒的吧~

                                                                                        覺得不錯,就點個 再看 吧??



          瀏覽 122
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  欧美成人电影一区二区 | 粉嫩99精品99久久久久久桃色 | 黄片无码在线免费观看 | 97色色欧美 | 成人视频网站在线观看 |