<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          RabbitMQ 消費(fèi)消息

          共 7037字,需瀏覽 15分鐘

           ·

          2023-05-18 16:38

          a91964c15dacad9ad140f314b491170d.webp



          RabbitMQ系列》: 1 .? RabbitMQ 與 AMQP 協(xié)議
          2 .? RabbitMQ 消息屬性詳解 3. Rab bitMQ 可靠性與性能的平衡


          對(duì)比 Basic.Get 和 Basic.Consume

          RabbitMQ 實(shí)現(xiàn)了兩個(gè)不同的 AMQP RPC 命令來(lái)獲取隊(duì)列中的消息:Basic.GetBasic.Consume

          Basic.Get 是一個(gè)輪詢(xún)模型,而 Basic.Consume 是一個(gè)推送模型。Basic.Get 不是從服務(wù)器獲取消息的理想方法。

          Basic.Get

          當(dāng)應(yīng)用程序使用 Basic.Get 請(qǐng)求來(lái)獲取消息時(shí),每次接收消息就必須發(fā)送一個(gè)新的請(qǐng)求,即使隊(duì)列中存在多個(gè)消息。

          如果在發(fā)出 Basic.Get RPC 請(qǐng)求時(shí)有一條消息可用,RabbitMQ 將返回 Basic.GetOk 以及消息本身。如下圖:

          f7cc7c613dac99d1550be939a43c509b.webpBasic.Get_1

          如果隊(duì)列中沒(méi)有待處理的消息,則 RabbitMQ 使用 Basic.GetEmpty 進(jìn)行響應(yīng),如下圖:

          6a494f4d445f1d7a7e742f683fdf6fa0.webpBasic.Get_2

          當(dāng)使用 Basic.Get 時(shí),應(yīng)用程序需要評(píng)估來(lái)自 RabbitMQRPC 響應(yīng)以確定是否收到了消息。這里的“評(píng)估”是指需要應(yīng)用程序主動(dòng)判斷,需要有代碼實(shí)現(xiàn)這部分邏輯。

          因?yàn)槭褂?Basic.Get 會(huì)導(dǎo)致每條消息都會(huì)產(chǎn)生與 RabbitMQ 同步通信的開(kāi)銷(xiāo),這一過(guò)程由發(fā)送請(qǐng)求幀的客戶(hù)端應(yīng)用程序和發(fā)送應(yīng)答的 RabbitMQ 組成。在簡(jiǎn)單的消息速度測(cè)試中,使用 Basic.Consume 至少是使用 Basic.Get 的兩倍。

          避免使用 Basic.Get 的一個(gè)潛在的不太明顯的原因是它會(huì)影響吞吐量,由于 Basic.Get 的臨時(shí)性,RabbitMQ 不能以任何方式優(yōu)化投遞過(guò)程,因?yàn)樗肋h(yuǎn)不知道應(yīng)用程序何時(shí)會(huì)請(qǐng)求消息。

          Basic.Consume

          使用 Basic.Consume RPC 命令來(lái)消費(fèi)消息,在消費(fèi)者可用時(shí),RabbitMQ 以異步的方式向消費(fèi)者發(fā)送消息。這通常被稱(chēng)為發(fā)布——訂閱模式。

          使用 Basic.Consume 消費(fèi)消息,當(dāng)一個(gè)客戶(hù)端發(fā)出 Basic.Consume 時(shí),一旦有消息可用時(shí) RabbitMQ 就會(huì)進(jìn)行發(fā)送,直到客戶(hù)端發(fā)出一個(gè) Basic.Consume 為止。如下圖:

          a9e2a4983923b636c36b340485253e16.webpBasic.Consume

          消費(fèi)者標(biāo)簽

          當(dāng)應(yīng)用程序發(fā)出 Basic.Consume 時(shí)會(huì)創(chuàng)建一個(gè)唯一的字符串,用來(lái)標(biāo)識(shí)通過(guò)已建立的信道與 RabbitMQ 進(jìn)行通信的應(yīng)用程序。這個(gè)字符串被稱(chēng)為消費(fèi)者標(biāo)簽(Consumer Tag),RabbitMQ 每次都會(huì)把該字符串與消息一起發(fā)送給應(yīng)用程序。

          如果一個(gè)應(yīng)用程序同時(shí)從多個(gè)隊(duì)列中消費(fèi)消息,消費(fèi)者標(biāo)簽就非常有用。因?yàn)槊總€(gè)收到的消息都在它的方法幀中包含該消息所投遞的目標(biāo)消費(fèi)者標(biāo)簽。如果應(yīng)用程序需要對(duì)從不同隊(duì)列接收到的消息執(zhí)行不同的操作,則可以使用 Basic.Consume 請(qǐng)求中的消費(fèi)者標(biāo)簽來(lái)確定該如何處理消息。在大多數(shù)情況下,客戶(hù)端已經(jīng)對(duì)消費(fèi)者標(biāo)簽做了封裝,所以不必?fù)?dān)心它。

          通過(guò)發(fā)送一個(gè) Basic.Cancel RPC 命令,消費(fèi)者標(biāo)簽可以用來(lái)取消從 RabbitMQ 獲取消息。

          在同步 Basic.Get 和異步 Basic.Consume 之間進(jìn)行選擇是在編寫(xiě)消費(fèi)者應(yīng)用程序時(shí)需要做的幾個(gè)決策之一。與發(fā)布消息時(shí)涉及的權(quán)衡一樣,你為應(yīng)用程序所做的選擇可能會(huì)直接影響消息的可靠投遞和性能。

          優(yōu)化消費(fèi)者性能

          當(dāng)發(fā)布消息時(shí),對(duì)消息的消費(fèi)在吞吐量與可靠性之間存在一種平衡。RabbitMQ 也提供了一些選項(xiàng),在弱化消息投遞保證的同時(shí)提高消息投遞的吞吐量。如下圖:

          f7956175a5e35b027984a3b4b8a8e47c.webp消費(fèi)者性能優(yōu)化維度
          使用 no-ack 模式實(shí)現(xiàn)更快的吞吐量

          在消息消費(fèi)時(shí),應(yīng)用程序發(fā)送一個(gè) Basic.Consume RPC 請(qǐng)求,與該請(qǐng)求一起發(fā)送的還有一個(gè) no-ack 標(biāo)志。當(dāng)這個(gè)標(biāo)志被啟用時(shí),它會(huì)告訴 RabbitMQ 消費(fèi)者在接收到消息時(shí)不會(huì)進(jìn)行確認(rèn),RabbitMQ 只管盡快發(fā)送它們。但這也是發(fā)送消息最不可靠的方式。

          重點(diǎn)是要清除,在消費(fèi)者應(yīng)用程序之前有多個(gè)數(shù)據(jù)緩沖區(qū)接收消息。如下圖:

          0ade2c5e328f8203ef57bae3cbc4b701.webp消費(fèi)者之前存在多個(gè)數(shù)據(jù)緩沖區(qū)

          當(dāng) RabbitMQ 通過(guò)打開(kāi)的連接發(fā)送消息時(shí),它使用 TCP 套接字連接與客戶(hù)端信息通信。如果這個(gè)連接是打開(kāi)且可寫(xiě)的,那么 RabbitMQ 假定一切都處于正常工作狀態(tài)并且成功投遞了消息。如果當(dāng) RabbitMQ 嘗試寫(xiě)入套接字以投遞消息時(shí)出現(xiàn)了網(wǎng)絡(luò)問(wèn)題,操作系統(tǒng)將觸發(fā)套接字錯(cuò)誤從而讓 RabbitMQ 知道出現(xiàn)了問(wèn)題。如果沒(méi)有發(fā)送錯(cuò)誤,RabbitMQ 將假定消息投遞成功。

          通過(guò) Basic.Ack RPC 響應(yīng)發(fā)送的消息確認(rèn)是客戶(hù)端讓 RabbitMQ 知道已成功接收消息的一種方法,這也是大多數(shù)情況下處理消息的方式。但如果關(guān)閉消息確認(rèn),那么當(dāng)有新的可用消息時(shí),RabbitMQ 將會(huì)發(fā)送該消息而不用等待。實(shí)際上,只要有新的消息,RabbitMQ 將會(huì)持續(xù)向消費(fèi)者發(fā)送它們直到套接字緩沖區(qū)被填滿(mǎn)為止。

          在 Linux 中增加接收套接字緩沖區(qū)

          要增加 Linux 操作系統(tǒng)中接收套接字緩沖區(qū)的數(shù)量,我們應(yīng)該增加 net.core.rmem_defaultnet.core.rmem_max 值(默認(rèn)是 128 KB)。對(duì)于大多數(shù)環(huán)境來(lái)說(shuō),16 MB16777216) 應(yīng)該足夠了。大多數(shù) Linux 發(fā)行版都可以在 /etc/sysctl.conf 中更改此值,也可以通過(guò)以下命令來(lái)手動(dòng)設(shè)置:

                  
                  echo?16777216>/proc/sys/net/core/rmem_default
          echo?16777216>/proc/sys/net/core/rmem_max

          值得注意的是,當(dāng)操作系統(tǒng)的套接字接收緩沖區(qū)中存放了大量的消息時(shí),如果消費(fèi)者應(yīng)用程序發(fā)生崩潰,并且套接字關(guān)閉時(shí),RabbitMQ 認(rèn)為它已經(jīng)發(fā)送了這些消息,并且不會(huì)收到應(yīng)該從操作系統(tǒng)讀取多少消息的指示。應(yīng)用程序所面臨的風(fēng)險(xiǎn)取決于消息的大小和數(shù)量以及操作系統(tǒng)中套接字接收緩沖區(qū)的大小。

          通過(guò)服務(wù)質(zhì)量設(shè)置控制消費(fèi)者預(yù)取

          AMQP 規(guī)范要求信道具有服務(wù)質(zhì)量(Quality Of Service,QoS)設(shè)置,即在確認(rèn)消息接收之前,消費(fèi)者可以預(yù)先要求接收一定數(shù)量的消息。QoS 設(shè)置允許 RabbitMQ 通過(guò)為消費(fèi)者預(yù)先分配一定數(shù)量的消息來(lái)實(shí)現(xiàn)更高效的消息發(fā)送。

          no-ack=true 的消費(fèi)者不同,如果消費(fèi)者應(yīng)用程序在確認(rèn)消息之前崩潰,則在套接字關(guān)閉時(shí),所有預(yù)取的消息將返回到隊(duì)列。

          在協(xié)議級(jí)別上,可以在信道上發(fā)送 Basic.QoS RPC 請(qǐng)求來(lái)指定服務(wù)質(zhì)量。作為這個(gè) RPC 請(qǐng)求的一部分,可以指定 QoS 設(shè)置是針對(duì)其發(fā)送的信道還是針對(duì)連接上打開(kāi)的所有信道。Basic.QoS RPC 請(qǐng)求可以隨時(shí)發(fā)送,通常在用戶(hù)發(fā)出 Basic.Consume RPC 請(qǐng)求之前進(jìn)行發(fā)送。

          注意

          雖然 AMQP 規(guī)范要求 Basic.QoS 方法同時(shí)設(shè)置預(yù)取總量和預(yù)取大小,但如果設(shè)置了 no-ack 選項(xiàng),預(yù)取大小將被忽略。

          過(guò)度分配預(yù)取總量會(huì)對(duì)消息吞吐量有負(fù)面影響。

          一次確認(rèn)多個(gè)消息

          使用 QoS 設(shè)置的好處之一就是不需要用 Basic.ACK RPC 響應(yīng)來(lái)確認(rèn)收到的每條消息。相反,Basic.ACK RPC 響應(yīng)具有一個(gè)名為 multiple 的屬性,當(dāng)把它設(shè)置為 True 時(shí)就能讓 RabbitMQ 知道你的應(yīng)用程序想要確認(rèn)所有以前未確認(rèn)的消息。

          同時(shí)確認(rèn)多個(gè)消息可以使處理消息所需的網(wǎng)絡(luò)通信量最小化,從而提高消息吞吐量。如果成功地處理了一些消息,并且應(yīng)用程序在確認(rèn)它們之前就已經(jīng)死亡,則所有未確認(rèn)的消息將返回隊(duì)列以供其他消費(fèi)者進(jìn)行處理。

          消費(fèi)者使用事務(wù)

          事務(wù)處理允許消費(fèi)者應(yīng)用程序提交和回滾批量操作。事務(wù)可能會(huì)對(duì)消息吞吐量產(chǎn)生負(fù)面影響,但有一個(gè)例外,如果你使用 QoS 設(shè)置,那么在使用事務(wù)來(lái)批量確認(rèn)消息時(shí),實(shí)際上可能會(huì)看到略微的性能提升。

          無(wú)論是使用它們進(jìn)行批量消息確認(rèn)還是確保在消費(fèi)消息時(shí)可以回滾 RPC 響應(yīng),了解事務(wù)對(duì)性能的真是影響將幫助你在消息可靠投遞和消息吞吐量之間找到適當(dāng)?shù)钠胶狻?/p>

          注意,事務(wù)不適用于已禁用確認(rèn)的消費(fèi)者。

          拒絕消息

          當(dāng)消息本身或消息的處理過(guò)程出現(xiàn)問(wèn)題時(shí),RabbitMQ 提供了兩種將消息踢回代理服務(wù)器的機(jī)制:Basic.RejectBasic.Nack

          消費(fèi)者可以對(duì)消息進(jìn)行確認(rèn)、拒絕和否定確認(rèn)。Basic.Nack 允許一次拒絕多個(gè)消息,而 Basic.Reject 一次只允許拒絕一個(gè)消息。

          Basic.Reject

          Basic.Reject 是一個(gè) AMQP 指定的 RPC 響應(yīng),用于通知代理服務(wù)器無(wú)法對(duì)所投遞的消息進(jìn)行處理。像 Basic.Ack 一樣,它攜帶由 RabbitMQ 創(chuàng)建的投遞標(biāo)簽,用于唯一標(biāo)識(shí)消費(fèi)者與 RabbitMQ 進(jìn)行通信的信道上的消息。

          當(dāng)消費(fèi)者拒絕消息時(shí),可以指示 RabbitMQ 丟棄消息或使用 requeue 標(biāo)志重新發(fā)送消息。當(dāng)啟用 requeue 標(biāo)志時(shí),RabbitMQ 將把消息放回到隊(duì)列中并再次處理。redelivered 標(biāo)志,用于通知消息的下一個(gè)消費(fèi)者它以前已經(jīng)被投遞過(guò)。

          Basic.Nack

          Basic.Nacknegative acknowledgment(否定確認(rèn))的縮寫(xiě),并非 AMQP 規(guī)范所定義的行為,但是對(duì) Basic.Ack 多消息處理行為進(jìn)行了補(bǔ)充。

          死信交換器

          RabbitMQ 的死信交換器(Dead-Letter eXchange,DLX)功能是對(duì) AMQP 規(guī)范的擴(kuò)展,是一種可以拒絕已投遞消息的可選行為。

          盡管聽(tīng)起來(lái)像是一種特殊的交換器,但實(shí)際上死信交換器是一種正常的交換器。創(chuàng)建它時(shí)沒(méi)有特別的要求也不需要執(zhí)行特別的操作。使交換器成為死信交換器的唯一要做的事情是在創(chuàng)建隊(duì)列時(shí)聲明該交換器將被用作保存被拒絕的消息。一旦拒絕了一個(gè)不重新發(fā)送的消息,RabbitMQ 將把消息路由到隊(duì)列的 x-dead-letter-exchange 參數(shù)中指定的交換器。

          死信交換器與備用交換器

          過(guò)期或被拒絕的消息通過(guò)死信交換器進(jìn)行投遞,而備用交換器則路有那些無(wú)法由 RabbitMQ 路由的消息。

          除交換器外,死信功能還允許使用預(yù)先指定的值覆蓋路由鍵。這樣可以允許使用同一個(gè)交換器同時(shí)處理死信消息和非死信消息,但需要確保死信消息不被投遞到相同的隊(duì)列。設(shè)置預(yù)定義的路由鍵需要在聲明隊(duì)列時(shí)指定一個(gè)額外的參數(shù) x-dead-letter-routing-key

          根據(jù) AMQP 標(biāo)準(zhǔn),RabbitMQ 中的所有隊(duì)列設(shè)置都是不可變的,意味著,在隊(duì)列被聲明后它們不能被修改。為了改變隊(duì)列的死信交換器,必須刪除并重新聲明它。

          控制隊(duì)列

          消費(fèi)者應(yīng)用程序有很多不同的應(yīng)用場(chǎng)景。對(duì)于某些應(yīng)用程序,可以接受多個(gè)消費(fèi)者監(jiān)聽(tīng)同一個(gè)隊(duì)列,而對(duì)于其他一些消費(fèi)者,一個(gè)隊(duì)列應(yīng)該只有一個(gè)消費(fèi)者。

          RabbitMQ 在創(chuàng)建隊(duì)列時(shí)幾乎為任何應(yīng)用場(chǎng)景提供了足夠的靈活性,定義隊(duì)列時(shí),有多個(gè)設(shè)置可以確定隊(duì)列的行為:

          • 自動(dòng)刪除自己
          • 只允許一個(gè)消費(fèi)者進(jìn)行消費(fèi)
          • 自動(dòng)過(guò)期的消息
          • 保持有限數(shù)量的消息
          • 將舊消息推出堆棧

          按照 AMQP 規(guī)范,隊(duì)列的設(shè)置是不可變的,一旦聲明了一個(gè)隊(duì)列,就不能改變用來(lái)創(chuàng)建它的任何設(shè)置。要更改隊(duì)列設(shè)置 ,必須刪除并重新創(chuàng)建它。

          臨時(shí)隊(duì)列
          自動(dòng)刪除隊(duì)列

          自動(dòng)刪除的隊(duì)列可以被創(chuàng)建并且用來(lái)處理消息,一旦消費(fèi)者完成連接和檢索消息,在斷開(kāi)連接時(shí)隊(duì)列將被刪除。

          創(chuàng)建自動(dòng)刪除隊(duì)列非常簡(jiǎn)單,只需要在 Queue.Declare RPC 請(qǐng)求中將 auto_delete 標(biāo)志設(shè)置為 Ture 即可。

          需要注意的是,任意數(shù)量的消費(fèi)者都可以對(duì)自動(dòng)刪除隊(duì)列進(jìn)行消費(fèi),隊(duì)列只會(huì)在沒(méi)有消費(fèi)者監(jiān)聽(tīng)的時(shí)候自行刪除。

          只允許單個(gè)消費(fèi)者

          如果沒(méi)有在隊(duì)列上啟用 exclusive 設(shè)置,那么可以連接到隊(duì)列并消費(fèi)消息的消費(fèi)者數(shù)量沒(méi)有限制。并對(duì)能夠從隊(duì)列中接收消息的所有消費(fèi)者實(shí)施輪詢(xún)(round-robin)投遞行為。

          auto_delete 參數(shù)一樣,啟用隊(duì)列的獨(dú)占屬性需要在隊(duì)列創(chuàng)建時(shí)傳遞參數(shù) exclusive=Ture。聲明為 exclusive 的隊(duì)列只能被聲明時(shí)所指定的同一個(gè)連接和信道所消費(fèi),當(dāng)創(chuàng)建隊(duì)列的信道關(guān)閉時(shí),獨(dú)占隊(duì)列也將自動(dòng)被刪除。

          auto_delete 隊(duì)列不同,在信道關(guān)閉之前,可以根據(jù)需要多次使用和取消 exclusive 隊(duì)列的消費(fèi)者。exclusive 隊(duì)列自動(dòng)刪除行為的發(fā)送不會(huì)考慮是否已經(jīng)發(fā)出了一個(gè) Basic.Consume 請(qǐng)求,這與 auto-delete 隊(duì)列不同。

          自動(dòng)過(guò)期隊(duì)列

          創(chuàng)建一個(gè)自動(dòng)過(guò)期的隊(duì)列非常簡(jiǎn)單,在聲明隊(duì)列時(shí)使用 x-expires 參數(shù),該參數(shù)以毫秒為單位設(shè)置隊(duì)列的生存時(shí)間(Time To Live,TTL)。

          自動(dòng)過(guò)期隊(duì)列有一些嚴(yán)格的規(guī)定:

          • 隊(duì)列只有在沒(méi)有消費(fèi)者的情況下才會(huì)過(guò)期。
          • 隊(duì)列只有在 TTL 周期之內(nèi)沒(méi)有收到 Basic.Get 請(qǐng)求時(shí)才會(huì)到期。一旦一個(gè) Basic.Get 請(qǐng)求中已經(jīng)包含了一個(gè)具有過(guò)期值的隊(duì)列,那么過(guò)期設(shè)置無(wú)效,該隊(duì)列將不會(huì)被自動(dòng)刪除。
          • 與任何其他隊(duì)列一樣,不能重新聲明或更改 x-expires 的設(shè)置和參數(shù)。
          • RabbitMQ 不保證刪除過(guò)期隊(duì)列這一過(guò)程的時(shí)效性。
          永久隊(duì)列
          隊(duì)列持久性

          在聲明隊(duì)列時(shí)將 durable 標(biāo)志設(shè)置為 Ture,這樣在服務(wù)器重啟之后隊(duì)列仍然存在。

          當(dāng)消息發(fā)布時(shí)將 delivery-mode 屬性設(shè)置為 2 時(shí),消息就會(huì)存儲(chǔ)在磁盤(pán)上。相反,durable 標(biāo)志告訴 RabbitMQ 希望隊(duì)列被設(shè)置在服務(wù)器中,直到 Queue.Delete 請(qǐng)求被調(diào)用為止。

          隊(duì)列中消息自動(dòng)過(guò)期

          消息級(jí)別的 TTL 設(shè)置允許服務(wù)器端對(duì)消息的最大生存時(shí)間進(jìn)行限制。聲明隊(duì)列時(shí)同時(shí)指定死信交換器和 TTL 值將導(dǎo)致該隊(duì)列中的已到期的消息成為死信消息。

          與消息的過(guò)期時(shí)間屬性相反,x-message-ttl 隊(duì)列設(shè)置強(qiáng)制規(guī)定了隊(duì)列中所有消息的最大生存時(shí)間。

          最大長(zhǎng)度隊(duì)列

          RabbitMQ 3.1.0 開(kāi)始,可以在聲明隊(duì)列時(shí)指定最大長(zhǎng)度。如果在隊(duì)列上設(shè)置了 x-max-length 參數(shù),一旦達(dá)到最大值,RabbitMQ 會(huì)在添加新消息時(shí)刪除位于隊(duì)列前端的消息。

          如果使用死信交換器聲明隊(duì)列,則從隊(duì)列前端移除的消息可能成為死信。

          任意隊(duì)列設(shè)置

          隊(duì)列參數(shù)可用于設(shè)置高可用性隊(duì)列、死信交換器、消息過(guò)期時(shí)間、隊(duì)列過(guò)期時(shí)間和隊(duì)列最大長(zhǎng)度。AMQP 規(guī)范將隊(duì)列參數(shù)定義為一個(gè)表,其中參數(shù)值的語(yǔ)法和語(yǔ)義由服務(wù)器確定。RabbitMQ 保留了這些參數(shù),并忽略了傳入的任何其他參數(shù)。隊(duì)列的保留參數(shù)如下:

          c3ee188e4b6c035558b83abf21622a1d.webp保留參數(shù)

          參數(shù)是設(shè)置隊(duì)列級(jí)別監(jiān)控和閾值的有用方法。

          37add1b3d59127145dd131de1a3ef274.webp

          記得 轉(zhuǎn)發(fā) 在看 關(guān)注 哦!
          瀏覽 105
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产熟妇疯狂性做爰XXXⅩ网站 | 色精品视频 | 欧美成在线观看 | 美女网站黄a | 一卡二卡久久 |