<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Kafka 事務(wù)的實(shí)現(xiàn)原理

          共 23812字,需瀏覽 48分鐘

           ·

          2021-06-27 14:22


          -     前言     - 


          Kafka 事務(wù)在流處理中應(yīng)用很廣泛,比如原子性的讀取消息,立即處理和發(fā)送,如果中途出現(xiàn)錯(cuò)誤,支持回滾操作。這篇文章來(lái)講講事務(wù)是如何實(shí)現(xiàn)的,首先來(lái)看看事務(wù)流程圖。

          -     事務(wù)流程     - 


          Kafka的整個(gè)事務(wù)處理流程如下圖:


          上圖中的 Transaction Coordinator 運(yùn)行在 Kafka 服務(wù)端,下面簡(jiǎn)稱 TC 服務(wù)。
          __transaction_state 是 TC 服務(wù)持久化事務(wù)信息的 topic 名稱,下面簡(jiǎn)稱事務(wù) topic。

          Producer 向 TC 服務(wù)發(fā)送的 commit 消息,下面簡(jiǎn)稱事務(wù)提交消息。

          TC 服務(wù)向分區(qū)發(fā)送的消息,下面簡(jiǎn)稱事務(wù)結(jié)果消息。

          -     尋找 TC 服務(wù)地址     - 


          Producer 會(huì)首先從 Kafka 集群中選擇任意一臺(tái)機(jī)器,然后向其發(fā)送請(qǐng)求,獲取 TC 服務(wù)的地址。Kafka 有個(gè)特殊的事務(wù) topic,名稱為_(kāi)_transaction_state ,負(fù)責(zé)持久化事務(wù)消息。這個(gè) topic 有多個(gè)分區(qū),默認(rèn)有50個(gè),每個(gè)分區(qū)負(fù)責(zé)一部分事務(wù)。事務(wù)劃分是根據(jù) transaction id, 計(jì)算出該事務(wù)屬于哪個(gè)分區(qū)。這個(gè)分區(qū)的 leader 所在的機(jī)器,負(fù)責(zé)這個(gè)事務(wù)的TC 服務(wù)地址。

          -     事務(wù)初始化     - 


          Producer 在使用事務(wù)功能,必須先自定義一個(gè)唯一的 transaction id。有了 transaction id,即使客戶端掛掉了,它重啟后也能繼續(xù)處理未完成的事務(wù)。

          Kafka 實(shí)現(xiàn)事務(wù)需要依靠?jī)绲刃裕鴥绲刃孕枰付?producer id 。所以Producer在啟動(dòng)事務(wù)之前,需要向 TC 服務(wù)申請(qǐng) producer id。TC 服務(wù)在分配 producer id 后,會(huì)將它持久化到事務(wù) topic。

          -     發(fā)送消息     - 


          Producer 在接收到 producer id 后,就可以正常的發(fā)送消息了。不過(guò)發(fā)送消息之前,需要先將這些消息的分區(qū)地址,上傳到 TC 服務(wù)。TC 服務(wù)會(huì)將這些分區(qū)地址持久化到事務(wù) topic。然后 Producer 才會(huì)真正的發(fā)送消息,這些消息與普通消息不同,它們會(huì)有一個(gè)字段,表示自身是事務(wù)消息。

          這里需要注意下一種特殊的請(qǐng)求,提交消費(fèi)位置請(qǐng)求,用于原子性的從某個(gè) topic 讀取消息,并且發(fā)送消息到另外一個(gè) topic。我們知道一般是消費(fèi)者使用消費(fèi)組訂閱 topic,才會(huì)發(fā)送提交消費(fèi)位置的請(qǐng)求,而這里是由 Producer 發(fā)送的。

          Producer 首先會(huì)發(fā)送一條請(qǐng)求,里面會(huì)包含這個(gè)消費(fèi)組對(duì)應(yīng)的分區(qū)(每個(gè)消費(fèi)組的消費(fèi)位置都保存在 __consumer_offset topic 的一個(gè)分區(qū)里),TC 服務(wù)會(huì)將分區(qū)持久化之后,發(fā)送響應(yīng)。Producer 收到響應(yīng)后,就會(huì)直接發(fā)送消費(fèi)位置請(qǐng)求給 GroupCoordinator。

          -     發(fā)送提交請(qǐng)求     - 


          Producer 發(fā)送完消息后,如果認(rèn)為該事務(wù)可以提交了,就會(huì)發(fā)送提交請(qǐng)求到 TC 服務(wù)。Producer 的工作至此就完成了,接下來(lái)它只需要等待響應(yīng)。這里需要強(qiáng)調(diào)下,Producer 會(huì)在發(fā)送事務(wù)提交請(qǐng)求之前,會(huì)等待之前所有的請(qǐng)求都已經(jīng)發(fā)送并且響應(yīng)成功。

          提交請(qǐng)求持久化

          TC 服務(wù)收到事務(wù)提交請(qǐng)求后,會(huì)先將提交信息先持久化到事務(wù) topic 。持久化成功后,服務(wù)端就立即發(fā)送成功響應(yīng)給 Producer。然后找到該事務(wù)涉及到的所有分區(qū),為每 個(gè)分區(qū)生成提交請(qǐng)求,存到隊(duì)列里等待發(fā)送。

          讀者可能有所疑問(wèn),在一般的二階段提交中,協(xié)調(diào)者需要收到所有參與者的響應(yīng)后,才能判斷此事務(wù)是否成功,最后才將結(jié)果返回給客戶。那如果 TC 服務(wù)在發(fā)送響應(yīng)給 Producer 后,還沒(méi)來(lái)及向分區(qū)發(fā)送請(qǐng)求就掛掉了,那么 Kafka 是如何保證事務(wù)完成。因?yàn)槊看问聞?wù)的信息都會(huì)持久化,所以 TC 服務(wù)掛掉重新啟動(dòng)后,會(huì)先從 事務(wù) topic 加載事務(wù)信息,如果發(fā)現(xiàn)只有事務(wù)提交信息,卻沒(méi)有后來(lái)的事務(wù)完成信息,說(shuō)明存在事務(wù)結(jié)果信息沒(méi)有提交到分區(qū)。

          發(fā)送事務(wù)結(jié)果信息給分區(qū)

          后臺(tái)線程會(huì)不停的從隊(duì)列里,拉取請(qǐng)求并且發(fā)送到分區(qū)。當(dāng)一個(gè)分區(qū)收到事務(wù)結(jié)果消息后,會(huì)將結(jié)果保存到分區(qū)里,并且返回成功響應(yīng)到 TC服務(wù)。當(dāng) TC 服務(wù)收到所有分區(qū)的成功響應(yīng)后,會(huì)持久化一條事務(wù)完成的消息到事務(wù) topic。至此,一個(gè)完整的事務(wù)流程就完成了。

          -     客戶端原理     - 


          使用示例:


          下面代碼實(shí)現(xiàn),消費(fèi)者讀取消息,并且發(fā)送到多個(gè)分區(qū)的事務(wù):
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          28
          29
          // 創(chuàng)建 Producer 實(shí)例,并且指定 transaction id
          KafkaProducer producer = createKafkaProducer(
          “bootstrap.servers”, “l(fā)ocalhost:9092”,
          “transactional.id”, “my-transactional-id”);

          // 初始化事務(wù),這里會(huì)向 TC 服務(wù)申請(qǐng) producer id
          producer.initTransactions();

          // 創(chuàng)建 Consumer 實(shí)例,并且訂閱 topic
          KafkaConsumer consumer = createKafkaConsumer(
          “bootstrap.servers”, “l(fā)ocalhost:9092”,
          “group.id”, “my-group-id”,
          "isolation.level", "read_committed");
          consumer.subscribe(singleton(“inputTopic”));

          while (true) {
          ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
          // 開(kāi)始新的事務(wù)
          producer.beginTransaction();
          for (ConsumerRecord record : records) {
          // 發(fā)送消息到分區(qū)
          producer.send(producerRecord(“outputTopic_1”, record));
          producer.send(producerRecord(“outputTopic_2”, record));
          }
          // 提交 offset
          producer.sendOffsetsToTransaction(currentOffsets(consumer), "my-group-id");
          // 提交事務(wù)
          producer.commitTransaction();
          }

          -     運(yùn)行原理     - 


          上面的例子使用了 Producer的接口實(shí)現(xiàn)了事務(wù),但負(fù)責(zé)與 TC 服務(wù)通信的是 TransactionManager 類。

          TransactionManager 類會(huì)發(fā)送申請(qǐng)分配 producer id 請(qǐng)求,上傳消息分區(qū)請(qǐng)求和事務(wù)提交請(qǐng)求,在完成每一步請(qǐng)求,TransactionManager 都會(huì)更新自身的狀態(tài)。


          狀態(tài)
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          private enum State {
          UNINITIALIZED,
          INITIALIZING,
          READY,
          IN_TRANSACTION,
          COMMITTING_TRANSACTION,
          ABORTING_TRANSACTION,
          ABORTABLE_ERROR,
          FATAL_ERROR;
          }



          這里還有兩個(gè)狀態(tài)沒(méi)有列出來(lái) ABORTABLE_ERROR或FATAL_ERROR,這是當(dāng)請(qǐng)求出錯(cuò)后,狀態(tài)就會(huì)變?yōu)樗鼈儭?/span>

          -     服務(wù)端原理     - 


          TC 服務(wù)會(huì)為每個(gè) transaction id 都維護(hù)了元數(shù)據(jù),元數(shù)據(jù)的字段如下:
          1
          2
          3
          4
          5
          6
          7
          8
          9
          class TransactionMetadata(
          val transactionalId: String, // 事務(wù) id
          var producerId: Long, // pruducer id
          var producerEpoch: Short, // producer epoch
          var txnTimeoutMs: Int, // 事務(wù)超時(shí)時(shí)間
          var state: TransactionState, // 事務(wù)當(dāng)前狀態(tài)
          val topicPartitions: mutable.Set[TopicPartition], // 該事務(wù)涉及到的分區(qū)列表
          @volatile var txnStartTimestamp: Long = -1, // 事務(wù)開(kāi)始的時(shí)間
          @volatile var txnLastUpdateTimestamp: Long) // 事務(wù)的更新時(shí)間


          對(duì)于服務(wù)端,每個(gè)事務(wù)也有對(duì)應(yīng)的狀態(tài)


          當(dāng) TC 服務(wù)接收到了來(lái)自客戶端的分區(qū)上傳請(qǐng)求,此時(shí)它才會(huì)認(rèn)為此次事務(wù)開(kāi)始了,然后它會(huì)更新分區(qū)列表,更新此次的事務(wù)開(kāi)始時(shí)間為當(dāng)前時(shí)間,并且會(huì)將更新后的元數(shù)據(jù),持久化到事務(wù) topic。最后將自身狀態(tài)改為 Ongoing。

          當(dāng)TC 服務(wù)收到事務(wù)提交請(qǐng)求或者事務(wù)回滾請(qǐng)求,更新元數(shù)據(jù),持久化到事務(wù) topic,然后自身狀態(tài)改為CompleteCommit 或CompleteAbort 。然后向涉及到該事務(wù)的分區(qū)發(fā)送事務(wù)結(jié)果消息,等待所有的分區(qū)都成功返回響應(yīng)后,就會(huì)持久化一條事務(wù)成功的消息到消息 topic。

          -     高可用分析     - 


          TC 服務(wù)

          通過(guò)上述對(duì) Kafka 事務(wù)的簡(jiǎn)述,可以看到 TC 服務(wù)起著很重要的作用。事實(shí)上 Kafka 集群中運(yùn)行著多個(gè) TC 服務(wù),每個(gè)TC 服務(wù)負(fù)責(zé)事務(wù) topic 的一個(gè)分區(qū)讀寫,也就是這個(gè)分區(qū)的 leader。Producer 根據(jù) transaction id 的哈希值,來(lái)決定該事務(wù)屬于事務(wù) topic 的哪個(gè)分區(qū),最后找到這個(gè)分區(qū)的 leader 位置。

          既然 TC 服務(wù)負(fù)責(zé)事務(wù) topic 的一個(gè)分區(qū) leader,我們知道當(dāng)一個(gè)分區(qū)的 leader掛掉之后,Kafka 會(huì)保證這個(gè)的分區(qū)的 follower 會(huì)轉(zhuǎn)換為 leader 角色,會(huì)繼續(xù)對(duì)外提供服務(wù)。這么 TC 服務(wù)的高可用就達(dá)到了。

          消息持久化

          TC 服務(wù)為了支持重啟后,也能恢復(fù)到之前的狀態(tài),所以它將每次重要的消息都會(huì)持久化起來(lái),并且保存到事務(wù) topic 的時(shí)候,指定 leader 分區(qū)和 follower 分區(qū)必須都存儲(chǔ)成功。這樣每次 TC 服務(wù)啟動(dòng)的時(shí)候,都會(huì)從事務(wù) topic 讀取之前的狀態(tài),加載到緩存里。

          比如當(dāng)TC 服務(wù)在響應(yīng)客戶端的事務(wù)提交請(qǐng)求后,還沒(méi)來(lái)得及向各分區(qū)發(fā)送事務(wù)結(jié)果請(qǐng)求,就已經(jīng)掛掉了。之后 TC 服務(wù)重啟,會(huì)去事務(wù) topic 加載數(shù)據(jù),它發(fā)現(xiàn)事務(wù)的最后狀態(tài)為 PrepareCommit,并且事務(wù)數(shù)據(jù)還包括了分區(qū)列表,這樣 TC 服務(wù)會(huì)繼續(xù)未完成的事務(wù),會(huì)向列表中的各個(gè)分區(qū)發(fā)送事務(wù)結(jié)果請(qǐng)求。

          超時(shí)處理

          如果 Producer 發(fā)起了一個(gè)事務(wù),但是由于網(wǎng)絡(luò)問(wèn)題,TC 服務(wù)遲遲沒(méi)有接下來(lái)的請(qǐng)求,那么該事務(wù)就會(huì)被認(rèn)為超時(shí)。TC 服務(wù)會(huì)有個(gè)線程,會(huì)定期檢查處理 Ongoing 狀態(tài)的事務(wù),如果該事務(wù)的開(kāi)始時(shí)間和當(dāng)前時(shí)間的差,超過(guò)了指定的超時(shí)時(shí)間(在發(fā)送申請(qǐng)producer id請(qǐng)求時(shí)可以指定),那么 TC 服務(wù)就會(huì)回滾該事務(wù),更新和持久化事務(wù)的狀態(tài),并且發(fā)送事務(wù)回滾結(jié)果給分區(qū)。

          源碼分析

          如果對(duì)源碼還有興趣的讀者,可以繼續(xù)閱讀這部分。這里會(huì)大概的講解下代碼結(jié)構(gòu),讀者如果想進(jìn)一步的理解,可以參看源碼。整個(gè)事務(wù)的源碼分為兩部分,客戶端和服務(wù)端。

          -     客戶端     - 


          事務(wù)的客戶端,只能是 Producer。下面首先介紹下 Producer 與事務(wù)相關(guān)的接口。
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          public interface Producer<K, V> extends Closeable {
          // 初始化事務(wù),包括申請(qǐng) producer id
          void initTransactions();
          // 開(kāi)始事務(wù),這里會(huì)更改事務(wù)的本地狀態(tài)
          void beginTransaction() throws ProducerFencedException;
          // 提交消費(fèi)位置, offsets表示每個(gè)分區(qū)的消費(fèi)位置, consumerGroupId表示消費(fèi)組的名稱
          void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
          String consumerGroupId) throws ProducerFencedException;
          // 發(fā)送事務(wù)提交請(qǐng)求
          void commitTransaction() throws ProducerFencedException;
          // 發(fā)送事務(wù)回滾請(qǐng)求
          void abortTransaction() throws ProducerFencedException;
          }


          KafkaProducer 類實(shí)現(xiàn)了 Producer 接口,比較簡(jiǎn)單,只是調(diào)用了 TransactionCoordinator 類的方法。客戶端事務(wù)處理的核心代碼,都是在 TransactionCoordinator 類里。


          TransactionCoordinator 發(fā)送的請(qǐng)求類,都有一個(gè)對(duì)應(yīng)的類來(lái)處理響應(yīng)。這些處理類都是繼承 TxnRequestHandler 類,它封裝了共同的錯(cuò)誤處理,比如連接斷開(kāi),api 版本不兼容等。子類需要實(shí)現(xiàn) handleResponse 方法,負(fù)責(zé)處理具體的響應(yīng)內(nèi)容。

          initializeTransactions 方法負(fù)責(zé)事務(wù)初始化,它會(huì)發(fā)送 InitProducerIdRequest 請(qǐng)求。
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          public synchronized TransactionalRequestResult initializeTransactions() {
          // 檢查transaction id是否已經(jīng)設(shè)置
          ensureTransactional();
          // 更改自身狀態(tài)為INITIALIZING
          transitionTo(State.INITIALIZING);
          // 將producer id和epoch都設(shè)為空
          setProducerIdAndEpoch(ProducerIdAndEpoch.NONE);
          // nextSequence在消息發(fā)送中會(huì)用到,因?yàn)榘l(fā)送事務(wù)消息要求冪等性,而發(fā)送冪等性的消息是需要設(shè)置sequence的
          this.nextSequence.clear();
          // 構(gòu)建申請(qǐng)produce id請(qǐng)求
          InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(transactionalId, transactionTimeoutMs);
          // InitProducerIdHandler 負(fù)責(zé)處理響應(yīng)
          InitProducerIdHandler handler = new InitProducerIdHandler(builder);
          // 將消息保存到隊(duì)列中,等待Sender線程(Producer會(huì)有個(gè)后臺(tái)線程發(fā)送消息)發(fā)送
          enqueueRequest(handler);
          // 返回異步結(jié)果
          return handler.result;
          }


          InitProducerIdHandler 類的定義如下:
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          28
          29
          30
          31
          32
          private class InitProducerIdHandler extends TxnRequestHandler {

          @Override
          public void handleResponse(AbstractResponse response) {
          InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) response;
          // 檢查錯(cuò)誤
          Errors error = initProducerIdResponse.error();
          if (error == Errors.NONE) {
          ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(initProducerIdResponse.producerId(), initProducerIdResponse.epoch());
          // 保存結(jié)果 producer id和epoch
          setProducerIdAndEpoch(producerIdAndEpoch);
          // 更改狀態(tài)為READY
          transitionTo(State.READY);
          lastError = null;
          // 通知異步結(jié)果已完成
          result.done();
          } else if (error == Errors.NOT_COORDINATOR || error == Errors.COORDINATOR_NOT_AVAILABLE) {
          // 如果TC服務(wù)沒(méi)有找到或者剛好掛掉,那么生成FindCoordinatorRequest請(qǐng)求,等待發(fā)送
          lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
          // 并且將自身請(qǐng)求也放入隊(duì)列,等待發(fā)送
          reenqueue();
          } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
          // 如果TC服務(wù)正在啟動(dòng)中,那么加入隊(duì)列,等待發(fā)送
          reenqueue();
          } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
          // 如果發(fā)生權(quán)限問(wèn)題,那么認(rèn)為進(jìn)入錯(cuò)誤狀態(tài)
          fatalError(error.exception());
          } else {
          fatalError(new KafkaException("Unexpected error in InitProducerIdResponse; " + error.message()));
          }
          }
          }


          beginTransaction 方法負(fù)責(zé)開(kāi)始新事務(wù),它只是更改自身狀態(tài)為 IN_TRANSACTION,并不會(huì)發(fā)送任何請(qǐng)求。
          1
          2
          3
          4
          5
          6
          7
          8
          public synchronized void beginTransaction() {
          // 檢查 transaction id
          ensureTransactional();
          // 檢查之前響應(yīng)是否出錯(cuò)
          maybeFailWithError();
          // 更改狀態(tài)
          transitionTo(State.IN_TRANSACTION);
          }


          我們知道Producer發(fā)送消息,都是先將消息發(fā)送到緩存隊(duì)列里,最后是由Sender線程發(fā)送出去 。Producer 如果開(kāi)啟了事務(wù), 它在發(fā)送消息到緩存之前,會(huì)將消息所在的分區(qū)保存在 TransactionCoordinator 里。然后Sender線程在發(fā)送消息之前,會(huì)去從 TransactionCoordinator 檢查是否需要上次分區(qū)到 TC 服務(wù),如果有就先上次分區(qū),隨后才發(fā)送消息。
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          public class KafkaProducer<K, V> implements Producer<K, V> {
          private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
          .......
          int partition = partition(record, serializedKey, serializedValue, cluster);
          tp = new TopicPartition(record.topic(), partition);
          ......
          if (transactionManager != null && transactionManager.isTransactional())
          // 如果開(kāi)啟了事務(wù),那么就先將分區(qū)保存在 transactionManager
          transactionManager.maybeAddPartitionToTransaction(tp);
          ......
          }
          }


          TransactionManager 提供了 maybeAddPartitionToTransaction 方法添加分區(qū)。
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          28
          29
          30
          public class TransactionManager {
          // 新增的上傳分區(qū)集合
          private final Set<TopicPartition> newPartitionsInTransaction;
          // 本次事務(wù)已經(jīng)上傳的分區(qū)集合
          private final Set<TopicPartition> partitionsInTransaction;
          // 本次事務(wù)涉及到的所有分區(qū)集合
          private final Set<TopicPartition> pendingPartitionsInTransaction;

          public synchronized void maybeAddPartitionToTransaction(TopicPartition topicPartition) {
          // 檢查事務(wù)狀態(tài)必須為IN_TRANSACTION
          failIfNotReadyForSend();
          // 如果已經(jīng)上傳過(guò)這個(gè)分區(qū),或者正在上傳這個(gè)分區(qū),那么直接返回
          if (isPartitionAdded(topicPartition) || isPartitionPendingAdd(topicPartition))
          return;

          log.debug("Begin adding new partition {} to transaction", topicPartition);
          // 添加到需要上次的集合
          newPartitionsInTransaction.add(topicPartition);
          }

          // 檢查是否這個(gè)分區(qū)已經(jīng)上傳過(guò)了
          synchronized boolean isPartitionAdded(TopicPartition partition) {
          return partitionsInTransaction.contains(partition);
          }

          // 檢查是否這個(gè)分區(qū)正在上傳中
          synchronized boolean isPartitionPendingAdd(TopicPartition partition) {
          return newPartitionsInTransaction.contains(partition) || pendingPartitionsInTransaction.contains(partition);
          }
          }

          TransactionManager 的 addPartitionsToTransactionHandler 方法,會(huì)生成分區(qū)上傳請(qǐng)求,然后由Sender發(fā)送。

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          public class TransactionManager {

          private synchronized TxnRequestHandler addPartitionsToTransactionHandler() {
          // 將新增的分區(qū),添加到 pendingPartitionsInTransaction 集合
          pendingPartitionsInTransaction.addAll(newPartitionsInTransaction);
          // 清空新增的分區(qū)集合
          newPartitionsInTransaction.clear();
          // 構(gòu)建 AddPartitionsToTxnRequest 請(qǐng)求
          AddPartitionsToTxnRequest.Builder builder = new AddPartitionsToTxnRequest.Builder(transactionalId,
          producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, new ArrayList<>(pendingPartitionsInTransaction));
          return new AddPartitionsToTxnHandler(builder);
          }
          }


          AddPartitionsToTxnHandler 負(fù)責(zé)處理響應(yīng)。

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          28
          29
          30
          31
          32
          33
          private class AddPartitionsToTxnHandler extends TxnRequestHandler {

          @Override
          public void handleResponse(AbstractResponse response) {
          AddPartitionsToTxnResponse addPartitionsToTxnResponse = (AddPartitionsToTxnResponse) response;
          Map<TopicPartition, Errors> errors = addPartitionsToTxnResponse.errors();
          boolean hasPartitionErrors = false;
          Set<String> unauthorizedTopics = new HashSet<>();
          retryBackoffMs = TransactionManager.this.retryBackoffMs;

          for (Map.Entry<TopicPartition, Errors> topicPartitionErrorEntry : errors.entrySet()) {
          // 檢查每個(gè)分區(qū)的響應(yīng)錯(cuò)誤
          .....
          }

          Set<TopicPartition> partitions = errors.keySet();
          // 因?yàn)檫@些分區(qū)已經(jīng)有響應(yīng)了,即使錯(cuò)誤也需要從集合中刪除
          pendingPartitionsInTransaction.removeAll(partitions);

          if (!unauthorizedTopics.isEmpty()) {
          abortableError(new TopicAuthorizationException(unauthorizedTopics));
          } else if (hasPartitionErrors) {
          abortableError(new KafkaException("Could not add partitions to transaction due to errors: " + errors));
          } else {
          log.debug("Successfully added partitions {} to transaction", partitions);
          // 將這些成功響應(yīng)的分區(qū),添加到 partitionsInTransaction集合
          partitionsInTransaction.addAll(partitions);
          transactionStarted = true;
          // 通知結(jié)果成功
          result.done();
          }
          }
          }


          sendOffsetsToTransaction 方法負(fù)責(zé)發(fā)送消費(fèi)位置提交請(qǐng)求。
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          public class TransactionManager {
          public synchronized TransactionalRequestResult sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) {
          // 保證transaction id
          ensureTransactional();
          // 檢查之前的響應(yīng)錯(cuò)誤
          maybeFailWithError();
          // 只有 IN_TRANSACTION 狀態(tài)才可以發(fā)送這種類型請(qǐng)求
          if (currentState != State.IN_TRANSACTION)
          throw new KafkaException("...");
          // 構(gòu)建請(qǐng)求
          AddOffsetsToTxnRequest.Builder builder = new AddOffsetsToTxnRequest.Builder(transactionalId, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, consumerGroupId);
          // 構(gòu)建處理器
          AddOffsetsToTxnHandler handler = new AddOffsetsToTxnHandler(builder, offsets);
          // 添加到隊(duì)列
          enqueueRequest(handler);
          return handler.result;
          }
          }


          AddOffsetsToTxnHandler 類負(fù)責(zé)處理響應(yīng),它的處理邏輯很簡(jiǎn)單,它收到響應(yīng)后,會(huì)發(fā)送 TxnOffsetCommitRequest 請(qǐng)求給 TC 服務(wù)。


          最后還剩下事務(wù)提交或回滾請(qǐng)求,還沒(méi)講述。Producer 在調(diào)用 commitTransaction 或 abortTransaction 方法,本質(zhì)都是調(diào)用了 TransactionManager 的 beginCompletingTransaction 方法發(fā)送請(qǐng)求。
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          28
          29
          30
          31
          32
          33
          public synchronized TransactionalRequestResult beginCommit() {
          ensureTransactional();
          maybeFailWithError();
          // 更改狀態(tài)為 COMMITTING_TRANSACTION
          transitionTo(State.COMMITTING_TRANSACTION);
          // 調(diào)用 beginCompletingTransaction 方法發(fā)送請(qǐng)求
          return beginCompletingTransaction(TransactionResult.COMMIT);
          }

          public synchronized TransactionalRequestResult beginAbort() {
          ensureTransactional();
          // 更改狀態(tài)為 ABORTABLE_ERROR
          if (currentState != State.ABORTABLE_ERROR)
          maybeFailWithError();
          transitionTo(State.ABORTING_TRANSACTION);

          // 清空分區(qū)集合
          newPartitionsInTransaction.clear();
          // 發(fā)送請(qǐng)求
          return beginCompletingTransaction(TransactionResult.ABORT);
          }

          private TransactionalRequestResult beginCompletingTransaction(TransactionResult transactionResult) {
          // 如果還有分區(qū)沒(méi)有上傳,那么生成請(qǐng)求放進(jìn)隊(duì)列
          if (!newPartitionsInTransaction.isEmpty())
          enqueueRezquest(addPartitionsToTransactionHandler());
          // 構(gòu)建請(qǐng)求
          EndTxnRequest.Builder builder = new EndTxnRequest.Builder(transactionalId, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, transactionResult);
          // 構(gòu)建處理器
          EndTxnHandler handler = new EndTxnHandler(builder);
          enqueueRequest(handler);
          return handler.result;
          }


          EndTxnHandler 負(fù)責(zé)處理事務(wù)提交或回滾響應(yīng),EndTxnHandler的處理邏輯比較簡(jiǎn)單,它只是調(diào)用了 completeTransaction 方法。

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10

          private synchronized void completeTransaction() {
          // 更改狀態(tài)為READY
          transitionTo(State.READY);
          lastError = null;
          transactionStarted = false;
          // 清空分區(qū)集合
          newPartitionsInTransaction.clear();
          pendingPartitionsInTransaction.clear();
          partitionsInTransaction.clear();
          }


          -     服務(wù)端     - 


          服務(wù)端的結(jié)構(gòu)會(huì)相對(duì)復(fù)雜一些,這里盡量簡(jiǎn)單的講講大概邏輯。首先介紹下 TransactionStateManager 類,它負(fù)責(zé)管理事務(wù)的元數(shù)據(jù),它也提供持久化事務(wù)的元數(shù)據(jù),和從事務(wù) topic 加載數(shù)據(jù)的功能。
          1
          2
          3
          4
          5
          6
          7
          8
          9
          class TransactionStateManager(...) {
          // key值為 partition id,value為 TxnMetadataCacheEntry對(duì)象
          private val transactionMetadataCache: mutable.Map[Int, TxnMetadataCacheEntry] = mutable.Map()
          }

          // metadataPerTransactionalId 參數(shù)是Pool類型,可以看成是Map
          // key值為transaction id, value為元數(shù)據(jù)
          private[transaction] case class TxnMetadataCacheEntry(coordinatorEpoch: Int,
          metadataPerTransactionalId: Pool[String, TransactionMetadata])


          TransactionStateManager 提供了 appendTransactionToLog 方法用于持久化。
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          28
          29
          30
          31
          32
          33
          34
          35
          36
          37
          38
          class TransactionStateManager {

          def appendTransactionToLog(transactionalId: String,
          coordinatorEpoch: Int,
          newMetadata: TxnTransitMetadata, // 新的元數(shù)據(jù)
          responseCallback: Errors => Unit, // 回調(diào)函數(shù)
          retryOnError: Errors => Boolean = _ => false): Unit = {
          // 生成record的key
          val keyBytes = TransactionLog.keyToBytes(transactionalId)
          // 生成record的calue
          val valueBytes = TransactionLog.valueToBytes(newMetadata)
          // 生成record
          val records = MemoryRecords.withRecords(TransactionLog.EnforcedCompressionType, new SimpleRecord(timestamp, keyBytes, valueBytes))
          val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionFor(transactionalId))
          val recordsPerPartition = Map(topicPartition -> records)

          // 當(dāng)持久化完成后,會(huì)調(diào)用這個(gè)函數(shù),更新transactionMetadataCache集合的元數(shù)據(jù)
          def updateCacheCallback(responseStatus: collection.Map[TopicPartition, PartitionResponse]): Unit = {
          // 檢查持久化是否出錯(cuò)
          .....
          // 更改元數(shù)據(jù)
          metadata.completeTransitionTo(newMetadata)
          // 執(zhí)行回調(diào)函數(shù)
          responseCallback(responseError)
          }

          ......

          // 持久化recod到topic里
          replicaManager.appendRecords(
          newMetadata.txnTimeoutMs.toLong,
          TransactionLog.EnforcedRequiredAcks,
          internalTopicsAllowed = true,
          isFromClient = false,
          recordsPerPartition,
          updateCacheCallback, // 持久化完成后,會(huì)調(diào)用這個(gè)函數(shù)
          delayedProduceLock = Some(stateLock.readLock))
          }


          TransactionStateManager 提供了 loadTransactionsForTxnTopicPartition 方法用于從消息 topic 恢復(fù)數(shù)據(jù),這里不再詳細(xì)介紹。

          接下來(lái)來(lái)講講 TransactionCoordinator 類,它負(fù)責(zé)處理重要的事務(wù)請(qǐng)求。


          handleInitProducerId 方法會(huì)返回 producer id,如果這個(gè)事務(wù)的 transaction id 第一次請(qǐng)求,那么會(huì)為它分配新的 producer id 。如果之前請(qǐng)求過(guò),就會(huì)返回之前分配的 producer id。

          handleAddPartitionsToTransaction 方法會(huì)將上傳的分區(qū)列表,添加到元數(shù)據(jù)并且持久化。

          handleEndTransaction 方法會(huì)稍微復(fù)雜一些,因?yàn)樗枰獙⑦@個(gè)消息轉(zhuǎn)發(fā)給各個(gè)分區(qū)。
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          28
          29
          30
          31
          32
          33
          34
          35
          36
          37
          38
          39
          40
          41
          42
          43
          44
          45
          46
          47
          48
          49
          50
          51
          52
          53
          54
          55
          56
          57
          58
          59
          60
          def handleEndTransaction(transactionalId: String,
          producerId: Long,
          producerEpoch: Short,
          txnMarkerResult: TransactionResult,
          responseCallback: EndTxnCallback): Unit = {
          // 獲取元數(shù)據(jù),更改狀態(tài)
          val preAppendResult: ApiResult[(Int, TxnTransitMetadata)] = txnManager.getTransactionState(transactionalId).right.flatMap {
          case None =>
          // 如果元數(shù)據(jù)不存在,說(shuō)明存在問(wèn)題
          Left(Errors.INVALID_PRODUCER_ID_MAPPING)
          case Some(epochAndTxnMetadata) =>
          val txnMetadata = epochAndTxnMetadata.transactionMetadata
          val coordinatorEpoch = epochAndTxnMetadata.coordinatorEpoch
          ......
          txnMetadata.state match {
          // 必須是Ongoing狀態(tài),如果是別的狀態(tài),就會(huì)報(bào)錯(cuò)
          case Ongoing =>
          // 根據(jù)發(fā)送的消息類型,查看是事務(wù)提交還是回滾,來(lái)決定接下來(lái)的狀態(tài)
          val nextState = if (txnMarkerResult == TransactionResult.COMMIT)
          PrepareCommit
          else
          PrepareAbort
          // 調(diào)用 prepareAbortOrCommit 來(lái)更新?tīng)顟B(tài)
          Right(coordinatorEpoch, txnMetadata.prepareAbortOrCommit(nextState, time.milliseconds()))
          }
          }

          // 檢查上一步的結(jié)果
          preAppendResult match {
          case Left(err) =>
          responseCallback(err)
          case Right((coordinatorEpoch, newMetadata)) =>

          // 定義回調(diào)函數(shù),用于發(fā)送請(qǐng)求到分區(qū)
          def sendTxnMarkersCallback(error: Errors): Unit = {
          if (error == Errors.NONE) {
          ...... // 檢查狀態(tài),只有是PrepareCommit或PrepareAbort,才能繼續(xù)執(zhí)行
          txnMetadata.state match {
          case PrepareCommit =>
          // 更改狀態(tài)為 CompleteCommit
          txnMetadata.prepareComplete(time.milliseconds())
          case PrepareAbort =>
          // 更改狀態(tài)為 CompleteAbort
          txnMetadata.prepareComplete(time.milliseconds())
          }
          }
          // 檢查更改狀態(tài)的結(jié)果
          preSendResult match {
          case Left(err) =>
          responseCallback(err)
          case Right((txnMetadata, newPreSendMetadata)) =>
          // 向客戶端發(fā)送成功響應(yīng)
          responseCallback(Errors.NONE)
          // 通過(guò)txnMarkerChannelManager發(fā)送請(qǐng)求到分區(qū)
          txnMarkerChannelManager.addTxnMarkersToSend(transactionalId, coordinatorEpoch, txnMarkerResult, txnMetadata, newPreSendMetadata)
          }
          }

          // 持久化元數(shù)據(jù),然后調(diào)用sendTxnMarkersCallback函數(shù),發(fā)送客戶端的響應(yīng)和發(fā)送請(qǐng)求到分區(qū)
          txnManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata, sendTxnMarkersCallback)


          上面發(fā)送請(qǐng)求到分區(qū),調(diào)用了 TransactionMarkerChannelManager 的方法。它會(huì)生成每個(gè)分區(qū)的請(qǐng)求,然后放到一個(gè)隊(duì)列里,后臺(tái)線程會(huì)負(fù)責(zé)將這些請(qǐng)求發(fā)送出去。當(dāng)收到所有分區(qū)的響應(yīng)后,它還負(fù)責(zé)更改事務(wù)的狀態(tài),并且負(fù)責(zé)持久化一條事務(wù)成功的消息。

          這里需要提下延遲任務(wù) DelayedTxnMarker,它負(fù)責(zé)檢查是否收到所有分區(qū)的響應(yīng)。它設(shè)置的延遲時(shí)間達(dá)到365天,所以可以認(rèn)為次任務(wù)幾乎不會(huì)過(guò)期。
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          private[transaction] class DelayedTxnMarker(txnMetadata: TransactionMetadata,
          completionCallback: Errors => Unit,
          lock: Lock)
          extends DelayedOperation(TimeUnit.DAYS.toMillis(100 * 365), Some(lock)) {

          override def tryComplete(): Boolean = {
          txnMetadata.inLock {
          // 當(dāng)每收到一個(gè)分區(qū)的響應(yīng)后,就會(huì)從元數(shù)據(jù)中刪除掉。
          // 直到分區(qū)列表為空,就說(shuō)明所有分區(qū)都已經(jīng)成功響應(yīng)
          if (txnMetadata.topicPartitions.isEmpty)
          forceComplete()
          else false
          }
          }

          override def onExpiration(): Unit = {
          // this should never happen
          throw new IllegalStateException(s"Delayed write txn marker operation for metadata $txnMetadata has timed out, this should never happen.")
          }

          // TODO: if we will always return NONE upon completion, we can remove the error code in the param
          override def onComplete(): Unit = completionCallback(Errors.NONE)
          }


          DelayedTxnMarker 是在 TransactionMarkerChannelManager 的 addTxnMarkersToSend 方法中實(shí)例化的,它的 completionCallback 參數(shù),就是定義在 addTxnMarkersToSend 方法里面。
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          28
          def addTxnMarkersToSend(transactionalId: String,
          coordinatorEpoch: Int,
          txnResult: TransactionResult,
          txnMetadata: TransactionMetadata,
          newMetadata: TxnTransitMetadata): Unit = {
          // 定義延遲任務(wù)的回調(diào)函數(shù)
          def appendToLogCallback(error: Errors): Unit = {
          // 檢查錯(cuò)誤
          error match {
          case Errors.NONE =>
          // 檢查狀態(tài)
          txnStateManager.getTransactionState(transactionalId) match {
          case Right(Some(epochAndMetadata)) =>
          if (epochAndMetadata.coordinatorEpoch == coordinatorEpoch) {
          // 持久化事務(wù)成功消息TxnLogAppend,寫入到事務(wù) topic
          tryAppendToLog(TxnLogAppend(transactionalId, coordinatorEpoch, txnMetadata, newMetadata))
          }
          }
          }
          }

          // 實(shí)例化延遲任務(wù)
          val delayedTxnMarker = new DelayedTxnMarker(txnMetadata, appendToLogCallback, txnStateManager.stateReadLock)
          // 等待執(zhí)行
          txnMarkerPurgatory.tryCompleteElseWatch(delayedTxnMarker, Seq(transactionalId))
          // 將請(qǐng)求放進(jìn)隊(duì)列里
          addTxnMarkersToBrokerQueue(transactionalId, txnMetadata.producerId, txnMetadata.producerEpoch, txnResult, coordinatorEpoch, txnMetadata.topicPartitions.toSet)
          }


          :zhmin

          來(lái)源:

          zhmin.github.io/2019/05/20/kafka-transaction/

          瀏覽 67
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  天天日天天操天天干天天 | 操逼免费小视频 | 99热这里有精品 | 免费操逼网址 | 精品偷拍 |