<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          面試必備!Kafka 怎么順序消費?

          共 3568字,需瀏覽 8分鐘

           ·

          2022-05-09 21:18



          前言


          本文針對解決Kafka不同Topic之間存在一定的數(shù)據(jù)關(guān)聯(lián)時的順序消費問題。

          如存在Topic-insert和Topic-update分別是對數(shù)據(jù)的插入和更新,當(dāng)insert和update操作為同一數(shù)據(jù)時,應(yīng)保證先insert再update。

          1、問題引入

          kafka的順序消費一直是一個難以解決的問題,kafka的消費策略是對于同Topic同Partition的消息可保證順序消費,其余無法保證。
          如果一個Topic只有一個Partition,那么這個Topic對應(yīng)consumer的消費必然是有序的。不同的Topic的任何情況下都無法保證consumer的消費順序和producer的發(fā)送順序一致。
          如果不同Topic之間存在數(shù)據(jù)關(guān)聯(lián)且對消費順序有要求,該如何處理?本文主要解決此問題。
          另外,Kafka 系列面試題和答案全部整理好了,微信搜索互聯(lián)網(wǎng)架構(gòu)師,在后臺發(fā)送:面試,可以在線閱讀。

          2、解決思路

          現(xiàn)有Topic-insert和Topic-update,數(shù)據(jù)唯一標(biāo)識為id,對于id=1的數(shù)據(jù)而言,要保證Topic-insert消費在前,Topic-update消費在后。想成為架構(gòu)師,這份架構(gòu)師圖譜建議看看,少走彎路。

          兩個Topic的消費為不同線程處理,所以為了保證在同一時間內(nèi)的同一數(shù)據(jù)標(biāo)識的消息僅有一個業(yè)務(wù)邏輯在處理,需要對業(yè)務(wù)添加鎖操作。

          使用synchronized進(jìn)行加鎖的話,會影響無關(guān)聯(lián)的insert和update的數(shù)據(jù)消費能力,如id=1的insert和id=2的update,在synchronized的情況下,無法并發(fā)處理,這是沒有必要的,我們需要的是對于id=1的insert和id=1的update在同一時間只有一個在處理,所以使用細(xì)粒度鎖來完成加鎖的操作。

          細(xì)粒度鎖實現(xiàn):https://blog.csdn.net/qq_38245668/article/details/105891161

          PS:如果為分布式系統(tǒng),細(xì)粒度鎖需要使用分布式鎖的對應(yīng)實現(xiàn)。

          在對insert和update加鎖之后,其實還是沒有解決消費順序的問題,只是確保了同一時間只有一個業(yè)務(wù)在處理。?對于消費順序異常的問題,也就是先消費了update再消費insert的情況。
          處理方式:消費到update數(shù)據(jù),校驗庫中是否存在當(dāng)前數(shù)據(jù)(也就是是否執(zhí)行insert),如果沒有,就將當(dāng)前update數(shù)據(jù)存入緩存,key為數(shù)據(jù)標(biāo)識id,在insert消費時檢查是否存在id對應(yīng)的update緩存,如果有,就證明當(dāng)前數(shù)據(jù)的消費順序異常,需執(zhí)行update操作,再將緩存數(shù)據(jù)移除。

          3、實現(xiàn)方案

          消息發(fā)送:

          kafkaTemplate.send("TOPIC_INSERT",?"1");
          kafkaTemplate.send("TOPIC_UPDATE",?"1");

          監(jiān)聽代碼示例:

          KafkaListenerDemo.java

          ?record,?Acknowledgment?acknowledgment)?throws?InterruptedException{????????//?模擬順序異常,也就是insert后消費,這里線程sleep????????Thread.sleep(1000);????????String?id?=?record.value();????????log.info("接收到insert ::?{}",?id);????????Lock?lock?=?weakRefHashLock.lock(id);????????lock.lock();????????try?{????????????log.info("開始處理?{}?的insert",?id);????????????//?模擬?insert?業(yè)務(wù)處理????????????Thread.sleep(1000);????????????//?從緩存中獲取?是否存在有update數(shù)據(jù)????????????if?(UPDATE_DATA_MAP.containsKey(id)){????????????????//?緩存數(shù)據(jù)存在,執(zhí)行update????????????????doUpdate(id);????????????}????????????log.info("處理?{}?的insert?結(jié)束",?id);????????}finally?{????????????lock.unlock();????????}????????acknowledgment.acknowledge();????}????@KafkaListener(topics?=?"TOPIC_UPDATE")????public?void?update(ConsumerRecord?record,?Acknowledgment?acknowledgment)?throws?InterruptedException{????????String?id?=?record.value();????????log.info("接收到update ::?{}",?id);????????Lock?lock?=?weakRefHashLock.lock(id);????????lock.lock();????????try?{????????????//?測試使用,不做數(shù)據(jù)庫的校驗????????????if?(!DATA_MAP.containsKey(id)){????????????????//?未找到對應(yīng)數(shù)據(jù),證明消費順序異常,將當(dāng)前數(shù)據(jù)加入緩存????????????????log.info("消費順序異常,將update數(shù)據(jù)?{}?加入緩存",?id);????????????????UPDATE_DATA_MAP.put(id,?id);????????????}else?{????????????????doUpdate(id);????????????}????????}finally?{????????????lock.unlock();????????}????????acknowledgment.acknowledge();????}????void?doUpdate(String?id)?throws?InterruptedException{????????//?模擬?update????????log.info("開始處理update::{}",?id);????????Thread.sleep(1000);????????log.info("處理update::{}?結(jié)束",?id);????}}" linktype="text" imgurl="" imgdata="null" data-itemshowtype="0" tab="innerlink" data-linktype="2" wah-hotarea="click" hasload="1" style="outline: 0px; -webkit-tap-highlight-color: rgba(0, 0, 0, 0); cursor: pointer; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important;">@Component
          @Slf4j
          public?class?KafkaListenerDemo?{

          ????//?消費到的數(shù)據(jù)緩存
          ????private?Map?UPDATE_DATA_MAP?=?new?ConcurrentHashMap<>();
          ????//?數(shù)據(jù)存儲
          ????private?Map?DATA_MAP?=?new?ConcurrentHashMap<>();
          ????private?WeakRefHashLock?weakRefHashLock;

          ????public?KafkaListenerDemo(WeakRefHashLock?weakRefHashLock)?{
          ????????this.weakRefHashLock?=?weakRefHashLock;
          ????}

          ????@KafkaListener(topics?=?"TOPIC_INSERT")
          ????public?void?insert(ConsumerRecord?record,?Acknowledgment?acknowledgment)?throws?InterruptedException{
          ????????//?模擬順序異常,也就是insert后消費,這里線程sleep
          ????????Thread.sleep(1000);

          ????????String?id?=?record.value();
          ????????log.info("接收到insert ::?{}",?id);
          ????????Lock?lock?=?weakRefHashLock.lock(id);
          ????????lock.lock();
          ????????try?{
          ????????????log.info("開始處理?{}?的insert",?id);
          ????????????//?模擬?insert?業(yè)務(wù)處理
          ????????????Thread.sleep(1000);
          ????????????//?從緩存中獲取?是否存在有update數(shù)據(jù)
          ????????????if?(UPDATE_DATA_MAP.containsKey(id)){
          ????????????????//?緩存數(shù)據(jù)存在,執(zhí)行update
          ????????????????doUpdate(id);
          ????????????}
          ????????????log.info("處理?{}?的insert?結(jié)束",?id);
          ????????}finally?{
          ????????????lock.unlock();
          ????????}
          ????????acknowledgment.acknowledge();
          ????}

          ????@KafkaListener(topics?=?"TOPIC_UPDATE")
          ????public?void?update(ConsumerRecord?record,?Acknowledgment?acknowledgment)?throws?InterruptedException{

          ????????String?id?=?record.value();
          ????????log.info("接收到update ::?{}",?id);
          ????????Lock?lock?=?weakRefHashLock.lock(id);
          ????????lock.lock();
          ????????try?{
          ????????????//?測試使用,不做數(shù)據(jù)庫的校驗
          ????????????if?(!DATA_MAP.containsKey(id)){
          ????????????????//?未找到對應(yīng)數(shù)據(jù),證明消費順序異常,將當(dāng)前數(shù)據(jù)加入緩存
          ????????????????log.info("消費順序異常,將update數(shù)據(jù)?{}?加入緩存",?id);
          ????????????????UPDATE_DATA_MAP.put(id,?id);
          ????????????}else?{
          ????????????????doUpdate(id);
          ????????????}
          ????????}finally?{
          ????????????lock.unlock();
          ????????}
          ????????acknowledgment.acknowledge();
          ????}

          ????void?doUpdate(String?id)?throws?InterruptedException{
          ????????//?模擬?update
          ????????log.info("開始處理update::{}",?id);
          ????????Thread.sleep(1000);
          ????????log.info("處理update::{}?結(jié)束",?id);
          ????}

          }

          日志(代碼中已模擬必現(xiàn)消費順序異常的場景):

          接收到update ::1
          消費順序異常,將update數(shù)據(jù)?1?加入緩存
          接收到insert ::1
          開始處理?1?的insert
          開始處理update::1
          處理update::1 結(jié)束
          處理?1?的insert?結(jié)束

          觀察日志,此方案可正常處理不同Topic再存在數(shù)據(jù)關(guān)聯(lián)的消費順序問題。

          版權(quán)聲明:本文為CSDN博主「方片龍」的原創(chuàng)文章,原文鏈接:https://blog.csdn.net/qq_38245668/article/details/105900011


          -End-


          全棧架構(gòu)社區(qū)交流群

          ?「全棧架構(gòu)社區(qū)」建立了讀者架構(gòu)師交流群,大家可以添加小編微信進(jìn)行加群。歡迎有想法、樂于分享的朋友們一起交流學(xué)習(xí)。

          掃描添加好友邀你進(jìn)架構(gòu)師群,加我時注明姓名+公司+職位】
          看完本文有收獲?請轉(zhuǎn)發(fā)分享給更多人
          瀏覽 44
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲qingse中文 | a毛片网站 | 亚洲AV无码变态另类在线播放 | xxxxx在线 | 亚洲精品在线视频播放平台 |