kafka中生產(chǎn)者是如何把消息投遞到哪個(gè)分區(qū)的?消費(fèi)者又是怎么選擇分區(qū)的?

作者 |?廢物大師兄
1. 前言
我們知道,生產(chǎn)者發(fā)送消息到主題,消費(fèi)者訂閱主題(以消費(fèi)者組的名義訂閱),而主題下是分區(qū),消息是存儲(chǔ)在分區(qū)中的,所以事實(shí)上生產(chǎn)者發(fā)送消息到分區(qū),消費(fèi)者則從分區(qū)讀取消息,那么,這里問題來了,生產(chǎn)者將消息投遞到哪個(gè)分區(qū)?消費(fèi)者組中的消費(fèi)者實(shí)例之間是怎么分配分區(qū)的呢?接下來,就圍繞著這兩個(gè)問題一探究竟。
2. 主題的分區(qū)數(shù)設(shè)置
在server.properties配置文件中可以指定一個(gè)全局的分區(qū)數(shù)設(shè)置,這是對(duì)每個(gè)主題下的分區(qū)數(shù)的默認(rèn)設(shè)置,默認(rèn)是1。

當(dāng)然每個(gè)主題也可以自己設(shè)置分區(qū)數(shù)量,如果創(chuàng)建主題的時(shí)候沒有指定分區(qū)數(shù)量,則會(huì)使用server.properties中的設(shè)置。
bin/kafka-topics.sh?--zookeeper?localhost:2181?--create?--topic?my-topic?--partitions?2?--replication-factor?1
在創(chuàng)建主題的時(shí)候,可以使用**--partitions**選項(xiàng)指定主題的分區(qū)數(shù)量
[root@localhost?kafka_2.11-2.0.0]#?bin/kafka-topics.sh?--describe?--zookeeper?localhost:2181?--topic?abc
Topic:abc???????PartitionCount:2????????ReplicationFactor:1?????Configs:
????????Topic:?abc??????Partition:?0????Leader:?0???????Replicas:?0?????Isr:?0
????????Topic:?abc??????Partition:?1????Leader:?0???????Replicas:?0?????Isr:?0
3. 生產(chǎn)者與分區(qū)
首先提出一個(gè)問題:生產(chǎn)者將消息投遞到分區(qū)有沒有規(guī)律?如果有,那么它是如何決定一條消息該投遞到哪個(gè)分區(qū)的呢?
3.1. 默認(rèn)的分區(qū)策略
The default partitioning strategy:
If a partition is specified in the record, use it If no partition is specified but a key is present choose a partition based on a hash of the key If no partition or key is present choose a partition in a round-robin fashion
org.apache.kafka.clients.producer.internals.DefaultPartitioner
默認(rèn)的分區(qū)策略是:
如果在發(fā)消息的時(shí)候指定了分區(qū),則消息投遞到指定的分區(qū) 如果沒有指定分區(qū),但是消息的key不為空,則基于key的哈希值來選擇一個(gè)分區(qū) 如果既沒有指定分區(qū),且消息的key也是空,則用輪詢的方式選擇一個(gè)分區(qū)
/**
?*?Compute?the?partition?for?the?given?record.
?*
?*?@param?topic?The?topic?name
?*?@param?key?The?key?to?partition?on?(or?null?if?no?key)
?*?@param?keyBytes?serialized?key?to?partition?on?(or?null?if?no?key)
?*?@param?value?The?value?to?partition?on?or?null
?*?@param?valueBytes?serialized?value?to?partition?on?or?null
?*?@param?cluster?The?current?cluster?metadata
?*/
public?int?partition(String?topic,?Object?key,?byte[]?keyBytes,?Object?value,?byte[]?valueBytes,?Cluster?cluster)?{
????List?partitions?=?cluster.partitionsForTopic(topic);
????int?numPartitions?=?partitions.size();
????if?(keyBytes?==?null)?{
????????int?nextValue?=?nextValue(topic);
????????List?availablePartitions?=?cluster.availablePartitionsForTopic(topic);
????????if?(availablePartitions.size()?>?0)?{
????????????int?part?=?Utils.toPositive(nextValue)?%?availablePartitions.size();
????????????return?availablePartitions.get(part).partition();
????????}?else?{
????????????//?no?partitions?are?available,?give?a?non-available?partition
????????????return?Utils.toPositive(nextValue)?%?numPartitions;
????????}
????}?else?{
????????//?hash?the?keyBytes?to?choose?a?partition
????????return?Utils.toPositive(Utils.murmur2(keyBytes))?%?numPartitions;
????}
}
通過源代碼可以更加作證這一點(diǎn)
4. 分區(qū)與消費(fèi)者
消費(fèi)者以組的名義訂閱主題,主題有多個(gè)分區(qū),消費(fèi)者組中有多個(gè)消費(fèi)者實(shí)例,那么消費(fèi)者實(shí)例和分區(qū)之前的對(duì)應(yīng)關(guān)系是怎樣的呢?
換句話說,就是組中的每一個(gè)消費(fèi)者負(fù)責(zé)那些分區(qū),這個(gè)分配關(guān)系是如何確定的呢?

