Kafka消費(fèi)者分區(qū)分配策略及自定義分配策略
點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”
回復(fù)”資源“獲取更多資源

kafka消費(fèi)者如何分配分區(qū)以及分配分區(qū)策略和源碼解釋
我們知道kafka的主題中數(shù)據(jù)數(shù)據(jù)是按照分區(qū)的概念來的,一個(gè)主題可能分配了多個(gè)分區(qū),每個(gè)分區(qū)配置了復(fù)制系數(shù),為了可用性,在多個(gè)broker中進(jìn)行復(fù)制,一個(gè)分區(qū)在多個(gè)broker中選舉出一個(gè)副本首領(lǐng),消費(fèi)者只訪問這個(gè)分區(qū)副本首領(lǐng),這些在本章節(jié)不重要,本章節(jié)闡述一個(gè)消費(fèi)者如何選定一個(gè)主題中多個(gè)分區(qū)中的一個(gè)分區(qū),和kafka的分區(qū)分配策略核心源碼解析。
kafka中分區(qū)策略核心實(shí)現(xiàn)有兩種 一種是range范圍策略,一種是roudRobin輪詢策略,在構(gòu)建KafkaConsumer類的時(shí)候配置,看一下策略的關(guān)系就能自行配置, 配置key為partition.assignment.strategy的具體實(shí)現(xiàn),看下圖:

首先我們需要有多種假設(shè)來舉例
假設(shè)我們創(chuàng)建了一個(gè)主題,并且8個(gè)分區(qū)p0-p8,我們有3個(gè)消費(fèi)者c0-c2

先來說說第一種策略, range策略
上面已經(jīng)做好了一些假設(shè)
根據(jù)range策略,分區(qū)按照順序平鋪,消費(fèi)者按照順序平鋪
分區(qū)數(shù)量除以消費(fèi)者數(shù)量,這里是分區(qū)數(shù)量8除以消費(fèi)者數(shù)量3 等于 2 (N),再分區(qū)數(shù)量8對消費(fèi)數(shù)量3取余得到2 ( M ),kafka的range算法是前?M個(gè)消費(fèi)能得到N+1個(gè)分區(qū),剩余的消費(fèi)者分配到N個(gè)分區(qū)
具體算法:假設(shè)區(qū)分?jǐn)?shù)量為pCout,消費(fèi)者數(shù)量為cCount
n = pCout / cCount? ?8? / 3 = 2
m = pCount % cCount? 8 % 3 = 2
前m(2)個(gè)消費(fèi)者得到n+1(2+1)個(gè)分區(qū),剩余的消費(fèi)者分配到N(2)個(gè)分區(qū),最終結(jié)果如下圖

range策略是kafka默認(rèn)的一個(gè)分區(qū)分配的策略可以看看ConsumerConfig類的static塊,默認(rèn)配置的RangeAssignor?

想看一下分配分區(qū)的策略的入口可以參考KafkaConsumer類中的pollOnce方法進(jìn)去,里面調(diào)用的ensurePartitionAssignment方法,不過這里debug進(jìn)去看還是挺復(fù)雜的,有興趣的可以參考,篇幅講的不是這些重點(diǎn),具體入口可以看下圖

