kafka消費(fèi)者分組消費(fèi)的再平衡策略
一,Kafka消費(fèi)模式
從kafka消費(fèi)消息,kafka客戶(hù)端提供兩種模式: 分區(qū)消費(fèi),分組消費(fèi)。
分區(qū)消費(fèi)對(duì)應(yīng)的就是我們的DirectKafkaInputDStream
分組消費(fèi)對(duì)應(yīng)的就是我們的KafkaInputDStream
消費(fèi)者數(shù)目跟分區(qū)數(shù)目的關(guān)系:
1),一個(gè)消費(fèi)者可以消費(fèi)一個(gè)到全部分區(qū)數(shù)據(jù)
2),分組消費(fèi),同一個(gè)分組內(nèi)所有消費(fèi)者消費(fèi)一份完整的數(shù)據(jù),此時(shí)一個(gè)分區(qū)數(shù)據(jù)只能被一個(gè)消費(fèi)者消費(fèi),而一個(gè)消費(fèi)者可以消費(fèi)多個(gè)分區(qū)數(shù)據(jù)
3),同一個(gè)消費(fèi)組內(nèi),消費(fèi)者數(shù)目大于分區(qū)數(shù)目后,消費(fèi)者會(huì)有空余=分區(qū)數(shù)-消費(fèi)者數(shù)

