<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Kafka 事務(wù)的實(shí)現(xiàn)原理

          共 23812字,需瀏覽 48分鐘

           ·

          2021-07-12 18:04

          -     前言     - 


          Kafka 事務(wù)在流處理中應(yīng)用很廣泛,比如原子性的讀取消息,立即處理和發(fā)送,如果中途出現(xiàn)錯誤,支持回滾操作。這篇文章來講講事務(wù)是如何實(shí)現(xiàn)的,首先來看看事務(wù)流程圖。

          -     事務(wù)流程     - 


          Kafka的整個事務(wù)處理流程如下圖:


          上圖中的 Transaction Coordinator 運(yùn)行在 Kafka 服務(wù)端,下面簡稱 TC 服務(wù)。
          __transaction_state 是 TC 服務(wù)持久化事務(wù)信息的 topic 名稱,下面簡稱事務(wù) topic。

          Producer 向 TC 服務(wù)發(fā)送的 commit 消息,下面簡稱事務(wù)提交消息。

          TC 服務(wù)向分區(qū)發(fā)送的消息,下面簡稱事務(wù)結(jié)果消息。

          -     尋找 TC 服務(wù)地址     - 


          Producer 會首先從 Kafka 集群中選擇任意一臺機(jī)器,然后向其發(fā)送請求,獲取 TC 服務(wù)的地址。Kafka 有個特殊的事務(wù) topic,名稱為__transaction_state ,負(fù)責(zé)持久化事務(wù)消息。這個 topic 有多個分區(qū),默認(rèn)有50個,每個分區(qū)負(fù)責(zé)一部分事務(wù)。事務(wù)劃分是根據(jù) transaction id, 計算出該事務(wù)屬于哪個分區(qū)。這個分區(qū)的 leader 所在的機(jī)器,負(fù)責(zé)這個事務(wù)的TC 服務(wù)地址。

          -     事務(wù)初始化     - 


          Producer 在使用事務(wù)功能,必須先自定義一個唯一的 transaction id。有了 transaction id,即使客戶端掛掉了,它重啟后也能繼續(xù)處理未完成的事務(wù)。

          Kafka 實(shí)現(xiàn)事務(wù)需要依靠冪等性,而冪等性需要指定 producer id 。所以Producer在啟動事務(wù)之前,需要向 TC 服務(wù)申請 producer id。TC 服務(wù)在分配 producer id 后,會將它持久化到事務(wù) topic。

          -     發(fā)送消息     - 


          Producer 在接收到 producer id 后,就可以正常的發(fā)送消息了。不過發(fā)送消息之前,需要先將這些消息的分區(qū)地址,上傳到 TC 服務(wù)。TC 服務(wù)會將這些分區(qū)地址持久化到事務(wù) topic。然后 Producer 才會真正的發(fā)送消息,這些消息與普通消息不同,它們會有一個字段,表示自身是事務(wù)消息。

          這里需要注意下一種特殊的請求,提交消費(fèi)位置請求,用于原子性的從某個 topic 讀取消息,并且發(fā)送消息到另外一個 topic。我們知道一般是消費(fèi)者使用消費(fèi)組訂閱 topic,才會發(fā)送提交消費(fèi)位置的請求,而這里是由 Producer 發(fā)送的。

          Producer 首先會發(fā)送一條請求,里面會包含這個消費(fèi)組對應(yīng)的分區(qū)(每個消費(fèi)組的消費(fèi)位置都保存在 __consumer_offset topic 的一個分區(qū)里),TC 服務(wù)會將分區(qū)持久化之后,發(fā)送響應(yīng)。Producer 收到響應(yīng)后,就會直接發(fā)送消費(fèi)位置請求給 GroupCoordinator。

          -     發(fā)送提交請求     - 


          Producer 發(fā)送完消息后,如果認(rèn)為該事務(wù)可以提交了,就會發(fā)送提交請求到 TC 服務(wù)。Producer 的工作至此就完成了,接下來它只需要等待響應(yīng)。這里需要強(qiáng)調(diào)下,Producer 會在發(fā)送事務(wù)提交請求之前,會等待之前所有的請求都已經(jīng)發(fā)送并且響應(yīng)成功。

          提交請求持久化

          TC 服務(wù)收到事務(wù)提交請求后,會先將提交信息先持久化到事務(wù) topic 。持久化成功后,服務(wù)端就立即發(fā)送成功響應(yīng)給 Producer。然后找到該事務(wù)涉及到的所有分區(qū),為每 個分區(qū)生成提交請求,存到隊列里等待發(fā)送。

          讀者可能有所疑問,在一般的二階段提交中,協(xié)調(diào)者需要收到所有參與者的響應(yīng)后,才能判斷此事務(wù)是否成功,最后才將結(jié)果返回給客戶。那如果 TC 服務(wù)在發(fā)送響應(yīng)給 Producer 后,還沒來及向分區(qū)發(fā)送請求就掛掉了,那么 Kafka 是如何保證事務(wù)完成。因?yàn)槊看问聞?wù)的信息都會持久化,所以 TC 服務(wù)掛掉重新啟動后,會先從 事務(wù) topic 加載事務(wù)信息,如果發(fā)現(xiàn)只有事務(wù)提交信息,卻沒有后來的事務(wù)完成信息,說明存在事務(wù)結(jié)果信息沒有提交到分區(qū)。

          發(fā)送事務(wù)結(jié)果信息給分區(qū)

          后臺線程會不停的從隊列里,拉取請求并且發(fā)送到分區(qū)。當(dāng)一個分區(qū)收到事務(wù)結(jié)果消息后,會將結(jié)果保存到分區(qū)里,并且返回成功響應(yīng)到 TC服務(wù)。當(dāng) TC 服務(wù)收到所有分區(qū)的成功響應(yīng)后,會持久化一條事務(wù)完成的消息到事務(wù) topic。至此,一個完整的事務(wù)流程就完成了。

          -     客戶端原理     - 


          使用示例:


          下面代碼實(shí)現(xiàn),消費(fèi)者讀取消息,并且發(fā)送到多個分區(qū)的事務(wù):
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          28
          29
          // 創(chuàng)建 Producer 實(shí)例,并且指定 transaction id
          KafkaProducer producer = createKafkaProducer(
          “bootstrap.servers”, “l(fā)ocalhost:9092”,
          “transactional.id”, “my-transactional-id”);

          // 初始化事務(wù),這里會向 TC 服務(wù)申請 producer id
          producer.initTransactions();

          // 創(chuàng)建 Consumer 實(shí)例,并且訂閱 topic
          KafkaConsumer consumer = createKafkaConsumer(
          “bootstrap.servers”, “l(fā)ocalhost:9092”,
          “group.id”, “my-group-id”,
          "isolation.level", "read_committed");
          consumer.subscribe(singleton(“inputTopic”));

          while (true) {
          ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
          // 開始新的事務(wù)
          producer.beginTransaction();
          for (ConsumerRecord record : records) {
          // 發(fā)送消息到分區(qū)
          producer.send(producerRecord(“outputTopic_1”, record));
          producer.send(producerRecord(“outputTopic_2”, record));
          }
          // 提交 offset
          producer.sendOffsetsToTransaction(currentOffsets(consumer), "my-group-id");
          // 提交事務(wù)
          producer.commitTransaction();
          }

          -     運(yùn)行原理     - 


          上面的例子使用了 Producer的接口實(shí)現(xiàn)了事務(wù),但負(fù)責(zé)與 TC 服務(wù)通信的是 TransactionManager 類。

          TransactionManager 類會發(fā)送申請分配 producer id 請求,上傳消息分區(qū)請求和事務(wù)提交請求,在完成每一步請求,TransactionManager 都會更新自身的狀態(tài)。


          狀態(tài)
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          private enum State {
          UNINITIALIZED,
          INITIALIZING,
          READY,
          IN_TRANSACTION,
          COMMITTING_TRANSACTION,
          ABORTING_TRANSACTION,
          ABORTABLE_ERROR,
          FATAL_ERROR;
          }



          這里還有兩個狀態(tài)沒有列出來 ABORTABLE_ERROR或FATAL_ERROR,這是當(dāng)請求出錯后,狀態(tài)就會變?yōu)樗鼈儭?/span>

          -     服務(wù)端原理     - 


          TC 服務(wù)會為每個 transaction id 都維護(hù)了元數(shù)據(jù),元數(shù)據(jù)的字段如下:
          1
          2
          3
          4
          5
          6
          7
          8
          9
          class TransactionMetadata(
          val transactionalId: String, // 事務(wù) id
          var producerId: Long, // pruducer id
          var producerEpoch: Short, // producer epoch
          var txnTimeoutMs: Int, // 事務(wù)超時時間
          var state: TransactionState, // 事務(wù)當(dāng)前狀態(tài)
          val topicPartitions: mutable.Set[TopicPartition], // 該事務(wù)涉及到的分區(qū)列表
          @volatile var txnStartTimestamp: Long = -1, // 事務(wù)開始的時間
          @volatile var txnLastUpdateTimestamp: Long) // 事務(wù)的更新時間


          對于服務(wù)端,每個事務(wù)也有對應(yīng)的狀態(tài)


          當(dāng) TC 服務(wù)接收到了來自客戶端的分區(qū)上傳請求,此時它才會認(rèn)為此次事務(wù)開始了,然后它會更新分區(qū)列表,更新此次的事務(wù)開始時間為當(dāng)前時間,并且會將更新后的元數(shù)據(jù),持久化到事務(wù) topic。最后將自身狀態(tài)改為 Ongoing。

          當(dāng)TC 服務(wù)收到事務(wù)提交請求或者事務(wù)回滾請求,更新元數(shù)據(jù),持久化到事務(wù) topic,然后自身狀態(tài)改為CompleteCommit 或CompleteAbort 。然后向涉及到該事務(wù)的分區(qū)發(fā)送事務(wù)結(jié)果消息,等待所有的分區(qū)都成功返回響應(yīng)后,就會持久化一條事務(wù)成功的消息到消息 topic。

          -     高可用分析     - 


          TC 服務(wù)

          通過上述對 Kafka 事務(wù)的簡述,可以看到 TC 服務(wù)起著很重要的作用。事實(shí)上 Kafka 集群中運(yùn)行著多個 TC 服務(wù),每個TC 服務(wù)負(fù)責(zé)事務(wù) topic 的一個分區(qū)讀寫,也就是這個分區(qū)的 leader。Producer 根據(jù) transaction id 的哈希值,來決定該事務(wù)屬于事務(wù) topic 的哪個分區(qū),最后找到這個分區(qū)的 leader 位置。

          既然 TC 服務(wù)負(fù)責(zé)事務(wù) topic 的一個分區(qū) leader,我們知道當(dāng)一個分區(qū)的 leader掛掉之后,Kafka 會保證這個的分區(qū)的 follower 會轉(zhuǎn)換為 leader 角色,會繼續(xù)對外提供服務(wù)。這么 TC 服務(wù)的高可用就達(dá)到了。

          消息持久化

          TC 服務(wù)為了支持重啟后,也能恢復(fù)到之前的狀態(tài),所以它將每次重要的消息都會持久化起來,并且保存到事務(wù) topic 的時候,指定 leader 分區(qū)和 follower 分區(qū)必須都存儲成功。這樣每次 TC 服務(wù)啟動的時候,都會從事務(wù) topic 讀取之前的狀態(tài),加載到緩存里。

          比如當(dāng)TC 服務(wù)在響應(yīng)客戶端的事務(wù)提交請求后,還沒來得及向各分區(qū)發(fā)送事務(wù)結(jié)果請求,就已經(jīng)掛掉了。之后 TC 服務(wù)重啟,會去事務(wù) topic 加載數(shù)據(jù),它發(fā)現(xiàn)事務(wù)的最后狀態(tài)為 PrepareCommit,并且事務(wù)數(shù)據(jù)還包括了分區(qū)列表,這樣 TC 服務(wù)會繼續(xù)未完成的事務(wù),會向列表中的各個分區(qū)發(fā)送事務(wù)結(jié)果請求。

          超時處理

          如果 Producer 發(fā)起了一個事務(wù),但是由于網(wǎng)絡(luò)問題,TC 服務(wù)遲遲沒有接下來的請求,那么該事務(wù)就會被認(rèn)為超時。TC 服務(wù)會有個線程,會定期檢查處理 Ongoing 狀態(tài)的事務(wù),如果該事務(wù)的開始時間和當(dāng)前時間的差,超過了指定的超時時間(在發(fā)送申請producer id請求時可以指定),那么 TC 服務(wù)就會回滾該事務(wù),更新和持久化事務(wù)的狀態(tài),并且發(fā)送事務(wù)回滾結(jié)果給分區(qū)。

          源碼分析

          如果對源碼還有興趣的讀者,可以繼續(xù)閱讀這部分。這里會大概的講解下代碼結(jié)構(gòu),讀者如果想進(jìn)一步的理解,可以參看源碼。整個事務(wù)的源碼分為兩部分,客戶端和服務(wù)端。

          -     客戶端     - 


          事務(wù)的客戶端,只能是 Producer。下面首先介紹下 Producer 與事務(wù)相關(guān)的接口。
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          public interface Producer<K, V> extends Closeable {
          // 初始化事務(wù),包括申請 producer id
          void initTransactions();
          // 開始事務(wù),這里會更改事務(wù)的本地狀態(tài)
          void beginTransaction() throws ProducerFencedException;
          // 提交消費(fèi)位置, offsets表示每個分區(qū)的消費(fèi)位置, consumerGroupId表示消費(fèi)組的名稱
          void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
          String consumerGroupId) throws ProducerFencedException;
          // 發(fā)送事務(wù)提交請求
          void commitTransaction() throws ProducerFencedException;
          // 發(fā)送事務(wù)回滾請求
          void abortTransaction() throws ProducerFencedException;
          }


          KafkaProducer 類實(shí)現(xiàn)了 Producer 接口,比較簡單,只是調(diào)用了 TransactionCoordinator 類的方法。客戶端事務(wù)處理的核心代碼,都是在 TransactionCoordinator 類里。


          TransactionCoordinator 發(fā)送的請求類,都有一個對應(yīng)的類來處理響應(yīng)。這些處理類都是繼承 TxnRequestHandler 類,它封裝了共同的錯誤處理,比如連接斷開,api 版本不兼容等。子類需要實(shí)現(xiàn) handleResponse 方法,負(fù)責(zé)處理具體的響應(yīng)內(nèi)容。

          initializeTransactions 方法負(fù)責(zé)事務(wù)初始化,它會發(fā)送 InitProducerIdRequest 請求。
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          public synchronized TransactionalRequestResult initializeTransactions() {
          // 檢查transaction id是否已經(jīng)設(shè)置
          ensureTransactional();
          // 更改自身狀態(tài)為INITIALIZING
          transitionTo(State.INITIALIZING);
          // 將producer id和epoch都設(shè)為空
          setProducerIdAndEpoch(ProducerIdAndEpoch.NONE);
          // nextSequence在消息發(fā)送中會用到,因?yàn)榘l(fā)送事務(wù)消息要求冪等性,而發(fā)送冪等性的消息是需要設(shè)置sequence的
          this.nextSequence.clear();
          // 構(gòu)建申請produce id請求
          InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(transactionalId, transactionTimeoutMs);
          // InitProducerIdHandler 負(fù)責(zé)處理響應(yīng)
          InitProducerIdHandler handler = new InitProducerIdHandler(builder);
          // 將消息保存到隊列中,等待Sender線程(Producer會有個后臺線程發(fā)送消息)發(fā)送
          enqueueRequest(handler);
          // 返回異步結(jié)果
          return handler.result;
          }


          InitProducerIdHandler 類的定義如下:
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          28
          29
          30
          31
          32
          private class InitProducerIdHandler extends TxnRequestHandler {

          @Override
          public void handleResponse(AbstractResponse response) {
          InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) response;
          // 檢查錯誤
          Errors error = initProducerIdResponse.error();
          if (error == Errors.NONE) {
          ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(initProducerIdResponse.producerId(), initProducerIdResponse.epoch());
          // 保存結(jié)果 producer id和epoch
          setProducerIdAndEpoch(producerIdAndEpoch);
          // 更改狀態(tài)為READY
          transitionTo(State.READY);
          lastError = null;
          // 通知異步結(jié)果已完成
          result.done();
          } else if (error == Errors.NOT_COORDINATOR || error == Errors.COORDINATOR_NOT_AVAILABLE) {
          // 如果TC服務(wù)沒有找到或者剛好掛掉,那么生成FindCoordinatorRequest請求,等待發(fā)送
          lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
          // 并且將自身請求也放入隊列,等待發(fā)送
          reenqueue();
          } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
          // 如果TC服務(wù)正在啟動中,那么加入隊列,等待發(fā)送
          reenqueue();
          } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
          // 如果發(fā)生權(quán)限問題,那么認(rèn)為進(jìn)入錯誤狀態(tài)
          fatalError(error.exception());
          } else {
          fatalError(new KafkaException("Unexpected error in InitProducerIdResponse; " + error.message()));
          }
          }
          }


          beginTransaction 方法負(fù)責(zé)開始新事務(wù),它只是更改自身狀態(tài)為 IN_TRANSACTION,并不會發(fā)送任何請求。
          1
          2
          3
          4
          5
          6
          7
          8
          public synchronized void beginTransaction() {
          // 檢查 transaction id
          ensureTransactional();
          // 檢查之前響應(yīng)是否出錯
          maybeFailWithError();
          // 更改狀態(tài)
          transitionTo(State.IN_TRANSACTION);
          }


          我們知道Producer發(fā)送消息,都是先將消息發(fā)送到緩存隊列里,最后是由Sender線程發(fā)送出去 。Producer 如果開啟了事務(wù), 它在發(fā)送消息到緩存之前,會將消息所在的分區(qū)保存在 TransactionCoordinator 里。然后Sender線程在發(fā)送消息之前,會去從 TransactionCoordinator 檢查是否需要上次分區(qū)到 TC 服務(wù),如果有就先上次分區(qū),隨后才發(fā)送消息。
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          public class KafkaProducer<K, V> implements Producer<K, V> {
          private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
          .......
          int partition = partition(record, serializedKey, serializedValue, cluster);
          tp = new TopicPartition(record.topic(), partition);
          ......
          if (transactionManager != null && transactionManager.isTransactional())
          // 如果開啟了事務(wù),那么就先將分區(qū)保存在 transactionManager
          transactionManager.maybeAddPartitionToTransaction(tp);
          ......
          }
          }


          TransactionManager 提供了 maybeAddPartitionToTransaction 方法添加分區(qū)。
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          28
          29
          30
          public class TransactionManager {
          // 新增的上傳分區(qū)集合
          private final Set<TopicPartition> newPartitionsInTransaction;
          // 本次事務(wù)已經(jīng)上傳的分區(qū)集合
          private final Set<TopicPartition> partitionsInTransaction;
          // 本次事務(wù)涉及到的所有分區(qū)集合
          private final Set<TopicPartition> pendingPartitionsInTransaction;

          public synchronized void maybeAddPartitionToTransaction(TopicPartition topicPartition) {
          // 檢查事務(wù)狀態(tài)必須為IN_TRANSACTION
          failIfNotReadyForSend();
          // 如果已經(jīng)上傳過這個分區(qū),或者正在上傳這個分區(qū),那么直接返回
          if (isPartitionAdded(topicPartition) || isPartitionPendingAdd(topicPartition))
          return;

          log.debug("Begin adding new partition {} to transaction", topicPartition);
          // 添加到需要上次的集合
          newPartitionsInTransaction.add(topicPartition);
          }

          // 檢查是否這個分區(qū)已經(jīng)上傳過了
          synchronized boolean isPartitionAdded(TopicPartition partition) {
          return partitionsInTransaction.contains(partition);
          }

          // 檢查是否這個分區(qū)正在上傳中
          synchronized boolean isPartitionPendingAdd(TopicPartition partition) {
          return newPartitionsInTransaction.contains(partition) || pendingPartitionsInTransaction.contains(partition);
          }
          }

          TransactionManager 的 addPartitionsToTransactionHandler 方法,會生成分區(qū)上傳請求,然后由Sender發(fā)送。

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          public class TransactionManager {

          private synchronized TxnRequestHandler addPartitionsToTransactionHandler() {
          // 將新增的分區(qū),添加到 pendingPartitionsInTransaction 集合
          pendingPartitionsInTransaction.addAll(newPartitionsInTransaction);
          // 清空新增的分區(qū)集合
          newPartitionsInTransaction.clear();
          // 構(gòu)建 AddPartitionsToTxnRequest 請求
          AddPartitionsToTxnRequest.Builder builder = new AddPartitionsToTxnRequest.Builder(transactionalId,
          producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, new ArrayList<>(pendingPartitionsInTransaction));
          return new AddPartitionsToTxnHandler(builder);
          }
          }


          AddPartitionsToTxnHandler 負(fù)責(zé)處理響應(yīng)。

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          28
          29
          30
          31
          32
          33
          private class AddPartitionsToTxnHandler extends TxnRequestHandler {

          @Override
          public void handleResponse(AbstractResponse response) {
          AddPartitionsToTxnResponse addPartitionsToTxnResponse = (AddPartitionsToTxnResponse) response;
          Map<TopicPartition, Errors> errors = addPartitionsToTxnResponse.errors();
          boolean hasPartitionErrors = false;
          Set<String> unauthorizedTopics = new HashSet<>();
          retryBackoffMs = TransactionManager.this.retryBackoffMs;

          for (Map.Entry<TopicPartition, Errors> topicPartitionErrorEntry : errors.entrySet()) {
          // 檢查每個分區(qū)的響應(yīng)錯誤
          .....
          }

          Set<TopicPartition> partitions = errors.keySet();
          // 因?yàn)檫@些分區(qū)已經(jīng)有響應(yīng)了,即使錯誤也需要從集合中刪除
          pendingPartitionsInTransaction.removeAll(partitions);

          if (!unauthorizedTopics.isEmpty()) {
          abortableError(new TopicAuthorizationException(unauthorizedTopics));
          } else if (hasPartitionErrors) {
          abortableError(new KafkaException("Could not add partitions to transaction due to errors: " + errors));
          } else {
          log.debug("Successfully added partitions {} to transaction", partitions);
          // 將這些成功響應(yīng)的分區(qū),添加到 partitionsInTransaction集合
          partitionsInTransaction.addAll(partitions);
          transactionStarted = true;
          // 通知結(jié)果成功
          result.done();
          }
          }
          }


          sendOffsetsToTransaction 方法負(fù)責(zé)發(fā)送消費(fèi)位置提交請求。
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          public class TransactionManager {
          public synchronized TransactionalRequestResult sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) {
          // 保證transaction id
          ensureTransactional();
          // 檢查之前的響應(yīng)錯誤
          maybeFailWithError();
          // 只有 IN_TRANSACTION 狀態(tài)才可以發(fā)送這種類型請求
          if (currentState != State.IN_TRANSACTION)
          throw new KafkaException("...");
          // 構(gòu)建請求
          AddOffsetsToTxnRequest.Builder builder = new AddOffsetsToTxnRequest.Builder(transactionalId, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, consumerGroupId);
          // 構(gòu)建處理器
          AddOffsetsToTxnHandler handler = new AddOffsetsToTxnHandler(builder, offsets);
          // 添加到隊列
          enqueueRequest(handler);
          return handler.result;
          }
          }


          AddOffsetsToTxnHandler 類負(fù)責(zé)處理響應(yīng),它的處理邏輯很簡單,它收到響應(yīng)后,會發(fā)送 TxnOffsetCommitRequest 請求給 TC 服務(wù)。


          最后還剩下事務(wù)提交或回滾請求,還沒講述。Producer 在調(diào)用 commitTransaction 或 abortTransaction 方法,本質(zhì)都是調(diào)用了 TransactionManager 的 beginCompletingTransaction 方法發(fā)送請求。
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          28
          29
          30
          31
          32
          33
          public synchronized TransactionalRequestResult beginCommit() {
          ensureTransactional();
          maybeFailWithError();
          // 更改狀態(tài)為 COMMITTING_TRANSACTION
          transitionTo(State.COMMITTING_TRANSACTION);
          // 調(diào)用 beginCompletingTransaction 方法發(fā)送請求
          return beginCompletingTransaction(TransactionResult.COMMIT);
          }

          public synchronized TransactionalRequestResult beginAbort() {
          ensureTransactional();
          // 更改狀態(tài)為 ABORTABLE_ERROR
          if (currentState != State.ABORTABLE_ERROR)
          maybeFailWithError();
          transitionTo(State.ABORTING_TRANSACTION);

          // 清空分區(qū)集合
          newPartitionsInTransaction.clear();
          // 發(fā)送請求
          return beginCompletingTransaction(TransactionResult.ABORT);
          }

          private TransactionalRequestResult beginCompletingTransaction(TransactionResult transactionResult) {
          // 如果還有分區(qū)沒有上傳,那么生成請求放進(jìn)隊列
          if (!newPartitionsInTransaction.isEmpty())
          enqueueRezquest(addPartitionsToTransactionHandler());
          // 構(gòu)建請求
          EndTxnRequest.Builder builder = new EndTxnRequest.Builder(transactionalId, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, transactionResult);
          // 構(gòu)建處理器
          EndTxnHandler handler = new EndTxnHandler(builder);
          enqueueRequest(handler);
          return handler.result;
          }


          EndTxnHandler 負(fù)責(zé)處理事務(wù)提交或回滾響應(yīng),EndTxnHandler的處理邏輯比較簡單,它只是調(diào)用了 completeTransaction 方法。

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10

          private synchronized void completeTransaction() {
          // 更改狀態(tài)為READY
          transitionTo(State.READY);
          lastError = null;
          transactionStarted = false;
          // 清空分區(qū)集合
          newPartitionsInTransaction.clear();
          pendingPartitionsInTransaction.clear();
          partitionsInTransaction.clear();
          }


          -     服務(wù)端     - 


          服務(wù)端的結(jié)構(gòu)會相對復(fù)雜一些,這里盡量簡單的講講大概邏輯。首先介紹下 TransactionStateManager 類,它負(fù)責(zé)管理事務(wù)的元數(shù)據(jù),它也提供持久化事務(wù)的元數(shù)據(jù),和從事務(wù) topic 加載數(shù)據(jù)的功能。
          1
          2
          3
          4
          5
          6
          7
          8
          9
          class TransactionStateManager(...) {
          // key值為 partition id,value為 TxnMetadataCacheEntry對象
          private val transactionMetadataCache: mutable.Map[Int, TxnMetadataCacheEntry] = mutable.Map()
          }

          // metadataPerTransactionalId 參數(shù)是Pool類型,可以看成是Map
          // key值為transaction id, value為元數(shù)據(jù)
          private[transaction] case class TxnMetadataCacheEntry(coordinatorEpoch: Int,
          metadataPerTransactionalId: Pool[String, TransactionMetadata])


          TransactionStateManager 提供了 appendTransactionToLog 方法用于持久化。
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          28
          29
          30
          31
          32
          33
          34
          35
          36
          37
          38
          class TransactionStateManager {

          def appendTransactionToLog(transactionalId: String,
          coordinatorEpoch: Int,
          newMetadata: TxnTransitMetadata, // 新的元數(shù)據(jù)
          responseCallback: Errors => Unit, // 回調(diào)函數(shù)
          retryOnError: Errors => Boolean = _ => false): Unit = {
          // 生成record的key
          val keyBytes = TransactionLog.keyToBytes(transactionalId)
          // 生成record的calue
          val valueBytes = TransactionLog.valueToBytes(newMetadata)
          // 生成record
          val records = MemoryRecords.withRecords(TransactionLog.EnforcedCompressionType, new SimpleRecord(timestamp, keyBytes, valueBytes))
          val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionFor(transactionalId))
          val recordsPerPartition = Map(topicPartition -> records)

          // 當(dāng)持久化完成后,會調(diào)用這個函數(shù),更新transactionMetadataCache集合的元數(shù)據(jù)
          def updateCacheCallback(responseStatus: collection.Map[TopicPartition, PartitionResponse]): Unit = {
          // 檢查持久化是否出錯
          .....
          // 更改元數(shù)據(jù)
          metadata.completeTransitionTo(newMetadata)
          // 執(zhí)行回調(diào)函數(shù)
          responseCallback(responseError)
          }

          ......

          // 持久化recod到topic里
          replicaManager.appendRecords(
          newMetadata.txnTimeoutMs.toLong,
          TransactionLog.EnforcedRequiredAcks,
          internalTopicsAllowed = true,
          isFromClient = false,
          recordsPerPartition,
          updateCacheCallback, // 持久化完成后,會調(diào)用這個函數(shù)
          delayedProduceLock = Some(stateLock.readLock))
          }


          TransactionStateManager 提供了 loadTransactionsForTxnTopicPartition 方法用于從消息 topic 恢復(fù)數(shù)據(jù),這里不再詳細(xì)介紹。

          接下來來講講 TransactionCoordinator 類,它負(fù)責(zé)處理重要的事務(wù)請求。


          handleInitProducerId 方法會返回 producer id,如果這個事務(wù)的 transaction id 第一次請求,那么會為它分配新的 producer id 。如果之前請求過,就會返回之前分配的 producer id。

          handleAddPartitionsToTransaction 方法會將上傳的分區(qū)列表,添加到元數(shù)據(jù)并且持久化。

          handleEndTransaction 方法會稍微復(fù)雜一些,因?yàn)樗枰獙⑦@個消息轉(zhuǎn)發(fā)給各個分區(qū)。
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          28
          29
          30
          31
          32
          33
          34
          35
          36
          37
          38
          39
          40
          41
          42
          43
          44
          45
          46
          47
          48
          49
          50
          51
          52
          53
          54
          55
          56
          57
          58
          59
          60
          def handleEndTransaction(transactionalId: String,
          producerId: Long,
          producerEpoch: Short,
          txnMarkerResult: TransactionResult,
          responseCallback: EndTxnCallback): Unit = {
          // 獲取元數(shù)據(jù),更改狀態(tài)
          val preAppendResult: ApiResult[(Int, TxnTransitMetadata)] = txnManager.getTransactionState(transactionalId).right.flatMap {
          case None =>
          // 如果元數(shù)據(jù)不存在,說明存在問題
          Left(Errors.INVALID_PRODUCER_ID_MAPPING)
          case Some(epochAndTxnMetadata) =>
          val txnMetadata = epochAndTxnMetadata.transactionMetadata
          val coordinatorEpoch = epochAndTxnMetadata.coordinatorEpoch
          ......
          txnMetadata.state match {
          // 必須是Ongoing狀態(tài),如果是別的狀態(tài),就會報錯
          case Ongoing =>
          // 根據(jù)發(fā)送的消息類型,查看是事務(wù)提交還是回滾,來決定接下來的狀態(tài)
          val nextState = if (txnMarkerResult == TransactionResult.COMMIT)
          PrepareCommit
          else
          PrepareAbort
          // 調(diào)用 prepareAbortOrCommit 來更新狀態(tài)
          Right(coordinatorEpoch, txnMetadata.prepareAbortOrCommit(nextState, time.milliseconds()))
          }
          }

          // 檢查上一步的結(jié)果
          preAppendResult match {
          case Left(err) =>
          responseCallback(err)
          case Right((coordinatorEpoch, newMetadata)) =>

          // 定義回調(diào)函數(shù),用于發(fā)送請求到分區(qū)
          def sendTxnMarkersCallback(error: Errors): Unit = {
          if (error == Errors.NONE) {
          ...... // 檢查狀態(tài),只有是PrepareCommit或PrepareAbort,才能繼續(xù)執(zhí)行
          txnMetadata.state match {
          case PrepareCommit =>
          // 更改狀態(tài)為 CompleteCommit
          txnMetadata.prepareComplete(time.milliseconds())
          case PrepareAbort =>
          // 更改狀態(tài)為 CompleteAbort
          txnMetadata.prepareComplete(time.milliseconds())
          }
          }
          // 檢查更改狀態(tài)的結(jié)果
          preSendResult match {
          case Left(err) =>
          responseCallback(err)
          case Right((txnMetadata, newPreSendMetadata)) =>
          // 向客戶端發(fā)送成功響應(yīng)
          responseCallback(Errors.NONE)
          // 通過txnMarkerChannelManager發(fā)送請求到分區(qū)
          txnMarkerChannelManager.addTxnMarkersToSend(transactionalId, coordinatorEpoch, txnMarkerResult, txnMetadata, newPreSendMetadata)
          }
          }

          // 持久化元數(shù)據(jù),然后調(diào)用sendTxnMarkersCallback函數(shù),發(fā)送客戶端的響應(yīng)和發(fā)送請求到分區(qū)
          txnManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata, sendTxnMarkersCallback)


          上面發(fā)送請求到分區(qū),調(diào)用了 TransactionMarkerChannelManager 的方法。它會生成每個分區(qū)的請求,然后放到一個隊列里,后臺線程會負(fù)責(zé)將這些請求發(fā)送出去。當(dāng)收到所有分區(qū)的響應(yīng)后,它還負(fù)責(zé)更改事務(wù)的狀態(tài),并且負(fù)責(zé)持久化一條事務(wù)成功的消息。

          這里需要提下延遲任務(wù) DelayedTxnMarker,它負(fù)責(zé)檢查是否收到所有分區(qū)的響應(yīng)。它設(shè)置的延遲時間達(dá)到365天,所以可以認(rèn)為次任務(wù)幾乎不會過期。
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          private[transaction] class DelayedTxnMarker(txnMetadata: TransactionMetadata,
          completionCallback: Errors => Unit,
          lock: Lock)
          extends DelayedOperation(TimeUnit.DAYS.toMillis(100 * 365), Some(lock)) {

          override def tryComplete(): Boolean = {
          txnMetadata.inLock {
          // 當(dāng)每收到一個分區(qū)的響應(yīng)后,就會從元數(shù)據(jù)中刪除掉。
          // 直到分區(qū)列表為空,就說明所有分區(qū)都已經(jīng)成功響應(yīng)
          if (txnMetadata.topicPartitions.isEmpty)
          forceComplete()
          else false
          }
          }

          override def onExpiration(): Unit = {
          // this should never happen
          throw new IllegalStateException(s"Delayed write txn marker operation for metadata $txnMetadata has timed out, this should never happen.")
          }

          // TODO: if we will always return NONE upon completion, we can remove the error code in the param
          override def onComplete(): Unit = completionCallback(Errors.NONE)
          }


          DelayedTxnMarker 是在 TransactionMarkerChannelManager 的 addTxnMarkersToSend 方法中實(shí)例化的,它的 completionCallback 參數(shù),就是定義在 addTxnMarkersToSend 方法里面。
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          28
          def addTxnMarkersToSend(transactionalId: String,
          coordinatorEpoch: Int,
          txnResult: TransactionResult,
          txnMetadata: TransactionMetadata,
          newMetadata: TxnTransitMetadata): Unit = {
          // 定義延遲任務(wù)的回調(diào)函數(shù)
          def appendToLogCallback(error: Errors): Unit = {
          // 檢查錯誤
          error match {
          case Errors.NONE =>
          // 檢查狀態(tài)
          txnStateManager.getTransactionState(transactionalId) match {
          case Right(Some(epochAndMetadata)) =>
          if (epochAndMetadata.coordinatorEpoch == coordinatorEpoch) {
          // 持久化事務(wù)成功消息TxnLogAppend,寫入到事務(wù) topic
          tryAppendToLog(TxnLogAppend(transactionalId, coordinatorEpoch, txnMetadata, newMetadata))
          }
          }
          }
          }

          // 實(shí)例化延遲任務(wù)
          val delayedTxnMarker = new DelayedTxnMarker(txnMetadata, appendToLogCallback, txnStateManager.stateReadLock)
          // 等待執(zhí)行
          txnMarkerPurgatory.tryCompleteElseWatch(delayedTxnMarker, Seq(transactionalId))
          // 將請求放進(jìn)隊列里
          addTxnMarkersToBrokerQueue(transactionalId, txnMetadata.producerId, txnMetadata.producerEpoch, txnResult, coordinatorEpoch, txnMetadata.topicPartitions.toSet)
          }


          :zhmin

          來源:

          zhmin.github.io/2019/05/20/kafka-transaction/

          瀏覽 48
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日韩综合在线 | 久久人妻网站 | AV大全免费看 | 国产亚洲视频在线 | 区三区日本在线观看视频 |