同一時(shí)刻,一條消息只能被組中的一個(gè)消費(fèi)者實(shí)例消費(fèi)
消費(fèi)者組訂閱這個(gè)主題,意味著主題下的所有分區(qū)都會(huì)被組中的消費(fèi)者消費(fèi)到,如果按照從屬關(guān)系來說的話就是,主題下的每個(gè)分區(qū)只從屬于組中的一個(gè)消費(fèi)者,不可能出現(xiàn)組中的兩個(gè)消費(fèi)者負(fù)責(zé)同一個(gè)分區(qū)。
那么,問題來了。如果分區(qū)數(shù)大于或者等于組中的消費(fèi)者實(shí)例數(shù),那自然沒有什么問題,無非一個(gè)消費(fèi)者會(huì)負(fù)責(zé)多個(gè)分區(qū),(PS:當(dāng)然,最理想的情況是二者數(shù)量相等,這樣就相當(dāng)于一個(gè)消費(fèi)者負(fù)責(zé)一個(gè)分區(qū));但是,如果消費(fèi)者實(shí)例的數(shù)量大于分區(qū)數(shù),那么按照默認(rèn)的策略(PS:之所以強(qiáng)調(diào)默認(rèn)策略是因?yàn)槟阋部梢宰远x策略),有一些消費(fèi)者是多余的,一直接不到消息而處于空閑狀態(tài)。
話又說回來,假設(shè)多個(gè)消費(fèi)者負(fù)責(zé)同一個(gè)分區(qū),那么會(huì)有什么問題呢?
我們知道,Kafka它在設(shè)計(jì)的時(shí)候就是要保證分區(qū)下消息的順序,也就是說消息在一個(gè)分區(qū)中的順序是怎樣的,那么消費(fèi)者在消費(fèi)的時(shí)候看到的就是什么樣的順序,那么要做到這一點(diǎn)就首先要保證消息是由消費(fèi)者主動(dòng)拉取的(pull),其次還要保證一個(gè)分區(qū)只能由一個(gè)消費(fèi)者負(fù)責(zé)。倘若,兩個(gè)消費(fèi)者負(fù)責(zé)同一個(gè)分區(qū),那么就意味著兩個(gè)消費(fèi)者同時(shí)讀取分區(qū)的消息,由于消費(fèi)者自己可以控制讀取消息的offset,就有可能C1才讀到2,而C1讀到1,C1還沒處理完,C2已經(jīng)讀到3了,則會(huì)造成很多浪費(fèi),因?yàn)檫@就相當(dāng)于多線程讀取同一個(gè)消息,會(huì)造成消息處理的重復(fù),且不能保證消息的順序,這就跟主動(dòng)推送(push)無異。
4.1. 消費(fèi)者分區(qū)分配策略
org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor
如果是自定義分配策略的話可以繼承AbstractPartitionAssignor這個(gè)類,它默認(rèn)有3個(gè)實(shí)現(xiàn)
4.1.1. range
range策略對(duì)應(yīng)的實(shí)現(xiàn)類是org.apache.kafka.clients.consumer.RangeAssignor
這是默認(rèn)的分配策略
可以通過消費(fèi)者配置中partition.assignment.strategy參數(shù)來指定分配策略,它的值是類的全路徑,是一個(gè)數(shù)組
/**
?*?The?range?assignor?works?on?a?per-topic?basis.?For?each?topic,?we?lay?out?the?available?partitions?in?numeric?order
?*?and?the?consumers?in?lexicographic?order.?We?then?divide?the?number?of?partitions?by?the?total?number?of
?*?consumers?to?determine?the?number?of?partitions?to?assign?to?each?consumer.?If?it?does?not?evenly
?*?divide,?then?the?first?few?consumers?will?have?one?extra?partition.
?*
?*?For?example,?suppose?there?are?two?consumers?C0?and?C1,?two?topics?t0?and?t1,?and?each?topic?has?3?partitions,
?*?resulting?in?partitions?t0p0,?t0p1,?t0p2,?t1p0,?t1p1,?and?t1p2.
?*
?*?The?assignment?will?be:
?*?C0:?[t0p0,?t0p1,?t1p0,?t1p1]
?*?C1:?[t0p2,?t1p2]
?*/
range策略是基于每個(gè)主題的
對(duì)于每個(gè)主題,我們以數(shù)字順序排列可用分區(qū),以字典順序排列消費(fèi)者。然后,將分區(qū)數(shù)量除以消費(fèi)者總數(shù),以確定分配給每個(gè)消費(fèi)者的分區(qū)數(shù)量。如果沒有平均劃分(PS:除不盡),那么最初的幾個(gè)消費(fèi)者將有一個(gè)額外的分區(qū)。
簡而言之,就是,
1、range分配策略針對(duì)的是主題(PS:也就是說,這里所說的分區(qū)指的某個(gè)主題的分區(qū),消費(fèi)者值的是訂閱這個(gè)主題的消費(fèi)者組中的消費(fèi)者實(shí)例)
2、首先,將分區(qū)按數(shù)字順序排行序,消費(fèi)者按消費(fèi)者名稱的字典序排好序
3、然后,用分區(qū)總數(shù)除以消費(fèi)者總數(shù)。如果能夠除盡,則皆大歡喜,平均分配;若除不盡,則位于排序前面的消費(fèi)者將多負(fù)責(zé)一個(gè)分區(qū)
例如,假設(shè)有兩個(gè)消費(fèi)者C0和C1,兩個(gè)主題t0和t1,并且每個(gè)主題有3個(gè)分區(qū),分區(qū)的情況是這樣的:t0p0,t0p1,t0p2,t1p0,t1p1,t1p2
那么,基于以上信息,最終消費(fèi)者分配分區(qū)的情況是這樣的:
C0: [t0p0, t0p1, t1p0, t1p1]
C1: [t0p2, t1p2]
為什么是這樣的結(jié)果呢?
因?yàn)?,?duì)于主題t0,分配的結(jié)果是C0負(fù)責(zé)P0和P1,C1負(fù)責(zé)P2;對(duì)于主題t2,也是如此,綜合起來就是這個(gè)結(jié)果
上面的過程用圖形表示的話大概是這樣的:

