<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          kafka消費(fèi)者分組消費(fèi)的再平衡策略

          共 8651字,需瀏覽 18分鐘

           ·

          2021-06-27 13:56

          一,Kafka消費(fèi)模式

          kafka消費(fèi)消息,kafka客戶(hù)端提供兩種模式: 分區(qū)消費(fèi),分組消費(fèi)。

          分區(qū)消費(fèi)對(duì)應(yīng)的就是我們的DirectKafkaInputDStream

          分組消費(fèi)對(duì)應(yīng)的就是我們的KafkaInputDStream

          消費(fèi)者數(shù)目跟分區(qū)數(shù)目的關(guān)系:

          1),一個(gè)消費(fèi)者可以消費(fèi)一個(gè)到全部分區(qū)數(shù)據(jù)

          2),分組消費(fèi),同一個(gè)分組內(nèi)所有消費(fèi)者消費(fèi)一份完整的數(shù)據(jù),此時(shí)一個(gè)分區(qū)數(shù)據(jù)只能被一個(gè)消費(fèi)者消費(fèi),而一個(gè)消費(fèi)者可以消費(fèi)多個(gè)分區(qū)數(shù)據(jù)

          3),同一個(gè)消費(fèi)組內(nèi),消費(fèi)者數(shù)目大于分區(qū)數(shù)目后,消費(fèi)者會(huì)有空余=分區(qū)數(shù)-消費(fèi)者數(shù)

          二,分組消費(fèi)的再平衡策略

          當(dāng)一個(gè)group,consumer加入或者離開(kāi)時(shí),會(huì)觸發(fā)partitions均衡partition.assignment.strategy,決定了partition分配給消費(fèi)者的分配策略,有兩種分配策略:

          1,org.apache.kafka.clients.consumer.RangeAssignor

          默認(rèn)采用的是這種再平衡方式,這種方式分配只是針對(duì)消費(fèi)者訂閱的topic的單個(gè)topic所有分區(qū)再分配,Consumer Rebalance的算法如下:

          1),將目標(biāo)Topic下的所有Partirtion排序,存于TP

          2),對(duì)某Consumer Group下所有Consumer按照名字根據(jù)字典排序,存于CG,第i個(gè)Consumer記為Ci

          3),N=size(TP)/size(CG)

          4),R=size(TP)%size(CG)

          5),Ci獲取的分區(qū)起始位置=N*i+min(i,R)

          6),Ci獲取的分區(qū)總數(shù)=N+(if (i+ 1 > R) 0 else 1)

          2,org.apache.kafka.clients.consumer.RoundRobinAssignor

          這種分配策略是針對(duì)消費(fèi)者消費(fèi)的所有topic的所有分區(qū)進(jìn)行分配。當(dāng)有新的消費(fèi)者加入或者有消費(fèi)者退出,就會(huì)觸發(fā)rebalance。這種方式有兩點(diǎn)要求

          A),在實(shí)例化每個(gè)消費(fèi)者時(shí)給每個(gè)topic指定相同的流數(shù)

          B),每個(gè)消費(fèi)者實(shí)例訂閱的topic必須相同

          Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, new Integer(1));Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

          其中,topic對(duì)應(yīng)的value就是流數(shù)目。對(duì)應(yīng)的kafka源碼是在

          kafka.consumer.ZookeeperConsumerConnector的consume方法里,根據(jù)這個(gè)參數(shù)構(gòu)建了相同數(shù)目的KafkaStream。

          這種策略的具體分配步驟:

          1),對(duì)所有topic的所有分區(qū)按照topic+partition轉(zhuǎn)string之后的hash進(jìn)行排序

          2),對(duì)消費(fèi)者按字典進(jìn)行排序

          3),然后輪訓(xùn)的方式將分區(qū)分配給消費(fèi)者

          3,舉例對(duì)比

          舉個(gè)例子,比如有兩個(gè)消費(fèi)者(c0,c1),兩個(gè)topic(t0,t1),每個(gè)topic有三個(gè)分區(qū)p(0-2),

          那么采用RangeAssignor,結(jié)果為:

          * C0: [t0p0, t0p1, t1p0, t1p1]

          * C1: [t0p2, t1p2]

          采用RoundRobinAssignor,結(jié)果為:

          * C0: [t0p0, t0p2, t1p1]

          * C1: [t0p1, t1p0, t1p2]

          三,本節(jié)源碼設(shè)計(jì)的重要概念及zookeeper相關(guān)目錄

          1,本節(jié)涉及的zookeeper目錄

          A),消費(fèi)者目錄,獲取子節(jié)點(diǎn)就可以獲取所有的消費(fèi)者

          /consumers/group.id/ids/

          B),topic的目錄,可以獲取topic,分區(qū)及副本信息

          /brokers/topics/topicName

          值:

          {"version":1,"partitions":{"0":[5,6],"2":[1,4],"27":[0,4],"1":[7,0]}}

          partitions對(duì)應(yīng)值的key是分區(qū)id,value是Broker id列表。

          C),分區(qū)所屬的消費(fèi)者線(xiàn)程關(guān)系

          /consumers/groupId/owners/topic/partitionid

          值就是消費(fèi)者線(xiàn)程id,也就是在A向獲取的消費(fèi)者后加了一個(gè)id值。

          2,涉及的概念

          A),消費(fèi)者ID

          val consumerIdString = {  var consumerUuid : String = null  config.consumerId match {    case Some(consumerId) // for testing only    => consumerUuid = consumerId    case None // generate unique consumerId automatically    => val uuid = UUID.randomUUID()    consumerUuid = "%s-%d-%s".format(      InetAddress.getLocalHost.getHostName, System.currentTimeMillis,      uuid.getMostSignificantBits().toHexString.substring(0,8))  }  config.groupId + "_" + consumerUuid}

          B),消費(fèi)者線(xiàn)程ID

          主要是在消費(fèi)者id的基礎(chǔ)上,根據(jù)消費(fèi)者構(gòu)建指定的topicStream數(shù)目,遞增加了個(gè)數(shù)字的值

          for ((topic, nConsumers) <- topicCountMap) {  val consumerSet = new mutable.HashSet[ConsumerThreadId]  assert(nConsumers >= 1)  for (i <- 0 until nConsumers)    consumerSet += ConsumerThreadId(consumerIdString, i) //ConusmerId的結(jié)尾加上一個(gè)常量區(qū)別 owners 目錄下可以看到  consumerThreadIdsPerTopicMap.put(topic, consumerSet)}

          ConsumerThreadId

          "%s-%d".format(consumer, threadId)

          C),TopicAndPartition

          topic名字的表示每個(gè)分區(qū),重點(diǎn)關(guān)注其toString方法,在比較的時(shí)候用到了。

          TopicAndPartition(topic: String, partition: Int)
          override def toString = "[%s,%d]".format(topic, partition)

          四,源碼解析

          1AssignmentContext

          主要作用是根據(jù)指定的消費(fèi)組,消費(fèi)者,topic信息,從zookeeper上獲取相關(guān)數(shù)據(jù)并解析得到,兩種分配策略要用的四個(gè)數(shù)據(jù)結(jié)構(gòu)。解析過(guò)程請(qǐng)結(jié)合zookeeper的相關(guān)目錄及節(jié)點(diǎn)的數(shù)據(jù)類(lèi)型和kafka源碼自行閱讀。

          class AssignmentContext(group: String, val consumerId: String, excludeInternalTopics: Boolean, zkClient: ZkClient) {  //(topic,ConsumerThreadIdSet) //指定一個(gè)消費(fèi)者,根據(jù)每個(gè)topic指定的streams數(shù)目,構(gòu)建相同數(shù)目的流  val myTopicThreadIds: collection.Map[String, collection.Set[ConsumerThreadId]] = {    val myTopicCount = TopicCount.constructTopicCount(group, consumerId, zkClient, excludeInternalTopics)    myTopicCount.getConsumerThreadIdsPerTopic  }
          //(topic 分區(qū)) /brokers/topics/topicname 值 val partitionsForTopic: collection.Map[String, Seq[Int]] = ZkUtils.getPartitionsForTopics(zkClient, myTopicThreadIds.keySet.toSeq)
          //(topic,ConsumerThreadId(當(dāng)前消費(fèi)者id)) ///consumers/Groupid/ids 子節(jié)點(diǎn) val consumersForTopic: collection.Map[String, List[ConsumerThreadId]] = ZkUtils.getConsumersPerTopic(zkClient, group, excludeInternalTopics)
          ///consumers/Groupid/ids的所有的子節(jié)點(diǎn) val consumers: Seq[String] = ZkUtils.getConsumersInGroup(zkClient, group).sorted} 2,RangeAssignorclass RangeAssignor() extends PartitionAssignor with Logging {
          def assign(ctx: AssignmentContext) = { val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]()
          for ((topic, consumerThreadIdSet) <- ctx.myTopicThreadIds) { val curConsumers = ctx.consumersForTopic(topic) //當(dāng)前topic的所有消費(fèi)者 val curPartitions: Seq[Int] = ctx.partitionsForTopic(topic) //當(dāng)前topic的所有分區(qū)
          // val nPartsPerConsumer = curPartitions.size / curConsumers.size val nConsumersWithExtraPart = curPartitions.size % curConsumers.size
          info("Consumer " + ctx.consumerId + " rebalancing the following partitions: " + curPartitions + " for topic " + topic + " with consumers: " + curConsumers)
          for (consumerThreadId <- consumerThreadIdSet) { val myConsumerPosition = curConsumers.indexOf(consumerThreadId) //獲取當(dāng)前消費(fèi)者線(xiàn)程的在集合中的位置 assert(myConsumerPosition >= 0) //獲取分區(qū)的起始位置 val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart) //獲取當(dāng)前消費(fèi)者消費(fèi)的分區(qū)數(shù)目 val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)
          /** * Range-partition the sorted partitions to consumers for better locality. * The first few consumers pick up an extra partition, if any. */ if (nParts <= 0) warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic) else { //將分區(qū)關(guān)系描述寫(xiě)入partitionOwnershipDecision for (i <- startPart until startPart + nParts) { val partition = curPartitions(i) info(consumerThreadId + " attempting to claim partition " + partition) // record the partition ownership decision partitionOwnershipDecision += (TopicAndPartition(topic, partition) -> consumerThreadId) } } } }
          partitionOwnershipDecision }

          3RoundRobinAssignor

          class RoundRobinAssignor() extends PartitionAssignor with Logging {
          def assign(ctx: AssignmentContext) = { val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]()
          // check conditions (a) and (b) topic, List[ConsumerThreadId] val (headTopic, headThreadIdSet) = (ctx.consumersForTopic.head._1, ctx.consumersForTopic.head._2.toSet)
          //測(cè)試輸出 ctx.consumersForTopic.foreach { case (topic, threadIds) => val threadIdSet = threadIds.toSet require(threadIdSet == headThreadIdSet, "Round-robin assignment is allowed only if all consumers in the group subscribe to the same topics, " + "AND if the stream counts across topics are identical for a given consumer instance.\n" + "Topic %s has the following available consumer streams: %s\n".format(topic, threadIdSet) + "Topic %s has the following available consumer streams: %s\n".format(headTopic, headThreadIdSet)) } //為傳入的集合創(chuàng)建一個(gè)循環(huán)迭代器,傳入之前排序是按照字典排序 val threadAssignor = Utils.circularIterator(headThreadIdSet.toSeq.sorted)
          info("Starting round-robin assignment with consumers " + ctx.consumers)
          //TopicAndPartition 按照字符串的hash排序 val allTopicPartitions = ctx.partitionsForTopic.flatMap { case(topic, partitions) => info("Consumer %s rebalancing the following partitions for topic %s: %s" .format(ctx.consumerId, topic, partitions))
          partitions.map(partition => { TopicAndPartition(topic, partition) //toString = "[%s,%d]".format(topic, partition) })
          }.toSeq.sortWith((topicPartition1, topicPartition2) => { /* * Randomize the order by taking the hashcode to reduce the likelihood of all partitions of a given topic ending * up on one consumer (if it has a high enough stream count). */ //按照hash值進(jìn)行排序 topicPartition1.toString.hashCode < topicPartition2.toString.hashCode })
          //過(guò)濾得到當(dāng)前消費(fèi)者的線(xiàn)程id allTopicPartitions.foreach(topicPartition => { val threadId = threadAssignor.next() if (threadId.consumer == ctx.consumerId) partitionOwnershipDecision += (topicPartition -> threadId) })
          //返回得到結(jié)果 partitionOwnershipDecision }}

           

          五,總結(jié)

          本文主要是講解分組消費(fèi)的兩種將分區(qū)分配給消費(fèi)者線(xiàn)程的分配策略。結(jié)合前面兩篇

          <Kafka源碼系列之Consumer高級(jí)API性能分析>和<Kafka源碼系列之源碼解析SimpleConsumer的消費(fèi)過(guò)程>,大家應(yīng)該會(huì)對(duì)kafka的java 消費(fèi)者客戶(hù)端的實(shí)現(xiàn)及性能優(yōu)缺點(diǎn)有徹底的了解了。

          分組,分區(qū)兩種種模型其實(shí)跟kafka集群并沒(méi)有關(guān)系,是我們java客戶(hù)端實(shí)現(xiàn)的區(qū)別。生產(chǎn)中可以根據(jù)自己的需要選擇兩種消費(fèi)模型。建議流量不是很大,也沒(méi)過(guò)分的性能需求,選擇分組消費(fèi),這樣同分組多消費(fèi)者的話(huà)相當(dāng)于實(shí)現(xiàn)了同分組的消費(fèi)者故障轉(zhuǎn)移。

          瀏覽 40
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  免费在线观看网站性情淫乱做爱 | 色婷婷AV无码久久精品 | 色情网站免费在线观看 | 三级片视频在线观看 | 日本高清视色www |