<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          通過 Pulsar 源碼徹底解決重復(fù)消費(fèi)問題

          共 4830字,需瀏覽 10分鐘

           ·

          2023-03-03 20:54

          背景

          最近真是和 Pulsar 杠上了,業(yè)務(wù)團(tuán)隊反饋說是線上有個應(yīng)用消息重復(fù)消費(fèi)。

          而且在測試環(huán)境是可以穩(wěn)定復(fù)現(xiàn)的,根據(jù)經(jīng)驗(yàn)來看一般能穩(wěn)定復(fù)現(xiàn)的都比較好解決。

          定位問題

          接著便是定位問題了,根據(jù)之前的經(jīng)驗(yàn)讓業(yè)務(wù)按照這幾種情況先排查一下:

          通過排查:1,2可以排除了。

          1. 沒有相關(guān)日志
          2. 存在異常,但最外層也捕獲了,所以不管有無異常都會 ACK。

          第三個也在消費(fèi)的入口和提交消息出計算了時間,最終發(fā)現(xiàn)都是在2s左右 ACK 的。

          偽代碼如下:

                  Consumer consumer = client.newConsumer()
                          .subscriptionType(SubscriptionType.Shared)
                          .enableRetry(true)
                          .topic(topic)
                          .ackTimeout(30, TimeUnit.SECONDS)
                          .subscriptionName("my-sub")
                          .messageListener(new MessageListener<byte[]>() {
                              @SneakyThrows
                              @Override
                              public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {
                                  log.info("msg_id{}",msg.getMessageId().toString());
                                  TimeUnit.SECONDS.sleep(2);
                                  consumer.acknowledge(msg);
                              }
                          })
                          .subscribe();

          那這就很奇怪了,因?yàn)榇a里配置的 ackTimeout 是 30s,理論上來說是不會存在超時導(dǎo)致消息重發(fā)的。

          為了排除是否是超時引起的,直接將業(yè)務(wù)代碼注釋掉了,等于是消息收到后立即就 ACK,經(jīng)過測試發(fā)現(xiàn)這樣確實(shí)就沒有重復(fù)消費(fèi)了。

          為了再次確認(rèn)是不是和 ackTimeout 有關(guān),直接將 .ackTimeout(30, TimeUnit.SECONDS) 注釋掉后測試,發(fā)現(xiàn)也沒有重復(fù)消費(fèi)了。

          確認(rèn)原因

          既然如此那一定是和這個配置有關(guān)了,但看代碼確實(shí)沒有超時,為了定位具體原因只有去看 client 的源碼了。

          這里簡單梳理下消息的消費(fèi)的流程:

          1. 根據(jù) .receiverQueueSize(1000) 的配置,默認(rèn)情況下 broker 會直接給客戶端推送 1000 條消息。
          2. 客戶端將這 1000 條消息保存到內(nèi)部隊列中。
          3. 如果使用同步消費(fèi) receive() 時,本質(zhì)上就是去 take 這個內(nèi)部隊列。
          4. 如果是使用的是 messageListener 異步消費(fèi)并配置 ackTimeout,每當(dāng)從隊列里獲得一條消息后便會把這條消息加入 UnAckedMessageTracker 內(nèi)部的一個時間輪中,定時檢測頂部是否存在消息,如果存在則會觸發(fā)重新投遞。4.1 加入時間輪后,異步調(diào)用我們自定義的事件,這個異步操作是提交到一個無界隊列中由單個線程依次排隊執(zhí)行(這點(diǎn)是這次問題的關(guān)鍵)
          5. 業(yè)務(wù) ACK 的時候會從時間輪中刪除消息,所以如果消息 ACK 的足夠快,在第四步就不會獲取到消息進(jìn)行重新投遞。

          整體流程如上圖,代碼細(xì)節(jié)如下圖:

          所以問題的根本原因就是寫入時間輪(UnAckedMessageTracker)開始倒計時的線程和回調(diào)業(yè)務(wù)邏輯的不是同一個線程。

          如果業(yè)務(wù)執(zhí)行耗時,等到消息從那個單線程的無界隊列中取出來的時候很有可能已經(jīng)過了 ackTimeou 的時間,從而導(dǎo)致了超時重發(fā)。

          也就是用戶所理解的 ackTimeout 周期(應(yīng)該進(jìn)入回調(diào)時候開始計時)和 SDK 實(shí)現(xiàn)的不一致造成的。

          之后我再次確認(rèn)同樣的代碼換為同步消費(fèi)是沒有問題的,不會導(dǎo)致重復(fù)消費(fèi):

          while (true) {
          Message msg = consumer.receive();
                      log.info(
                              "consumer Message received: " + new String(msg.getData()) + msg.getMessageId().toString());
                      TimeUnit.SECONDS.sleep(2);
                      consumer.acknowledge(msg); 
          }

          查看代碼后發(fā)現(xiàn)同步代碼的獲取消息和加入 UnAckedMessageTracker 時間輪是同步的,也就不會出現(xiàn)超時的問題。

          總結(jié)

          所以其實(shí) 是messageListener 異步消費(fèi)的 ackTimeout 的語義是有問題的,需要將加入 UnAckedMessageTracker 處移動到回調(diào)函數(shù)中同步調(diào)用。

          我查看了最新的 2.11.x 版本的代碼依然沒有修復(fù),正準(zhǔn)備提個 PR 切換到 master 時才發(fā)現(xiàn)已經(jīng)有相關(guān)的 PR 了,只是還沒有發(fā)版。

          修復(fù)的背景和思路也是類似的,具體參考:

          https://github.com/apache/pulsar/pull/18911

          其實(shí)業(yè)務(wù)中并不推薦使用 ackTimeout 這個配置了,不好預(yù)估時間從而導(dǎo)致超時,而且我相信大部分業(yè)務(wù)配置好 ackTImeout 后直到后續(xù)出問題的時候才想起來要改。所以干脆一開始就不要使用。

          在 go 版本的 SDK 中直接廢棄掉了這個參數(shù),推薦使用 nack API 替換。



          往期推薦

          一個詭異的 Pulsar InterruptedException 異常

          Istio 升級后踩的坑

          Pulsar負(fù)載均衡原理及優(yōu)化

          2022 年度總結(jié)

          對 Pulsar 集群的壓測與優(yōu)化

           

          點(diǎn)分享

          點(diǎn)收藏

          點(diǎn)點(diǎn)贊

          點(diǎn)在看

           

          瀏覽 64
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产麻豆成人免费视频 | 天天色色 | 中国白虎AV网站 | 久久中文网| 九九九九九九视频 |