<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          聊聊 Kafka:Producer 源碼解析

          共 6171字,需瀏覽 13分鐘

           ·

          2021-09-10 19:59

          2143054fdf401d87df50a1ee66cd6a4d.webp

          點(diǎn)擊上方老周聊架構(gòu)關(guān)注我


          一、前言

          前面幾篇我們講了關(guān)于 Kafka 的基礎(chǔ)架構(gòu)以及搭建,從這篇開(kāi)始我們就來(lái)源碼分析一波。我們這用的 Kafka 版本是 2.7.0,其 Client 端是由 Java 實(shí)現(xiàn),Server 端是由 Scala 來(lái)實(shí)現(xiàn)的,在使用 Kafka 時(shí),Client 是用戶最先接觸到的部分,因此,我們從 Client 端開(kāi)始,會(huì)先從 Producer 端開(kāi)始,今天我們就來(lái)對(duì) Producer 源碼解析一番。

          二、Producer 使用

          首先我們先通過(guò)一段代碼來(lái)展示 KafkaProducer 的使用方法。在下面的示例中,我們使用 KafkaProducer 實(shí)現(xiàn)向 Kafka 發(fā)送消息的功能。在示例程序中,首先將 KafkaProduce 使用的配置寫入到 Properties 中,每項(xiàng)配置的具體含義在注釋中進(jìn)行解釋。之后以此 Properties 對(duì)象為參數(shù)構(gòu)造 KafkaProducer 對(duì)象,最后通過(guò) send 方法完成發(fā)送,代碼中包含同步發(fā)送、異步發(fā)送兩種情況。

          7bfabe4bcc34bf3e75ebf1d74fbf0e5d.webp
          從上面的代碼可以看出 Kafka 為用戶提供了非常簡(jiǎn)潔方便的 API,在使用時(shí),只需要如下兩步:
          • 初始化 KafkaProducer 實(shí)例

          • 調(diào)用 send 接口發(fā)送數(shù)據(jù)

          本文主要是圍繞著初始化 KafkaProducer 實(shí)例與如何實(shí)現(xiàn) send 接口發(fā)送數(shù)據(jù)而展開(kāi)的。

          三、KafkaProducer 實(shí)例化

          了解了 KafkaProducer 的基本使用,然后我們來(lái)深入了解下方法核心邏輯:

          public?KafkaProducer(Properties?properties)?{
          ????this(Utils.propsToMap(properties),?(Serializer)null,?(Serializer)null,?(ProducerMetadata)null,?(KafkaClient)null,?(ProducerInterceptors)null,?Time.SYSTEM);
          }
          c00fe3d2115d9cd3b22f407992dbfd2b.webp

          四、消息發(fā)送過(guò)程

          用戶是直接使用 producer.send() 發(fā)送的數(shù)據(jù),先看一下 send() 接口的實(shí)現(xiàn)

          //?異步向一個(gè)?topic?發(fā)送數(shù)據(jù)
          public?Future<RecordMetadata>?send(ProducerRecord<K,?V>?record)?{
          ????return?this.send(record,?(Callback)null);
          }

          //?向?topic?異步地發(fā)送數(shù)據(jù),當(dāng)發(fā)送確認(rèn)后喚起回調(diào)函數(shù)
          public?Future<RecordMetadata>?send(ProducerRecord<K,?V>?record,?Callback?callback)?{
          ????ProducerRecord<K,?V>?interceptedRecord?=?this.interceptors.onSend(record);
          ????return?this.doSend(interceptedRecord,?callback);
          }

          數(shù)據(jù)發(fā)送的最終實(shí)現(xiàn)還是調(diào)用了 Producer 的 doSend() 接口。

          4.1 攔截器

          首先方法會(huì)先進(jìn)入攔截器集合 ProducerInterceptors , onSend 方法是遍歷攔截器 onSend 方 法,攔截器的目的是將數(shù)據(jù)處理加工, Kafka 本身并沒(méi)有給出默認(rèn)的攔截器的實(shí)現(xiàn)。如果需要使用攔截器功能,必須自己實(shí)現(xiàn)接口。

          4.1.1 攔截器代碼

          1970175d586e56276629745d7787f3eb.webp
          4.1.2 攔截器核心邏輯
          5ecb83614333940196936fe659a5ffa8.webp
          ProducerInterceptor 接口包括三個(gè)方法:
          • onSend(ProducerRecordvar1),>:該方法封裝進(jìn) KafkaProducer.send 方法中,即它運(yùn)行在用戶主線程中的。確保在消息被序列化以計(jì)算分區(qū)前調(diào)用該方法。用戶可以在該方法中對(duì)消息做任何操作,但最好保證不要修改消息所屬的 topic 和分區(qū),否則會(huì)影響目標(biāo)分區(qū)的計(jì)算。

          • onAcknowledgement(RecordMetadata var1, Exception var2):該方法會(huì)在消息被應(yīng)答之前或消息發(fā)送失敗時(shí)調(diào)用,并且通常都是在 producer 回調(diào)邏輯觸發(fā)之前。onAcknowledgement 運(yùn)行在 producer 的 IO 線程中,因此不要在該方法中放入很重的邏輯,否則會(huì)拖慢 producer 的消息發(fā)送效率。

          • close():關(guān)閉 interceptor,主要用于執(zhí)行一些資源清理工作。

          攔截器可能被運(yùn)行在多個(gè)線程中,因此在具體實(shí)現(xiàn)時(shí)用戶需要自行確保線程安全。另外倘若指定了多個(gè) interceptor,則 producer 將按照指定順序調(diào)用它們,并僅僅是捕獲每個(gè) interceptor 可能拋出的異常記錄到錯(cuò)誤日志中而非在向上傳遞。

          4.2 Producer 的 doSend 實(shí)現(xiàn)

          下面是 doSend() 的具體實(shí)現(xiàn):

          ee355e3ac7f41db4f7fedb1337be8815.webp
          在 doSend() 方法的實(shí)現(xiàn)上,一條 Record 數(shù)據(jù)的發(fā)送,主要分為以下五步:
          • 確認(rèn)數(shù)據(jù)要發(fā)送到的 topic 的 metadata 是可用的(如果該 partition 的 leader 存在則是可用的,如果開(kāi)啟權(quán)限時(shí),client 有相應(yīng)的權(quán)限),如果沒(méi)有 topic 的 metadata 信息,就需要獲取相應(yīng)的 metadata;

          • 序列化 record 的 key 和 value;

          • 獲取該 record 要發(fā)送到的 partition(可以指定,也可以根據(jù)算法計(jì)算);

          • 向 accumulator 中追加 record 數(shù)據(jù),數(shù)據(jù)會(huì)先進(jìn)行緩存;

          • 如果追加完數(shù)據(jù)后,對(duì)應(yīng)的 RecordBatch 已經(jīng)達(dá)到了 batch.size 的大小(或者 batch 的剩余空間不足以添加下一條 Record),則喚醒 sender 線程發(fā)送數(shù)據(jù)。

          數(shù)據(jù)的發(fā)送過(guò)程,可以簡(jiǎn)單總結(jié)為以上五點(diǎn),下面會(huì)這幾部分的具體實(shí)現(xiàn)進(jìn)行詳細(xì)分析。

          五、消息發(fā)送過(guò)程

          5.1 獲取 topic 的 metadata 信息

          Producer 通過(guò) waitOnMetadata() 方法來(lái)獲取對(duì)應(yīng) topic 的 metadata 信息,這塊內(nèi)容我下一篇再來(lái)講。

          5.2 key 和 value 的序列化

          Producer 端對(duì) record 的 key 和 value 值進(jìn)行序列化操作,在 Consumer 端再進(jìn)行相應(yīng)的反序列化,Kafka 內(nèi)部提供的序列化和反序列化算法如下圖所示:

          a8724edc0c50dd98a940209b358c77cc.webp
          當(dāng)然我們也是可以自定義序列化的具體實(shí)現(xiàn),不過(guò)一般情況下,Kafka 內(nèi)部提供的這些方法已經(jīng)足夠使用。

          5.3 獲取該 record 要發(fā)送到的 partition

          獲取 partition 值,具體分為下面三種情況:

          • 指明 partition 的情況下,直接將指明的值直接作為 partiton 值;

          • 沒(méi)有指明 partition 值但有 key 的情況下,將 key 的 hash 值與 topic 的 partition 數(shù)進(jìn)行取余得到 partition 值;

          • 既沒(méi)有 partition 值又沒(méi)有 key 值的情況下,第一次調(diào)用時(shí)隨機(jī)生成一個(gè)整數(shù)(后面每次調(diào)用在這個(gè)整數(shù)上自增),將這個(gè)值與 topic 可用的 partition 總數(shù)取余得到 partition 值,也就是常說(shuō)的 round-robin 算法。

          具體實(shí)現(xiàn)如下:

          //?當(dāng)?record?中有?partition?值時(shí),直接返回,沒(méi)有的情況下調(diào)用?partitioner?的類的?partition?方法去計(jì)算(KafkaProducer.class)
          private?int?partition(ProducerRecord<K,?V>?record,?byte[]?serializedKey,?byte[]?serializedValue,?Cluster?cluster)?{
          ????Integer?partition?=?record.partition();
          ????return?partition?!=?null???partition?:?this.partitioner.partition(record.topic(),?record.key(),?serializedKey,?record.value(),?serializedValue,?cluster);
          }

          Producer 默認(rèn)使用的 partitioner 是 org.apache.kafka.clients.producer.internals.DefaultPartitioner,用戶也可以自定義 partition 的策略,下面是默認(rèn)分區(qū)策略具體實(shí)現(xiàn):

          public?int?partition(String?topic,?Object?key,?byte[]?keyBytes,?Object?value,?byte[]?valueBytes,?Cluster?cluster)?{
          ????return?this.partition(topic,?key,?keyBytes,?value,?valueBytes,?cluster,?cluster.partitionsForTopic(topic).size());
          }

          public?int?partition(String?topic,?Object?key,?byte[]?keyBytes,?Object?value,?byte[]?valueBytes,?Cluster?cluster,?int?numPartitions)?{
          ????return?keyBytes?==?null???this.stickyPartitionCache.partition(topic,?cluster)?:?Utils.toPositive(Utils.murmur2(keyBytes))?%?numPartitions;
          }
          d46f699b8ebc6ed838cd5a0a275c68c5.webp
          上面這個(gè)默認(rèn)算法核心就是粘著分區(qū)緩存

          5.4 向 RecordAccmulator 中追加 record 數(shù)據(jù)

          我們講 RecordAccumulator 之前先看這張圖,這樣的話會(huì)對(duì)整個(gè)發(fā)送流程有個(gè)大局觀。

          be26a3cacb86bb034d90ac73bdd9d896.webp

          RecordAccmulator 承擔(dān)了緩沖區(qū)的角色。默認(rèn)是 32 MB。

          在 Kafka Producer 中,消息不是一條一條發(fā)給 broker 的,而是多條消息組成一個(gè) ProducerBatch,然后由 Sender 一次性發(fā)出去,這里的 batch.size 并不是消息的條數(shù)(湊滿多少條即發(fā)送),而是一個(gè)大小。默認(rèn)是 16 KB,可以根據(jù)具體情況來(lái)進(jìn)行優(yōu)化。

          在 RecordAccumulator 中,最核心的參數(shù)就是:

          private?final?ConcurrentMap<TopicPartition,?Deque<ProducerBatch>>?batches;

          它是一個(gè) ConcurrentMap,key 是 TopicPartition 類,代表一個(gè) topic 的一個(gè) partition。value 是一個(gè)包含 ProducerBatch 的雙端隊(duì)列。等待 Sender 線程發(fā)送給 broker。畫(huà)張圖來(lái)看下:

          ef728b1851f5cb7a5825ccc97fd9419b.webp
          4579e539f02828d858f1a83d6cbd3f46.webp
          上面的代碼不知道大家有沒(méi)有疑問(wèn)?分配內(nèi)存的代碼為啥不在 synchronized 同步塊中分配?導(dǎo)致下面的 synchronized 同步塊中還要 tryAppend 一下。

          因?yàn)檫@時(shí)候可能其他線程已經(jīng)創(chuàng)建好 RecordBatch 了,造成多余的內(nèi)存申請(qǐng)。

          如果把分配內(nèi)存放在 synchronized 同步塊會(huì)有什么問(wèn)題?

          內(nèi)存申請(qǐng)不到線程會(huì)一直等待,如果放在同步塊中會(huì)造成一直不釋放 Deque 隊(duì)列的鎖,那其他線程將無(wú)法對(duì) Deque 隊(duì)列進(jìn)行線程安全的同步操作。

          再跟下 tryAppend() 方法,這就比較簡(jiǎn)單了。

          e3026e053189873181a9b65393b6c076.webp
          以上代碼見(jiàn)圖解:


          6aeae62bddf59b55c305bfb6e7d50e82.webp
          5.5 喚醒 sender 線程發(fā)送 RecordBatch

          當(dāng) record 寫入成功后,如果發(fā)現(xiàn) RecordBatch 已滿足發(fā)送的條件(通常是 queue 中有多個(gè) batch,那么最先添加的那些 batch 肯定是可以發(fā)送了),那么就會(huì)喚醒 sender 線程,發(fā)送 RecordBatch。

          sender 線程對(duì) RecordBatch 的處理是在 run() 方法中進(jìn)行的,該方法具體實(shí)現(xiàn)如下:

          6ac7317b73899f14211d96fde27b19c9.webp

          be4b0c6f582a9b224dc81aba9a176159.webp

          其中比較核心的方法是 run() 方法中的 org.apache.kafka.clients.producer.internals.Sender#sendProducerData

          其中 pollTimeout 意思是最長(zhǎng)阻塞到至少有一個(gè)通道在你注冊(cè)的事件就緒了。返回 0 則表示走起發(fā)車了。

          62090b502c85c484e7c10a13434fd5cd.webp
          我們繼續(xù)跟下:org.apache.kafka.clients.producer.internals.RecordAccumulator#ready
          5e4de533a380bebe88b6965d016c4317.webp
          最后再來(lái)看下里面這個(gè)方法 org.apache.kafka.clients.producer.internals.RecordAccumulator#drain,從accumulator 緩沖區(qū)獲取要發(fā)送的數(shù)據(jù),最大一次性發(fā) max.request.size 大小的數(shù)據(jù)。
          2101019baa7c3e5eed562c7107120a6e.webp

          daef410f07c2bb928cb991fcaf5ddb77.webp

          六、總結(jié)

          最后為了讓你對(duì) Kafka Producer 有個(gè)宏觀的架構(gòu)理解,請(qǐng)看下圖:

          acff773065380a475058bfa8205aa869.webp

          簡(jiǎn)要說(shuō)明:
          • new KafkaProducer() 后創(chuàng)建一個(gè)后臺(tái)線程 KafkaThread (實(shí)際運(yùn)行線程是 Sender,KafkaThread 是對(duì) Sender 的封裝) 掃描 RecordAccumulator 中是否有消息。

          • 調(diào)用 KafkaProducer.send() 發(fā)送消息,實(shí)際是將消息保存到 RecordAccumulator 中,實(shí)際上就是保存到一個(gè) Map 中 (ConcurrentMap<topicpartition, deque

          • 后臺(tái)的獨(dú)立線程掃描到 RecordAccumulator 中有消息后,會(huì)將消息發(fā)送到 Kafka 集群中 (不是一有消息就發(fā)送,而是要看消息是否 ready)

          • 如果發(fā)送成功 (消息成功寫入 Kafka), 就返回一個(gè) RecordMetaData 對(duì)象,它包括了主題和分區(qū)信息,以及記錄在分區(qū)里的偏移量。

          • 如果寫入失敗,就會(huì)返回一個(gè)錯(cuò)誤,生產(chǎn)者在收到錯(cuò)誤之后會(huì)嘗試重新發(fā)送消息 (如果允許的話,此時(shí)會(huì)將消息在保存到 RecordAccumulator 中),幾次之后如果還是失敗就返回錯(cuò)誤消息。

          好了,本文對(duì) Kafka Producer 源碼進(jìn)行了解析,下一篇文章將會(huì)詳細(xì)介紹 metadata 的內(nèi)容以及在 Producer 端 metadata 的更新機(jī)制。敬請(qǐng)期待~



          歡迎大家關(guān)注我的公眾號(hào)【老周聊架構(gòu)】,Java后端主流技術(shù)棧的原理、源碼分析、架構(gòu)以及各種互聯(lián)網(wǎng)高并發(fā)、高性能、高可用的解決方案。

          喜歡的話,點(diǎn)贊、再看、分享三連。

          3cabcbe7e1e8dbe9d5c21ab00ea8d408.webp

          點(diǎn)個(gè)在看你最好看

          瀏覽 54
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  欧美日韩国产无码 | 日韩性爱视频 | 天天爱,天天操 | 摸人妻精品导航 | 欧美爆操 |