萬字詳解數(shù)據(jù)中心的百萬級消息服務(wù)實戰(zhàn)架構(gòu)之美關(guān)注共 10904字,需瀏覽 22分鐘 ·2022-01-17 02:21 —?1?—背景利用RabbitMQ集群橫向擴(kuò)展能力,均衡流量壓力,讓消息集群的秒級服務(wù)能力達(dá)到百萬,Google曾做過此類實驗;有貨在某些推送場景下也做了類似嘗試,在此對此前實踐經(jīng)驗以及踩得坑做些總結(jié)工作。RabbitMQ概述RabbitMQ是基于AMQP協(xié)議實現(xiàn)的消息中間件的一種,常用于在分布式系統(tǒng)中存儲轉(zhuǎn)發(fā)消息,表現(xiàn)為易用、可擴(kuò)展、高可用等特點,最初活躍在金融系統(tǒng)中。消息中間件的主要作用是讓服務(wù)組件之間能夠解耦,消息生產(chǎn)者無需關(guān)注消息消費者的存在與行為。1.Message:由Producer發(fā)出,經(jīng)過Exchange路由到相應(yīng)的Queue,然后Consumer從Queue中取走消費;2.Queue:存儲消息的容器,消息存儲在隊列里,直到有消費者連接隊列并取走為止;3.綁定(Binding):將Queue與Exchange之間按規(guī)則建立映射關(guān)系,類似建立網(wǎng)絡(luò)路由表,通過Binding規(guī)定了Exchange如何將消息路由到某個隊列中;4.交換機(Exchange):Exchage就是路由器,每個消息都有一個路由Key的屬性,交換機中有一些隊列的Binding(路由規(guī)則),交換機有多種類型,如topic、direct、fanout;5.Broker(服務(wù)器):接受客戶端連接,實現(xiàn)AMQP消息隊列和路由功能的進(jìn)程;6.虛擬主機(virtual-host):一個虛擬主機有一組交換機,隊列和Binding,用戶只能在虛擬主機的范圍內(nèi)進(jìn)行權(quán)限控制,每一個服務(wù)器都有一個默認(rèn)的虛擬主機(/);7.連接(Connection):客戶端與broker之間的Tcp連接;8.信道(Channel):比連接更小的單位,創(chuàng)建連接后需要在其內(nèi)創(chuàng)建信道發(fā)送消息,一個連接內(nèi)可以有多個信道,這樣設(shè)計是為了減少tcp連接,客戶端線程盡量共用連接,不共用Channel;RabbitMQ Brokers是一個或多個Erlang節(jié)點的邏輯分組,每個節(jié)點運行RabbitMQ應(yīng)用程序并共享用戶,虛擬主機,隊列,交換,綁定和運行時參數(shù)。有時我們將節(jié)點的集合稱為集群。在所有節(jié)點上復(fù)制RabbitMQ代理的操作所需的所有數(shù)據(jù)/狀態(tài)。一個例外是消息隊列,它們默認(rèn)駐留在一個節(jié)點上,盡管它們是可見的,并且可以從所有節(jié)點訪問。要跨集群中的節(jié)點復(fù)制隊列,需要配置Mirror特性。集群又可以分為兩種,普通模式(默認(rèn)模式)以兩個節(jié)點(A、B)為例來進(jìn)行說明。對于Queue來說,消息實體只存在于其中一個節(jié)點A(或者B),A和B兩個節(jié)點僅有相同的元數(shù)據(jù),即隊列的結(jié)構(gòu)。當(dāng)消息進(jìn)入A節(jié)點的Queue后,Consumer從B節(jié)點消費時,RabbitMQ會臨時在A、B間進(jìn)行消息傳輸,把從A中的消息實體取出并經(jīng)過B發(fā)送給Consumer。所以Consumer應(yīng)盡量連接每一個節(jié)點,從中取消息。即對于同一個邏輯隊列,要在多個節(jié)點建立物理Queue。否則無論Consumer連A或B,出口總在A,會產(chǎn)生瓶頸。當(dāng)A節(jié)點故障后,B節(jié)點無法取到A節(jié)點中還未消費的消息實體。如果做了消息持久化,那么得等A節(jié)點恢復(fù),然后才可被消費。如果沒有持久化的話,就會產(chǎn)生消息丟失的現(xiàn)象。把需要的隊列做成鏡像隊列,隊列存在與多個節(jié)點屬于RabbitMQ的HA方案。該模式解決了普通模式中的問題,其實質(zhì)和普通模式不同之處在于,消息實體會主動在鏡像節(jié)點間同步,而不是在客戶端取數(shù)據(jù)時臨時拉取。該模式帶來的副作用也很明顯,除了降低系統(tǒng)性能外,如果鏡像隊列數(shù)量過多,加之大量的消息進(jìn)入,集群內(nèi)部的網(wǎng)絡(luò)帶寬將會被這種同步通訊大大消耗掉。所以在對可靠性要求較高的場合中適用。—?2?—百萬級消息服務(wù)上文講述了RabbitMQ的一些基礎(chǔ)概念,接下來首先分析Google的測試思想,然后介紹下我們在此基礎(chǔ)上的一些其他想法,借此了解下如何構(gòu)建能夠支持百萬級消息并發(fā)的RabbitMQ服務(wù)。Google共使用了32臺8核30G內(nèi)存的虛擬機,構(gòu)建了相對來說比較龐大的rabbitmq集群,各虛擬機的作用分配如下:30 RabbitMQ RAM節(jié)點(正常RAM節(jié)點,RabbitMQ元數(shù)據(jù)和定義僅保存在RAM中);1 RabbitMQDisc節(jié)點(元數(shù)據(jù)持久化節(jié)點,其中RabbitMQ代理元數(shù)據(jù)和定義也保留在光盤上);1 RabbitMQ Stats節(jié)點(統(tǒng)計信息節(jié)點,運行RabbitMQ管理插件,不帶任何隊列);在這種高負(fù)載的生產(chǎn)(1345531msgs/pers)消費(1413840 msgs/pers)壓力下,RabbitMQ僅有2343條消息暫時在其等待發(fā)送的隊列中累積,在這樣的負(fù)載下,RabbitMQ節(jié)點也沒有顯示內(nèi)存壓力,或者需要基于資源限制的啟動流控機制。使用RabbitMQ的許多用戶現(xiàn)在大多集群規(guī)模大致為3-7個RabbitMQ節(jié)點組成的群集,從該類集群中就可以獲得極好的結(jié)果。鑒于這一基礎(chǔ),在Google的實驗中使用的30節(jié)點RabbitMQ集群與當(dāng)前實踐中常見的相比是相當(dāng)大的。當(dāng)然大數(shù)據(jù)的增長,物聯(lián)網(wǎng),實時分析等應(yīng)用可能會增加未來許多RabbitMQ集群的規(guī)模。在這樣的大型集群中,系統(tǒng)的設(shè)計,構(gòu)建和擴(kuò)展都有一些要點。即使對于較小的集群也是如此,首先,RabbitMQ的消息傳遞工作的基本并行單位是隊列。除了適用于某些高可用性配置的部分異常之外,RabbitMQ隊列由單個Erlang進(jìn)程(輕量級線程抽象)支持,通過謹(jǐn)慎地分配消息的生產(chǎn)者,相對于他們的消息最終到達(dá)的隊列,可以解決單個隊列所構(gòu)成的潛在瓶頸。Pivotal RabbitMQ教程演示了支持各種場景和路由方案的消息架構(gòu)的構(gòu)建。Google使用了非?;镜睦?。當(dāng)然除了教程中涵蓋的場景之外,RabbitMQ還存在更多的可能性,包括使用一致的哈希交換類型進(jìn)行動態(tài)負(fù)載平衡場景。其次,重要的是要注意個別節(jié)點的職責(zé),尤其在負(fù)載非常高的集群中??梢栽谌杭械娜魏喂?jié)點上啟用或禁用RabbitMQ管理插件。RabbitMQ管理插件提供上述基于Web的管理UI,以及相應(yīng)的基于HTTP的管理API,還可以作為統(tǒng)計其他集群節(jié)點報告性能指標(biāo)。在大型集群中,許多節(jié)點都是報告度量,目前統(tǒng)計數(shù)據(jù)庫都可能成為瓶頸。因此,Google在實驗過程中,單獨創(chuàng)建了一個信息統(tǒng)計節(jié)點,并將其從負(fù)載均衡器的后端服務(wù)器列表中排除掉,從而消息生產(chǎn)與消費不會經(jīng)過該節(jié)點,統(tǒng)計信息與生產(chǎn)消費也就不會發(fā)生競爭資源的情況。在AWS上使用同等規(guī)模與配置的環(huán)境,驗證了Google提供的測試結(jié)果后,又做了一些別的嘗試,如使用RabbitMQ Sharding插件、Consistent-hash Sharding Exchange來更加靈活的動態(tài)均衡隊列壓力,同樣可以達(dá)到此類性能。RabbitMQ Sharding插件下面介紹下如何使Sharding插件,3.6.0以及以后的RabbitMQ版本啟用Sharding插件,使用命令:rabbitmq-pluginsenable?rabbitmq_sharding在大規(guī)模集群上,配置節(jié)點多分片隊列,可以有效分?jǐn)倖侮犃械男阅芷款i。這個插件能夠讓分片隊列自動擴(kuò)展,如果您添加更多的節(jié)點到您的RabbitMQ群集,那么該插件將自動在新節(jié)點中創(chuàng)建更多的分片。假設(shè)集群初始僅有一個節(jié)點A,配置每個節(jié)點分布4個分片隊列,現(xiàn)在將節(jié)點B加入了節(jié)點A所在群集。插件將自動在節(jié)點b中創(chuàng)建4個隊列,并將它們連接到分片分區(qū)。已經(jīng)傳遞的消息將不會被重新平衡,但新到達(dá)的消息將被分區(qū)到新的隊列。默認(rèn)情況下RabbitMQ的交換機以”all or nothing”方式工作,即:如果路由key與綁定到交換機的一組隊列匹配,則RabbitMQ將將消息路由到該集合中的所有隊列。因此,為了使這個插件能正常工作,我們需要將消息路由到一個交換機來分配消息,讓消息最多被分配到一個隊列。該插件提供了一種新的Exchange類型“x-modulus-hash”,它將使用傳統(tǒng)的哈希技術(shù)應(yīng)用于跨隊列分區(qū)消息。“x-modulus-hash”交換機將對用于發(fā)布消息的Routing-Key進(jìn)行hash,然后將hash值mod N來選擇路由消息的隊列,其中N是綁定到交換機的隊列數(shù)。此交換將完全忽略用于將隊列綁定到交換機的Routing-Key。如果只需要消息分區(qū),而不是由此插件提供的自動隊列創(chuàng)建,那么只需使用一致的哈希Exchange,這個后面介紹。安裝插件后,您可以通過設(shè)置與交換名稱匹配的策略來定義交換分片。例如,如果我們有一個稱為shard.images的交換,我們可以定義以下策略來分片:set_policy images-swarding "^images"這將為集群中的每個節(jié)點創(chuàng)建2個分片隊列,并使用Routingkey:”hello”對這些隊列進(jìn)行綁定,隨后定義一個名為images的exchange。隨后,插件會自動在每個節(jié)點上創(chuàng)建2個分片隊列,名為“sharding:images-*”。在上面的例子中,我們在定義策略時使用路由key為“hello”。這意味著用于分片的底層交換機將使用上面指定的hello路由key將分片隊列綁定到交換機。這意味著對于“Direct-Exchange”,使用路由密鑰hello發(fā)布的息將被路由到所有的分片隊列。如果您決定使用“Fanout-exchange”進(jìn)行分片,則在綁定期間使用的“hello”路由key將被交換機忽略。如果使用“x-modulus-hash”交換,則路由key也將被忽略。因此,根據(jù)您使用的交換機,路由策略定義在路由消息時會產(chǎn)生影響。Consistent-sharding Exchange在某些情況下,你可能希望發(fā)送到交換機的消息是一致和均勻地分布在多個不同的隊列。在上面的插件中如果隊列數(shù)量發(fā)生變化,則不難確保新的拓?fù)浣Y(jié)構(gòu)仍然在不同隊列之間均勻分配消息,此時就可以借助Consistent-sharding類型Exchange,與Sharding插件的主要區(qū)別是,該類Exchange不能自動創(chuàng)建分片隊列,需要手動創(chuàng)建并配置Binding關(guān)系,且支持一致性hash。在作為交換類型的一致哈希的情況下,從所接收的每個消息的Routing-key進(jìn)行哈希計算后散列存儲。因此,具有相同Routing-Key的消息將具有計算的相同散列,將被路由到相同的隊列。將隊列綁定到一致性哈希的Exchange時,綁定key是一個數(shù)字字符串,表示希望該隊列在整個hash空間中占有的點數(shù)。具有相同Routing-key的所有消息將進(jìn)入相同的隊列。因此,如果希望隊列A接收路由消息是隊列B接收路由消息的兩倍,那么需要將隊列A綁定到Exchange的綁定Key(字符串的數(shù)字)設(shè)置為隊列B的綁定Key的2倍。當(dāng)然,只有當(dāng)你的路由Key均勻分布在散列空間中時才是這種情況。例如,如果在所有消息上僅使用兩個不同的路由Key,即使其他隊列在其綁定Key中具有較高的值,兩個密鑰也可能路由到同一個隊列。使用更大的路由Key集合,路由Key的統(tǒng)計分布接近綁定Key的設(shè)置的比率。在這種情況下,隨機Routing-key的消息最終將會均勻分布到兩個隊列中。RabbitMQ可靠性與可用性討論但是如何確保消息傳遞的可靠性以及如何配置高可用,很多人都一直存在疑惑,實踐才是檢驗真理的唯一標(biāo)準(zhǔn),所以基于有貨某些使用場景,也分析總結(jié)下經(jīng)驗與教訓(xùn),希望對大家有所幫助。場景1,如何保證消息的傳遞可靠,生產(chǎn)者與消費者互不感知,那么怎么確認(rèn)生產(chǎn)者已將消息投遞到RabbitMQ服務(wù)端,又如何確認(rèn)消費者已經(jīng)消費了該消息?這里需要使用的RabbtiMQ提供的生產(chǎn)者Confirm機制、消費者Ack機制來解決;使用標(biāo)準(zhǔn)AMQP 0-9-1,保證消息不丟失的唯一方法是使用事務(wù):使信道事務(wù)發(fā)布,發(fā)布消息,提交。在這種情況下,交易是不必要的重量級,并將吞吐量降低250倍。為了彌補這一點,引入了確認(rèn)機制。它模仿了協(xié)議中已經(jīng)存在的消費者確認(rèn)機制。要啟用確認(rèn),客戶端發(fā)送confirm.select方法。根據(jù)是否設(shè)置不等待,RabbitMQ Broker可以通過confirm.select-ok進(jìn)行回復(fù)。一旦在通道上使用了confirm.select方法,就被認(rèn)為處于確認(rèn)模式。事務(wù)通道不能進(jìn)入確認(rèn)模式,一旦通道處于確認(rèn)模式,則不能進(jìn)行事務(wù)處理。一旦通道處于確認(rèn)模式,代理和客戶端都會計數(shù)消息(從第一個confirm.select開始計數(shù))。然后Broker通過在同一個頻道上發(fā)送basic.ack來確認(rèn)消息。發(fā)送標(biāo)簽字段包含已確認(rèn)消息的序列號。Broker還可以在basic.ack中設(shè)置多個字段,表明所有小于該序列號的消息都已到達(dá)并被處理。何時確認(rèn)呢?對于無法路由的消息,一旦exchange驗證了消息不會被路由到任何隊列(返回一個空列表的隊列),Broker將發(fā)出確認(rèn)。如果消息的發(fā)送屬性指定了為強制性(mandatory),則basic.return將在basic.ack之前發(fā)送給客戶端。否定的確認(rèn)也是如此(basic.nack)。對于可以路由的消息,當(dāng)所有隊列接受消息時,發(fā)送basic.ack。對于路由到持久隊列的持久消息,這意味著已保存到磁盤。對于鏡像隊列,這意味著隊列的所有鏡像都已接受該消息。當(dāng)RabbitMQ交付消息給Consumer時,需要確認(rèn)Message已被投遞到Consumer。Acknowledgemenets作用,consumer通知server已收到消息或者成功消費消息,根據(jù)使用的確認(rèn)模式,RabbitMQ可以在發(fā)送(寫入TCP套接字)后或當(dāng)接收到顯式(“手動”)客戶端確認(rèn)信息時立即考慮成功傳遞的消息。手動發(fā)送的確認(rèn)可以是正面或負(fù)面的,并使用以下協(xié)議方法之一:basic.ack用于肯定確認(rèn);basic.nack用于否定確認(rèn)(注意:這是AMQP0-9-1的RabbitMQ擴(kuò)展);basic.reject用于否定確認(rèn),但與basic.nack相比有一個限制;肯定的確認(rèn)只是指示RabbitMQ記錄一個消息傳遞。與basic.reject的否定確認(rèn)具有相同的效果。差異主要在語義上:肯定確認(rèn)假設(shè)一條消息已成功處理,而其負(fù)面對應(yīng)方則表示傳送未被處理但仍應(yīng)被刪除,可以批量手動確認(rèn)以減少網(wǎng)絡(luò)流量。綜上所述,在1的位置需要開啟Channel的Confirm模式,接收RabbitMQ服務(wù)端發(fā)送的確認(rèn)消息已到達(dá)的Ack信息;在3的位置,消費者在成功消費或者業(yè)務(wù)處理失敗后,需要顯示告訴RabbitMQ服務(wù)端,消息已被消費成功或者失??;在某些類型的網(wǎng)絡(luò)故障中,數(shù)據(jù)包丟失可能意味著中斷的TCP連接需要較長時間才能夠被操作系統(tǒng)檢測到。AMQP 0.9.1提供心跳功能,以確保應(yīng)用程序?qū)蛹皶r發(fā)現(xiàn)連接中斷。在我們的部署架構(gòu)中,ELB與RabbitMQ之間就是通過此機制來判斷服務(wù)是否存活,是否提示生產(chǎn)者服務(wù)端已掛,異步等待confirm的消息直接進(jìn)入unconfirm的處理環(huán)節(jié)。另外為了避免在代理中丟失消息,我們需要應(yīng)對代理重新啟動,代理硬件故障,甚至破壞代理崩潰。為了確保重新啟動時消息和代理定義生效,我們需要確保它們在磁盤上持久化。AMQP標(biāo)準(zhǔn)具有交換,隊列和持久消息的耐久性概念,要求持久對象或持久消息將在重新啟動后生存。場景2,如何實現(xiàn)處理失敗后重試機制;某些情況下,業(yè)務(wù)在處理消息時可能會失敗,此時需要做的是重試,而不是直接丟棄;當(dāng)然重試也不能僅僅是直接重試,一旦有任務(wù)長時間失敗,會導(dǎo)致后面的消息無法被正常處理,此時可以借助死信機制轉(zhuǎn)發(fā)投遞到重試隊列后,隨后再嘗試重新處理該消息;那如何實現(xiàn)呢?下面介紹下具體操作;x-dead-letter-exchange:死信轉(zhuǎn)發(fā)的exchange;x-dead-letter-routing-key:死信轉(zhuǎn)發(fā)時的routing-key;該隊列綁定到名為“amp.topic”的topic類型exchange,接收routing-key為“yoho_test_retry”的消息;死信轉(zhuǎn)發(fā)到“amp.topic”的exchange,routing-key為“yoho_test_retry”(即工作隊列接收該主題消息);x-message-ttl:message在重試隊列中存活的時間,也就是延遲多久重試;該隊列綁定到“amp.topic”,接收routing-key為“retry.yoho_test_retry”的消息(即接收工作隊列的死信),這樣就可以實現(xiàn)重試隊列的機制了。場景3,如何實現(xiàn)定時任務(wù);定時任務(wù),這也是一種常見的需求,那如何在RabbitMQ中實現(xiàn)這個能力,可以讓某些任務(wù)延時執(zhí)行。其實同樣的也可以借助死信機制來實現(xiàn),如隊列A用于接收暫存Producer的消息,隊列B用于Consumer的消費,在隊列A中指定消息的ttl即生命周期時長,同時指定其死信交換機DLXs,一旦消息在隊列中存活時長超過ttl的設(shè)定值,那么消息會被轉(zhuǎn)發(fā)到DLXs,綁定隊列B到DLXs,即可接收到隊列A的死信;下面具體看下操作流程;從“amp.topic”的exchange中接收routing-key為“delay.yoho_test_delay”主題的消息;死信轉(zhuǎn)到交換機“amp.topic”中,消息的routing-key為“delay.yoho_test_delay”(即工作隊列接收的延遲消息的隊列),消息在延遲隊列中存活時間ttl,同時該隊列綁定到“amp.topic”交換機,接收routing-key為“yoho_test_delay”注意的消息(即生產(chǎn)發(fā)送消息指定的topic);如此一來延遲隊列接收消息后,等待ttl時長,將消息轉(zhuǎn)發(fā)到工作隊列中,即可實現(xiàn)延遲隊列機制。場景4,如何跨中心共享消息有時跨中心業(yè)務(wù)需要共享消息,如緩存清理等,在業(yè)務(wù)代碼中分別向多個中心的RabbitMQ發(fā)布消費消息顯然不是一種比較好的解決方案,那還有什么好的方法呢,RabbitMQ為此提供了Federation插件來很好的解決此類問題;Federation插件是一個在不需要cluster,而在brokers之間傳輸消息的高性能插件,F(xiàn)ederation插件可以在brokers或者cluster之間傳輸消息,連接的雙方可以使用不同的users和virtual hosts,或者雙方的rabbitmq和erlang的版本不一致,F(xiàn)ederation插件使用AMQP協(xié)議通訊,可以接受不連續(xù)的傳輸。Federation插件允許你配置一個exchanges Federation或者queues Federation。一個exchange/queues federation允許你從一個或者多個upstream接受信息(就是遠(yuǎn)程的exchange/queues或者其他brokers),一個federation exchange可以路由消息到一個本地queue中,一個federation queue可以使一個本地的consumer接受從upstream queue過來的消息。下面分別看下如何啟用,創(chuàng)建以及兩者的區(qū)別:(1)啟動Federation插件命令:rabbitmq-pluginsenable rabbitmq_federation(2)啟動web配置插件:rabbitmq-pluginsenable rabbitmq_federation_managementFederation exchanges,可以看成downstream從upstream主動拉取消息,但并不是拉取所有消息,必須是在downstream上已經(jīng)明確定義Bindings關(guān)系的exchange,也就是有實際的物理queue來接收消息,才會從upstream拉取消息到downstream。使用AMQP協(xié)議實施代理間通信。Downstream會將綁定關(guān)系組合在一起,綁定/解除綁定命令將發(fā)送到upstream交換機。因此,F(xiàn)ederation交換機只接收具有訂閱的消息。任何upstream exchange接收到的消息都可能被downstream中Federation exchange接收到,但直接發(fā)送給Federation Exchange的消息是不能upstream中所綁定的exchange接收到的。Federation queues與Federationexchange最大的區(qū)別共享消息的機制不同,F(xiàn)ederation隊列僅在本地消耗消息時檢索消息,消費者需要消息,并且Upstream隊列具有未被消費的消息。這樣做的目的是確保消息僅在需要時,才會在聯(lián)合隊列之間傳輸。先看下創(chuàng)建過程,如下創(chuàng)建一個名為“fqueue_test”的federationFederation隊列可以被聲明為任何其他隊列。為了使RabbitMQ能夠識別出隊列需要聯(lián)合,還有哪些節(jié)點消息應(yīng)該被消耗,Downstream(消費)節(jié)點需要進(jìn)行配置。通過聲明策略來完成配置。策略是隊列名稱匹配的模式。匹配隊列將聯(lián)合。Federation隊列只能屬于一個策略。如果多個策略與隊列名稱匹配,則應(yīng)用優(yōu)先級最高的策略。當(dāng)兩個策略具有相同的優(yōu)先級時,隨機選擇匹配的策略。Federation隊列可以作為另一個Federation隊列的“上游”,甚至可以形成“循環(huán)”,例如,隊列A將隊列B聲明為上游,隊列B將隊列A聲明為上游。允許更復(fù)雜的多重連接的安排。Federation隊列將使用AMQP連接到其所有上游隊列。在聲明或配置聯(lián)合隊列時,每個上游隊列都將列出用于建立鏈接的連接屬性。每個單獨的隊列分別應(yīng)用其參數(shù),例如,如果在聯(lián)合隊列上設(shè)置x-max-length,則該隊列的長度將受限制(可能會在其已滿時丟棄消息),但與其聯(lián)合的其他隊列將不受影響。特別要注意的是,當(dāng)每個隊列或每個消息的TTL被使用時,當(dāng)一個消息被傳送到另一個隊列時,它的定時器將被重置。與Federation交換機不同,在Federation隊列之間可以轉(zhuǎn)發(fā)消息的次數(shù)沒有限制。在一組相互聯(lián)合的隊列中,消息將移動到空閑消費容量的位置。因此,如果閑置消費容量繼續(xù)移動,消息將繼續(xù)移動。上述就是Federation的使用方法以及注意點,當(dāng)然與其他插件的配合,可以衍生出多種使用方法。場景5,如何保證消息隊列的高可用,這樣的場景很多比如核心業(yè)務(wù)的訂單服務(wù)、erp服務(wù)等等。默認(rèn)情況下,RabbitMQ群集中的隊列位于單個節(jié)點(首次被聲明的節(jié)點上),而Exchanges和Bindings可以認(rèn)為在所有節(jié)點上存在,可以選擇在cluster中跨節(jié)點節(jié)點之間配置為鏡像隊列。每個鏡像隊列由一個master和一個或多個slave組成,如果master因為某些原因失效,則將從slave中選擇一個提升為master。發(fā)布到隊列的消息將復(fù)制到所有鏡像。消費者連接到主機,無論它們連接到哪個節(jié)點,鏡像會丟棄已在主設(shè)備上確認(rèn)的消息。隊列鏡像因此增強了可用性,但不跨節(jié)點分配負(fù)載(所有參與節(jié)點都執(zhí)行所有工作)。隊列已被配置為鏡像,master節(jié)點位于server5,slave節(jié)點位于server6,此時,隨意關(guān)閉任意一臺RabbitMQ節(jié)點,該隊列都可以正常對外提供服務(wù)。A先掛,B后掛,master轉(zhuǎn)移B,此時需先拉起B(yǎng),后拉起A,可恢復(fù)鏡像隊列;A、B同時掛,同時拉起,可恢復(fù)鏡像隊列;若新加入cluster節(jié)點,最好不要在生產(chǎn)環(huán)境手工同步,采用自然同步方式,對于沒有工作queue可以手工同步操作(同步操作時,queue是不可用的);當(dāng)所有slave都處在(與master)未同步狀態(tài)時,并且ha-promote-on-shutdown policy設(shè)置為when-syned(默認(rèn))時,如果master因為主動的原因停掉,比如是通過rabbitmqctl stop命令停止或者優(yōu)雅關(guān)閉OS,那么slave不會接管master,也就是說此時鏡像隊列不可用;但是如果master因為被動原因停掉,比如VM或者OS crash了,那么slave會接管master。這個配置項隱含的價值取向是優(yōu)先保證消息可靠不丟失,放棄可用性。如果ha-promote-on-shutdown policy設(shè)置為alway,那么不論master因為何種原因停止,slave都會接管master,優(yōu)先保證可用性。A、B都未掛,兩者網(wǎng)絡(luò)異常,各自為master,此時出現(xiàn)網(wǎng)絡(luò)分區(qū)沖突,必須手工介入保證消息不丟失,萬不可隨意重啟導(dǎo)致數(shù)據(jù)丟失(不論是否持久化),將一臺數(shù)據(jù)量較小的從cluster中剔除,消費完成后再重啟恢復(fù)鏡像;或者將其中一臺從集群中剔除后,加入另外一臺slave,再消費完成剔除的節(jié)點中數(shù)據(jù);(會出現(xiàn)重復(fù)消費,此時需要客戶端做冪等處理保證唯一一次消費)當(dāng)然在高可用的場景下,隊列的性能會受到一定的影響,此時可以借助上面提到的Sharding機制(根據(jù)場景選擇x-modulus-hash還是consistent-hash),解決單隊列的性能瓶頸,在高可用、高并發(fā)下尋求一個動態(tài)的平衡;上圖為鏡像場景的壓測結(jié)果,對比普通集群,鏡像對性能的影響很明顯,消息持久化也拉低了集群的性能,適當(dāng)增加Prefetch可以提高集群性能。性能與高可靠、高可用,魚和熊掌不可兼得,所以欲提升RabbitMQ集群或單節(jié)點服務(wù)的性能,犧牲可靠性(根據(jù)場景來),在消費能力范圍內(nèi),盡量提高prefetch的數(shù)量,其次就是簡單粗暴型(加機器(隊列實際存儲節(jié)點性能未榨干,建議隊列均衡分配到各節(jié)點)、加配置)。Spring AMQP提供了一個API,可輕松訪問AMQP消息代理。像往常一樣,Spring模板作為技術(shù)細(xì)節(jié)的抽象。對于AMQP,AmqpTemplate可以做到這一點。Spring-amqp項目擁有所有必要的通用接口(例如AmqpTemplate)和API類,而具體的實現(xiàn)則依賴spring-rabbitmq,Spring-rabbitmq依賴于RabbitMQ amqp-client的通用Java API??蛻舳藨?yīng)用程序僅依靠spring-amqp來實現(xiàn)松耦合。能夠從一個AMQP代理切換到另一個AMQP代理,而不會在代碼中進(jìn)行任何重大更改。作者:有貨技術(shù)來源:www.jianshu.com/p/ddca1548d0a1 瀏覽 50點贊 評論 收藏 分享 手機掃一掃分享分享 舉報 評論圖片表情視頻評價全部評論推薦 多數(shù)據(jù)中心的百萬級消息服務(wù)實戰(zhàn)Java架構(gòu)師社區(qū)0Kafka 萬億級消息實戰(zhàn)k8s技術(shù)圈0保姆級Git入門教程,萬字詳解Jack Cui0保姆級Git入門教程,萬字詳解沉默王二0保姆級Git入門教程,萬字詳解SAMshare0Mysql百萬級數(shù)據(jù)遷移實戰(zhàn)筆記卡二條的技術(shù)圈0Mysql百萬級數(shù)據(jù)遷移實戰(zhàn)筆記碼農(nóng)編程進(jìn)階筆記0【資源】超詳細(xì)Docker實戰(zhàn)教程,萬字詳解!程序員面試吧0UCMQHTTP協(xié)議級消息隊列服務(wù)組件UCMQ是一款輕量的HTTP協(xié)議級消息隊列服務(wù)組件,項目的最初原型來自“張宴”的HTTPSQS。基本特性:支持標(biāo)準(zhǔn)的HTTP協(xié)議(GET/POST方法),支持長連接(keep-alive);請求響應(yīng)非UCMQHTTP協(xié)議級消息隊列服務(wù)組件UCMQHTTP協(xié)議級消息隊列服務(wù)組件0點贊 評論 收藏 分享 手機掃一掃分享分享 舉報
RabbitMQ概述RabbitMQ是基于AMQP協(xié)議實現(xiàn)的消息中間件的一種,常用于在分布式系統(tǒng)中存儲轉(zhuǎn)發(fā)消息,表現(xiàn)為易用、可擴(kuò)展、高可用等特點,最初活躍在金融系統(tǒng)中。消息中間件的主要作用是讓服務(wù)組件之間能夠解耦,消息生產(chǎn)者無需關(guān)注消息消費者的存在與行為。1.Message:由Producer發(fā)出,經(jīng)過Exchange路由到相應(yīng)的Queue,然后Consumer從Queue中取走消費;2.Queue:存儲消息的容器,消息存儲在隊列里,直到有消費者連接隊列并取走為止;3.綁定(Binding):將Queue與Exchange之間按規(guī)則建立映射關(guān)系,類似建立網(wǎng)絡(luò)路由表,通過Binding規(guī)定了Exchange如何將消息路由到某個隊列中;4.交換機(Exchange):Exchage就是路由器,每個消息都有一個路由Key的屬性,交換機中有一些隊列的Binding(路由規(guī)則),交換機有多種類型,如topic、direct、fanout;5.Broker(服務(wù)器):接受客戶端連接,實現(xiàn)AMQP消息隊列和路由功能的進(jìn)程;6.虛擬主機(virtual-host):一個虛擬主機有一組交換機,隊列和Binding,用戶只能在虛擬主機的范圍內(nèi)進(jìn)行權(quán)限控制,每一個服務(wù)器都有一個默認(rèn)的虛擬主機(/);7.連接(Connection):客戶端與broker之間的Tcp連接;8.信道(Channel):比連接更小的單位,創(chuàng)建連接后需要在其內(nèi)創(chuàng)建信道發(fā)送消息,一個連接內(nèi)可以有多個信道,這樣設(shè)計是為了減少tcp連接,客戶端線程盡量共用連接,不共用Channel;RabbitMQ Brokers是一個或多個Erlang節(jié)點的邏輯分組,每個節(jié)點運行RabbitMQ應(yīng)用程序并共享用戶,虛擬主機,隊列,交換,綁定和運行時參數(shù)。有時我們將節(jié)點的集合稱為集群。在所有節(jié)點上復(fù)制RabbitMQ代理的操作所需的所有數(shù)據(jù)/狀態(tài)。一個例外是消息隊列,它們默認(rèn)駐留在一個節(jié)點上,盡管它們是可見的,并且可以從所有節(jié)點訪問。要跨集群中的節(jié)點復(fù)制隊列,需要配置Mirror特性。集群又可以分為兩種,普通模式(默認(rèn)模式)以兩個節(jié)點(A、B)為例來進(jìn)行說明。對于Queue來說,消息實體只存在于其中一個節(jié)點A(或者B),A和B兩個節(jié)點僅有相同的元數(shù)據(jù),即隊列的結(jié)構(gòu)。當(dāng)消息進(jìn)入A節(jié)點的Queue后,Consumer從B節(jié)點消費時,RabbitMQ會臨時在A、B間進(jìn)行消息傳輸,把從A中的消息實體取出并經(jīng)過B發(fā)送給Consumer。所以Consumer應(yīng)盡量連接每一個節(jié)點,從中取消息。即對于同一個邏輯隊列,要在多個節(jié)點建立物理Queue。否則無論Consumer連A或B,出口總在A,會產(chǎn)生瓶頸。當(dāng)A節(jié)點故障后,B節(jié)點無法取到A節(jié)點中還未消費的消息實體。如果做了消息持久化,那么得等A節(jié)點恢復(fù),然后才可被消費。如果沒有持久化的話,就會產(chǎn)生消息丟失的現(xiàn)象。把需要的隊列做成鏡像隊列,隊列存在與多個節(jié)點屬于RabbitMQ的HA方案。該模式解決了普通模式中的問題,其實質(zhì)和普通模式不同之處在于,消息實體會主動在鏡像節(jié)點間同步,而不是在客戶端取數(shù)據(jù)時臨時拉取。該模式帶來的副作用也很明顯,除了降低系統(tǒng)性能外,如果鏡像隊列數(shù)量過多,加之大量的消息進(jìn)入,集群內(nèi)部的網(wǎng)絡(luò)帶寬將會被這種同步通訊大大消耗掉。所以在對可靠性要求較高的場合中適用。—?2?—百萬級消息服務(wù)上文講述了RabbitMQ的一些基礎(chǔ)概念,接下來首先分析Google的測試思想,然后介紹下我們在此基礎(chǔ)上的一些其他想法,借此了解下如何構(gòu)建能夠支持百萬級消息并發(fā)的RabbitMQ服務(wù)。Google共使用了32臺8核30G內(nèi)存的虛擬機,構(gòu)建了相對來說比較龐大的rabbitmq集群,各虛擬機的作用分配如下:30 RabbitMQ RAM節(jié)點(正常RAM節(jié)點,RabbitMQ元數(shù)據(jù)和定義僅保存在RAM中);1 RabbitMQDisc節(jié)點(元數(shù)據(jù)持久化節(jié)點,其中RabbitMQ代理元數(shù)據(jù)和定義也保留在光盤上);1 RabbitMQ Stats節(jié)點(統(tǒng)計信息節(jié)點,運行RabbitMQ管理插件,不帶任何隊列);在這種高負(fù)載的生產(chǎn)(1345531msgs/pers)消費(1413840 msgs/pers)壓力下,RabbitMQ僅有2343條消息暫時在其等待發(fā)送的隊列中累積,在這樣的負(fù)載下,RabbitMQ節(jié)點也沒有顯示內(nèi)存壓力,或者需要基于資源限制的啟動流控機制。使用RabbitMQ的許多用戶現(xiàn)在大多集群規(guī)模大致為3-7個RabbitMQ節(jié)點組成的群集,從該類集群中就可以獲得極好的結(jié)果。鑒于這一基礎(chǔ),在Google的實驗中使用的30節(jié)點RabbitMQ集群與當(dāng)前實踐中常見的相比是相當(dāng)大的。當(dāng)然大數(shù)據(jù)的增長,物聯(lián)網(wǎng),實時分析等應(yīng)用可能會增加未來許多RabbitMQ集群的規(guī)模。在這樣的大型集群中,系統(tǒng)的設(shè)計,構(gòu)建和擴(kuò)展都有一些要點。即使對于較小的集群也是如此,首先,RabbitMQ的消息傳遞工作的基本并行單位是隊列。除了適用于某些高可用性配置的部分異常之外,RabbitMQ隊列由單個Erlang進(jìn)程(輕量級線程抽象)支持,通過謹(jǐn)慎地分配消息的生產(chǎn)者,相對于他們的消息最終到達(dá)的隊列,可以解決單個隊列所構(gòu)成的潛在瓶頸。Pivotal RabbitMQ教程演示了支持各種場景和路由方案的消息架構(gòu)的構(gòu)建。Google使用了非?;镜睦?。當(dāng)然除了教程中涵蓋的場景之外,RabbitMQ還存在更多的可能性,包括使用一致的哈希交換類型進(jìn)行動態(tài)負(fù)載平衡場景。其次,重要的是要注意個別節(jié)點的職責(zé),尤其在負(fù)載非常高的集群中??梢栽谌杭械娜魏喂?jié)點上啟用或禁用RabbitMQ管理插件。RabbitMQ管理插件提供上述基于Web的管理UI,以及相應(yīng)的基于HTTP的管理API,還可以作為統(tǒng)計其他集群節(jié)點報告性能指標(biāo)。在大型集群中,許多節(jié)點都是報告度量,目前統(tǒng)計數(shù)據(jù)庫都可能成為瓶頸。因此,Google在實驗過程中,單獨創(chuàng)建了一個信息統(tǒng)計節(jié)點,并將其從負(fù)載均衡器的后端服務(wù)器列表中排除掉,從而消息生產(chǎn)與消費不會經(jīng)過該節(jié)點,統(tǒng)計信息與生產(chǎn)消費也就不會發(fā)生競爭資源的情況。在AWS上使用同等規(guī)模與配置的環(huán)境,驗證了Google提供的測試結(jié)果后,又做了一些別的嘗試,如使用RabbitMQ Sharding插件、Consistent-hash Sharding Exchange來更加靈活的動態(tài)均衡隊列壓力,同樣可以達(dá)到此類性能。RabbitMQ Sharding插件下面介紹下如何使Sharding插件,3.6.0以及以后的RabbitMQ版本啟用Sharding插件,使用命令:rabbitmq-pluginsenable?rabbitmq_sharding在大規(guī)模集群上,配置節(jié)點多分片隊列,可以有效分?jǐn)倖侮犃械男阅芷款i。這個插件能夠讓分片隊列自動擴(kuò)展,如果您添加更多的節(jié)點到您的RabbitMQ群集,那么該插件將自動在新節(jié)點中創(chuàng)建更多的分片。假設(shè)集群初始僅有一個節(jié)點A,配置每個節(jié)點分布4個分片隊列,現(xiàn)在將節(jié)點B加入了節(jié)點A所在群集。插件將自動在節(jié)點b中創(chuàng)建4個隊列,并將它們連接到分片分區(qū)。已經(jīng)傳遞的消息將不會被重新平衡,但新到達(dá)的消息將被分區(qū)到新的隊列。默認(rèn)情況下RabbitMQ的交換機以”all or nothing”方式工作,即:如果路由key與綁定到交換機的一組隊列匹配,則RabbitMQ將將消息路由到該集合中的所有隊列。因此,為了使這個插件能正常工作,我們需要將消息路由到一個交換機來分配消息,讓消息最多被分配到一個隊列。該插件提供了一種新的Exchange類型“x-modulus-hash”,它將使用傳統(tǒng)的哈希技術(shù)應(yīng)用于跨隊列分區(qū)消息。“x-modulus-hash”交換機將對用于發(fā)布消息的Routing-Key進(jìn)行hash,然后將hash值mod N來選擇路由消息的隊列,其中N是綁定到交換機的隊列數(shù)。此交換將完全忽略用于將隊列綁定到交換機的Routing-Key。如果只需要消息分區(qū),而不是由此插件提供的自動隊列創(chuàng)建,那么只需使用一致的哈希Exchange,這個后面介紹。安裝插件后,您可以通過設(shè)置與交換名稱匹配的策略來定義交換分片。例如,如果我們有一個稱為shard.images的交換,我們可以定義以下策略來分片:set_policy images-swarding "^images"這將為集群中的每個節(jié)點創(chuàng)建2個分片隊列,并使用Routingkey:”hello”對這些隊列進(jìn)行綁定,隨后定義一個名為images的exchange。隨后,插件會自動在每個節(jié)點上創(chuàng)建2個分片隊列,名為“sharding:images-*”。在上面的例子中,我們在定義策略時使用路由key為“hello”。這意味著用于分片的底層交換機將使用上面指定的hello路由key將分片隊列綁定到交換機。這意味著對于“Direct-Exchange”,使用路由密鑰hello發(fā)布的息將被路由到所有的分片隊列。如果您決定使用“Fanout-exchange”進(jìn)行分片,則在綁定期間使用的“hello”路由key將被交換機忽略。如果使用“x-modulus-hash”交換,則路由key也將被忽略。因此,根據(jù)您使用的交換機,路由策略定義在路由消息時會產(chǎn)生影響。Consistent-sharding Exchange在某些情況下,你可能希望發(fā)送到交換機的消息是一致和均勻地分布在多個不同的隊列。在上面的插件中如果隊列數(shù)量發(fā)生變化,則不難確保新的拓?fù)浣Y(jié)構(gòu)仍然在不同隊列之間均勻分配消息,此時就可以借助Consistent-sharding類型Exchange,與Sharding插件的主要區(qū)別是,該類Exchange不能自動創(chuàng)建分片隊列,需要手動創(chuàng)建并配置Binding關(guān)系,且支持一致性hash。在作為交換類型的一致哈希的情況下,從所接收的每個消息的Routing-key進(jìn)行哈希計算后散列存儲。因此,具有相同Routing-Key的消息將具有計算的相同散列,將被路由到相同的隊列。將隊列綁定到一致性哈希的Exchange時,綁定key是一個數(shù)字字符串,表示希望該隊列在整個hash空間中占有的點數(shù)。具有相同Routing-key的所有消息將進(jìn)入相同的隊列。因此,如果希望隊列A接收路由消息是隊列B接收路由消息的兩倍,那么需要將隊列A綁定到Exchange的綁定Key(字符串的數(shù)字)設(shè)置為隊列B的綁定Key的2倍。當(dāng)然,只有當(dāng)你的路由Key均勻分布在散列空間中時才是這種情況。例如,如果在所有消息上僅使用兩個不同的路由Key,即使其他隊列在其綁定Key中具有較高的值,兩個密鑰也可能路由到同一個隊列。使用更大的路由Key集合,路由Key的統(tǒng)計分布接近綁定Key的設(shè)置的比率。在這種情況下,隨機Routing-key的消息最終將會均勻分布到兩個隊列中。RabbitMQ可靠性與可用性討論但是如何確保消息傳遞的可靠性以及如何配置高可用,很多人都一直存在疑惑,實踐才是檢驗真理的唯一標(biāo)準(zhǔn),所以基于有貨某些使用場景,也分析總結(jié)下經(jīng)驗與教訓(xùn),希望對大家有所幫助。場景1,如何保證消息的傳遞可靠,生產(chǎn)者與消費者互不感知,那么怎么確認(rèn)生產(chǎn)者已將消息投遞到RabbitMQ服務(wù)端,又如何確認(rèn)消費者已經(jīng)消費了該消息?這里需要使用的RabbtiMQ提供的生產(chǎn)者Confirm機制、消費者Ack機制來解決;使用標(biāo)準(zhǔn)AMQP 0-9-1,保證消息不丟失的唯一方法是使用事務(wù):使信道事務(wù)發(fā)布,發(fā)布消息,提交。在這種情況下,交易是不必要的重量級,并將吞吐量降低250倍。為了彌補這一點,引入了確認(rèn)機制。它模仿了協(xié)議中已經(jīng)存在的消費者確認(rèn)機制。要啟用確認(rèn),客戶端發(fā)送confirm.select方法。根據(jù)是否設(shè)置不等待,RabbitMQ Broker可以通過confirm.select-ok進(jìn)行回復(fù)。一旦在通道上使用了confirm.select方法,就被認(rèn)為處于確認(rèn)模式。事務(wù)通道不能進(jìn)入確認(rèn)模式,一旦通道處于確認(rèn)模式,則不能進(jìn)行事務(wù)處理。一旦通道處于確認(rèn)模式,代理和客戶端都會計數(shù)消息(從第一個confirm.select開始計數(shù))。然后Broker通過在同一個頻道上發(fā)送basic.ack來確認(rèn)消息。發(fā)送標(biāo)簽字段包含已確認(rèn)消息的序列號。Broker還可以在basic.ack中設(shè)置多個字段,表明所有小于該序列號的消息都已到達(dá)并被處理。何時確認(rèn)呢?對于無法路由的消息,一旦exchange驗證了消息不會被路由到任何隊列(返回一個空列表的隊列),Broker將發(fā)出確認(rèn)。如果消息的發(fā)送屬性指定了為強制性(mandatory),則basic.return將在basic.ack之前發(fā)送給客戶端。否定的確認(rèn)也是如此(basic.nack)。對于可以路由的消息,當(dāng)所有隊列接受消息時,發(fā)送basic.ack。對于路由到持久隊列的持久消息,這意味著已保存到磁盤。對于鏡像隊列,這意味著隊列的所有鏡像都已接受該消息。當(dāng)RabbitMQ交付消息給Consumer時,需要確認(rèn)Message已被投遞到Consumer。Acknowledgemenets作用,consumer通知server已收到消息或者成功消費消息,根據(jù)使用的確認(rèn)模式,RabbitMQ可以在發(fā)送(寫入TCP套接字)后或當(dāng)接收到顯式(“手動”)客戶端確認(rèn)信息時立即考慮成功傳遞的消息。手動發(fā)送的確認(rèn)可以是正面或負(fù)面的,并使用以下協(xié)議方法之一:basic.ack用于肯定確認(rèn);basic.nack用于否定確認(rèn)(注意:這是AMQP0-9-1的RabbitMQ擴(kuò)展);basic.reject用于否定確認(rèn),但與basic.nack相比有一個限制;肯定的確認(rèn)只是指示RabbitMQ記錄一個消息傳遞。與basic.reject的否定確認(rèn)具有相同的效果。差異主要在語義上:肯定確認(rèn)假設(shè)一條消息已成功處理,而其負(fù)面對應(yīng)方則表示傳送未被處理但仍應(yīng)被刪除,可以批量手動確認(rèn)以減少網(wǎng)絡(luò)流量。綜上所述,在1的位置需要開啟Channel的Confirm模式,接收RabbitMQ服務(wù)端發(fā)送的確認(rèn)消息已到達(dá)的Ack信息;在3的位置,消費者在成功消費或者業(yè)務(wù)處理失敗后,需要顯示告訴RabbitMQ服務(wù)端,消息已被消費成功或者失??;在某些類型的網(wǎng)絡(luò)故障中,數(shù)據(jù)包丟失可能意味著中斷的TCP連接需要較長時間才能夠被操作系統(tǒng)檢測到。AMQP 0.9.1提供心跳功能,以確保應(yīng)用程序?qū)蛹皶r發(fā)現(xiàn)連接中斷。在我們的部署架構(gòu)中,ELB與RabbitMQ之間就是通過此機制來判斷服務(wù)是否存活,是否提示生產(chǎn)者服務(wù)端已掛,異步等待confirm的消息直接進(jìn)入unconfirm的處理環(huán)節(jié)。另外為了避免在代理中丟失消息,我們需要應(yīng)對代理重新啟動,代理硬件故障,甚至破壞代理崩潰。為了確保重新啟動時消息和代理定義生效,我們需要確保它們在磁盤上持久化。AMQP標(biāo)準(zhǔn)具有交換,隊列和持久消息的耐久性概念,要求持久對象或持久消息將在重新啟動后生存。場景2,如何實現(xiàn)處理失敗后重試機制;某些情況下,業(yè)務(wù)在處理消息時可能會失敗,此時需要做的是重試,而不是直接丟棄;當(dāng)然重試也不能僅僅是直接重試,一旦有任務(wù)長時間失敗,會導(dǎo)致后面的消息無法被正常處理,此時可以借助死信機制轉(zhuǎn)發(fā)投遞到重試隊列后,隨后再嘗試重新處理該消息;那如何實現(xiàn)呢?下面介紹下具體操作;x-dead-letter-exchange:死信轉(zhuǎn)發(fā)的exchange;x-dead-letter-routing-key:死信轉(zhuǎn)發(fā)時的routing-key;該隊列綁定到名為“amp.topic”的topic類型exchange,接收routing-key為“yoho_test_retry”的消息;死信轉(zhuǎn)發(fā)到“amp.topic”的exchange,routing-key為“yoho_test_retry”(即工作隊列接收該主題消息);x-message-ttl:message在重試隊列中存活的時間,也就是延遲多久重試;該隊列綁定到“amp.topic”,接收routing-key為“retry.yoho_test_retry”的消息(即接收工作隊列的死信),這樣就可以實現(xiàn)重試隊列的機制了。場景3,如何實現(xiàn)定時任務(wù);定時任務(wù),這也是一種常見的需求,那如何在RabbitMQ中實現(xiàn)這個能力,可以讓某些任務(wù)延時執(zhí)行。其實同樣的也可以借助死信機制來實現(xiàn),如隊列A用于接收暫存Producer的消息,隊列B用于Consumer的消費,在隊列A中指定消息的ttl即生命周期時長,同時指定其死信交換機DLXs,一旦消息在隊列中存活時長超過ttl的設(shè)定值,那么消息會被轉(zhuǎn)發(fā)到DLXs,綁定隊列B到DLXs,即可接收到隊列A的死信;下面具體看下操作流程;從“amp.topic”的exchange中接收routing-key為“delay.yoho_test_delay”主題的消息;死信轉(zhuǎn)到交換機“amp.topic”中,消息的routing-key為“delay.yoho_test_delay”(即工作隊列接收的延遲消息的隊列),消息在延遲隊列中存活時間ttl,同時該隊列綁定到“amp.topic”交換機,接收routing-key為“yoho_test_delay”注意的消息(即生產(chǎn)發(fā)送消息指定的topic);如此一來延遲隊列接收消息后,等待ttl時長,將消息轉(zhuǎn)發(fā)到工作隊列中,即可實現(xiàn)延遲隊列機制。場景4,如何跨中心共享消息有時跨中心業(yè)務(wù)需要共享消息,如緩存清理等,在業(yè)務(wù)代碼中分別向多個中心的RabbitMQ發(fā)布消費消息顯然不是一種比較好的解決方案,那還有什么好的方法呢,RabbitMQ為此提供了Federation插件來很好的解決此類問題;Federation插件是一個在不需要cluster,而在brokers之間傳輸消息的高性能插件,F(xiàn)ederation插件可以在brokers或者cluster之間傳輸消息,連接的雙方可以使用不同的users和virtual hosts,或者雙方的rabbitmq和erlang的版本不一致,F(xiàn)ederation插件使用AMQP協(xié)議通訊,可以接受不連續(xù)的傳輸。Federation插件允許你配置一個exchanges Federation或者queues Federation。一個exchange/queues federation允許你從一個或者多個upstream接受信息(就是遠(yuǎn)程的exchange/queues或者其他brokers),一個federation exchange可以路由消息到一個本地queue中,一個federation queue可以使一個本地的consumer接受從upstream queue過來的消息。下面分別看下如何啟用,創(chuàng)建以及兩者的區(qū)別:(1)啟動Federation插件命令:rabbitmq-pluginsenable rabbitmq_federation(2)啟動web配置插件:rabbitmq-pluginsenable rabbitmq_federation_managementFederation exchanges,可以看成downstream從upstream主動拉取消息,但并不是拉取所有消息,必須是在downstream上已經(jīng)明確定義Bindings關(guān)系的exchange,也就是有實際的物理queue來接收消息,才會從upstream拉取消息到downstream。使用AMQP協(xié)議實施代理間通信。Downstream會將綁定關(guān)系組合在一起,綁定/解除綁定命令將發(fā)送到upstream交換機。因此,F(xiàn)ederation交換機只接收具有訂閱的消息。任何upstream exchange接收到的消息都可能被downstream中Federation exchange接收到,但直接發(fā)送給Federation Exchange的消息是不能upstream中所綁定的exchange接收到的。Federation queues與Federationexchange最大的區(qū)別共享消息的機制不同,F(xiàn)ederation隊列僅在本地消耗消息時檢索消息,消費者需要消息,并且Upstream隊列具有未被消費的消息。這樣做的目的是確保消息僅在需要時,才會在聯(lián)合隊列之間傳輸。先看下創(chuàng)建過程,如下創(chuàng)建一個名為“fqueue_test”的federationFederation隊列可以被聲明為任何其他隊列。為了使RabbitMQ能夠識別出隊列需要聯(lián)合,還有哪些節(jié)點消息應(yīng)該被消耗,Downstream(消費)節(jié)點需要進(jìn)行配置。通過聲明策略來完成配置。策略是隊列名稱匹配的模式。匹配隊列將聯(lián)合。Federation隊列只能屬于一個策略。如果多個策略與隊列名稱匹配,則應(yīng)用優(yōu)先級最高的策略。當(dāng)兩個策略具有相同的優(yōu)先級時,隨機選擇匹配的策略。Federation隊列可以作為另一個Federation隊列的“上游”,甚至可以形成“循環(huán)”,例如,隊列A將隊列B聲明為上游,隊列B將隊列A聲明為上游。允許更復(fù)雜的多重連接的安排。Federation隊列將使用AMQP連接到其所有上游隊列。在聲明或配置聯(lián)合隊列時,每個上游隊列都將列出用于建立鏈接的連接屬性。每個單獨的隊列分別應(yīng)用其參數(shù),例如,如果在聯(lián)合隊列上設(shè)置x-max-length,則該隊列的長度將受限制(可能會在其已滿時丟棄消息),但與其聯(lián)合的其他隊列將不受影響。特別要注意的是,當(dāng)每個隊列或每個消息的TTL被使用時,當(dāng)一個消息被傳送到另一個隊列時,它的定時器將被重置。與Federation交換機不同,在Federation隊列之間可以轉(zhuǎn)發(fā)消息的次數(shù)沒有限制。在一組相互聯(lián)合的隊列中,消息將移動到空閑消費容量的位置。因此,如果閑置消費容量繼續(xù)移動,消息將繼續(xù)移動。上述就是Federation的使用方法以及注意點,當(dāng)然與其他插件的配合,可以衍生出多種使用方法。場景5,如何保證消息隊列的高可用,這樣的場景很多比如核心業(yè)務(wù)的訂單服務(wù)、erp服務(wù)等等。默認(rèn)情況下,RabbitMQ群集中的隊列位于單個節(jié)點(首次被聲明的節(jié)點上),而Exchanges和Bindings可以認(rèn)為在所有節(jié)點上存在,可以選擇在cluster中跨節(jié)點節(jié)點之間配置為鏡像隊列。每個鏡像隊列由一個master和一個或多個slave組成,如果master因為某些原因失效,則將從slave中選擇一個提升為master。發(fā)布到隊列的消息將復(fù)制到所有鏡像。消費者連接到主機,無論它們連接到哪個節(jié)點,鏡像會丟棄已在主設(shè)備上確認(rèn)的消息。隊列鏡像因此增強了可用性,但不跨節(jié)點分配負(fù)載(所有參與節(jié)點都執(zhí)行所有工作)。隊列已被配置為鏡像,master節(jié)點位于server5,slave節(jié)點位于server6,此時,隨意關(guān)閉任意一臺RabbitMQ節(jié)點,該隊列都可以正常對外提供服務(wù)。A先掛,B后掛,master轉(zhuǎn)移B,此時需先拉起B(yǎng),后拉起A,可恢復(fù)鏡像隊列;A、B同時掛,同時拉起,可恢復(fù)鏡像隊列;若新加入cluster節(jié)點,最好不要在生產(chǎn)環(huán)境手工同步,采用自然同步方式,對于沒有工作queue可以手工同步操作(同步操作時,queue是不可用的);當(dāng)所有slave都處在(與master)未同步狀態(tài)時,并且ha-promote-on-shutdown policy設(shè)置為when-syned(默認(rèn))時,如果master因為主動的原因停掉,比如是通過rabbitmqctl stop命令停止或者優(yōu)雅關(guān)閉OS,那么slave不會接管master,也就是說此時鏡像隊列不可用;但是如果master因為被動原因停掉,比如VM或者OS crash了,那么slave會接管master。這個配置項隱含的價值取向是優(yōu)先保證消息可靠不丟失,放棄可用性。如果ha-promote-on-shutdown policy設(shè)置為alway,那么不論master因為何種原因停止,slave都會接管master,優(yōu)先保證可用性。A、B都未掛,兩者網(wǎng)絡(luò)異常,各自為master,此時出現(xiàn)網(wǎng)絡(luò)分區(qū)沖突,必須手工介入保證消息不丟失,萬不可隨意重啟導(dǎo)致數(shù)據(jù)丟失(不論是否持久化),將一臺數(shù)據(jù)量較小的從cluster中剔除,消費完成后再重啟恢復(fù)鏡像;或者將其中一臺從集群中剔除后,加入另外一臺slave,再消費完成剔除的節(jié)點中數(shù)據(jù);(會出現(xiàn)重復(fù)消費,此時需要客戶端做冪等處理保證唯一一次消費)當(dāng)然在高可用的場景下,隊列的性能會受到一定的影響,此時可以借助上面提到的Sharding機制(根據(jù)場景選擇x-modulus-hash還是consistent-hash),解決單隊列的性能瓶頸,在高可用、高并發(fā)下尋求一個動態(tài)的平衡;上圖為鏡像場景的壓測結(jié)果,對比普通集群,鏡像對性能的影響很明顯,消息持久化也拉低了集群的性能,適當(dāng)增加Prefetch可以提高集群性能。性能與高可靠、高可用,魚和熊掌不可兼得,所以欲提升RabbitMQ集群或單節(jié)點服務(wù)的性能,犧牲可靠性(根據(jù)場景來),在消費能力范圍內(nèi),盡量提高prefetch的數(shù)量,其次就是簡單粗暴型(加機器(隊列實際存儲節(jié)點性能未榨干,建議隊列均衡分配到各節(jié)點)、加配置)。Spring AMQP提供了一個API,可輕松訪問AMQP消息代理。像往常一樣,Spring模板作為技術(shù)細(xì)節(jié)的抽象。對于AMQP,AmqpTemplate可以做到這一點。Spring-amqp項目擁有所有必要的通用接口(例如AmqpTemplate)和API類,而具體的實現(xiàn)則依賴spring-rabbitmq,Spring-rabbitmq依賴于RabbitMQ amqp-client的通用Java API??蛻舳藨?yīng)用程序僅依靠spring-amqp來實現(xiàn)松耦合。能夠從一個AMQP代理切換到另一個AMQP代理,而不會在代碼中進(jìn)行任何重大更改。