下面看一看range策略中核心源碼的實(shí)現(xiàn),具體查看RangeAssignor類
@Override public Map<String, List> assign(Map<String, Integer> partitionsPerTopic, Map<String, List<String>> subscriptions) { //獲取每個(gè)主題消費(fèi)者們 Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions); Map<String, List> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList()); for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) { //主題 String topic = topicEntry.getKey(); //這個(gè)主題的消費(fèi)們 List<String> consumersForTopic = topicEntry.getValue(); //主題的分區(qū)數(shù)量 Integer numPartitionsForTopic = partitionsPerTopic.get(topic); if (numPartitionsForTopic == null) continue; //對主題的消費(fèi)者進(jìn)行排序 Collections.sort(consumersForTopic); //主題數(shù)量除以主題消費(fèi)者數(shù)量 int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); //主題數(shù)量對消費(fèi)者數(shù)量進(jìn)行取余 int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); //封裝主題和分區(qū)信息 List partitions = partitions(topic, numPartitionsForTopic); //下面就開始為每一個(gè)消費(fèi)者分配分區(qū),看到這里是不是會(huì)發(fā)現(xiàn) 消費(fèi)者分區(qū)再均衡,每次添加消費(fèi)者或者添加分區(qū)都會(huì)發(fā)生再均衡 //事件,不過這里不是重點(diǎn) for (int i = 0, n = consumersForTopic.size(); i < n; i++) { //消費(fèi)者分區(qū)起始位置 int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition); //分配的分區(qū)數(shù)量, 從我們上面的假設(shè)的分區(qū)數(shù)量和消費(fèi)者數(shù)量可以得出這里的值 // int length = 2 + (i + 1 > 2 ? 0 : 1); //因?yàn)橛械臒o法整除和取余的,所以前面的2個(gè)消費(fèi)者這里會(huì)獲得3 的結(jié)果, 最后一個(gè)消費(fèi)者這里只能得到2 int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1); //為每個(gè)消費(fèi)者分配分區(qū)信息 assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length)); } } return assignment; }
下面講一講kafka自帶的第二種消費(fèi)者分配分區(qū)的策略
輪詢策略
還是按照上面的假設(shè)8個(gè)分區(qū)3個(gè)消費(fèi)者
8個(gè)分區(qū)按照順序平鋪
構(gòu)造消費(fèi)者環(huán) c0,c1,c2,c0,c1,c2.......
輪詢分配過程是? p0 分配給了 c0, p1 分配給了 c1, p2分配給了 c2, p3分配給了c0, p4分配給了 c1, p5分配給了c2, 一次類推,所有分區(qū)輪詢分配給一個(gè)消費(fèi)者環(huán),大概草圖如下

?上面草圖 多多理解 , 核心源碼如下
@Overridepublic Map> assign(Map partitionsPerTopic, Map> subscriptions) { Map> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet())assignment.put(memberId, new ArrayList()); //講消費(fèi)者集合進(jìn)行排序,構(gòu)建一個(gè)消費(fèi)者環(huán), 內(nèi)部通過索引位置+1對總數(shù)取余的方式實(shí)現(xiàn)的環(huán)CircularIteratorassigner = new CircularIterator<>(Utils.sorted(subscriptions.keySet())); //對所有主題和分區(qū)進(jìn)行排序, 假設(shè)集合中有多個(gè)主題/分區(qū)-分區(qū),最終排序結(jié)果為// t1/p0-p1-p2,t2/p0-p1,t3/p0-p1-p2for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {//當(dāng)前主題final String topic = partition.topic();//這里循環(huán)遍歷看看消費(fèi)者有沒有訂閱改topic,否則一直next到下一個(gè)消費(fèi)者,主要的作用是跳過//沒有訂閱該主題的消費(fèi)者while (!subscriptions.get(assigner.peek()).contains(topic))assigner.next();//未當(dāng)前消費(fèi)者添加分區(qū)信息assignment.get(assigner.next()).add(partition);}return assignment;}
通過上面的的案例我們是不是可以通過繼承AbstractPartitionAssignor抽象類,實(shí)現(xiàn)它的assign方法,來自定義消費(fèi)者分區(qū)分配策略,因?yàn)檫@里我們得到了一個(gè)所有相關(guān)主題和主題分區(qū)數(shù)量,所有主題對應(yīng)的消費(fèi)者,那么就可以在這里根據(jù)自己實(shí)際場景自定義一些分配策略。
歡迎點(diǎn)贊+收藏+轉(zhuǎn)發(fā)朋友圈素質(zhì)三連文章不錯(cuò)?點(diǎn)個(gè)【在看】吧!??


