Kafka 怎么順序消費?面試必備!
版權(quán)聲明:本文為CSDN博主「方片龍」的原創(chuàng)文章,遵循CC 4.0 BY-SA版權(quán)協(xié)議,轉(zhuǎn)載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/qq_38245668/article/details/105900011
# 前言
本文針對解決Kafka不同Topic之間存在一定的數(shù)據(jù)關(guān)聯(lián)時的順序消費問題。
如存在Topic-insert和Topic-update分別是對數(shù)據(jù)的插入和更新,當(dāng)insert和update操作為同一數(shù)據(jù)時,應(yīng)保證先insert再update。
# 問題引入
kafka的順序消費一直是一個難以解決的問題,kafka的消費策略是對于同Topic同Partition的消息可保證順序消費,其余無法保證。
如果一個Topic只有一個Partition,那么這個Topic對應(yīng)consumer的消費必然是有序的。不同的Topic的任何情況下都無法保證consumer的消費順序和producer的發(fā)送順序一致。
如果不同Topic之間存在數(shù)據(jù)關(guān)聯(lián)且對消費順序有要求,該如何處理?本文主要解決此問題。
# 解決思路
現(xiàn)有Topic-insert和Topic-update,數(shù)據(jù)唯一標(biāo)識為id,對于id=1的數(shù)據(jù)而言,要保證Topic-insert消費在前,Topic-update消費在后。
兩個Topic的消費為不同線程處理,所以為了保證在同一時間內(nèi)的同一數(shù)據(jù)標(biāo)識的消息僅有一個業(yè)務(wù)邏輯在處理,需要對業(yè)務(wù)添加鎖操作。
使用synchronized進行加鎖的話,會影響無關(guān)聯(lián)的insert和update的數(shù)據(jù)消費能力,如id=1的insert和id=2的update,在synchronized的情況下,無法并發(fā)處理,這是沒有必要的,我們需要的是對于id=1的insert和id=1的update在同一時間只有一個在處理,所以使用細粒度鎖來完成加鎖的操作。
細粒度鎖實現(xiàn):https://blog.csdn.net/qq_38245668/article/details/105891161
PS:如果為分布式系統(tǒng),細粒度鎖需要使用分布式鎖的對應(yīng)實現(xiàn)。
在對insert和update加鎖之后,其實還是沒有解決消費順序的問題,只是確保了同一時間只有一個業(yè)務(wù)在處理。對于消費順序異常的問題,也就是先消費了update再消費insert的情況。
處理方式:消費到update數(shù)據(jù),校驗庫中是否存在當(dāng)前數(shù)據(jù)(也就是是否執(zhí)行insert),如果沒有,就將當(dāng)前update數(shù)據(jù)存入緩存,key為數(shù)據(jù)標(biāo)識id,在insert消費時檢查是否存在id對應(yīng)的update緩存,如果有,就證明當(dāng)前數(shù)據(jù)的消費順序異常,需執(zhí)行update操作,再將緩存數(shù)據(jù)移除。
# 實現(xiàn)方案
消息發(fā)送:
kafkaTemplate.send("TOPIC_INSERT", "1");kafkaTemplate.send("TOPIC_UPDATE", "1");
監(jiān)聽代碼示例:
KafkaListenerDemo.java
@Component@Slf4jpublic class KafkaListenerDemo {// 消費到的數(shù)據(jù)緩存private MapUPDATE_DATA_MAP = new ConcurrentHashMap<>(); // 數(shù)據(jù)存儲private MapDATA_MAP = new ConcurrentHashMap<>(); private WeakRefHashLock weakRefHashLock;public KafkaListenerDemo(WeakRefHashLock weakRefHashLock) {this.weakRefHashLock = weakRefHashLock;}@KafkaListener(topics = "TOPIC_INSERT")public void insert(ConsumerRecordrecord, Acknowledgment acknowledgment ) throws InterruptedException{// 模擬順序異常,也就是insert后消費,這里線程sleep????????Thread.sleep(1000);String id = record.value();log.info("接收到insert ::{}", id);Lock lock = weakRefHashLock.lock(id);lock.lock();try {log.info("開始處理 {} 的insert", id);// 模擬 insert 業(yè)務(wù)處理Thread.sleep(1000);// 從緩存中獲取 是否存在有update數(shù)據(jù)if (UPDATE_DATA_MAP.containsKey(id)){// 緩存數(shù)據(jù)存在,執(zhí)行updatedoUpdate(id);}log.info("處理 {} 的insert 結(jié)束", id);}finally {lock.unlock();}acknowledgment.acknowledge();}@KafkaListener(topics = "TOPIC_UPDATE")????public?void?update(ConsumerRecord?record,?Acknowledgment?acknowledgment )?throws?InterruptedException{String id = record.value();log.info("接收到update ::{}", id);Lock lock = weakRefHashLock.lock(id);lock.lock();try {// 測試使用,不做數(shù)據(jù)庫的校驗if (!DATA_MAP.containsKey(id)){// 未找到對應(yīng)數(shù)據(jù),證明消費順序異常,將當(dāng)前數(shù)據(jù)加入緩存log.info("消費順序異常,將update數(shù)據(jù) {} 加入緩存", id);UPDATE_DATA_MAP.put(id, id);}else {doUpdate(id);}}finally {lock.unlock();}acknowledgment.acknowledge();}void doUpdate(String id) throws InterruptedException{// 模擬 updatelog.info("開始處理update::{}", id);Thread.sleep(1000);log.info("處理update::{} 結(jié)束", id);}}
日志(代碼中已模擬必現(xiàn)消費順序異常的場景):
接收到update ::1消費順序異常,將update數(shù)據(jù) 1 加入緩存接收到insert ::1開始處理 1 的insert開始處理update::1處理update::1 結(jié)束處理 1 的insert 結(jié)束
觀察日志,此方案可正常處理不同Topic再存在數(shù)據(jù)關(guān)聯(lián)的消費順序問題。