閱讀代碼,更有助于理解:
public?Map>?assign(Map?partitionsPerTopic,
????????????????????????????????????????????????????Map?subscriptions)?{
????//????主題與消費(fèi)者的映射????????????????????????????????????????????????????????????
????Map>?consumersPerTopic?=?consumersPerTopic(subscriptions);
????Map>?assignment?=?new?HashMap<>();
????for?(String?memberId?:?subscriptions.keySet())
????????assignment.put(memberId,?new?ArrayList());
????for?(Map.Entry>?topicEntry?:?consumersPerTopic.entrySet())?{
????????String?topic?=?topicEntry.getKey();????//????主題
????????List?consumersForTopic?=?topicEntry.getValue();????//????消費(fèi)者列表
????????//????partitionsPerTopic表示主題和分區(qū)數(shù)的映射
????????//????獲取主題下有多少個(gè)分區(qū)
????????Integer?numPartitionsForTopic?=?partitionsPerTopic.get(topic);
????????if?(numPartitionsForTopic?==?null)
????????????continue;
????????//????消費(fèi)者按字典序排序
????????Collections.sort(consumersForTopic);
????????//????分區(qū)數(shù)量除以消費(fèi)者數(shù)量
????????int?numPartitionsPerConsumer?=?numPartitionsForTopic?/?consumersForTopic.size();
????????//????取模,余數(shù)就是額外的分區(qū)
????????int?consumersWithExtraPartition?=?numPartitionsForTopic?%?consumersForTopic.size();
????????List?partitions?=?AbstractPartitionAssignor.partitions(topic,?numPartitionsForTopic);
????????for?(int?i?=?0,?n?=?consumersForTopic.size();?i?????????????int?start?=?numPartitionsPerConsumer?*?i?+?Math.min(i,?consumersWithExtraPartition);
????????????int?length?=?numPartitionsPerConsumer?+?(i?+?1?>?consumersWithExtraPartition???0?:?1);
????????????//????分配分區(qū)
????????????assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start,?start?+?length));
????????}
????}
????return?assignment;
}
4.1.2. roundrobin(輪詢)
roundronbin分配策略的具體實(shí)現(xiàn)是org.apache.kafka.clients.consumer.RoundRobinAssignor
/**
?*?The?round?robin?assignor?lays?out?all?the?available?partitions?and?all?the?available?consumers.?It
?*?then?proceeds?to?do?a?round?robin?assignment?from?partition?to?consumer.?If?the?subscriptions?of?all?consumer
?*?instances?are?identical,?then?the?partitions?will?be?uniformly?distributed.?(i.e.,?the?partition?ownership?counts
?*?will?be?within?a?delta?of?exactly?one?across?all?consumers.)
?*
?*?For?example,?suppose?there?are?two?consumers?C0?and?C1,?two?topics?t0?and?t1,?and?each?topic?has?3?partitions,
?*?resulting?in?partitions?t0p0,?t0p1,?t0p2,?t1p0,?t1p1,?and?t1p2.
?*
?*?The?assignment?will?be:
?*?C0:?[t0p0,?t0p2,?t1p1]
?*?C1:?[t0p1,?t1p0,?t1p2]
?*
?*?When?subscriptions?differ?across?consumer?instances,?the?assignment?process?still?considers?each
?*?consumer?instance?in?round?robin?fashion?but?skips?over?an?instance?if?it?is?not?subscribed?to
?*?the?topic.?Unlike?the?case?when?subscriptions?are?identical,?this?can?result?in?imbalanced
?*?assignments.?For?example,?we?have?three?consumers?C0,?C1,?C2,?and?three?topics?t0,?t1,?t2,
?*?with?1,?2,?and?3?partitions,?respectively.?Therefore,?the?partitions?are?t0p0,?t1p0,?t1p1,?t2p0,
?*?t2p1,?t2p2.?C0?is?subscribed?to?t0;?C1?is?subscribed?to?t0,?t1;?and?C2?is?subscribed?to?t0,?t1,?t2.
?*
?*?Tha?assignment?will?be:
?*?C0:?[t0p0]
?*?C1:?[t1p0]
?*?C2:?[t1p1,?t2p0,?t2p1,?t2p2]
?*/
輪詢分配策略是基于所有可用的消費(fèi)者和所有可用的分區(qū)的
與前面的range策略最大的不同就是它不再局限于某個(gè)主題
如果所有的消費(fèi)者實(shí)例的訂閱都是相同的,那么這樣最好了,可用統(tǒng)一分配,均衡分配
例如,假設(shè)有兩個(gè)消費(fèi)者C0和C1,兩個(gè)主題t0和t1,每個(gè)主題有3個(gè)分區(qū),分別是t0p0,t0p1,t0p2,t1p0,t1p1,t1p2
那么,最終分配的結(jié)果是這樣的:
C0: [t0p0, t0p2, t1p1]
C1: [t0p1, t1p0, t1p2]
用圖形表示大概是這樣的:

