Kafka 萬億級(jí)消息實(shí)戰(zhàn)
作者:vivo互聯(lián)網(wǎng)服務(wù)器團(tuán)隊(duì)-Yang Yijun
一、Kafka應(yīng)用



//未指定遷移目錄的遷移計(jì)劃{"version":1,"partitions":[{"topic":"yyj4","partition":0,"replicas":[1000003,1000004]},{"topic":"yyj4","partition":1,"replicas":[1000003,1000004]},{"topic":"yyj4","partition":2,"replicas":[1000003,1000004]}]}
//指定遷移目錄的遷移計(jì)劃{"version":1,"partitions":[{"topic":"yyj1","partition":0,"replicas":[1000006,1000005],"log_dirs":["/data1/bigdata/mydata1","/data1/bigdata/mydata3"]},{"topic":"yyj1","partition":1,"replicas":[1000006,1000005],"log_dirs":["/data1/bigdata/mydata1","/data1/bigdata/mydata3"]},{"topic":"yyj1","partition":2,"replicas":[1000006,1000005],"log_dirs":["/data1/bigdata/mydata1","/data1/bigdata/mydata3"]}]}


/config/users/<user>/clients/<client-id> //根據(jù)用戶和客戶端ID組合限流/config/users/<user>/clients/<default>/config/users/<user>//根據(jù)用戶限流 這種限流方式是我們最常用的方式/config/users/<default>/clients/<client-id>/config/users/<default>/clients/<default>/config/users/<default>/config/clients/<client-id>/config/clients/<default>
(1)消費(fèi)流量指標(biāo):ObjectName:kafka.server:type=Fetch,user=acl認(rèn)證用戶名稱 屬性:byte-rate(用戶在當(dāng)前broker的出流量)、throttle-time(用戶在當(dāng)前broker的出流量被限制時(shí)間)(2)生產(chǎn)流量指標(biāo):ObjectName:kafka.server:type=Produce,user=acl認(rèn)證用戶名稱 屬性:byte-rate(用戶在當(dāng)前broker的入流量)、throttle-time(用戶在當(dāng)前broker的入流量被限制時(shí)間)


//副本同步限流配置共涉及以下4個(gè)參數(shù)leader.replication.throttled.ratefollower.replication.throttled.rateleader.replication.throttled.replicasfollower.replication.throttled.replicas
(1)副本同步出流量指標(biāo):ObjectName:kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesOutPerSec(2)副本同步入流量指標(biāo):ObjectName:kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesInPerSec


Kafka Manager;
Kafka Eagle;
Kafka Monitor;
KafkaOffsetMonitor;









Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//ClientMetricsReporter類實(shí)現(xiàn)org.apache.kafka.common.metrics.MetricsReporter接口props.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, ClientMetricsReporter.class.getName());...

維度:用戶名稱、客戶端ID、客戶端IP、topic名稱、集群名稱、brokerIP; 指標(biāo):連接數(shù)、IO等待時(shí)間、生產(chǎn)流量大小、生產(chǎn)記錄數(shù)、請(qǐng)求次數(shù)、請(qǐng)求延時(shí)、發(fā)送錯(cuò)誤/重試次數(shù)等。
維度:用戶名稱、客戶端ID、客戶端IP、topic名稱、集群名稱、消費(fèi)組、brokerIP、topic分區(qū); 指標(biāo):連接數(shù)、io等待時(shí)間、消費(fèi)流量大小、消費(fèi)記錄數(shù)、消費(fèi)延時(shí)、topic分區(qū)消費(fèi)延遲記錄等。
1) Zookeeper進(jìn)程監(jiān)控;
2) Zookeeper的leader切換監(jiān)控;
3) Zookeeper服務(wù)的錯(cuò)誤日志監(jiān)控;


必須保證topic分區(qū)leader與follower輪詢的分布在資源組內(nèi)所有broker上,讓流量分布更加均衡,同時(shí)需要考慮相同分區(qū)不同副本跨機(jī)架分布以提高容災(zāi)能力;
當(dāng)topic分區(qū)leader個(gè)數(shù)除以資源組節(jié)點(diǎn)個(gè)數(shù)有余數(shù)時(shí),需要把余數(shù)分區(qū)leader優(yōu)先考慮放入流量較低的broker。
擴(kuò)容智能評(píng)估:根據(jù)集群負(fù)載,把是否需要擴(kuò)容評(píng)估程序化、智能化;
智能擴(kuò)容:當(dāng)評(píng)估需要擴(kuò)容后,把擴(kuò)容流程以及流量均衡平臺(tái)化。
一些老化的服務(wù)器需要下線,實(shí)現(xiàn)節(jié)點(diǎn)下線平臺(tái)化;
服務(wù)器故障,broker故障無法恢復(fù),我們需要下線故障服務(wù)器,實(shí)現(xiàn)節(jié)點(diǎn)下線平臺(tái)化;
有更優(yōu)配置的服務(wù)器替換已有broker節(jié)點(diǎn),實(shí)現(xiàn)下線節(jié)點(diǎn)平臺(tái)化。





