圖解Kafka消息發(fā)送者核心參數(shù)與工作機制
點擊上方“中間件興趣圈”,選擇“設為星標”
掌握一到兩門java主流中間件,是敲開BAT等大廠必備的技能,送給大家一個Java中間件學習路線,助力大家實現(xiàn)職場的蛻變。java中間件進階路線(三本電子書、12個免費專欄)
本文將從Kafka Producer的配置屬性為突破口,結(jié)合源碼深入提煉出Kafka Producer的工作機制,方便大家更好使用Kafka Producer,并且胸有成竹的進行性能調(diào)優(yōu)。
將Kafka Producer相關(guān)的參數(shù)分成如下幾個類型:
常規(guī)參數(shù)
工作原理(性能相關(guān))參數(shù)(圖解)
本文會結(jié)合圖解方式,重點闡述與Kafka生產(chǎn)者運作機制密切相關(guān)的參數(shù)。
1、常規(guī)參數(shù)
為了更好的使用Kafka Producer,首先介紹一下幾個基本參數(shù)。
bootstrap.servers
配置Kafka broker的服務器地址列表,多個用英文逗號分開,可以不必寫全,Kafka內(nèi)部有自動感知Kafka broker的機制。client.dns.lookup
客戶端尋找bootstrap地址的方式,支持如下兩種方式:resolve_canonical_bootstrap_servers_only
這種方式,會依據(jù)bootstrap.servers提供的主機名(hostname),根據(jù)主機上的名稱服務返回其IP地址的數(shù)組(InetAddress.getAllByName),然后依次獲取inetAddress.getCanonicalHostName(),再建立tcp連接。
一個主機可配置多個網(wǎng)卡,如果啟用該功能,應該可以有效利用多網(wǎng)卡的優(yōu)勢,降低Broker的網(wǎng)絡端負載壓力。use_all_dns_ips
這種方式會直接使用bootstrap.servers中提供的hostname、port創(chuàng)建tcp連接,默認選項。compression.type
消息的壓縮算法,目前可選值:none、gzip、snappy、lz4、zstd,默認不壓縮,建議與Kafka服務器配置的一樣,當然Kafka服務端可以配置的壓縮類型為 producer,即采用與發(fā)送方配置的壓縮類型。發(fā)送方與Broker 服務器采用相同的壓縮類型,可有效避免在Broker服務端進行消息的壓縮與解壓縮,大大降低Broker的CPU使用壓力。client.id
客戶端ID,如果不設置默認為producer-遞增,強烈建議設置該值,盡量包含ip,port,pid。send.buffer.bytes
網(wǎng)絡通道(TCP)的發(fā)送緩存區(qū)大小,默認為128K。receive.buffer.bytes
網(wǎng)絡通道(TCP)的接收緩存區(qū)大小,默認為32K。reconnect.backoff.ms
重新建立鏈接的等待時長,默認為50ms,屬于底層網(wǎng)絡參數(shù),基本無需關(guān)注。reconnect.backoff.max.ms
重新建立鏈接的最大等待時長,默認為1s,連續(xù)兩次對同一個連接建立重連,等待時間會在reconnect.backoff.ms的初始值上成指數(shù)級遞增,但超過max后,將不再指數(shù)級遞增。key.serializer
消息key的序列化策略,為org.apache.kafka.common.serialization接口的實現(xiàn)類。value.serializer
消息體的序列化策略partitioner.class
消息發(fā)送隊列負載算法,其默 DefaultPartitioner,路由算法如下:如果指定了 key ,則使用 key 的 hashcode 與分區(qū)數(shù)取模。
如果未指定 key,則輪詢所有的分區(qū)。
interceptor.classes
攔截器列表,kafka運行在消息真正發(fā)送到broker之前對消息進行攔截加工。enable.idempotence
是否開啟發(fā)送端的冪等,這個機制后續(xù)會重點剖析其實現(xiàn)原理,默認為false。transaction.timeout.ms
事務協(xié)調(diào)器等待客戶端的事務狀態(tài)反饋的最大超時時間,默認為60s。transactional.id
事務id,用于在一個事務中唯一標識一個客戶端。
2、工作原理相關(guān)參數(shù)
2.1 核心參數(shù)一覽
工作機制相關(guān)參數(shù),涉及到消息發(fā)送是如何工作的,本節(jié)首先將羅列參數(shù),做簡單說明,然后再給出運作圖,進一步闡述其工作機制。
buffer.memory
用于設置一個生產(chǎn)者(KafkaProducer)中緩存池的內(nèi)存大小,默認為32M。max.block.ms
當消息發(fā)送者申請空閑內(nèi)存時,如果可用內(nèi)存不足的等待時長,默認為60s,如果在指定時間內(nèi)未申請到內(nèi)存,消息發(fā)送端會直接報TimeoutException,這個時間包含了發(fā)送端用于查找元信息的時間。retries
重試次數(shù),Kafka Sender線程從緩存區(qū)嘗試發(fā)送到Broker端的重試次數(shù),默認為Integer.MAX_VALUE,為了避免無限重試,只針對可恢復的異常,例如Leader選舉中這種異常就是可恢復的,重試最終是能解決問題的。acks
用來定義消息“已提交”的條件(標準),就是 Broker 端向客戶端承偌已提交的條件,可選值如下:0
表示生產(chǎn)者不關(guān)心該條消息在 broker 端的處理結(jié)果,只要調(diào)用 KafkaProducer 的 send 方法返回后即認為成功,顯然這種方式是最不安全的,因為 Broker 端可能壓根都沒有收到該條消息或存儲失敗。all 或 -1
表示消息不僅需要 Leader 節(jié)點已存儲該消息,并且要求其副本(準確的來說是 ISR 中的節(jié)點)全部存儲才認為已提交,才向客戶端返回提交成功。這是最嚴格的持久化保障,當然性能也最低。1
表示消息只需要寫入 Leader 節(jié)點后就可以向客戶端返回提交成功。batch.size
在消息發(fā)送端Kafka引入了批的概念,發(fā)送到服務端的消息通常不是一條一條發(fā)送,而是一批一批發(fā)送,該值用于設置每一個批次的內(nèi)存大小,一個批次對應源碼層級為ProducerBatch對象,默認為16K。linger.ms
該參數(shù)與batch.size配合使用。Kafka希望一個批次一個批次去發(fā)送到Broker,應用程序往KafkaProducer中發(fā)送一條消息,首先會進入到內(nèi)部緩沖區(qū),具體是會進入到某一個批次中(ProducerBatch),等待該批次堆滿后一次發(fā)送到Broker,這樣能提高消息的吞吐量,但其消息發(fā)送的延遲也會相應提高,試想一下,如果在某一個時間端,應用端發(fā)送到broker的消息太少,不足以填滿一個批次,那豈不是消息一直無法發(fā)送到Broker端嗎?為了解決該問題,linger.ms參數(shù)應運而生。它的作用是控制在緩存區(qū)中未積滿時來控制消息發(fā)送線程的行為。如果linger.ms 設置為 0表示立即發(fā)送,如果設置為大于0,則消息發(fā)送線程會等待這個值后才會向broker發(fā)送。有點類似于 TCP 領(lǐng)域的 Nagle 算法。
delivery.timeout.ms
消息在客戶端緩存中的過期時間,在Kafka的消息發(fā)送模型中,消息先進入到消息發(fā)送端的雙端緩存隊列中,然后單獨一個線程將緩存區(qū)中的消息發(fā)送到Broker,該參數(shù)控制在雙端隊列中的過期時間,默認為120s,從進入雙端隊列開始計時,超過該值后會返回超時異常(TimeoutException)。request.timeout.ms
請求的超時時間,主要是Kafka消息發(fā)送線程(Sender)與Broker端的網(wǎng)絡通訊的請求超時時間。max.request.size
Send線程一次發(fā)送的最大字節(jié)數(shù)量,也就是Send線程向服務端一次消息發(fā)送請求的最大傳輸數(shù)據(jù),默認為1M。max.in.flight.requests.per.connection
設置每一個客戶端與服務端連接,在應用層一個通道的積壓消息數(shù)量,默認為5,有點類似Netty用高低水位線控制發(fā)送緩沖區(qū)中積壓的多少,避免內(nèi)存溢出。
2.2 圖解工作原理
上面的核心參數(shù)在表述上可能不夠直觀,接下來我想簡單通過兩張圖闡述一下Kafka消息發(fā)送相關(guān)的核心原理。
首先,我們來看一下消息發(fā)送者相關(guān)的數(shù)據(jù)結(jié)構(gòu):

