<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          面試必備!Kafka 怎么順序消費?

          共 6898字,需瀏覽 14分鐘

           ·

          2022-04-16 15:00

          點擊關注公眾號,回復“2T”獲取2TB學習資源!

          互聯(lián)網(wǎng)架構師后臺回復 2T 有特別禮包

          上一篇:互聯(lián)網(wǎng)后端技術棧大全,建議收藏!

          前言

          本文針對解決Kafka不同Topic之間存在一定的數(shù)據(jù)關聯(lián)時的順序消費問題。

          如存在Topic-insert和Topic-update分別是對數(shù)據(jù)的插入和更新,當insert和update操作為同一數(shù)據(jù)時,應保證先insert再update。

          1、問題引入

          kafka的順序消費一直是一個難以解決的問題,kafka的消費策略是對于同Topic同Partition的消息可保證順序消費,其余無法保證。
          如果一個Topic只有一個Partition,那么這個Topic對應consumer的消費必然是有序的。不同的Topic的任何情況下都無法保證consumer的消費順序和producer的發(fā)送順序一致。
          如果不同Topic之間存在數(shù)據(jù)關聯(lián)且對消費順序有要求,該如何處理?本文主要解決此問題。
          另外,Kafka 系列面試題和答案全部整理好了,微信搜索互聯(lián)網(wǎng)架構師,在后臺發(fā)送:面試,可以在線閱讀。

          2、解決思路

          現(xiàn)有Topic-insert和Topic-update,數(shù)據(jù)唯一標識為id,對于id=1的數(shù)據(jù)而言,要保證Topic-insert消費在前,Topic-update消費在后。想成為架構師,這份架構師圖譜建議看看,少走彎路。

          兩個Topic的消費為不同線程處理,所以為了保證在同一時間內(nèi)的同一數(shù)據(jù)標識的消息僅有一個業(yè)務邏輯在處理,需要對業(yè)務添加鎖操作。

          使用synchronized進行加鎖的話,會影響無關聯(lián)的insert和update的數(shù)據(jù)消費能力,如id=1的insert和id=2的update,在synchronized的情況下,無法并發(fā)處理,這是沒有必要的,我們需要的是對于id=1的insert和id=1的update在同一時間只有一個在處理,所以使用細粒度鎖來完成加鎖的操作。

          細粒度鎖實現(xiàn):https://blog.csdn.net/qq_38245668/article/details/105891161

          PS:如果為分布式系統(tǒng),細粒度鎖需要使用分布式鎖的對應實現(xiàn)。

          在對insert和update加鎖之后,其實還是沒有解決消費順序的問題,只是確保了同一時間只有一個業(yè)務在處理。 對于消費順序異常的問題,也就是先消費了update再消費insert的情況。
          處理方式:消費到update數(shù)據(jù),校驗庫中是否存在當前數(shù)據(jù)(也就是是否執(zhí)行insert),如果沒有,就將當前update數(shù)據(jù)存入緩存,key為數(shù)據(jù)標識id,在insert消費時檢查是否存在id對應的update緩存,如果有,就證明當前數(shù)據(jù)的消費順序異常,需執(zhí)行update操作,再將緩存數(shù)據(jù)移除。

          3、實現(xiàn)方案

          消息發(fā)送:

          kafkaTemplate.send("TOPIC_INSERT""1");
          kafkaTemplate.send("TOPIC_UPDATE""1");

          監(jiān)聽代碼示例:

          KafkaListenerDemo.java

          @Component
          @Slf4j
          public class KafkaListenerDemo {

              // 消費到的數(shù)據(jù)緩存
              private Map<String, String> UPDATE_DATA_MAP = new ConcurrentHashMap<>();
              // 數(shù)據(jù)存儲
              private Map<String, String> DATA_MAP = new ConcurrentHashMap<>();
              private WeakRefHashLock weakRefHashLock;

              public KafkaListenerDemo(WeakRefHashLock weakRefHashLock) {
                  this.weakRefHashLock = weakRefHashLock;
              }

              @KafkaListener(topics = "TOPIC_INSERT")
              public void insert(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws InterruptedException{
                  // 模擬順序異常,也就是insert后消費,這里線程sleep
                  Thread.sleep(1000);

                  String id = record.value();
                  log.info("接收到insert :: {}", id);
                  Lock lock = weakRefHashLock.lock(id);
                  lock.lock();
                  try {
                      log.info("開始處理 {} 的insert", id);
                      // 模擬 insert 業(yè)務處理
                      Thread.sleep(1000);
                      // 從緩存中獲取 是否存在有update數(shù)據(jù)
                      if (UPDATE_DATA_MAP.containsKey(id)){
                          // 緩存數(shù)據(jù)存在,執(zhí)行update
                          doUpdate(id);
                      }
                      log.info("處理 {} 的insert 結束", id);
                  }finally {
                      lock.unlock();
                  }
                  acknowledgment.acknowledge();
              }

              @KafkaListener(topics = "TOPIC_UPDATE")
              public void update(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws InterruptedException{

                  String id = record.value();
                  log.info("接收到update :: {}", id);
                  Lock lock = weakRefHashLock.lock(id);
                  lock.lock();
                  try {
                      // 測試使用,不做數(shù)據(jù)庫的校驗
                      if (!DATA_MAP.containsKey(id)){
                          // 未找到對應數(shù)據(jù),證明消費順序異常,將當前數(shù)據(jù)加入緩存
                          log.info("消費順序異常,將update數(shù)據(jù) {} 加入緩存", id);
                          UPDATE_DATA_MAP.put(id, id);
                      }else {
                          doUpdate(id);
                      }
                  }finally {
                      lock.unlock();
                  }
                  acknowledgment.acknowledge();
              }

              void doUpdate(String id) throws InterruptedException{
                  // 模擬 update
                  log.info("開始處理update::{}", id);
                  Thread.sleep(1000);
                  log.info("處理update::{} 結束", id);
              }

          }

          日志(代碼中已模擬必現(xiàn)消費順序異常的場景):

          接收到update ::1
          消費順序異常,將update數(shù)據(jù) 1 加入緩存
          接收到insert ::1
          開始處理 1 的insert
          開始處理update::1
          處理update::1 結束
          處理 1 的insert 結束

          觀察日志,此方案可正常處理不同Topic再存在數(shù)據(jù)關聯(lián)的消費順序問題。

          版權聲明:本文為CSDN博主「方片龍」的原創(chuàng)文章,原文鏈接:https://blog.csdn.net/qq_38245668/article/details/105900011

          -End-


          最后,關注公眾號互聯(lián)網(wǎng)架構師,在后臺回復:2T,可以獲取我整理的 Java 系列面試題和答案,非常齊全


          正文結束


          推薦閱讀 ↓↓↓

          1.心態(tài)崩了!稅前2萬4,到手1萬4,年終獎扣稅方式1月1日起施行~

          2.深圳一普通中學老師工資單曝光,秒殺程序員,網(wǎng)友:敢問是哪個學校畢業(yè)的?

          3.從零開始搭建創(chuàng)業(yè)公司后臺技術棧

          4.程序員一般可以從什么平臺接私活?

          5.清華大學:2021 元宇宙研究報告!

          6.為什么國內(nèi) 996 干不過國外的 955呢?

          7.這封“領導痛批95后下屬”的郵件,句句扎心!

          8.15張圖看懂瞎忙和高效的區(qū)別!

          瀏覽 34
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  大香蕉AV艺人网 | 一卡二卡三卡免费视频 | 91精品国内手机在线高清 | 天天做爱网站 | аⅴ资源新版在线天堂 |