面試必備!Kafka 怎么順序消費?
點擊關注公眾號,回復“2T”獲取2TB學習資源!
互聯(lián)網(wǎng)架構師后臺回復 2T 有特別禮包
前言
本文針對解決Kafka不同Topic之間存在一定的數(shù)據(jù)關聯(lián)時的順序消費問題。
1、問題引入
2、解決思路
現(xiàn)有Topic-insert和Topic-update,數(shù)據(jù)唯一標識為id,對于id=1的數(shù)據(jù)而言,要保證Topic-insert消費在前,Topic-update消費在后。想成為架構師,這份架構師圖譜建議看看,少走彎路。
兩個Topic的消費為不同線程處理,所以為了保證在同一時間內(nèi)的同一數(shù)據(jù)標識的消息僅有一個業(yè)務邏輯在處理,需要對業(yè)務添加鎖操作。
使用synchronized進行加鎖的話,會影響無關聯(lián)的insert和update的數(shù)據(jù)消費能力,如id=1的insert和id=2的update,在synchronized的情況下,無法并發(fā)處理,這是沒有必要的,我們需要的是對于id=1的insert和id=1的update在同一時間只有一個在處理,所以使用細粒度鎖來完成加鎖的操作。
細粒度鎖實現(xiàn):https://blog.csdn.net/qq_38245668/article/details/105891161
PS:如果為分布式系統(tǒng),細粒度鎖需要使用分布式鎖的對應實現(xiàn)。
3、實現(xiàn)方案
kafkaTemplate.send("TOPIC_INSERT", "1");
kafkaTemplate.send("TOPIC_UPDATE", "1");監(jiān)聽代碼示例:
@Component
@Slf4j
public class KafkaListenerDemo {
// 消費到的數(shù)據(jù)緩存
private Map<String, String> UPDATE_DATA_MAP = new ConcurrentHashMap<>();
// 數(shù)據(jù)存儲
private Map<String, String> DATA_MAP = new ConcurrentHashMap<>();
private WeakRefHashLock weakRefHashLock;
public KafkaListenerDemo(WeakRefHashLock weakRefHashLock) {
this.weakRefHashLock = weakRefHashLock;
}
@KafkaListener(topics = "TOPIC_INSERT")
public void insert(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws InterruptedException{
// 模擬順序異常,也就是insert后消費,這里線程sleep
Thread.sleep(1000);
String id = record.value();
log.info("接收到insert :: {}", id);
Lock lock = weakRefHashLock.lock(id);
lock.lock();
try {
log.info("開始處理 {} 的insert", id);
// 模擬 insert 業(yè)務處理
Thread.sleep(1000);
// 從緩存中獲取 是否存在有update數(shù)據(jù)
if (UPDATE_DATA_MAP.containsKey(id)){
// 緩存數(shù)據(jù)存在,執(zhí)行update
doUpdate(id);
}
log.info("處理 {} 的insert 結束", id);
}finally {
lock.unlock();
}
acknowledgment.acknowledge();
}
@KafkaListener(topics = "TOPIC_UPDATE")
public void update(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws InterruptedException{
String id = record.value();
log.info("接收到update :: {}", id);
Lock lock = weakRefHashLock.lock(id);
lock.lock();
try {
// 測試使用,不做數(shù)據(jù)庫的校驗
if (!DATA_MAP.containsKey(id)){
// 未找到對應數(shù)據(jù),證明消費順序異常,將當前數(shù)據(jù)加入緩存
log.info("消費順序異常,將update數(shù)據(jù) {} 加入緩存", id);
UPDATE_DATA_MAP.put(id, id);
}else {
doUpdate(id);
}
}finally {
lock.unlock();
}
acknowledgment.acknowledge();
}
void doUpdate(String id) throws InterruptedException{
// 模擬 update
log.info("開始處理update::{}", id);
Thread.sleep(1000);
log.info("處理update::{} 結束", id);
}
}
接收到update ::1
消費順序異常,將update數(shù)據(jù) 1 加入緩存
接收到insert ::1
開始處理 1 的insert
開始處理update::1
處理update::1 結束
處理 1 的insert 結束
觀察日志,此方案可正常處理不同Topic再存在數(shù)據(jù)關聯(lián)的消費順序問題。
版權聲明:本文為CSDN博主「方片龍」的原創(chuàng)文章,原文鏈接:https://blog.csdn.net/qq_38245668/article/details/105900011
-End-
正文結束
1.心態(tài)崩了!稅前2萬4,到手1萬4,年終獎扣稅方式1月1日起施行~