1)生成副本遷移計(jì)劃以及執(zhí)行遷移任務(wù)平臺(tái)化、自動(dòng)化、智能化; 2)執(zhí)行均衡后broker間流量比較均勻,且單個(gè)topic分區(qū)均勻分布在所有broker節(jié)點(diǎn)上;
3)執(zhí)行均衡后broker內(nèi)部多塊磁盤間流量比較均衡;
2. Introduction to Kafka Cruise Control
3. Cloudera Cruise Control REST API Reference

1)選擇核心指標(biāo)作為生成遷移計(jì)劃的依據(jù),比如出流量、入流量、機(jī)架、單topic分區(qū)分散性等;
2)優(yōu)化用來生成遷移計(jì)劃的指標(biāo)樣本,比如過濾流量突增/突降/掉零等異常樣本;
3)各資源組的遷移計(jì)劃需要使用的樣本全部為資源組內(nèi)部樣本,不涉及其他資源組,無交叉;
4)治理單分區(qū)過大topic,讓topic分區(qū)分布更分散,流量不集中在部分broker,讓topic單分區(qū)數(shù)據(jù)量更小,這樣可以減少遷移的數(shù)據(jù)量,提升遷移速度;
5)已經(jīng)均勻分散在資源組內(nèi)的topic,加入遷移黑名單,不做遷移,這樣可以減少遷移的數(shù)據(jù)量,提升遷移速度;
6)做topic治理,排除長期無流量topic對(duì)均衡的干擾;
7)新建topic或者topic分區(qū)擴(kuò)容時(shí),應(yīng)讓所有分區(qū)輪詢分布在所有broker節(jié)點(diǎn),輪詢后余數(shù)分區(qū)優(yōu)先分布流量較低的broker;
8)擴(kuò)容broker節(jié)點(diǎn)后開啟負(fù)載均衡時(shí),優(yōu)先把同一broker分配了同一大流量(流量大而不是存儲(chǔ)空間大,這里可以認(rèn)為是每秒的吞吐量)topic多個(gè)分區(qū)leader的,遷移一部分到新broker節(jié)點(diǎn);
9)提交遷移任務(wù)時(shí),同一批遷移計(jì)劃中的分區(qū)數(shù)據(jù)大小偏差應(yīng)該盡可能小,這樣可以避免遷移任務(wù)中小分區(qū)遷移完成后長時(shí)間等待大分區(qū)的遷移,造成任務(wù)傾斜;
(1)生產(chǎn)者權(quán)限認(rèn)證;
(2)消費(fèi)者權(quán)限認(rèn)證;
(3)指定數(shù)據(jù)目錄遷移安全認(rèn)證;
GitHub地址:https://github.com 精確KIP地址 :https://cwiki.apache.org
num.network.threads創(chuàng)建Processor處理網(wǎng)絡(luò)請(qǐng)求線程個(gè)數(shù),建議設(shè)置為broker當(dāng)CPU核心數(shù)*2,這個(gè)值太低經(jīng)常出現(xiàn)網(wǎng)絡(luò)空閑太低而缺失副本。num.io.threads創(chuàng)建KafkaRequestHandler處理具體請(qǐng)求線程個(gè)數(shù),建議設(shè)置為broker磁盤個(gè)數(shù)*2num.replica.fetchers建議設(shè)置為CPU核心數(shù)/4,適當(dāng)提高可以提升CPU利用率及follower同步leader數(shù)據(jù)當(dāng)并行度。compression.type建議采用lz4壓縮類型,壓縮可以提升CPU利用率同時(shí)可以減少網(wǎng)絡(luò)傳輸數(shù)據(jù)量。queued.max.requests如果是生產(chǎn)環(huán)境,建議配置最少500以上,默認(rèn)為500。log.flush.scheduler.interval.mslog.flush.interval.mslog.flush.interval.messages這幾個(gè)參數(shù)表示日志數(shù)據(jù)刷新到磁盤的策略,應(yīng)該保持默認(rèn)配置,刷盤策略讓操作系統(tǒng)去完成,由操作系統(tǒng)來決定什么時(shí)候把數(shù)據(jù)刷盤;如果設(shè)置來這個(gè)參數(shù),可能對(duì)吞吐量影響非常大;auto.leader.rebalance.enable表示是否開啟leader自動(dòng)負(fù)載均衡,默認(rèn)true;我們應(yīng)該把這個(gè)參數(shù)設(shè)置為false,因?yàn)樽詣?dòng)負(fù)載均衡不可控,可能影響集群性能和穩(wěn)定;
linger.ms#客戶端生產(chǎn)消息等待多久時(shí)間才發(fā)送到服務(wù)端,單位:毫秒。和batch.size參數(shù)配合使用;適當(dāng)調(diào)大可以提升吞吐量,但是如果客戶端如果down機(jī)有丟失數(shù)據(jù)風(fēng)險(xiǎn);batch.size#客戶端發(fā)送到服務(wù)端消息批次大小,和linger.ms參數(shù)配合使用;適當(dāng)調(diào)大可以提升吞吐量,但是如果客戶端如果down機(jī)有丟失數(shù)據(jù)風(fēng)險(xiǎn);compression.type#建議采用lz4壓縮類型,具備較高的壓縮比及吞吐量;由于Kafka對(duì)CPU的要求并不高,所以,可以通過壓縮,充分利用CPU資源以提升網(wǎng)絡(luò)吞吐量;buffer.memory#客戶端緩沖區(qū)大小,如果topic比較大,且內(nèi)存比較充足,可以適當(dāng)調(diào)高這個(gè)參數(shù),默認(rèn)只為33554432(32MB)retries#生產(chǎn)失敗后的重試次數(shù),默認(rèn)0,可以適當(dāng)增加。當(dāng)重試超過一定次數(shù)后,如果業(yè)務(wù)要求數(shù)據(jù)準(zhǔn)確性較高,建議做容錯(cuò)處理。retry.backoff.ms#生產(chǎn)失敗后,重試時(shí)間間隔,默認(rèn)100ms,建議不要設(shè)置太大或者太小。
1)topic分區(qū)集中落在某幾個(gè)broker節(jié)點(diǎn)上,導(dǎo)致流量副本失衡;
2)導(dǎo)致broker節(jié)點(diǎn)內(nèi)部某幾塊磁盤讀寫超負(fù)載,存儲(chǔ)被寫爆;
當(dāng)topic數(shù)據(jù)量非常大時(shí),建議一個(gè)分區(qū)開啟一個(gè)線程去消費(fèi); 對(duì)topic消費(fèi)延時(shí)添加監(jiān)控告警,及時(shí)發(fā)現(xiàn)處理; 當(dāng)topic數(shù)據(jù)可以丟棄時(shí),遇到超大延時(shí),比如單個(gè)分區(qū)延遲記錄超過千萬甚至數(shù)億,那么可以重置topic的消費(fèi)點(diǎn)位進(jìn)行緊急處理;【此方案一般在極端場景才使用】 避免重置topic的分區(qū)offset到很早的位置,這可能造成拉取大量歷史數(shù)據(jù);
提升操作效率; 操作出錯(cuò)概率更小,集群更安全; 所有操作有跡可循,可以追溯;
在平臺(tái)上為用戶的topic提供生產(chǎn)樣例數(shù)據(jù)與消費(fèi)抽樣的功能,用戶可以不用自己寫代碼也可以測試topic是否可以使用,權(quán)限是否正常;
在平臺(tái)上為用戶的topic提供生產(chǎn)/消費(fèi)權(quán)限驗(yàn)證功能,讓用戶可以明確自己的賬號(hào)對(duì)某個(gè)topic有沒有讀寫權(quán)限;
1)無流量topic的治理,對(duì)集群中無流量topic進(jìn)行清理,減少過多無用元數(shù)據(jù)對(duì)集群造成的壓力;
2)topic分區(qū)數(shù)據(jù)大小治理,把topic分區(qū)數(shù)據(jù)量過大的topic(如單分區(qū)數(shù)據(jù)量超過100GB/天)進(jìn)行梳理,看看是否需要擴(kuò)容,避免數(shù)據(jù)集中在集群部分節(jié)點(diǎn)上;
3)topic分區(qū)數(shù)據(jù)傾斜治理,避免客戶端在生產(chǎn)消息的時(shí)候,指定消息的key,但是key過于集中,消息只集中分布在部分分區(qū),導(dǎo)致數(shù)據(jù)傾斜;
4)topic分區(qū)分散性治理,讓topic分區(qū)分布在集群盡可能多的broker上,這樣可以避免因topic流量突增,流量只集中到少數(shù)節(jié)點(diǎn)上的風(fēng)險(xiǎn),也可以避免某個(gè)broker異常對(duì)topic影響非常大;
5)topic分區(qū)消費(fèi)延時(shí)治理;一般有延時(shí)消費(fèi)較多的時(shí)候有兩種情況,一種是集群性能下降,另外一種是業(yè)務(wù)方的消費(fèi)并發(fā)度不夠,如果是消費(fèi)者并發(fā)不夠的化應(yīng)該與業(yè)務(wù)聯(lián)系增加消費(fèi)并發(fā)。
1)把所有指標(biāo)采集做成平臺(tái)可配置,提供統(tǒng)一的指標(biāo)采集和指標(biāo)展示及告警平臺(tái),實(shí)現(xiàn)一體化監(jiān)控; 2)把上下游業(yè)務(wù)進(jìn)行關(guān)聯(lián),做成全鏈路監(jiān)控; 3)用戶可以配置topic或者分區(qū)流量延時(shí)、突變等監(jiān)控告警;
1)為我們進(jìn)行資源申請(qǐng)?jiān)u估提供依據(jù); 2)讓我們更了解集群的讀寫能力及瓶頸在哪里,針對(duì)瓶頸進(jìn)行優(yōu)化; 3)為我們限流閾值設(shè)置提供依據(jù); 4)為我們?cè)u(píng)估什么時(shí)候應(yīng)該擴(kuò)容提供依據(jù);
1)為我們創(chuàng)建topic時(shí),評(píng)估應(yīng)該指定多少分區(qū)合理提供依據(jù); 2)為我們topic的分區(qū)擴(kuò)容評(píng)估提供依據(jù);
1)為我們了解磁盤的真正讀寫能力,為我們選擇更合適Kafka的磁盤類型提供依據(jù); 2)為我們做磁盤流量告警閾值設(shè)置提供依據(jù);
1)我們需要了解單個(gè)集群規(guī)模的上限或者是元數(shù)據(jù)規(guī)模的上限,探索相關(guān)信息對(duì)集群性能和穩(wěn)定性的影響; 2)根據(jù)摸底情況,評(píng)估集群節(jié)點(diǎn)規(guī)模的合理范圍,及時(shí)預(yù)測風(fēng)險(xiǎn),進(jìn)行超大集群的拆分等工作;
二、開源版本功能缺陷
無法實(shí)現(xiàn)增量遷移;【我們已經(jīng)基于2.1.1版本源碼改造,實(shí)現(xiàn)了增量遷移】
無法實(shí)現(xiàn)并發(fā)遷移;【開源版本直到2.6.0才實(shí)現(xiàn)了并發(fā)遷移】
無法實(shí)現(xiàn)終止遷移;【我們已經(jīng)基于2.1.1版本源碼改造,實(shí)現(xiàn)了終止副本遷移】【開源版本直到2.6.0才實(shí)現(xiàn)了暫停遷移,和終止遷移有些不一樣,不會(huì)回滾元數(shù)據(jù)】
當(dāng)指定遷移數(shù)據(jù)目錄時(shí),遷移過程中,如果把topic保留時(shí)間改短,topic保留時(shí)間針對(duì)正在遷移topic分區(qū)不生效,topic分區(qū)過期數(shù)據(jù)無法刪除;【開源版本bug,目前還沒有修復(fù)】
當(dāng)指定遷移數(shù)據(jù)目錄時(shí),當(dāng)遷移計(jì)劃為以下場景時(shí),整個(gè)遷移任務(wù)無法完成遷移,一直處于卡死狀態(tài);【開源版本bug,目前還沒有修復(fù)】
遷移過程中,如果有重啟broker節(jié)點(diǎn),那個(gè)broker節(jié)點(diǎn)上的所有l(wèi)eader分區(qū)無法切換回來,導(dǎo)致節(jié)點(diǎn)流量全部轉(zhuǎn)移到其他節(jié)點(diǎn),直到所有副本被遷移完畢后leader才會(huì)切換回來;【開源版本bug,目前還沒有修復(fù)】。
在原生的Kafka版本中存在以下指定數(shù)據(jù)目錄場景無法遷移完畢的情況,此版本我們也不決定修復(fù)次bug:1.針對(duì)同一個(gè)topic分區(qū),如果部分目標(biāo)副本相比原副本是所屬broker發(fā)生變化,部分目標(biāo)副本相比原副本是broker內(nèi)部所屬數(shù)據(jù)目錄發(fā)生變化;那么副本所屬broker發(fā)生變化的那個(gè)目標(biāo)副本可以正常遷移完畢,目標(biāo)副本是在broker內(nèi)部數(shù)據(jù)目錄發(fā)生變化的無法正常完成遷移;但是舊副本依然可以正常提供生產(chǎn)、消費(fèi)服務(wù),并且不影響下一次遷移任務(wù)的提交,下一次遷移任務(wù)只需要把此topic分區(qū)的副本列表所屬broker列表變更后提交依然可以正常完成遷移,并且可以清理掉之前未完成的目標(biāo)副本;這里假設(shè)topic yyj1的初始化副本分布情況如下:{"version":1,"partitions":[{"topic":"yyj","partition":0,"replicas":[1000003,1000001],"log_dirs":["/kfk211data/data31","/kfk211data/data13"]}]}//遷移場景1:{"version":1,"partitions":[{"topic":"yyj","partition":0,"replicas":[1000003,1000002],"log_dirs":["/kfk211data/data32","/kfk211data/data23"]}]}//遷移場景2:{"version":1,"partitions":[{"topic":"yyj","partition":0,"replicas":[1000002,1000001],"log_dirs":["/kfk211data/data22","/kfk211data/data13"]}]}針對(duì)上述的topic yyj1的分布分布情況,此時(shí)如果我們的遷移計(jì)劃為“遷移場景1”或遷移場景2“,那么都將出現(xiàn)有副本無法遷移完畢的情況。但是這并不影響舊副本處理生產(chǎn)、消費(fèi)請(qǐng)求,并且我們可以正常提交其他的遷移任務(wù)。為了清理舊的未遷移完成的副本,我們只需要修改一次遷移計(jì)劃【新的目標(biāo)副本列表和當(dāng)前分區(qū)已分配副本列表完全不同即可】,再次提交遷移即可。這里,我們依然以上述的例子做遷移計(jì)劃修改如下:{"version":1,"partitions":[{"topic":"yyj","partition":0,"replicas":[1000004,1000005],"log_dirs":["/kfk211data/data42","/kfk211data/data53"]}]}這樣我們就可以正常完成遷移。
/config/users/<user>/clients/<client-id>/config/users/<user>/clients/<default>/config/users/<user>/config/users/<default>/clients/<client-id>/config/users/<default>/clients/<default>/config/users/<default>/config/clients/<client-id>/config/clients/<default>
(1)改造源碼,實(shí)現(xiàn)單個(gè)broker流量上限限制,只要流量達(dá)到broker上限立即進(jìn)行限流處理,所有往這個(gè)broker寫的用戶都可以被限制??;或者對(duì)用戶進(jìn)行優(yōu)先級(jí)處理,放過高優(yōu)先級(jí)的,限制低優(yōu)先級(jí)的;
(2)改造源碼,實(shí)現(xiàn)broker上單塊磁盤流量上限限制(很多時(shí)候都是流量集中到某幾塊磁盤上,導(dǎo)致沒有達(dá)到broker流量上限卻超過了單磁盤讀寫能力上限),只要磁盤流量達(dá)到上限,立即進(jìn)行限流處理,所有往這個(gè)磁盤寫的用戶都可以被限制??;或者對(duì)用戶進(jìn)行優(yōu)先級(jí)處理,放過高優(yōu)先級(jí)的,限制低優(yōu)先級(jí)的;
(3)改造源碼,實(shí)現(xiàn)topic維度限流以及對(duì)topic分區(qū)的禁寫功能;
(4)改造源碼,實(shí)現(xiàn)用戶、broker、磁盤、topic等維度組合精準(zhǔn)限流;
三、kafka發(fā)展趨勢
3.8 Kafka所有KIP地址
四、如何貢獻(xiàn)社區(qū)
2)https://issues.apache.org/jira/secure/BrowseProjects.jspa?selectedCategory=all
評(píng)論
圖片
表情