假設(shè),組中每個(gè)消費(fèi)者訂閱的主題不一樣,分配過程仍然以輪詢的方式考慮每個(gè)消費(fèi)者實(shí)例,但是如果沒有訂閱主題,則跳過實(shí)例。當(dāng)然,這樣的話分配肯定不均衡。
什么意思呢?也就是說,消費(fèi)者組是一個(gè)邏輯概念,同組意味著同一時(shí)刻分區(qū)只能被一個(gè)消費(fèi)者實(shí)例消費(fèi),換句話說,同組意味著一個(gè)分區(qū)只能分配給組中的一個(gè)消費(fèi)者。事實(shí)上,同組也可以不同訂閱,這就是說雖然屬于同一個(gè)組,但是它們訂閱的主題可以是不一樣的。
例如,假設(shè)有3個(gè)主題t0,t1,t2;其中,t0有1個(gè)分區(qū)p0,t1有2個(gè)分區(qū)p0和p1,t2有3個(gè)分區(qū)p0,p1和p2;有3個(gè)消費(fèi)者C0,C1和C2;C0訂閱t0,C1訂閱t0和t1,C2訂閱t0,t1和t2。那么,按照輪詢分配的話,C0應(yīng)該負(fù)責(zé)
首先,肯定是輪詢的方式,其次,比如說有主題t0,t1,t2,它們分別有1,2,3個(gè)分區(qū),也就是t0有1個(gè)分區(qū),t1有2個(gè)分區(qū),t2有3個(gè)分區(qū);有3個(gè)消費(fèi)者分別從屬于3個(gè)組,C0訂閱t0,C1訂閱t0和t1,C2訂閱t0,t1,t2;那么,按照輪詢分配的話,C0應(yīng)該負(fù)責(zé)t0p0,C1應(yīng)該負(fù)責(zé)t1p0,其余均由C2負(fù)責(zé)。
上述過程用圖形表示大概是這樣的:

