<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          要將RocketMQ中臺化,有點小激動

          共 8517字,需瀏覽 18分鐘

           ·

          2021-06-13 15:05


          本文字數(shù):4223

          預計閱讀時間:13分鐘


          一、RocketMQ簡介

          RocketMQ是一個高可用、高性能、高可靠的分布式消息隊列,相對于kafka更適合處理業(yè)務系統(tǒng)之間的消息。

          • 它具有很多特性,例如:

            • 發(fā)布訂閱
            • 順序、事務、定時消息
            • 消息堆積、重試,回溯等等
          • 它通過同步刷盤同步雙寫等技術(shù)手段來實現(xiàn)高可靠,保證如下情況消息不丟:

            • 可恢復性故障:broker或OS crash等
            • 不可恢復性故障:磁盤損壞等
          • 它采用多項技術(shù)優(yōu)化來滿足性能要求:

            • 順序IO
            • PageCache和mmap
            • 內(nèi)存預熱和鎖定
            • 異步提交和刷盤
            • 堆外內(nèi)存緩沖等等

          所以,它的本質(zhì)決定的其架構(gòu)一定是復雜的,參考RocketMQ官方架構(gòu)圖:

          這里不再介紹各個組件的含義,可以參考RocketMQ架構(gòu)設(shè)計。

          RocketMQ經(jīng)過阿里多年雙十一的檢驗,其穩(wěn)定性不言而喻。

          可作為搜狐視頻的消息中臺,還需要很長一段路要走,為什么這么說呢?

          二、運維之痛

          早在2014年我們就引入了RocketMQ作為消息中間件,其附帶了基本的命令行工具。

          但是命令行運維此等龐然大物會讓人感到力不從心,好在社區(qū)提供了一個web控制臺:RocketMQ-Console。

          在初期,簡單的控制臺已經(jīng)能滿足基本的需求。但是隨著各個業(yè)務逐漸接入,需求也紛至沓來。

          我們在RocketMQ-Console的修修補補已經(jīng)無法滿足了,主要體現(xiàn)在如下幾點:

          • 從業(yè)務方的角度:
            • 偏重運維,一般業(yè)務用戶不關(guān)心集群的數(shù)據(jù)和狀態(tài),無法聚焦。
            • 使用起來繁瑣,且直接操作集群,易誤操作。
            • 沒有監(jiān)控預警功能。
            • 無法滿足業(yè)務用戶的需求,包括但不限于:
              • 序列化
              • trace
              • 流控,隔離降級
              • 埋點統(tǒng)計監(jiān)控等等
            • 一些隱性問題無法解決。
          • 從管理員維度:
            • 無用戶概念,任何人都能直接操作集群,易誤操作且比較危險。
            • 無集群管理功能,日常更新或機器替換需要手動部署,非常耗時、麻煩且易出錯。
            • 無相關(guān)數(shù)據(jù)統(tǒng)計,監(jiān)控,預警等,往往有問題不能及時發(fā)現(xiàn)。

          另外,RocketMQ有一些潛在約定、使用規(guī)范、最佳實踐、bug或優(yōu)化等等,用文檔說明也無濟于事。

          所以與其寫文檔不如將經(jīng)驗和實踐轉(zhuǎn)換為產(chǎn)品,能夠更好的服務于業(yè)務及運維集群,于是MQCloud應運而生。

          三、MQCloud誕生

          先看一下MQCloud的定位:

          它是集客戶端SDK,監(jiān)控預警,集群運維于一體的一站式服務平臺。

          MQCloud的系統(tǒng)架構(gòu)如下:

          下面來分別說明一下MQCloud如何解決上面提到的痛點。

          業(yè)務端和運維端分離,使業(yè)務用戶只聚焦于業(yè)務數(shù)據(jù)。
          為了實現(xiàn)這個目的,引入了用戶,資源兩大維度。
          針對用戶和資源加以控制,使不同的用戶只聚焦于自己的數(shù)據(jù)。
            • 對于生產(chǎn)者來說,他關(guān)心的是topic配置,消息的發(fā)送數(shù)據(jù),誰在消費等等問題,這樣只對他展示相應的數(shù)據(jù)即可;
            • 對于消費者來說,只關(guān)心消費狀況,有沒有堆積,消費失敗等情況;
            • 對于管理員來說,可以進行部署,監(jiān)控,統(tǒng)一配置,審批等日常運維;
          清晰明了的操作
          通過對不同角色展示不同的視圖,使用戶可以進行的操作一目了然。
          規(guī)范和安全
          為了保障集群操作的安全性和規(guī)范性,所有的操作都會以申請單的形式進入后臺審批系統(tǒng),管理員來進行相關(guān)審批,安全性大大提升。
          多維的數(shù)據(jù)統(tǒng)計和監(jiān)控預警
          MQCloud核心功能之一就是監(jiān)控,要想做監(jiān)控,必須先做統(tǒng)計,為了更好的知道RocketMQ集群的運行狀況,MQCloud做了大量的統(tǒng)計工作,主要包括如下幾項:
            1. 每分鐘topic的生產(chǎn)流量:用于繪制topic生產(chǎn)流量圖及監(jiān)控預警。
            2. 每分鐘消費者流量:用于繪制消費流量圖及監(jiān)控預警。
            3. 每10分鐘topic生產(chǎn)流量:用于按照流量展示topic排序。
            4. 每分鐘broker生產(chǎn)、消費流量:用于繪制broker生產(chǎn)消費流量圖。
            5. 每分鐘broker集群生產(chǎn)、消費流量:用于繪制broker集群的生產(chǎn)流量圖。
            6. 每分鐘生產(chǎn)者百分位耗時、異常統(tǒng)計:以ip維度繪制每個生產(chǎn)者的耗時流量圖及監(jiān)控預警。
            7. 機器的cpu,內(nèi)存,io,網(wǎng)絡(luò)流量,網(wǎng)絡(luò)連接等統(tǒng)計:用于服務器的狀況圖和監(jiān)控預警。
          下面來分別介紹每項統(tǒng)計是如何收集的:
          每分鐘topic的生產(chǎn)流量
          此數(shù)據(jù)來自于RocketMQ broker端BrokerStatsManager,其提供了統(tǒng)計功能,統(tǒng)計項如下:
          1. TOPIC_PUT_NUMS:某topic消息生產(chǎn)條數(shù),向某個topic寫入消息成功才算

            寫入成功包括四種狀態(tài):PUT_OK,F(xiàn)LUSH_DISK_TIMEOUT,F(xiàn)LUSH_SLAVE_TIMEOUT,SLAVE_NOT_AVAILABLE

          2. TOPIC_PUT_SIZE:某topic消息生產(chǎn)大小,向某個topic寫入消息成功才算

          RocketMQ實現(xiàn)的統(tǒng)計邏輯較為精巧,這里做簡單描述,首先介紹幾個對象:
          1. StatsItemSet主要字段及方法如下:

            ConcurrentMap<String/* statsKey */, StatsItem> statsItemTable; // statsKey<->StatsItem
            // 針對某個數(shù)據(jù)項進行記錄
            public void addValue(final String statsKey, final int incValue, final int incTimes) {
                StatsItem statsItem = this.getAndCreateStatsItem(statsKey);
                statsItem.getValue().addAndGet(incValue);
                statsItem.getTimes().addAndGet(incTimes);
            }
            // 獲取并創(chuàng)建StatsItem
            public StatsItem getAndCreateStatsItem(final String statsKey) {
                StatsItem statsItem = this.statsItemTable.get(statsKey);
                if (null == statsItem) {
                    statsItem = new StatsItem(this.statsName, statsKey);
                    this.statsItemTable.put(statsKey, statsItem);
                }
                return statsItem;
            }

          2. StatsItem主要字段及方法如下:

            AtomicLong value; // 統(tǒng)計數(shù)據(jù):比如消息條數(shù),消息大小
            AtomicLong times; // 次數(shù)
            LinkedList<CallSnapshot> csListMinute; // 每分鐘快照數(shù)據(jù)
            LinkedList<CallSnapshot> csListHour; // 每小時快照數(shù)據(jù)
            LinkedList<CallSnapshot> csListDay; // 每天快照數(shù)據(jù)
            // 分鐘采樣
            public void samplingInSeconds() {
                synchronized (csListMinute) {
                    csListMinute.add(new CallSnapshot(System.currentTimeMillis(), times.get(), value.get()));
                    if (csListMinute.size() > 7) {
                        csListMinute.removeFirst();
                    }
                }
            }
            // 小時采樣
            public void samplingInMinutes() {
              // ...代碼省略
            }
            // 天采樣
            public void samplingInHour() {
              // ...代碼省略
            }

          3. CallSnapshot主要字段如下:

            long times; // 次數(shù)快照
            long value; // 統(tǒng)計數(shù)據(jù)快照
            long timestamp; //快照時間戳

          上面三個對象如何配合進行數(shù)據(jù)統(tǒng)計呢?舉個例子,比如統(tǒng)計topic名字為test_topic的消息生產(chǎn)大?。?/span>
          只要進行類似如下調(diào)用即可:

          StatsItemSet.addValue("test_topic", 123125123, 1)

          即表示發(fā)送了1次消息到test_topic,消息大小為123125123。
          那如何進行數(shù)據(jù)采樣呢?StatsItemSet內(nèi)置了定時任務,比如其每10秒調(diào)用一次StatsItem.samplingInSeconds()。這樣StatsItem就會持有60秒的數(shù)據(jù),類似如下結(jié)構(gòu):
          那么,最后一個10秒的快照 - 第一個10秒的快照 = 當前60秒的數(shù)據(jù),根據(jù)時間戳差值可以得到耗時。
          類似,小時數(shù)據(jù)每10分鐘進行一次快照,類似如下結(jié)構(gòu):
          天數(shù)據(jù)每1小時進行一次快照,類似如下結(jié)構(gòu):
          MQCloud每分鐘遍歷查詢集群下所有broker來查詢RocketMQ統(tǒng)計好的分鐘數(shù)據(jù),然后進行存儲。
          每分鐘消費者流量
          與每分鐘topic的生產(chǎn)流量一樣,也采用RocketMQ統(tǒng)計好的數(shù)據(jù)。
          每10分鐘topic生產(chǎn)流量
          采用數(shù)據(jù)庫已經(jīng)統(tǒng)計好的每分鐘topic流量進行累加,統(tǒng)計出10分鐘流量。
          每分鐘broker生產(chǎn)、消費流量
          由于統(tǒng)計1.每分鐘topic的生產(chǎn)流量和2.每分鐘消費者流量時是跟broker交互獲取的,所以知道broker ip,故直接按照broker維度存儲一份數(shù)據(jù)即可。
          每分鐘broker集群生產(chǎn)、消費流量
          采用4.每分鐘broker生產(chǎn)、消費流量數(shù)據(jù),按照集群求和即可。
          每分鐘生產(chǎn)者百分位耗時、異常統(tǒng)計
          由于RocketMQ并沒有提供生產(chǎn)者的流量統(tǒng)計(只提供了topic,但是并不知道每個生產(chǎn)者的情況),所以MQCloud實現(xiàn)了對生產(chǎn)者數(shù)據(jù)進行統(tǒng)計(通過RocketMQ的回調(diào)鉤子實現(xiàn)):
          主要統(tǒng)計如下信息:
            1. 客戶端ip->broker ip
            2. 發(fā)送消息耗時
            3. 消息數(shù)量
            4. 發(fā)送異常
          統(tǒng)計完成后,定時發(fā)送到MQCloud進行存儲,并做實時監(jiān)控和展示。
          關(guān)于統(tǒng)計部分有一點說明,一般耗時統(tǒng)計有最大,最小和平均值,而通常99%(即99%的請求耗時都低于此數(shù)值)的請求的耗時情況才能反映真實響應情況。99%請求耗時統(tǒng)計最大的問題是如何控制內(nèi)存占用,因為需要對某段時間內(nèi)所有的耗時做排序后才能統(tǒng)計出這段時間的99%的耗時狀況。而對于流式數(shù)據(jù)做這樣的統(tǒng)計是有一些算法和數(shù)據(jù)結(jié)構(gòu)的,例如t-digest,但是MQCloud采用了非精確的但是較為簡單的分段統(tǒng)計的方法,具體如下:
          1. 創(chuàng)建一個按照最大耗時預哈希的時間跨度不同的耗時分段數(shù)組

            優(yōu)點:此種分段方法占用內(nèi)存是固定的,比如最大耗時如果為3500ms,那么只需要空間大小為96的數(shù)組即可

            缺點:分段精度需要提前設(shè)定好,且不可更改

            1. 第一段:耗時范圍0ms~10ms,時間跨度為1ms。

            2. 第二組:耗時范圍11ms~100ms,時間跨度5ms。

            3. 第三組:耗時范圍101ms~3500ms,時間跨度50ms。

            優(yōu)點:此種分段方法占用內(nèi)存是固定的,比如最大耗時如果為3500ms,那么只需要空間大小為96的數(shù)組即可

            缺點:分段精度需要提前設(shè)定好,且不可更

          2. 針對上面的分段數(shù)組,創(chuàng)建一個大小對應的AtomicLong的計數(shù)數(shù)組,支持并發(fā)統(tǒng)計:

          3. 耗時統(tǒng)計時,計算耗時對應的耗時分段數(shù)組下標,然后調(diào)用計數(shù)數(shù)組進行統(tǒng)計即可,參考下圖:

            這樣,從計數(shù)數(shù)組就可以得到實時耗時統(tǒng)計,類似如下:

            1. 例如某次耗時為18ms,首先找到它所屬的區(qū)間,即歸屬于[16~20]ms之間,對應的數(shù)組下標為12。
            2. 根據(jù)第一步找到的數(shù)組下標12,獲取對應的計數(shù)數(shù)組下標12。
            3. 獲取對應的計數(shù)器進行+1操作,即表示18ms發(fā)生了一次調(diào)用。

            這樣,從計數(shù)數(shù)組就可以得到實時耗時統(tǒng)計,類似如下:

          4. 然后定時采樣任務會每分鐘對計數(shù)數(shù)組進行快照,產(chǎn)生如下耗時數(shù)據(jù)

          5. 由于上面的耗時數(shù)據(jù)天然就是排好序的,可以很容易計算99%、90%、平均耗時等數(shù)據(jù)了。

          另外提一點,由于RocketMQ 4.4.0新增的trace功能也使用hook來實現(xiàn),與MQCloud的統(tǒng)計有沖突,MQCloud已經(jīng)做了兼容。
          Trace和統(tǒng)計是兩種維度,trace反映的是消息從生產(chǎn)->存儲->消費的流程,而MQCloud做的是針對生產(chǎn)者狀況的統(tǒng)計,有了這些統(tǒng)計數(shù)據(jù),才可以做到生產(chǎn)耗時情況展示,生產(chǎn)異常情況預警等功能。

          機器統(tǒng)計

          關(guān)于集群狀況收集主要采用了將nmon自動放置到/tmp目錄,定時采用ssh連接到機器執(zhí)行nmon命令,解析返回的數(shù)據(jù),然后進行存儲。

          上面這些工作就為監(jiān)控和預警奠定了堅實的數(shù)據(jù)基礎(chǔ)。

          單獨定制的客戶端

          針對客戶端的一些需求,mq-client在rocketmq-client的基礎(chǔ)上進行了開發(fā)定制:
          1. 多集群支持

            MQCloud儲存了生產(chǎn)者、消費者和集群的關(guān)系,通過路由適配,客戶端可以自動路由到目標集群上,使客戶端對多集群透明。

          2. trace

            通過搭建單獨的trace集群和定制客戶端,使trace數(shù)據(jù)能夠發(fā)往獨立的集群,防止影響主集群。

          3. 序列化

            通過集成不同的序列化機制,配合MQCloud,客戶端無需關(guān)心序列化問題。

            目前支持的序列化為protobuf和json,并且通過類型檢測支持在線修改序列化方式。

          4. 流控

            通過提供令牌桶和漏桶限流機制,自動開啟流控機制,防止消息洪峰沖垮業(yè)務端。

          5. 隔離降級

            使用hystrix提供隔離降級策略,使業(yè)務端在broker故障時可以避免拖累。

          6. 埋點監(jiān)控

            通過對客戶端數(shù)據(jù)進行統(tǒng)計,收集,在MQCloud里進行監(jiān)控,使客戶端任何風吹草動都能及時得知。

          7. 規(guī)范問題

            通過編碼保障,使某些約定,規(guī)范和最佳實踐得以實現(xiàn)。包括但不限于:

            • 命名規(guī)范
            • 消費組全局唯一,防止重復導致消費問題
            • 重試消息跳過
            • 安全關(guān)閉等等
            • 更完善的重試機制
          自動化運維
            1. 部署

              手動部署一臺broker實例沒什么問題,但是當實例變多時,手動部署極易出錯且耗時耗力。

              MQCloud提供了一套自動化部署機制,并支持配置模板功能,支持一鍵部署。

            2. 機器運維

              MQCloud提供了一整套機器的運維機制,包括上下線,機器狀況收集、監(jiān)控、預警等等,大大提升了生產(chǎn)力。

          安全性加固

          一、開啟管理員權(quán)限

          RocketMQ從4.4.0開始支持ACL,但是默認沒有開啟,也就是任何人使用管理工具或API就可以直接操縱線上集群。但是開啟ACL對現(xiàn)有業(yè)務影響太大,針對這種情況MQCloud進行專門定制。

          借鑒RocketMQ ACL機制,只針對RocketMQ管理員操作加固權(quán)限校驗:

          并且支持自定義和熱加載管理員請求碼,使得非法操作RocketMQ集群成為不可能,安全性大大提升。


          二、broker通信加固

          broker同步數(shù)據(jù)代碼由于沒有校驗,存在安全隱患,只要連接master監(jiān)聽的slave通信端口,發(fā)送數(shù)據(jù)大于8個字節(jié),就可能導致同步偏移量錯誤,代碼如下:

          if ((this.byteBufferRead.position() - this.processPostion) >= 8) {
            int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
            long readOffset = this.byteBufferRead.getLong(pos - 8);
            this.processPostion = pos;
            HAConnection.this.slaveAckOffset = readOffset;
            if (HAConnection.this.slaveRequestOffset < 0) {
                HAConnection.this.slaveRequestOffset = readOffset;
                log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
            }
            HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
          }

          MQCloud通過驗證數(shù)據(jù)首包的策略,保障了通信的安全性。

          目前MQCloud運維規(guī)模如下:

          • 服務器:50臺+
          • 集群:5個+
          • topic:700個+
          • 生產(chǎn)消費消息量/日:4億條+
          • 生產(chǎn)消費消息大小/日:400G+

          MQCloud在充分考慮和吸收實際業(yè)務的需求后,以各個角色聚焦為核心,以全面監(jiān)控為目標,

          以滿足各業(yè)務端需求為己任,在不斷地發(fā)展和完善。

          在MQCloud逐漸成熟之后,秉承著服務于社區(qū)和吸收更多經(jīng)驗的理念,我們開放了源代碼。

          四、開源之路

          開放源代碼說不難也不難,說難也難。為什么這么說?

          不難就是因為代碼已經(jīng)有了,只是換個倉庫而已。

          而難點就是需要進行抽象設(shè)計,剝離不能開源的代碼(內(nèi)部模塊,代碼,地址等等)。

          經(jīng)過設(shè)計和拆分,MQCloud于18年開源了,從第一個版本release到現(xiàn)在已經(jīng)過去兩年了,

          期間隨著更新迭代大大小小一共release了20多個版本。

          其中不但包含功能更新、bug修復、wiki說明等,而且每個大版本都經(jīng)過詳細的測試和內(nèi)部的運行。

          之后很多小伙伴躍躍欲試,來試用它,并提出一些建議和意見,我們根據(jù)反饋來進一步完善它。

          我們將一直遵循我們的目標,堅定的走自己的開源之路:

          • 為業(yè)務提供可監(jiān)控,可預警,可滿足其各種需求的穩(wěn)定的MQ服務。
          • 積累MQ領(lǐng)域經(jīng)驗,將經(jīng)驗轉(zhuǎn)化為產(chǎn)品,更好的服務業(yè)務。

          后臺回復 學習資料 領(lǐng)取學習視頻


          如有收獲,點個在看,誠摯感謝

          瀏覽 40
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产欧美在线精品日韩 | 九九免费观看视频 | 8x8x国产一区二区三区精品痛苦 | 91美女被艹 | 香蕉视频视频禁止十八 |