<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          kafka源碼學(xué)習(xí):KafkaApis-LEADER_AND_ISR

          共 19588字,需瀏覽 40分鐘

           ·

          2021-06-09 10:47

          作者:小兵

          來源:SegmentFault 思否社區(qū)


          每當(dāng)controller發(fā)生狀態(tài)變更時(shí),都會(huì)通過調(diào)用sendRequestsToBrokers方法發(fā)送leaderAndIsrRequest請求,本文主要介紹kafka服務(wù)端處理該請求的邏輯和過程。

          LEADER_AND_ISR


          整體邏輯流程

          case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)

          在server端收到LEADER_AND_ISR請求后,會(huì)調(diào)用 handleLeaderAndIsrRequest 方法進(jìn)行處理,該方法的處理流程如圖所示:

          源碼

          handleLeaderAndIsrRequest

          handleLeaderAndIsrRequest函數(shù)的邏輯結(jié)果主要分為以下幾個(gè)部分:

          1. 構(gòu)造callback函數(shù)onLeadershipChange,用來回調(diào)coordinator處理新增的leader或者follower節(jié)點(diǎn)
          2. 校驗(yàn)請求權(quán)限,如果校驗(yàn)成功調(diào)用replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange)進(jìn)行后續(xù)處理【此處該函數(shù)的主流程】,否則,直接返回錯(cuò)誤碼Errors.CLUSTER_AUTHORIZATION_FAILED.code
          def handleLeaderAndIsrRequest(request: RequestChannel.Request) {    // ensureTopicExists is only for client facing requests    // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they    // stop serving data to clients for the topic being deleted    val correlationId = request.header.correlationId    val leaderAndIsrRequest = request.body.asInstanceOf[LeaderAndIsrRequest]
          try { def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]) { // for each new leader or follower, call coordinator to handle consumer group migration. // this callback is invoked under the replica state change lock to ensure proper order of // leadership changes updatedLeaders.foreach { partition => if (partition.topic == Topic.GroupMetadataTopicName) coordinator.handleGroupImmigration(partition.partitionId) } updatedFollowers.foreach { partition => if (partition.topic == Topic.GroupMetadataTopicName) coordinator.handleGroupEmigration(partition.partitionId) } }
          val leaderAndIsrResponse = if (authorize(request.session, ClusterAction, Resource.ClusterResource)) { val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange) new LeaderAndIsrResponse(result.errorCode, result.responseMap.mapValues(new JShort(_)).asJava) } else { val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, new JShort(Errors.CLUSTER_AUTHORIZATION_FAILED.code))).toMap new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code, result.asJava) }
          requestChannel.sendResponse(new Response(request, leaderAndIsrResponse)) } catch { case e: KafkaStorageException => fatal("Disk error during leadership change.", e) Runtime.getRuntime.halt(1) } }

          becomeLeaderOrFollower

          ReplicaManager的主要工作有以下幾個(gè)部分,具體代碼位置見中文注釋:

          1. 校驗(yàn)controller epoch是否合規(guī),只處理比自己epoch大且本地有副本的tp的請求
          2. 調(diào)用makeLeadersmakeFollowers方法構(gòu)造新增的leader partition和follower partition【此處為主要邏輯,后面小結(jié)詳細(xì)介紹】
          3. 如果是第一次收到請求,啟動(dòng)定時(shí)更新hw的線程
          4. 停掉空的Fetcher線程
          5. 調(diào)用回調(diào)函數(shù),coordinator處理新增的leader partition和follower partition
          def becomeLeaderOrFollower(correlationId: Int,leaderAndISRRequest: LeaderAndIsrRequest,                           metadataCache: MetadataCache,                           onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): BecomeLeaderOrFollowerResult = {    leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>        stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]"                                .format(localBrokerId, stateInfo, correlationId,                                        leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, topicPartition.topic, topicPartition.partition))    }    //主要代碼,構(gòu)造返回結(jié)果    replicaStateChangeLock synchronized {        val responseMap = new mutable.HashMap[TopicPartition, Short]        //如果controller epoch不正確,直接返回Errors.STALE_CONTROLLER_EPOCH.code錯(cuò)誤碼        if (leaderAndISRRequest.controllerEpoch < controllerEpoch) {            stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d since " +                                    "its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId,                                                                                                                  correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch))            BecomeLeaderOrFollowerResult(responseMap, Errors.STALE_CONTROLLER_EPOCH.code)        } else {            val controllerId = leaderAndISRRequest.controllerId            controllerEpoch = leaderAndISRRequest.controllerEpoch
          // First check partition's leader epoch //校驗(yàn)所有的partition信息,分為以下3種情況: //1. 本地不包含該partition,返回Errors.UNKNOWN_TOPIC_OR_PARTITION.code //2. 本地包含該partition,controller epoch比本地epoch大,信息正確 //3. controller epoch比本地epoch小,返回Errors.STALE_CONTROLLER_EPOCH.code val partitionState = new mutable.HashMap[Partition, PartitionState]() leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) => val partition = getOrCreatePartition(topicPartition) val partitionLeaderEpoch = partition.getLeaderEpoch // If the leader epoch is valid record the epoch of the controller that made the leadership decision. // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path if (partitionLeaderEpoch < stateInfo.leaderEpoch) { if(stateInfo.replicas.contains(localBrokerId)) partitionState.put(partition, stateInfo) else { stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " + "epoch %d for partition [%s,%d] as itself is not in assigned replica list %s") .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch, topicPartition.topic, topicPartition.partition, stateInfo.replicas.asScala.mkString(","))) responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code) } } else { // Otherwise record the error code in response stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " + "epoch %d for partition [%s,%d] since its associated leader epoch %d is not higher than the current leader epoch %d") .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch, topicPartition.topic, topicPartition.partition, stateInfo.leaderEpoch, partitionLeaderEpoch)) responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH.code) } } //處理leader&follower副本,構(gòu)造partitionsBecomeLeader和partitionsBecomeFollower供callback處理(coordinator處理) val partitionsTobeLeader = partitionState.filter { case (_, stateInfo) => stateInfo.leader == localBrokerId } val partitionsToBeFollower = partitionState -- partitionsTobeLeader.keys
          val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty) // 主要調(diào)用 makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap) else Set.empty[Partition] val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty) // 主要調(diào)用 makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap, metadataCache) else Set.empty[Partition]
          // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions // have been completely populated before starting the checkpointing there by avoiding weird race conditions // 在第一次收到收到請求后,就會(huì)啟動(dòng)Scheduler,定時(shí)更新hw checkpoint if (!hwThreadInitialized) { startHighWaterMarksCheckPointThread() hwThreadInitialized = true } // 因?yàn)樯厦娓铝嗽畔ⅲ颂帣z查停掉不必要的Fetcher線程 replicaFetcherManager.shutdownIdleFetcherThreads() // 回調(diào) onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower) BecomeLeaderOrFollowerResult(responseMap, Errors.NONE.code) } }}

          makeLeaders

          處理新增的leader partition
          1. 停止這些partition的follower線程
          2. 更新這些partition的metadata cache
          3. 構(gòu)造新增leader集合

          private def makeLeaders(controllerId: Int,                          epoch: Int,                          partitionState: Map[Partition, PartitionState],                          correlationId: Int,                          responseMap: mutable.Map[TopicPartition, Short]): Set[Partition] = {    // 構(gòu)造becomeLeaderOrFollower需要的返回結(jié)果    for (partition <- partitionState.keys)      responseMap.put(partition.topicPartition, Errors.NONE.code)
          val partitionsToMakeLeaders: mutable.Set[Partition] = mutable.Set()
          try { // First stop fetchers for all the partitions // 停止Fetcher線程 replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(_.topicPartition)) // Update the partition information to be the leader // 構(gòu)造新增leader partition集合 partitionState.foreach{ case (partition, partitionStateInfo) => if (partition.makeLeader(controllerId, partitionStateInfo, correlationId)) partitionsToMakeLeaders += partition else stateChangeLogger.info(("Broker %d skipped the become-leader state change after marking its partition as leader with correlation id %d from " + "controller %d epoch %d for partition %s since it is already the leader for the partition.") .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition)) } } } catch { case e: Throwable => partitionState.keys.foreach { partition => val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d" + " epoch %d for partition %s").format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition) stateChangeLogger.error(errorMsg, e) } // Re-throw the exception for it to be caught in KafkaApis throw e }
          partitionsToMakeLeaders }

          partition.makeLeader(controllerId, partitionStateInfo, correlationId)會(huì)進(jìn)行元信息的處理,并更新hw,此方法會(huì)調(diào)用maybeIncrementLeaderHW函數(shù),該函數(shù)會(huì)嘗試追趕hw:如果其他副本落后leader不太遠(yuǎn),并且比之前的hw大,會(huì)延緩hw增長速度,盡可能讓其他副本進(jìn)隊(duì)。

          def makeLeader(controllerId: Int, partitionStateInfo: PartitionState, correlationId: Int): Boolean = {    val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {      val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)      // record the epoch of the controller that made the leadership decision. This is useful while updating the isr      // to maintain the decision maker controller's epoch in the zookeeper path      controllerEpoch = partitionStateInfo.controllerEpoch      // add replicas that are new      // 構(gòu)造新ISR      allReplicas.foreach(replica => getOrCreateReplica(replica))      val newInSyncReplicas = partitionStateInfo.isr.asScala.map(r => getOrCreateReplica(r)).toSet      // remove assigned replicas that have been removed by the controller      // 移除所有不在新ISR中的副本      (assignedReplicas.map(_.brokerId) -- allReplicas).foreach(removeReplica)      inSyncReplicas = newInSyncReplicas      leaderEpoch = partitionStateInfo.leaderEpoch      zkVersion = partitionStateInfo.zkVersion      //是否第一次成為該partition的leader      val isNewLeader =        if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == localBrokerId) {          false        } else {          leaderReplicaIdOpt = Some(localBrokerId)          true        }      val leaderReplica = getReplica().get      val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset      val curTimeMs = time.milliseconds      // initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and lastFetchLeaderLogEndOffset.      //新leader初始化      (assignedReplicas - leaderReplica).foreach { replica =>        val lastCaughtUpTimeMs = if (inSyncReplicas.contains(replica)) curTimeMs else 0L        replica.resetLastCaughtUpTime(curLeaderLogEndOffset, curTimeMs, lastCaughtUpTimeMs)      }      // we may need to increment high watermark since ISR could be down to 1      if (isNewLeader) {        // construct the high watermark metadata for the new leader replica        leaderReplica.convertHWToLocalOffsetMetadata()        // reset log end offset for remote replicas        assignedReplicas.filter(_.brokerId != localBrokerId).foreach(_.updateLogReadResult(LogReadResult.UnknownLogReadResult))      }      //  嘗試追趕hw,如果其他副本落后leader不太遠(yuǎn),并且比之前的hw大,會(huì)延緩hw增長速度,盡可能讓其他副本進(jìn)隊(duì)      (maybeIncrementLeaderHW(leaderReplica), isNewLeader)    }    // some delayed operations may be unblocked after HW changed    //  hw更新后會(huì)處理一些request    if (leaderHWIncremented)      tryCompleteDelayedRequests()    isNewLeader  }

          makeFollowers

          處理新增的follower partition

          1. 從leaderpartition集合中移除這些partition

          2. 標(biāo)記為follower,阻止producer請求

          3. 移除Fetcher線程

          4. 根據(jù)hw truncate這些partition的本地日志

          5. 清理producer和fetch請求

          6. 如果沒有宕機(jī),從新的leader fetch數(shù)據(jù)

          private def makeFollowers(controllerId: Int,                          epoch: Int,                          partitionState: Map[Partition, PartitionState],                          correlationId: Int,                          responseMap: mutable.Map[TopicPartition, Short],                          metadataCache: MetadataCache) : Set[Partition] = {    partitionState.keys.foreach { partition =>        stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " +                                 "starting the become-follower transition for partition %s")                                .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))    }
          // 構(gòu)造becomeLeaderOrFollower需要的返回結(jié)果 for (partition <- partitionState.keys) responseMap.put(partition.topicPartition, Errors.NONE.code)
          val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()
          try {
          // TODO: Delete leaders from LeaderAndIsrRequest partitionState.foreach{ case (partition, partitionStateInfo) => val newLeaderBrokerId = partitionStateInfo.leader metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match { // Only change partition state when the leader is available case Some(_) => // 構(gòu)造返回結(jié)果 if (partition.makeFollower(controllerId, partitionStateInfo, correlationId)) partitionsToMakeFollower += partition else stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from " + "controller %d epoch %d for partition %s since the new leader %d is the same as the old leader") .format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch, partition.topicPartition, newLeaderBrokerId)) case None => // The leader broker should always be present in the metadata cache. // If not, we should record the error message and abort the transition process for this partition stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest with correlation id %d from controller" + " %d epoch %d for partition %s but cannot become follower since the new leader %d is unavailable.") .format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch, partition.topicPartition, newLeaderBrokerId)) // Create the local replica even if the leader is unavailable. This is required to ensure that we include // the partition's high watermark in the checkpoint file (see KAFKA-1647) partition.getOrCreateReplica() } }//移除Fetcher線程 replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition)) //根據(jù)新hw進(jìn)行truncate logManager.truncateTo(partitionsToMakeFollower.map { partition => (partition.topicPartition, partition.getOrCreateReplica().highWatermark.messageOffset) }.toMap) //hw更新,嘗試處理請求 partitionsToMakeFollower.foreach { partition => val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topicPartition) tryCompleteDelayedProduce(topicPartitionOperationKey) tryCompleteDelayedFetch(topicPartitionOperationKey) }
          if (isShuttingDown.get()) { partitionsToMakeFollower.foreach { partition => stateChangeLogger.trace(("Broker %d skipped the adding-fetcher step of the become-follower state change with correlation id %d from " + "controller %d epoch %d for partition %s since it is shutting down").format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition)) } } else { // we do not need to check if the leader exists again since this has been done at the beginning of this process // 重置fetch位置,加入Fetcher val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition => partition.topicPartition -> BrokerAndInitialOffset( metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerListenerName), partition.getReplica().get.logEndOffset.messageOffset)).toMap replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset) } } catch { case e: Throwable => val errorMsg = ("Error on broker %d while processing LeaderAndIsr request with correlationId %d received from controller %d " + "epoch %d").format(localBrokerId, correlationId, controllerId, epoch) stateChangeLogger.error(errorMsg, e) // Re-throw the exception for it to be caught in KafkaApis throw e }
          partitionsToMakeFollower}


          原文鏈接:https://fxbing.github.io/2021/06/05/kafka源碼學(xué)習(xí):KafkaApis-LEADER-AND-ISR/
          本文源碼基于kafka 0.10.2版本

          點(diǎn)擊左下角閱讀原文,到 SegmentFault 思否社區(qū) 和文章作者展開更多互動(dòng)和交流,掃描下方”二維碼“或在“公眾號(hào)后臺(tái)回復(fù)“ 入群 ”即可加入我們的技術(shù)交流群,收獲更多的技術(shù)文章~

          - END -

          瀏覽 46
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  一区二区三区四区免费看 | 黄色免费一级在线视频 | 91白丝在线 | 黄色电影在线看片 | 国产做爰视频免费播放 |