為什么最后的結(jié)果是
C0: [t0p0]
C1: [t1p0]
C2: [t1p1, t2p0, t2p1, t2p2]
這是因?yàn)?,按照輪詢t0p1由C0負(fù)責(zé),t1p0由C1負(fù)責(zé),由于同組,C2只能負(fù)責(zé)t1p1,由于只有C2訂閱了t2,所以t2所有分區(qū)由C2負(fù)責(zé),綜合起來就是這個(gè)結(jié)果
細(xì)想一下可以發(fā)現(xiàn),這種情況下跟range分配的結(jié)果是一樣的
5. 測試代碼
<project?xmlns="http://maven.apache.org/POM/4.0.0"?xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
????xsi:schemaLocation="http://maven.apache.org/POM/4.0.0?http://maven.apache.org/xsd/maven-4.0.0.xsd">
????<modelVersion>4.0.0modelVersion>
????<groupId>com.cjs.examplegroupId>
????<artifactId>kafka-demoartifactId>
????<version>0.0.1-SNAPSHOTversion>
????<packaging>jarpackaging>
????<name>kafka-demoname>
????<description>description>
????<parent>
????????<groupId>org.springframework.bootgroupId>
????????<artifactId>spring-boot-starter-parentartifactId>
????????<version>2.0.5.RELEASEversion>
????????<relativePath/>?
????parent>
????<properties>
????????<project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
????????<project.reporting.outputEncoding>UTF-8project.reporting.outputEncoding>
????????<java.version>1.8java.version>
????properties>
????<dependencies>
????????<dependency>
????????????<groupId>org.springframework.bootgroupId>
????????????<artifactId>spring-boot-starter-webartifactId>
????????dependency>
????????<dependency>
????????????<groupId>org.springframework.kafkagroupId>
????????????<artifactId>spring-kafkaartifactId>
????????dependency>
????????<dependency>
????????????<groupId>org.springframework.bootgroupId>
????????????<artifactId>spring-boot-starter-testartifactId>
????????????<scope>testscope>
????????dependency>
????dependencies>
????<build>
????????<plugins>
????????????<plugin>
????????????????<groupId>org.springframework.bootgroupId>
????????????????<artifactId>spring-boot-maven-pluginartifactId>
????????????plugin>
????????plugins>
????build>
project>
package?com.cjs.kafka.producer;
import?org.apache.kafka.clients.producer.*;
import?java.util.Properties;
public?class?HelloProducer?{
????public?static?void?main(String[]?args)?{
????????Properties?props?=?new?Properties();
????????props.put("bootstrap.servers",?"192.168.1.133:9092");
????????props.put("acks",?"all");
????????props.put("retries",?0);
????????props.put("batch.size",?16384);
????????props.put("linger.ms",?1);
????????props.put("buffer.memory",?33554432);
????????props.put("key.serializer",?"org.apache.kafka.common.serialization.StringSerializer");
????????props.put("value.serializer",?"org.apache.kafka.common.serialization.StringSerializer");
????????Producer?producer?=?new?KafkaProducer(props);
????????for?(int?i?=?0;?i?100;?i++)?{
????????????producer.send(new?ProducerRecord("abc",?Integer.toString(i),?Integer.toString(i)),?new?Callback()?{
????????????????@Override
????????????????public?void?onCompletion(RecordMetadata?recordMetadata,?Exception?e)?{
????????????????????if?(null?!=?e)?{
????????????????????????e.printStackTrace();
????????????????????}else?{
????????????????????????System.out.println("callback:?"?+?recordMetadata.topic()?+?"?"?+?recordMetadata.partition()?+?"?"?+?recordMetadata.offset());
????????????????????}
????????????????}
????????????});
????????}
????????producer.close();
????}
}
package?com.cjs.kafka.consumer;
import?org.apache.kafka.clients.consumer.ConsumerRecord;
import?org.apache.kafka.clients.consumer.ConsumerRecords;
import?org.apache.kafka.clients.consumer.KafkaConsumer;
import?java.util.Arrays;
import?java.util.Properties;
public?class?HelloConsumer?{
????public?static?void?main(String[]?args)?{
????????Properties?props?=?new?Properties();
????????props.put("bootstrap.servers",?"192.168.1.133:9092");
????????props.put("group.id",?"test");
????????props.put("enable.auto.commit",?"true");
????????props.put("auto.commit.interval.ms",?"1000");
//????????props.put("partition.assignment.strategy",?"org.apache.kafka.clients.consumer.RoundRobinAssignor");
????????props.put("key.deserializer",?"org.apache.kafka.common.serialization.StringDeserializer");
????????props.put("value.deserializer",?"org.apache.kafka.common.serialization.StringDeserializer");
????????KafkaConsumer?consumer?=?new?KafkaConsumer(props);
????????consumer.subscribe(Arrays.asList("foo",?"bar",?"abc"));
????????while?(true)?{
????????????ConsumerRecords?records?=?consumer.poll(100);
????????????for?(ConsumerRecord?record?:?records)?{
????????????????System.out.printf("partition?=?%s,?offset?=?%d,?key?=?%s,?value?=?%s%n",?record.partition(),?record.offset(),?record.key(),?record.value());
????????????}
????????}
????}
}
6. 參考
http://kafka.apache.org/documentation/#consumerconfigs
https://blog.csdn.net/feelwing1314/article/details/81097167
https://blog.csdn.net/OiteBody/article/details/80595971
https://blog.csdn.net/YChenFeng/article/details/74980531
往期推薦
掃一掃,關(guān)注我
知曉前沿科技,領(lǐng)略技術(shù)魅力