Kafka的每一個消息發(fā)送者,也就是KafkaProducer對象內(nèi)部會有一塊緩存區(qū),其總大小由buffer.memory指定,默認為32M,但內(nèi)存的組織會按照topic+parition構(gòu)建雙端隊列,隊列中的每一個元素為一個ProducerBatch對象,表示一個消息發(fā)送批次,但發(fā)送線程將消息發(fā)送到Broker端時,一次可以包含多個批次。一次允許發(fā)送的消息總大小受max.request.size控制,默認為1M。
在了解了核心數(shù)據(jù)結(jié)構(gòu)后,我們再看一下各個核心參數(shù)在消息發(fā)送的各個階段是如何工作的。
2.3 性能優(yōu)化
從Kafka Producer 的工作原理來看,在客戶端所謂的性能優(yōu)化,其實就是延遲、吞吐率、數(shù)據(jù)完整性的一個權(quán)衡。在具體的實踐中通常可以調(diào)整的參數(shù)主要如下:
acks 這個只能是根據(jù)業(yè)務的特點,對數(shù)據(jù)丟失的容忍度,通常該參數(shù)在實踐過程中遇到性能瓶頸后,調(diào)整該參數(shù)的可能性幾乎沒有,因為需要犧牲數(shù)據(jù)的完整性,此舉并不是一個好的方案。
batch.size 與 linger.ms
通??梢赃m當修改batch.size與linger.ms的值,特別是linger.ms值,犧牲一定的延時,方便更多數(shù)據(jù)進入到Batch,從而提高Sender線程一次發(fā)送的數(shù)據(jù)大小,提高帶寬,顯著提高吞吐率,但犧牲延時。當然如果是想提高響應延遲,則采取的手段則恰恰相反。buffer.memory、max.request.size
如果需要進一步提高吞吐量,可以適當提高buffer.memory的大小,讓客戶端能緩存更多數(shù)據(jù),并且調(diào)高max.request.size,進一步提高單次消息發(fā)送的消息量。
從工作原理圖中要得到上述的方式并不難,但我們有沒有辦法正確的評估到底是需要調(diào)整batch.size,還是要調(diào)整uffer.memory呢?
有沒有科學的指導,如何判斷一個客戶端當前的瓶頸到底在什么地方,如何針對性的進行調(diào)優(yōu)呢?請持續(xù)關(guān)注本公眾號,該部分將在下一篇文章中與大家詳細剖析,我們下期再會。
好了,本文就介紹到這里了,三連(關(guān)注、點贊、留言)是對我最大的鼓勵
歡迎關(guān)注微信公眾號:互聯(lián)網(wǎng)全棧架構(gòu),收取更多有價值的信息。

