<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Kafka消費(fèi)者分區(qū)分配策略及自定義分配策略

          共 4288字,需瀏覽 9分鐘

           ·

          2020-07-30 00:14

          點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)

          回復(fù)”資源“獲取更多資源

          88734501d3baf88aaf447cb41a4077d2.webp

          24931816f40f20635b4cda2063adf68f.webp

          大數(shù)據(jù)技術(shù)與架構(gòu)點(diǎn)擊右側(cè)關(guān)注,大數(shù)據(jù)開發(fā)領(lǐng)域最強(qiáng)公眾號(hào)!

          7159c84402d1bf5668559ce8298dd934.webp

          暴走大數(shù)據(jù)點(diǎn)擊右側(cè)關(guān)注,暴走大數(shù)據(jù)!3282640065d337e4e6f2e9ce503bcce4.webp

          kafka消費(fèi)者如何分配分區(qū)以及分配分區(qū)策略和源碼解釋

          我們知道kafka的主題中數(shù)據(jù)數(shù)據(jù)是按照分區(qū)的概念來的,一個(gè)主題可能分配了多個(gè)分區(qū),每個(gè)分區(qū)配置了復(fù)制系數(shù),為了可用性,在多個(gè)broker中進(jìn)行復(fù)制,一個(gè)分區(qū)在多個(gè)broker中選舉出一個(gè)副本首領(lǐng),消費(fèi)者只訪問這個(gè)分區(qū)副本首領(lǐng),這些在本章節(jié)不重要,本章節(jié)闡述一個(gè)消費(fèi)者如何選定一個(gè)主題中多個(gè)分區(qū)中的一個(gè)分區(qū),和kafka的分區(qū)分配策略核心源碼解析。

          kafka中分區(qū)策略核心實(shí)現(xiàn)有兩種 一種是range范圍策略,一種是roudRobin輪詢策略,在構(gòu)建KafkaConsumer類的時(shí)候配置,看一下策略的關(guān)系就能自行配置, 配置key為partition.assignment.strategy的具體實(shí)現(xiàn),看下圖:

          d00b9ef8344273da48c3f96ed5f4f3e7.webp

          首先我們需要有多種假設(shè)來舉例

          假設(shè)我們創(chuàng)建了一個(gè)主題,并且8個(gè)分區(qū)p0-p8,我們有3個(gè)消費(fèi)者c0-c2

          c02d7390e4ec950ee8746ca3da382278.webp

          先來說說第一種策略, range策略

          上面已經(jīng)做好了一些假設(shè)

          根據(jù)range策略,分區(qū)按照順序平鋪,消費(fèi)者按照順序平鋪

          分區(qū)數(shù)量除以消費(fèi)者數(shù)量,這里是分區(qū)數(shù)量8除以消費(fèi)者數(shù)量3 等于 2 (N),再分區(qū)數(shù)量8對消費(fèi)數(shù)量3取余得到2 ( M ),kafka的range算法是前?M個(gè)消費(fèi)能得到N+1個(gè)分區(qū),剩余的消費(fèi)者分配到N個(gè)分區(qū)

          具體算法:假設(shè)區(qū)分?jǐn)?shù)量為pCout,消費(fèi)者數(shù)量為cCount

          n = pCout / cCount? ?8? / 3 = 2

          m = pCount % cCount? 8 % 3 = 2

          前m(2)個(gè)消費(fèi)者得到n+1(2+1)個(gè)分區(qū),剩余的消費(fèi)者分配到N(2)個(gè)分區(qū),最終結(jié)果如下圖

          5f8b13f7799cbdbffaa8c4dbe011d04d.webp

          range策略是kafka默認(rèn)的一個(gè)分區(qū)分配的策略可以看看ConsumerConfig類的static塊,默認(rèn)配置的RangeAssignor?

          b584fba8281d6405c587a63699631e6f.webp

          想看一下分配分區(qū)的策略的入口可以參考KafkaConsumer類中的pollOnce方法進(jìn)去,里面調(diào)用的ensurePartitionAssignment方法,不過這里debug進(jìn)去看還是挺復(fù)雜的,有興趣的可以參考,篇幅講的不是這些重點(diǎn),具體入口可以看下圖

          1b1549f53b69c79d5ec9f01ce110bd4b.webp

          下面看一看range策略中核心源碼的實(shí)現(xiàn),具體查看RangeAssignor類

          @Override    public Map<String, List> assign(Map<String, Integer> partitionsPerTopic,                                                    Map<String, List<String>> subscriptions) {        //獲取每個(gè)主題消費(fèi)者們    Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);        Map<String, List> assignment = new HashMap<>();        for (String memberId : subscriptions.keySet())            assignment.put(memberId, new ArrayList());        for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {          //主題            String topic = topicEntry.getKey();            //這個(gè)主題的消費(fèi)們            List<String> consumersForTopic = topicEntry.getValue();            //主題的分區(qū)數(shù)量            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);            if (numPartitionsForTopic == null)                continue;            //對主題的消費(fèi)者進(jìn)行排序            Collections.sort(consumersForTopic);            //主題數(shù)量除以主題消費(fèi)者數(shù)量            int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();            //主題數(shù)量對消費(fèi)者數(shù)量進(jìn)行取余            int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();            //封裝主題和分區(qū)信息            List partitions = partitions(topic, numPartitionsForTopic);            //下面就開始為每一個(gè)消費(fèi)者分配分區(qū),看到這里是不是會(huì)發(fā)現(xiàn) 消費(fèi)者分區(qū)再均衡,每次添加消費(fèi)者或者添加分區(qū)都會(huì)發(fā)生再均衡            //事件,不過這里不是重點(diǎn)            for (int i = 0, n = consumersForTopic.size(); i < n; i++) {              //消費(fèi)者分區(qū)起始位置                int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);                //分配的分區(qū)數(shù)量, 從我們上面的假設(shè)的分區(qū)數(shù)量和消費(fèi)者數(shù)量可以得出這里的值                // int length = 2 + (i + 1 > 2 ? 0 : 1);                //因?yàn)橛械臒o法整除和取余的,所以前面的2個(gè)消費(fèi)者這里會(huì)獲得3 的結(jié)果, 最后一個(gè)消費(fèi)者這里只能得到2                int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);                //為每個(gè)消費(fèi)者分配分區(qū)信息                assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));            }        }        return assignment;    }

          下面講一講kafka自帶的第二種消費(fèi)者分配分區(qū)的策略

          輪詢策略

          還是按照上面的假設(shè)8個(gè)分區(qū)3個(gè)消費(fèi)者

          8個(gè)分區(qū)按照順序平鋪

          構(gòu)造消費(fèi)者環(huán) c0,c1,c2,c0,c1,c2.......

          輪詢分配過程是? p0 分配給了 c0, p1 分配給了 c1, p2分配給了 c2, p3分配給了c0, p4分配給了 c1, p5分配給了c2, 一次類推,所有分區(qū)輪詢分配給一個(gè)消費(fèi)者環(huán),大概草圖如下

          e1c488e41ae8d7a335a5f8d4dfce10e9.webp

          ?上面草圖 多多理解 , 核心源碼如下

          @Override    public Map> assign(Map partitionsPerTopic,                                                    Map> subscriptions) {        Map> assignment = new HashMap<>();        for (String memberId : subscriptions.keySet())            assignment.put(memberId, new ArrayList());        //講消費(fèi)者集合進(jìn)行排序,構(gòu)建一個(gè)消費(fèi)者環(huán), 內(nèi)部通過索引位置+1對總數(shù)取余的方式實(shí)現(xiàn)的環(huán)        CircularIterator assigner = new CircularIterator<>(Utils.sorted(subscriptions.keySet()));        //對所有主題和分區(qū)進(jìn)行排序, 假設(shè)集合中有多個(gè)主題/分區(qū)-分區(qū),最終排序結(jié)果為        // t1/p0-p1-p2,t2/p0-p1,t3/p0-p1-p2        for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {          //當(dāng)前主題            final String topic = partition.topic();            //這里循環(huán)遍歷看看消費(fèi)者有沒有訂閱改topic,否則一直next到下一個(gè)消費(fèi)者,主要的作用是跳過            //沒有訂閱該主題的消費(fèi)者            while (!subscriptions.get(assigner.peek()).contains(topic))                assigner.next();            //未當(dāng)前消費(fèi)者添加分區(qū)信息            assignment.get(assigner.next()).add(partition);        }        return assignment;    }

          通過上面的的案例我們是不是可以通過繼承AbstractPartitionAssignor抽象類,實(shí)現(xiàn)它的assign方法,來自定義消費(fèi)者分區(qū)分配策略,因?yàn)檫@里我們得到了一個(gè)所有相關(guān)主題和主題分區(qū)數(shù)量,所有主題對應(yīng)的消費(fèi)者,那么就可以在這里根據(jù)自己實(shí)際場景自定義一些分配策略。

          歡迎點(diǎn)贊+收藏+轉(zhuǎn)發(fā)朋友圈素質(zhì)三連



          文章不錯(cuò)?點(diǎn)個(gè)【在看】吧!??

          瀏覽 23
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  色婷婷中文在线观看 | 天堂俺去俺来也WWW色光网 | 婷婷五月天婷婷五月天婷婷五月天色 | 九九操逼| 国产欧美一区二区 |