<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          說說 RabbiMQ 的應(yīng)答模式

          共 4649字,需瀏覽 10分鐘

           ·

          2021-01-14 22:17

          RabbiMQ 我們都很熟悉了,是很常用的一個(gè)開源消息隊(duì)列。搞懂 RabbiMQ 的應(yīng)答模式對我們排查錯(cuò)誤很有幫助,也能避免一些坑。本文說說 RabbiMQ 的應(yīng)答模式。

          生產(chǎn)者發(fā)出一條消息給 RabbiMQ ,RabbiMQ 將消息推送給消費(fèi)者,消費(fèi)者處理完消息后告訴 RabbiMQ,我已經(jīng)接收到消息并處理了,RabbiMQ 收到通知后會(huì)將消息從隊(duì)列中刪除。消費(fèi)者通知 MQ 的這個(gè)過程就是消息的應(yīng)答。在 RabbiMQ 中有兩種應(yīng)答模式:自動(dòng)應(yīng)答和手動(dòng)應(yīng)答。

          版本

          • dotNET Core :3.1
          • RabbitMQ:3.8.2
          • RabbitMQ.Client:6.2.1

          自動(dòng)應(yīng)答

          當(dāng) RabbiMQ 開啟了消息的自動(dòng)應(yīng)答,一旦 RabbiMQ 將消息分發(fā)給了消費(fèi)者,就會(huì)將消息從內(nèi)存中刪除。這種情況下,如果正在執(zhí)行的消費(fèi)者掛掉,就會(huì)丟失正在處理的消息。

          生產(chǎn)者代碼

          static?void?Main(string[]?args)
          {
          ????ConnectionFactory?factory?=?new?ConnectionFactory
          ????{
          ????????UserName?=?"oec2003",
          ????????Password?=?"000000",
          ????????HostName?=?"10.211.55.6"
          ????};

          ????using?(var?connection?=?factory.CreateConnection())
          ????using?(var?channel?=?connection.CreateModel())
          ????{
          ????????Console.WriteLine("RabbitMQ連接成功,請輸入消息,輸入exit退出");
          ????????channel.QueueDeclare("oec2003",?false,?false,?false,?null);
          ????????string?input;
          ????????do
          ????????{
          ????????????input?=?Console.ReadLine();
          ????????????var?body?=?Encoding.UTF8.GetBytes(input);
          ????????????channel.BasicPublish("",?"oec2003",?null,?body);
          ????????}
          ????????while?(input.Trim().ToLower()?!=?"exit");
          ????}
          }

          消費(fèi)者代碼

          static?void?Main(string[]?args)
          {
          ????ConnectionFactory?factory?=?new?ConnectionFactory
          ????{
          ????????UserName?=?"oec2003",
          ????????Password?=?"000000",
          ????????HostName?=?"10.211.55.6"
          ????};
          ????using?(var?connection?=?factory.CreateConnection())
          ????using?(var?channel?=?connection.CreateModel())
          ????{
          ????????Console.WriteLine("消費(fèi)者開始監(jiān)聽......");
          ????????channel.QueueDeclare("oec2003",?false,?false,?false,?null);
          ????????EventingBasicConsumer?consumer?=?new?EventingBasicConsumer(channel);
          ????????consumer.Received?+=?(ch,?ea)?=>
          ????????{
          ????????????string?message?=?Encoding.Default.GetString(ea.Body.ToArray());
          ????????????Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd?HH:mm:ss")}:收到消息:?{message}");
          ????????????System.Threading.Thread.Sleep(10000);
          ????????};
          ????????channel.BasicConsume("oec2003",?true,?consumer);
          ????????Console.ReadKey();
          ????}
          }
          • channel.BasicConsume 方法的第二個(gè)參數(shù)設(shè)置為 true 表示自動(dòng)應(yīng)答;
          • 開啟自動(dòng)應(yīng)答后,消息是生產(chǎn)者發(fā)布后,當(dāng)有消費(fèi)者連接上后,所有的消息都會(huì)被自動(dòng)確認(rèn),并且從內(nèi)存中刪除,這時(shí)如果消費(fèi)者進(jìn)程掛掉,沒有處理的消息會(huì)丟失,正在處理中的消息也不會(huì)被重新投遞;
          • 自動(dòng)應(yīng)答的好處是消息隊(duì)列不會(huì)處于堵塞狀態(tài),但代價(jià)有點(diǎn)大,生產(chǎn)環(huán)境中還是不建議使用。

          手動(dòng)應(yīng)答

          手動(dòng)應(yīng)答,當(dāng)消費(fèi)者接收到消息處理完后,需要發(fā)送一個(gè)回執(zhí),告訴 RabbiMQ 服務(wù)端,這時(shí) RabbiMQ 才會(huì)將該消息刪除。

          生產(chǎn)者的代碼和上面的一樣,消費(fèi)者代碼需要做相關(guān)調(diào)整,如下:

          static?void?Main(string[]?args)
          {
          ????ConnectionFactory?factory?=?new?ConnectionFactory
          ????{
          ????????UserName?=?"oec2003",
          ????????Password?=?"000000",
          ????????HostName?=?"10.211.55.6"
          ????};
          ????using?(var?connection?=?factory.CreateConnection())
          ????using?(var?channel?=?connection.CreateModel())
          ????{
          ????????Console.WriteLine("消費(fèi)者開始監(jiān)聽......");
          ????????EventingBasicConsumer?consumer?=?new?EventingBasicConsumer(channel);
          ????????consumer.Received?+=?(ch,?ea)?=>
          ????????{
          ????????????string?message?=?Encoding.Default.GetString(ea.Body.ToArray());
          ????????????Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd?HH:mm:ss")}:收到消息:?{message}");
          ???????????
          ????????????channel.BasicAck(ea.DeliveryTag,?false);
          ????????};
          ????????channel.BasicConsume("oec2003",?false,?consumer);
          ????????Console.ReadKey();
          ????}
          }
          • channel.BasicConsume 方法的第二個(gè)參數(shù)設(shè)置為 false ,表示手動(dòng)應(yīng)答模式;
          • 在處理完消息后調(diào)用 channel.BasicAck(ea.DeliveryTag, false); 來進(jìn)行應(yīng)答,告訴 RabbiMQ 消息已經(jīng)收到,RabbiMQ 收到這個(gè)回執(zhí)后,才會(huì)刪除消息。

          可能遇到的問題

          流量控制問題

          在手動(dòng)模式下,生產(chǎn)者發(fā)送消息后消息會(huì)從 Ready 進(jìn)入到 Unacked 中,當(dāng)消費(fèi)者進(jìn)行應(yīng)答之后消息從 Unacked 中刪除。

          如果消息的產(chǎn)生速度遠(yuǎn)遠(yuǎn)大于消費(fèi)者的處理速度,這時(shí)消息就會(huì)都在消費(fèi)者處進(jìn)行積壓了。我們會(huì)看到 Unacked 中的數(shù)量會(huì)越來越大,這樣消費(fèi)者的壓力就會(huì)越來越大,這時(shí)就需要使用 Qos 來進(jìn)行限流。

          Qos

          在消費(fèi)者中使用 channel.BasicQos(0, 2, false); 來進(jìn)行 Qos 的設(shè)置,如下圖:

          BasicQos 方法有三個(gè)參數(shù):

          • prefetchSize:批量獲取消息的總大小,0為不限制;
          • prefetchCount:每次處理消息的個(gè)數(shù),比如 prefetchCount 設(shè)置為 2 ,那么處于 Unacked 狀態(tài)的消息最多就 2 條,當(dāng)其中一條進(jìn)行了得到了應(yīng)答后,才會(huì)從 Ready 中轉(zhuǎn)入一條到 Unacked
          • global:設(shè)置為 true 表示對 channel 進(jìn)行控制,否則對每個(gè)消費(fèi)者進(jìn)行限制,一個(gè) channel 可以有多個(gè)消費(fèi)者

          為什么使用 Qos :

          • 提高服務(wù)穩(wěn)定性,因?yàn)橛?prefetchCount 參數(shù)的控制,不會(huì)有海量的數(shù)據(jù)涌進(jìn)來導(dǎo)致消費(fèi)者服務(wù)掛掉;
          • 提高吞吐量,當(dāng)隊(duì)列有多個(gè)消費(fèi)者時(shí),每個(gè)消費(fèi)者的能力不一樣,我們可以通過 prefetchCount 參數(shù)來合理安排每個(gè)消費(fèi)者的處理能力,不會(huì)出現(xiàn)有的空閑,有的積壓。

          prefetchCount 是一個(gè)非常關(guān)鍵的參數(shù),當(dāng)消費(fèi)者處理消息時(shí),出現(xiàn)一些異常情況,導(dǎo)致無法進(jìn)行 Ack 應(yīng)答,沒有應(yīng)答的數(shù)量大于等于 prefetchCount 時(shí),隊(duì)列就會(huì)發(fā)生堵塞。所以我們一定要確保消息的處理能夠被異常捕獲,并在 finally 中進(jìn)行 Ack 應(yīng)答,代碼如下:

          try
          {
          ????string?message?=?Encoding.Default.GetString(ea.Body.ToArray());
          ????Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd?HH:mm:ss")}:收到消息:?{message}");
          ????if?(message?==?"error")
          ????{
          ????????throw?new?Exception("mq?error");
          ????}
          ????else?if?(message?==?"sleep")
          ????{
          ????????System.Threading.Thread.Sleep(60000);
          ????}
          }
          catch?(Exception)
          {
          ????//處理異常
          }
          finally
          {
          ????channel.BasicAck(ea.DeliveryTag,?false);
          }

          一旦隊(duì)列堵塞了,一種處理方式就是斷掉客戶端,這樣,處在 Unacked 中的消息會(huì)重新回到 Ready 中,會(huì)重新進(jìn)行投遞進(jìn)行消費(fèi)。

          總結(jié)

          1、自動(dòng)應(yīng)答模式需要慎用,特別是生產(chǎn)環(huán)境;

          2、不開啟 Qos ,消費(fèi)者可能會(huì)面臨很大壓力,但消息不會(huì)堵塞(測試過 500 個(gè)未進(jìn)行 Ack 沒有造成堵塞),現(xiàn)在不確定在沒有 Qos 的情況下,有沒有默認(rèn)的最大 prefetchCount ;

          3、開啟 Qos ,prefetchCount 的值很關(guān)鍵,并且需要做好異常處理,防止堵塞。

          希望本文對您有所幫助!

          瀏覽 45
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  操逼系列 | 日日夜夜爱| 免费爱爱网址 | 三级在线视频播放 | 九九性爱视频 |