二,分組消費(fèi)的再平衡策略
當(dāng)一個(gè)group中,有consumer加入或者離開(kāi)時(shí),會(huì)觸發(fā)partitions均衡partition.assignment.strategy,決定了partition分配給消費(fèi)者的分配策略,有兩種分配策略:
1,org.apache.kafka.clients.consumer.RangeAssignor
默認(rèn)采用的是這種再平衡方式,這種方式分配只是針對(duì)消費(fèi)者訂閱的topic的單個(gè)topic所有分區(qū)再分配,Consumer Rebalance的算法如下:
1),將目標(biāo)Topic下的所有Partirtion排序,存于TP
2),對(duì)某Consumer Group下所有Consumer按照名字根據(jù)字典排序,存于CG,第i個(gè)Consumer記為Ci
3),N=size(TP)/size(CG)
4),R=size(TP)%size(CG)
5),Ci獲取的分區(qū)起始位置=N*i+min(i,R)
6),Ci獲取的分區(qū)總數(shù)=N+(if (i+ 1 > R) 0 else 1)
2,org.apache.kafka.clients.consumer.RoundRobinAssignor
這種分配策略是針對(duì)消費(fèi)者消費(fèi)的所有topic的所有分區(qū)進(jìn)行分配。當(dāng)有新的消費(fèi)者加入或者有消費(fèi)者退出,就會(huì)觸發(fā)rebalance。這種方式有兩點(diǎn)要求
A),在實(shí)例化每個(gè)消費(fèi)者時(shí)給每個(gè)topic指定相同的流數(shù)
B),每個(gè)消費(fèi)者實(shí)例訂閱的topic必須相同
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, new Integer(1));Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
其中,topic對(duì)應(yīng)的value就是流數(shù)目。對(duì)應(yīng)的kafka源碼是在
在kafka.consumer.ZookeeperConsumerConnector的consume方法里,根據(jù)這個(gè)參數(shù)構(gòu)建了相同數(shù)目的KafkaStream。
這種策略的具體分配步驟:
1),對(duì)所有topic的所有分區(qū)按照topic+partition轉(zhuǎn)string之后的hash進(jìn)行排序
2),對(duì)消費(fèi)者按字典進(jìn)行排序
3),然后輪訓(xùn)的方式將分區(qū)分配給消費(fèi)者
3,舉例對(duì)比
舉個(gè)例子,比如有兩個(gè)消費(fèi)者(c0,c1),兩個(gè)topic(t0,t1),每個(gè)topic有三個(gè)分區(qū)p(0-2),
那么采用RangeAssignor,結(jié)果為:
* C0: [t0p0, t0p1, t1p0, t1p1]
* C1: [t0p2, t1p2]
采用RoundRobinAssignor,結(jié)果為:
* C0: [t0p0, t0p2, t1p1]
* C1: [t0p1, t1p0, t1p2]
三,本節(jié)源碼設(shè)計(jì)的重要概念及zookeeper相關(guān)目錄
1,本節(jié)涉及的zookeeper目錄
A),消費(fèi)者目錄,獲取子節(jié)點(diǎn)就可以獲取所有的消費(fèi)者
/consumers/group.id/ids/
B),topic的目錄,可以獲取topic,分區(qū)及副本信息
/brokers/topics/topicName
值:
{"version":1,"partitions":{"0":[5,6],"2":[1,4],"27":[0,4],"1":[7,0]}}
partitions對(duì)應(yīng)值的key是分區(qū)id,value是Broker id列表。
C),分區(qū)所屬的消費(fèi)者線(xiàn)程關(guān)系
/consumers/groupId/owners/topic/partitionid
值就是消費(fèi)者線(xiàn)程id,也就是在A向獲取的消費(fèi)者后加了一個(gè)id值。
2,涉及的概念
A),消費(fèi)者ID
val consumerIdString = {var consumerUuid : String = nullconfig.consumerId match {case Some(consumerId) // for testing only=> consumerUuid = consumerIdcase None // generate unique consumerId automatically=> val uuid = UUID.randomUUID()consumerUuid = "%s-%d-%s".format(InetAddress.getLocalHost.getHostName, System.currentTimeMillis,uuid.getMostSignificantBits().toHexString.substring(0,8))}config.groupId + "_" + consumerUuid}
B),消費(fèi)者線(xiàn)程ID
主要是在消費(fèi)者id的基礎(chǔ)上,根據(jù)消費(fèi)者構(gòu)建指定的topic的Stream數(shù)目,遞增加了個(gè)數(shù)字的值
for ((topic, nConsumers) <- topicCountMap) {val consumerSet = new mutable.HashSet[ConsumerThreadId]assert(nConsumers >= 1)for (i <- 0 until nConsumers)consumerSet += ConsumerThreadId(consumerIdString, i) //ConusmerId的結(jié)尾加上一個(gè)常量區(qū)別 owners 目錄下可以看到consumerThreadIdsPerTopicMap.put(topic, consumerSet)}
ConsumerThreadId
"%s-%d".format(consumer, threadId)C),TopicAndPartition
帶topic名字的表示每個(gè)分區(qū),重點(diǎn)關(guān)注其toString方法,在比較的時(shí)候用到了。
TopicAndPartition(topic: String, partition: Int)override def toString = "[%s,%d]".format(topic, partition)四,源碼解析
1,AssignmentContext
主要作用是根據(jù)指定的消費(fèi)組,消費(fèi)者,topic信息,從zookeeper上獲取相關(guān)數(shù)據(jù)并解析得到,兩種分配策略要用的四個(gè)數(shù)據(jù)結(jié)構(gòu)。解析過(guò)程請(qǐng)結(jié)合zookeeper的相關(guān)目錄及節(jié)點(diǎn)的數(shù)據(jù)類(lèi)型和kafka源碼自行閱讀。
class AssignmentContext(group: String, val consumerId: String, excludeInternalTopics: Boolean, zkClient: ZkClient) {//(topic,ConsumerThreadIdSet) //指定一個(gè)消費(fèi)者,根據(jù)每個(gè)topic指定的streams數(shù)目,構(gòu)建相同數(shù)目的流val myTopicThreadIds: collection.Map[String, collection.Set[ConsumerThreadId]] = {val myTopicCount = TopicCount.constructTopicCount(group, consumerId, zkClient, excludeInternalTopics)myTopicCount.getConsumerThreadIdsPerTopic}//(topic 分區(qū)) /brokers/topics/topicname 值val partitionsForTopic: collection.Map[String, Seq[Int]] =ZkUtils.getPartitionsForTopics(zkClient, myTopicThreadIds.keySet.toSeq)//(topic,ConsumerThreadId(當(dāng)前消費(fèi)者id)) ///consumers/Groupid/ids 子節(jié)點(diǎn)val consumersForTopic: collection.Map[String, List[ConsumerThreadId]] =ZkUtils.getConsumersPerTopic(zkClient, group, excludeInternalTopics)///consumers/Groupid/ids的所有的子節(jié)點(diǎn)val consumers: Seq[String] = ZkUtils.getConsumersInGroup(zkClient, group).sorted}2,RangeAssignorclass RangeAssignor() extends PartitionAssignor with Logging {def assign(ctx: AssignmentContext) = {val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]()for ((topic, consumerThreadIdSet) <- ctx.myTopicThreadIds) {val curConsumers = ctx.consumersForTopic(topic) //當(dāng)前topic的所有消費(fèi)者val curPartitions: Seq[Int] = ctx.partitionsForTopic(topic) //當(dāng)前topic的所有分區(qū)//val nPartsPerConsumer = curPartitions.size / curConsumers.sizeval nConsumersWithExtraPart = curPartitions.size % curConsumers.sizeinfo("Consumer " + ctx.consumerId + " rebalancing the following partitions: " + curPartitions +" for topic " + topic + " with consumers: " + curConsumers)for (consumerThreadId <- consumerThreadIdSet) {val myConsumerPosition = curConsumers.indexOf(consumerThreadId) //獲取當(dāng)前消費(fèi)者線(xiàn)程的在集合中的位置assert(myConsumerPosition >= 0)//獲取分區(qū)的起始位置val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)//獲取當(dāng)前消費(fèi)者消費(fèi)的分區(qū)數(shù)目val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)/*** Range-partition the sorted partitions to consumers for better locality.* The first few consumers pick up an extra partition, if any.*/if (nParts <= 0)warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic)else {//將分區(qū)關(guān)系描述寫(xiě)入partitionOwnershipDecisionfor (i <- startPart until startPart + nParts) {val partition = curPartitions(i)info(consumerThreadId + " attempting to claim partition " + partition)// record the partition ownership decisionpartitionOwnershipDecision += (TopicAndPartition(topic, partition) -> consumerThreadId)}}}}partitionOwnershipDecision}}
3,RoundRobinAssignor
class RoundRobinAssignor() extends PartitionAssignor with Logging {def assign(ctx: AssignmentContext) = {val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]()// check conditions (a) and (b) topic, List[ConsumerThreadId]val (headTopic, headThreadIdSet) = (ctx.consumersForTopic.head._1, ctx.consumersForTopic.head._2.toSet)//測(cè)試輸出ctx.consumersForTopic.foreach { case (topic, threadIds) =>val threadIdSet = threadIds.toSetrequire(threadIdSet == headThreadIdSet,"Round-robin assignment is allowed only if all consumers in the group subscribe to the same topics, " +"AND if the stream counts across topics are identical for a given consumer instance.\n" +"Topic %s has the following available consumer streams: %s\n".format(topic, threadIdSet) +"Topic %s has the following available consumer streams: %s\n".format(headTopic, headThreadIdSet))}//為傳入的集合創(chuàng)建一個(gè)循環(huán)迭代器,傳入之前排序是按照字典排序val threadAssignor = Utils.circularIterator(headThreadIdSet.toSeq.sorted)info("Starting round-robin assignment with consumers " + ctx.consumers)//TopicAndPartition 按照字符串的hash排序val allTopicPartitions = ctx.partitionsForTopic.flatMap { case(topic, partitions) =>info("Consumer %s rebalancing the following partitions for topic %s: %s".format(ctx.consumerId, topic, partitions))partitions.map(partition => {TopicAndPartition(topic, partition) //toString = "[%s,%d]".format(topic, partition)})}.toSeq.sortWith((topicPartition1, topicPartition2) => {/** Randomize the order by taking the hashcode to reduce the likelihood of all partitions of a given topic ending* up on one consumer (if it has a high enough stream count).*///按照hash值進(jìn)行排序topicPartition1.toString.hashCode < topicPartition2.toString.hashCode})//過(guò)濾得到當(dāng)前消費(fèi)者的線(xiàn)程idallTopicPartitions.foreach(topicPartition => {val threadId = threadAssignor.next()if (threadId.consumer == ctx.consumerId)partitionOwnershipDecision += (topicPartition -> threadId)})//返回得到結(jié)果partitionOwnershipDecision}}
五,總結(jié)
本文主要是講解分組消費(fèi)的兩種將分區(qū)分配給消費(fèi)者線(xiàn)程的分配策略。結(jié)合前面兩篇
<Kafka源碼系列之Consumer高級(jí)API性能分析>和<Kafka源碼系列之源碼解析SimpleConsumer的消費(fèi)過(guò)程>,大家應(yīng)該會(huì)對(duì)kafka的java 消費(fèi)者客戶(hù)端的實(shí)現(xiàn)及性能優(yōu)缺點(diǎn)有徹底的了解了。
分組,分區(qū)兩種種模型其實(shí)跟kafka集群并沒(méi)有關(guān)系,是我們java客戶(hù)端實(shí)現(xiàn)的區(qū)別。生產(chǎn)中可以根據(jù)自己的需要選擇兩種消費(fèi)模型。建議流量不是很大,也沒(méi)過(guò)分的性能需求,選擇分組消費(fèi),這樣同分組多消費(fèi)者的話(huà)相當(dāng)于實(shí)現(xiàn)了同分組的消費(fèi)者故障轉(zhuǎn)移。
