<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          圖解 Kafka 網(wǎng)絡(luò)層源碼實現(xiàn)機(jī)制之收發(fā)消息全過程

          共 24113字,需瀏覽 49分鐘

           ·

          2022-08-25 20:34

          閱讀本文大約需要 30 分鐘。

          點擊上方"華仔聊技術(shù)"右上角選擇設(shè)為星標(biāo)
          硬核技術(shù)文章不會錯過!

          大家好,我是 華仔, 又跟大家見面了。

          在上一篇中,主要帶大家深度剖析了「Kafka 對多路復(fù)用器 Selector」的封裝全過程,今天我們主要對 Kafka 網(wǎng)絡(luò)層收發(fā)流程進(jìn)行總結(jié)下,本系列總共分為3篇,這是下篇,主要剖析最后一個問題:

          1. 針對 Java NIO 的 SocketChannel,kafka 是如何封裝統(tǒng)一的傳輸層來實現(xiàn)最基礎(chǔ)的網(wǎng)絡(luò)連接以及讀寫操作的?
          2. 剖析 KafkaChannel 是如何對傳輸層、讀寫 buffer 操作進(jìn)行封裝的?
          3. 剖析工業(yè)級 NIO 實戰(zhàn):如何基于位運算來控制事件的監(jiān)聽以及拆包、粘包是如何實現(xiàn)的?
          4. 剖析 Kafka 是如何封裝 Selector 多路復(fù)用器的?
          5. 剖析 Kafka 封裝的 Selector 是如何初始化并與 Broker 進(jìn)行連接以及網(wǎng)絡(luò)讀寫的?
          6. 剖析 Kafka 網(wǎng)絡(luò)發(fā)送消息和接收響應(yīng)的整個過程是怎樣的?

          認(rèn)真讀完這篇文章,我相信你會對 Kafka 網(wǎng)絡(luò)層源碼有更加深刻的理解。

          這篇文章干貨很多,希望你可以耐心讀完。

          01 總體概述

          通過場景驅(qū)動的方式,在網(wǎng)絡(luò)請求封裝和監(jiān)聽好后,我們來看看消息是如何進(jìn)行網(wǎng)絡(luò)收發(fā)的,都需要做哪些工作。

          1. 發(fā)送消息流程剖析
            • 消息預(yù)發(fā)送
            • 消息真正發(fā)送
          2. 接收響應(yīng)流程剖析
            • 讀取響應(yīng)結(jié)果
            • 解析響應(yīng)信息
            • 處理回調(diào)

          為了方便大家理解,所有的源碼只保留骨干。

          02 發(fā)送消息流程剖析

          02.1 消息預(yù)發(fā)送

          這部分涉及的東西比較多,此處就簡單的說明下,后續(xù)會有專門篇章進(jìn)行剖析。

          客戶端先準(zhǔn)備要發(fā)送的消息,流程如下:

          1. Sender 子線程會從 RecordAccumulator 緩沖區(qū)拉取要發(fā)送的消息集合,抽取到的數(shù)據(jù)會存放到下面幾個地方:
            • 發(fā)送時會放入 inFlightRequests 集合和 KafkaChannel 的 send 對象,其中 inFlightRequests 后續(xù)篇章再進(jìn)行剖析,這里簡單說明下,該集合用來存儲和操作待發(fā)送消息的緩存區(qū),當(dāng)請求準(zhǔn)備網(wǎng)絡(luò)發(fā)送時,會把請求從隊頭放入隊列;當(dāng)接收到響應(yīng)后,會把請求從隊尾刪除。
            • 待發(fā)送完成后會放入 completedRequests 集合
          2. 對已經(jīng)過期的數(shù)據(jù)進(jìn)行處理。
          3. 封裝客戶端請求 ClientRequest,把 ClientRequest 類對象發(fā)送給 NetworkClient,它主要有以下2個工作要做:
            • 根據(jù) ClientRequest 類對象構(gòu)造 InFlightRequest 類對象
            • 根據(jù) ClientRequest 類對象構(gòu)造 NetworkSend 類對象,并放入到 KafkaChannel 的緩存里。
          4. 此時消息預(yù)發(fā)送結(jié)束。

          接下來我們依次看下 Selector 和 KafkaChannel 類的具體源碼實現(xiàn)。

          02.1.1 請求數(shù)據(jù)暫存內(nèi)存中

          github 源碼地址如下:

          https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/Selector.java

          https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java

          /**
           * 消息預(yù)發(fā)送
           */

          public void send(Send send) {
              // 1. 從服務(wù)端獲取 connectionId
              String connectionId = send.destination();
              // 2. 從數(shù)據(jù)包中獲取對應(yīng)連接
              KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
              // 3. 如果關(guān)閉連接集合中存在該連接
              if (closingChannels.containsKey(connectionId)) {
                  // 把 connectionId 放入 failedSends 集合里
                  this.failedSends.add(connectionId);
              } else {
                  try {
                      // 4. 暫存數(shù)據(jù)預(yù)發(fā)送,并沒有真正的發(fā)送,一次只能發(fā)送一個
                      channel.setSend(send);
                  } catch (Exception e) {
                      // 5. 更新 KafkaChannel 的狀態(tài)為發(fā)送失敗  
                      channel.state(ChannelState.FAILED_SEND);
                      // 6. 把 connectionId 放入 failedSends 集合里
                      this.failedSends.add(connectionId);
                      // 7. 關(guān)閉連接
                      close(channel, CloseMode.DISCARD_NO_NOTIFY);
                      ...
                  }
              }
          }

          從源碼中可以看到調(diào)用了 KafkaChannel 類的 setSend() 方法。

          public void setSend(Send send) {
            if (this.send != null)
                throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
            // 設(shè)置要發(fā)送消息的字段
            this.send = send;
            // 調(diào)用傳輸層增加寫事件
            this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
          }

          // PlaintextTransportLayer 類方法
          @Override
          public void addInterestOps(int ops) {
              //通過 key.interestOps() | ops 來添加事件
              key.interestOps(key.interestOps() | ops);
          }

          該方法主要用來預(yù)發(fā)送,即在發(fā)送網(wǎng)絡(luò)請求前,將需要發(fā)送的ByteBuffer 數(shù)據(jù)保存到 KafkaChannel 的 send 中,然后調(diào)用傳輸層方法增加對這個 channel 上「OP_WRITE」事件的關(guān)注,同時還保留了「OP_READ」事件,此時該 Channel 是同時可以進(jìn)行讀寫的。當(dāng)真正執(zhí)行發(fā)送的時候,會先從 send 中讀取數(shù)據(jù)。

          02.2 消息真正發(fā)送


          Sender 子線程會調(diào)用 Selector 的 「poll」方法把請求真正發(fā)送出去。

          02.2.1 poll()

          @Override
          public void poll(long timeout) throws IOException {
              ...
              // 調(diào)用nioSelector.select線程阻塞等待I/O事件并設(shè)置阻塞時間,等待I/O事件就緒發(fā)生,然后返回已經(jīng)監(jiān)控到了多少準(zhǔn)備就緒的事件
              int numReadyKeys = select(timeout);
              // 監(jiān)聽到事件發(fā)生或立即連接集合不為空或存在緩存數(shù)據(jù)
              if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
                  // 在SSL連接才可能會存在緩存數(shù)據(jù)
                  if (dataInBuffers) {
                      // 處理事件
                      pollSelectionKeys(toPoll, false, endSelect);
                  }
                  // 處理監(jiān)聽到的準(zhǔn)備就緒事件
                  pollSelectionKeys(readyKeys, false, endSelect);
                  // 處理立即連接集合
                  pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
              } else {
                  ...
              }
              ...
          }

          該方法就干了一件事,即收集準(zhǔn)備就緒事件,并針對事件進(jìn)行網(wǎng)絡(luò)操作,通過上述簡化代碼可以看出是調(diào)用了 「pollSelectionKeys」 方法,真正讀寫操作在該方法中,我們來看看:

          02.2.2 pollSelectionKeys()

          void pollSelectionKeys(Set<SelectionKey> selectionKeys,boolean isImmediatelyConnected,long currentTimeNanos) {
              //1. 循環(huán)調(diào)用當(dāng)前監(jiān)聽到的事件(原順序或者洗牌后順序)
              for (SelectionKey key : determineHandlingOrder(selectionKeys)) {
                  // 2. 之前創(chuàng)建連接,把kafkachanel注冊到key上,這里就是獲取對應(yīng)的 channel
                  KafkaChannel channel = channel(key);
                  ...
                  // 3. 獲取節(jié)點id
                  String nodeId = channel.id();
                  ...
                  try {
                      ...
                      // 4. 讀事件是否準(zhǔn)備就緒了
                      if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasCompletedReceive(channel) && !explicitlyMutedChannels.contains(channel)) {
                          // 嘗試處理讀事件
                          attemptRead(channel);
                  }
                  ...
                  try {
                     // 5. 嘗試處理寫事件
                     attemptWrite(key, channel, nowNanos);
                  } catch (Exception e) {
                      sendFailed = true;
                      throw e;
                  }
              } catch (Exception e) {
                  ...
              } finally {
                 ....
              }
            }
          }

          該方法主要用來處理監(jiān)聽到的事件,包括連接事件、讀寫事件、以及立即完成的連接的。接下來我們看看嘗試進(jìn)行網(wǎng)絡(luò)寫操作,如何才能進(jìn)行真正寫。

          02.2.3 attemptWrite()

          private void attemptWrite(SelectionKey key, KafkaChannel channel, long nowNanos) throws IOException {
              // 此處需要滿足4個條件才可以進(jìn)行寫操作      
              if (channel.hasSend()
                      && channel.ready()
                      && key.isWritable()
                      && !channel.maybeBeginClientReauthentication(() -> nowNanos)) {
                  // 進(jìn)行寫操作
                  write(channel);
              }
          }

          // channel 連接就緒
          public boolean ready() {
              return transportLayer.ready() && authenticator.complete();
          }

          // java nio SelectionKey
          public final boolean isWritable() {
              return (readyOps() & OP_WRITE) != 0;
          }

          該方法主要用來嘗試進(jìn)行網(wǎng)絡(luò)寫操作,方法很簡單,必須「同時滿足4個條件」:

          1. channel 還有數(shù)據(jù)可以發(fā)送」即數(shù)據(jù)還未發(fā)送完成。
          2. channel 連接就緒」。
          3. 寫事件是可寫狀態(tài)」只要寫緩沖區(qū)未寫滿會一直產(chǎn)生「OP_WRITE」 事件,如果不寫數(shù)據(jù)或者寫滿時則需要取消 「OP_WRITE」 事件,防止產(chǎn)生不必要的資源消耗。
          4. 客戶端驗證沒有開啟」。

          當(dāng)滿足以上4個條件后就可以進(jìn)行寫操作了,接下來我們看看寫操作的過程。

          02.2.4 write()

          // 執(zhí)行寫操作 
          void write(KafkaChannel channel) throws IOException {
              // 1.獲取 channel 對應(yīng)的節(jié)點id    
              String nodeId = channel.id();
              // 2. 將保存在 send 上的數(shù)據(jù)真正發(fā)送出去,但是一次不一定能發(fā)送完,會返回已經(jīng)發(fā)出的字節(jié)數(shù)
              long bytesSent = channel.write();
              // 3. 判斷是否發(fā)送完成,未完成返回null,等待下次poll繼續(xù)發(fā)送
              Send send = channel.maybeCompleteSend();
              // 4. 說明已經(jīng)發(fā)出或者發(fā)送完成
              if (bytesSent > 0 || send != null) {
                  long currentTimeMs = time.milliseconds();
                  if (bytesSent > 0)
                      // 記錄發(fā)送字節(jié) Metrics 信息
                      this.sensors.recordBytesSent(nodeId, bytesSent, currentTimeMs);
                  // 發(fā)送完成
                  if (send != null) {
                      // 將 send 添加到 completedSends
                      this.completedSends.add(send);
                      //  記錄發(fā)送完成 Metrics 信息
                      this.sensors.recordCompletedSend(nodeId, send.size(), currentTimeMs);
                  }
              }
          }

          該方法主要用來真正執(zhí)行網(wǎng)絡(luò)寫操作的,大家知道在網(wǎng)絡(luò)編程過程中,不一定一次性可以發(fā)送完成,此時就需要判斷是否發(fā)送完成,如果未完成返回null,「等待下次輪詢 poll() 會繼續(xù)發(fā)送,并繼續(xù)關(guān)注這個 channel 的寫事件」,如果發(fā)送完成,「則返回 send,并取消 Selector 在這個 socketchannel 上 OP_WRITE 事件的關(guān)注」。這里調(diào)用了 KafkaChannel 類的 write() 進(jìn)行寫操作發(fā)送,并調(diào)用 maybeCompleteSend() 判斷是否發(fā)送完成,我們先來看下 write() 寫操作:

          02.2.6 KafkaChannel.write()

          public long write() throws IOException {
              // 判斷 send 是否為空,如果為空表示已經(jīng)發(fā)送完畢了
              if (send == null)
                  return 0;

              midWrite = true;
              // 調(diào)用ByteBufferSend.writeTo把數(shù)據(jù)真正發(fā)送出去
              return send.writeTo(transportLayer);
          }

          該方法主要用來把保存在 send 上的數(shù)據(jù)真正發(fā)送出去,調(diào)用 ByteBufferSend.writeTo 把數(shù)據(jù)真正發(fā)送出去,我們來看看 wirteTo() 方法:

          @Override
          // 將字節(jié)流數(shù)據(jù)寫入到channel中
          public long writeTo(GatheringByteChannel channel) throws IOException {
              // 1.調(diào)用nio底層write方法把buffers寫入傳輸層返回寫入的字節(jié)數(shù)
              long written = channel.write(buffers);
              if (written < 0)
                  throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
              // 2.計算還剩多少字節(jié)沒有寫入傳輸層
              remaining -= written;
              // 每次發(fā)送 都檢查是否
              pending = TransportLayers.hasPendingWrites(channel);
              return written;
          }

          該方法主要用來把 buffers 數(shù)組寫入到 SocketChannel 里,因為在網(wǎng)絡(luò)編程中,寫一次不一定可以完全把數(shù)據(jù)都寫成功,所以調(diào)用java nio 底層 channel.write(buffers) 方法會返回「已經(jīng)寫入成功多少字節(jié)」的返回值,這樣調(diào)用一次后就知道已經(jīng)寫入多少字節(jié)了。

          當(dāng)調(diào)用 write() 以及一系列底層方法進(jìn)行寫操作后,會返回已經(jīng)發(fā)出的字節(jié)數(shù),如果這次沒有發(fā)送完畢則返回 null,「等待下次輪詢 poll 繼續(xù)發(fā)送網(wǎng)絡(luò)寫操作,并繼續(xù)關(guān)注這個 channel 的寫事件」,所以需要判斷下本次是否發(fā)送完畢了,我們來看看:

          02.2.7 maybeCompleteSend()

          // 可能完成發(fā)送
          public Send maybeCompleteSend() {
              // send 不為空且已經(jīng)發(fā)送完畢
              if (send != null && send.completed()) {
                  midWrite = false;
                  // 當(dāng)寫數(shù)據(jù)完畢后,取消傳輸層對 OP_WRITE 事件的監(jiān)聽,完成一次寫操作
                  transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
                  // 將 send 賦值給結(jié)果集 result
                  Send result = send;
                  // 此時讀完后將 send 清空,以便下次寫
                  send = null;
                  // 最后返回結(jié)果集 result,完成一次寫操作
                  return result;
              }
              return null;
          }

          // PlaintextTransportLayer 類方法
          @Override
          public void removeInterestOps(int ops) {
              // 通過 key.interestOps() & ~ops 來刪除事件
              key.interestOps(key.interestOps() & ~ops);
          }

          // ByteBufferSend
          @Override
          public boolean completed() {
              return remaining <= 0 && !pending;
          }

          該方法主要用來判斷是否寫數(shù)據(jù)完畢了,而判斷的寫數(shù)據(jù)完畢的條件是 buffer 中 remaining 沒有剩余且 pending 為 false。如果發(fā)送完成,把發(fā)送完成的請求添加到發(fā)送完成的集合 completedSends 里。

          待消息請求發(fā)送完成后,又做了哪些工作呢?這里涉及到 NetworkClient 類的相關(guān)知識,這里簡單說明下,后續(xù)再剖析:

          github 源碼地址如下:

          https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java

          private void handleCompletedSends(List<ClientResponse> responses, long now) {
              // if no response is expected then when the send is completed, return it
              // 上面發(fā)送完成將 send 添加到 completedSends 集合,然后遍歷這個集合
              for (Send send : this.selector.completedSends())     {
                  // 獲取 inFlightRequests 集合發(fā)往對應(yīng) Broker 的最后一個請求元素
                  InFlightRequest request = this.inFlightRequests.lastSent(send.destination());
                  // 判斷是否期望進(jìn)行響應(yīng)
                  if (!request.expectResponse) {
                      // 如果不期望進(jìn)行響應(yīng)就刪除inFlightRequests集合發(fā)往對應(yīng) Broker 請求隊列的第一個元素
                      this.inFlightRequests.completeLastSent(send.destination());
                      // 把請求添加到 responses 集合里
                      responses.add(request.completed(null, now));
                  }
              }
          }

          從源碼可以看出會對「completedSends」集合和「inFlightRequests」集合是一個「互相協(xié)作」的關(guān)系。

          其中「completedSends」集合是指發(fā)送完成但還沒有返回的請求集合,而「inFlightRequests」集合則是保存了已經(jīng)發(fā)送出去但還沒有收到響應(yīng)結(jié)果的 Request 集合。其中「completedSends」的元素對應(yīng)著「inFlightRequests」集合對應(yīng)隊列的最后一個元素。

          到此發(fā)送消息流程剖析完畢,至于發(fā)送完成后續(xù)工作,我們待講解 Sender 和 NetWorkClient 的時候再詳細(xì)進(jìn)行剖析,接下來我們來看看接收響應(yīng)流程。

          03 接收響應(yīng)流程剖析


          在上面剖析 Selector.pollSelectionKeys() 時候,當(dāng)網(wǎng)絡(luò)讀事件就緒后會調(diào)用 attemptRead() 進(jìn)行嘗試網(wǎng)絡(luò)讀操作,我們來看看:

          03.1 讀取響應(yīng)結(jié)果

          03.1.1 attemptRead()

          private void attemptRead(KafkaChannel channel) throws IOException {
              // 獲取 channel 對應(yīng)的節(jié)點 id
              String nodeId = channel.id();
              // 將從傳輸層中讀取數(shù)據(jù)到NetworkReceive對象中
              long bytesReceived = channel.read();
              if (bytesReceived != 0) {
                  ...
                  // 判斷 NetworkReceive 對象是否已經(jīng)讀完了
                  NetworkReceive receive = channel.maybeCompleteReceive();
                  // 當(dāng)讀完后把這個 NetworkReceive 對象添加到已經(jīng)接收完畢網(wǎng)絡(luò)請求集合里
                  if (receive != null) {
                      addToCompletedReceives(channel, receive, currentTimeMs);
                  }
              }
              ...
          }

          // KafkaChannel 方法
          public long read() throws IOException {
              if (receive == null) {
                  // 初始化 NetworkReceive 對象
                  receive = new NetworkReceive(maxReceiveSize, id, memoryPool);
              }
              // 嘗試把 channel 的數(shù)據(jù)讀到 NetworkReceive 對象中
              long bytesReceived = receive(this.receive);
              ...
              return bytesReceived;
          }

          該方法主要用來嘗試讀取數(shù)據(jù)并添加已經(jīng)接收完畢的集合中。我們看到會先調(diào)用 KafkaChannel.read() 方法進(jìn)行讀取,然后判斷是否讀完了,如果沒有讀完,下次輪詢時候接著讀取,如果讀完了就假如到請求讀完的集合 completedReceives 中

          我們來看下是如何判斷 NetworkReceive 對象是否已經(jīng)讀完了的:

          03.1.2 maybeCompleteReceive()

          // 判斷 NetworkReceive 對象是否已經(jīng)讀完了
          // 如果此時并沒有讀完一個完整的NetworkReceive對象,則下次觸發(fā)讀事件會繼續(xù)填充整個NetworkReceive對象,
          // 如果讀完一個完整的NetworkReceive對象則將其置空,下次觸發(fā)讀事件時會創(chuàng)建一個全新的NetworkReceive對象。
          public NetworkReceive maybeCompleteReceive() {
            if (receive != null && receive.complete()) {
                receive.payload().rewind();
                NetworkReceive result = receive;
                receive = null;
                return result;
            }
            return null;
          }
          // NetworkReceive
          public boolean complete() {
              return !size.hasRemaining() && buffer != null && !buffer.hasRemaining();
          }

          該方法主要用來判斷數(shù)據(jù)已經(jīng)讀取完畢了,而判斷是否讀完的條件是 NetworkReceive 里的 buffer 是否用完,包括上面說過的表示響應(yīng)消息頭 size ByteBuffer 和響應(yīng)消息體本身的 buffer ByteBuffer,這兩個都讀完才算真正讀完了。

          如果此時并沒有讀完一個完整的 NetworkReceive 對象,則下次觸發(fā)讀事件會繼續(xù)填充整個 NetworkReceive 對象,如果此時讀完一個完整的NetworkReceive 對象則將其置空,下次觸發(fā)讀事件時會創(chuàng)建一個全新的NetworkReceive 對象。

          03.2 解析響應(yīng)消息

          等讀取完一個完整響應(yīng)消息后,接下來要做哪些工作呢?那就是要解析這個響應(yīng)消息,我們來看看是如何實現(xiàn)的:

          github 源碼地址如下:

          https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java

          private void handleCompletedReceives(List<ClientResponse> responses, long now) {
            // 當(dāng)讀完后把這個 NetworkReceive 對象添加到已經(jīng)接收完畢網(wǎng)絡(luò)請求集合里,然后遍歷這個集合
            for (NetworkReceive receive : this.selector.completedReceives()) {
                // 獲取發(fā)送請求的node id 
                String source = receive.source();
                // 從 InFlightRequest 集合取出對應(yīng)的元素并刪除
                InFlightRequest req = inFlightRequests.completeNext(source);
                // 解析該響應(yīng)
                Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
                    throttleTimeSensor, now);
                ....
                // 添加響應(yīng)到響應(yīng)結(jié)果集合中
                responses.add(req.completed(response, now));
            }
          }

          該方法主要用來循環(huán)遍歷 completedReceives 集合做一些響應(yīng)處理工作,在文章開始的時候就簡單說過,收到響應(yīng)后會將其從「inFlightRequests」中刪除掉,然后去解析這個響應(yīng):

          private static Struct parseStructMaybeUpdateThrottleTimeMetrics(ByteBuffer responseBuffer, RequestHeader requestHeader,Sensor throttleTimeSensor, long now) {
              // 獲取響應(yīng)頭
              ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer,requestHeader.apiKey().responseHeaderVersion(requestHeader.apiVersion()));
              // 獲取響應(yīng)體
              Struct responseBody = requestHeader.apiKey().parseResponse(requestHeader.apiVersion(), responseBuffer);
              // 對比響應(yīng)頭 correlationId 和響應(yīng)體的 correlationId 是否一致,否則拋異常
              correlate(requestHeader, responseHeader);
              ...
              return responseBody;
          }

          該方法主要用來解析響應(yīng)的,并判斷響應(yīng)頭跟響應(yīng)體的 correlationId 值是否一致,否則拋異常。

          此時只對響應(yīng)做了解析但并沒有對響應(yīng)進(jìn)行處理,而響應(yīng)處理是通過調(diào)用回調(diào)方法進(jìn)行處理的,我們來看下。

          03.3 處理回調(diào)

          private void completeResponses(List<ClientResponse> responses) {
              // 遍歷響應(yīng)結(jié)果集合
              for (ClientResponse response : responses) {
                  try {
                      response.onComplete();
                  } catch (Exception e) {
                      log.error("Uncaught error in request completion:", e);
                  }
              }
          }

          //ClientResponse 類
          public void onComplete() {
              if (callback != null)
                  callback.onComplete(this);
          }

          到此接收響應(yīng)消息流程剖析完畢。

          04 總結(jié)

          這里,我們一起來總結(jié)一下這篇文章的重點。

          1、帶你先整體的梳理了 Kafka 網(wǎng)絡(luò)層收發(fā)流程,主要分為「發(fā)送消息流程」和「接收響應(yīng)流程」。

          2、又帶你分別剖析了發(fā)送消息流程和接收響應(yīng)流程的源碼實現(xiàn)細(xì)節(jié)。

          源碼詳細(xì)分析請看上兩篇:

          圖解 Kafka 網(wǎng)絡(luò)層實現(xiàn)機(jī)制之上篇

          圖解 Kafka 網(wǎng)絡(luò)層實現(xiàn)機(jī)制之 Selector 多路復(fù)用器

          下篇我們來深度剖析「Kafka 客戶端內(nèi)存緩沖池機(jī)制實現(xiàn)原理」,大家期待,我們下期見。

          如另外歡迎加入我的技術(shù)交流群,關(guān)注本公眾號并添加我個人微信,邀您進(jìn)群。

          如果我的文章對你有所幫助,還請幫忙點贊、在看、轉(zhuǎn)發(fā)一下,非常感謝!

          堅持總結(jié), 持續(xù)輸出高質(zhì)量文章  關(guān)注我: 華仔聊技術(shù)




          瀏覽 61
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产三级韩国三级日本三级99 | 97性爱视频 | 大香蕉在线视频11 | 国产乱伦精品视频 | 大鸡巴干的不要不要的视频 |