<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Kafka面試題系列(基礎(chǔ)篇)

          共 8128字,需瀏覽 17分鐘

           ·

          2020-07-28 13:23

          點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)

          回復(fù)”資源“獲取更多資源

          大數(shù)據(jù)技術(shù)與架構(gòu)
          點(diǎn)擊右側(cè)關(guān)注,大數(shù)據(jù)開發(fā)領(lǐng)域最強(qiáng)公眾號(hào)!

          暴走大數(shù)據(jù)
          點(diǎn)擊右側(cè)關(guān)注,暴走大數(shù)據(jù)!

          Kafka的用途有哪些?使用場景如何?

          • 消息系統(tǒng):Kafka 和傳統(tǒng)的消息系統(tǒng)(也稱作消息中間件)都具備系統(tǒng)解耦、冗余存儲(chǔ)、流量削峰、緩沖、異步通信、擴(kuò)展性、可恢復(fù)性等功能。與此同時(shí),Kafka 還提供了大多數(shù)消息系統(tǒng)難以實(shí)現(xiàn)的消息順序性保障及回溯消費(fèi)的功能。

          • 存儲(chǔ)系統(tǒng):Kafka 把消息持久化到磁盤,相比于其他基于內(nèi)存存儲(chǔ)的系統(tǒng)而言,有效地降低了數(shù)據(jù)丟失的風(fēng)險(xiǎn)。也正是得益于 Kafka 的消息持久化功能和多副本機(jī)制,我們可以把 Kafka 作為長期的數(shù)據(jù)存儲(chǔ)系統(tǒng)來使用,只需要把對(duì)應(yīng)的數(shù)據(jù)保留策略設(shè)置為“永久”或啟用主題的日志壓縮功能即可。

          • 流式處理平臺(tái):Kafka 不僅為每個(gè)流行的流式處理框架提供了可靠的數(shù)據(jù)來源,還提供了一個(gè)完整的流式處理類庫,比如窗口、連接、變換和聚合等各類操作。

          Kafka中的ISR、AR又代表什么?ISR的伸縮又指什么

          分區(qū)中的所有副本統(tǒng)稱為 AR(Assigned Replicas)。所有與 leader 副本保持一定程度同步的副本(包括 leader 副本在內(nèi))組成ISR(In-Sync Replicas),ISR 集合是 AR 集合中的一個(gè)子集。

          ISR的伸縮:
          leader 副本負(fù)責(zé)維護(hù)和跟蹤 ISR 集合中所有 follower 副本的滯后狀態(tài),當(dāng) follower 副本落后太多或失效時(shí),leader 副本會(huì)把它從 ISR 集合中剔除。如果 OSR 集合中有 follower 副本“追上”了 leader 副本,那么 leader 副本會(huì)把它從 OSR 集合轉(zhuǎn)移至 ISR 集合。默認(rèn)情況下,當(dāng) leader 副本發(fā)生故障時(shí),只有在 ISR 集合中的副本才有資格被選舉為新的 leader,而在 OSR 集合中的副本則沒有任何機(jī)會(huì)(不過這個(gè)原則也可以通過修改相應(yīng)的參數(shù)配置來改變)。

          replica.lag.time.max.ms :這個(gè)參數(shù)的含義是 Follower 副本能夠落后 Leader 副本的最長時(shí)間間隔,當(dāng)前默認(rèn)值是 10 秒。

          unclean.leader.election.enable:是否允許 Unclean 領(lǐng)導(dǎo)者選舉。開啟 Unclean 領(lǐng)導(dǎo)者選舉可能會(huì)造成數(shù)據(jù)丟失,但好處是,它使得分區(qū) Leader 副本一直存在,不至于停止對(duì)外提供服務(wù),因此提升了高可用性。

          Kafka中的HW、LEO、LSO、LW等分別代表什么?

          HW 是 High Watermark 的縮寫,俗稱高水位,它標(biāo)識(shí)了一個(gè)特定的消息偏移量(offset),消費(fèi)者只能拉取到這個(gè) offset 之前的消息。

          LSO是LogStartOffset,一般情況下,日志文件的起始偏移量 logStartOffset 等于第一個(gè)日志分段的 baseOffset,但這并不是絕對(duì)的,logStartOffset 的值可以通過 DeleteRecordsRequest 請(qǐng)求(比如使用 KafkaAdminClient 的 deleteRecords()方法、使用 kafka-delete-records.sh 腳本、日志的清理和截?cái)嗟炔僮鬟M(jìn)行修改。

          如上圖所示,它代表一個(gè)日志文件,這個(gè)日志文件中有9條消息,第一條消息的 offset(LogStartOffset)為0,最后一條消息的 offset 為8,offset 為9的消息用虛線框表示,代表下一條待寫入的消息。日志文件的 HW 為6,表示消費(fèi)者只能拉取到 offset 在0至5之間的消息,而 offset 為6的消息對(duì)消費(fèi)者而言是不可見的。

          LEO 是 Log End Offset 的縮寫,它標(biāo)識(shí)當(dāng)前日志文件中下一條待寫入消息的 offset,上圖中 offset 為9的位置即為當(dāng)前日志文件的 LEO,LEO 的大小相當(dāng)于當(dāng)前日志分區(qū)中最后一條消息的 offset 值加1。分區(qū) ISR 集合中的每個(gè)副本都會(huì)維護(hù)自身的 LEO,而 ISR 集合中最小的 LEO 即為分區(qū)的 HW,對(duì)消費(fèi)者而言只能消費(fèi) HW 之前的消息。

          LW 是 Low Watermark 的縮寫,俗稱“低水位”,代表 AR 集合中最小的 logStartOffset 值。副本的拉取請(qǐng)求(FetchRequest,它有可能觸發(fā)新建日志分段而舊的被清理,進(jìn)而導(dǎo)致 logStartOffset 的增加)和刪除消息請(qǐng)求(DeleteRecordRequest)都有可能促使 LW 的增長。

          Kafka中是怎么體現(xiàn)消息順序性的?

          可以通過分區(qū)策略體現(xiàn)消息順序性。
          分區(qū)策略有輪詢策略、隨機(jī)策略、按消息鍵保序策略。

          按消息鍵保序策略:一旦消息被定義了 Key,那么你就可以保證同一個(gè) Key 的所有消息都進(jìn)入到相同的分區(qū)里面,由于每個(gè)分區(qū)下的消息處理都是有順序的,故這個(gè)策略被稱為按消息鍵保序策略

          Copy
          List partitions = cluster.partitionsForTopic(topic);
          return Math.abs(key.hashCode()) % partitions.size();

          Kafka中的分區(qū)器、序列化器、攔截器是否了解?它們之間的處理順序是什么?

          • 序列化器:生產(chǎn)者需要用序列化器(Serializer)把對(duì)象轉(zhuǎn)換成字節(jié)數(shù)組才能通過網(wǎng)絡(luò)發(fā)送給 Kafka。而在對(duì)側(cè),消費(fèi)者需要用反序列化器(Deserializer)把從 Kafka 中收到的字節(jié)數(shù)組轉(zhuǎn)換成相應(yīng)的對(duì)象。

          • 分區(qū)器:分區(qū)器的作用就是為消息分配分區(qū)。如果消息 ProducerRecord 中沒有指定 partition 字段,那么就需要依賴分區(qū)器,根據(jù) key 這個(gè)字段來計(jì)算 partition 的值。

          • Kafka 一共有兩種攔截器:生產(chǎn)者攔截器和消費(fèi)者攔截器。

            • 生產(chǎn)者攔截器既可以用來在消息發(fā)送前做一些準(zhǔn)備工作,比如按照某個(gè)規(guī)則過濾不符合要求的消息、修改消息的內(nèi)容等,也可以用來在發(fā)送回調(diào)邏輯前做一些定制化的需求,比如統(tǒng)計(jì)類工作。

            • 消費(fèi)者攔截器主要在消費(fèi)到消息或在提交消費(fèi)位移時(shí)進(jìn)行一些定制化的操作。

          消息在通過 send() 方法發(fā)往 broker 的過程中,有可能需要經(jīng)過攔截器(Interceptor)、序列化器(Serializer)和分區(qū)器(Partitioner)的一系列作用之后才能被真正地發(fā)往 broker。攔截器(下一章會(huì)詳細(xì)介紹)一般不是必需的,而序列化器是必需的。消息經(jīng)過序列化之后就需要確定它發(fā)往的分區(qū),如果消息 ProducerRecord 中指定了 partition 字段,那么就不需要分區(qū)器的作用,因?yàn)?partition 代表的就是所要發(fā)往的分區(qū)號(hào)。

          處理順序 :攔截器->序列化器->分區(qū)器

          KafkaProducer 在將消息序列化和計(jì)算分區(qū)之前會(huì)調(diào)用生產(chǎn)者攔截器的 onSend() 方法來對(duì)消息進(jìn)行相應(yīng)的定制化操作。
          然后生產(chǎn)者需要用序列化器(Serializer)把對(duì)象轉(zhuǎn)換成字節(jié)數(shù)組才能通過網(wǎng)絡(luò)發(fā)送給 Kafka。
          最后可能會(huì)被發(fā)往分區(qū)器為消息分配分區(qū)。

          Kafka生產(chǎn)者客戶端的整體結(jié)構(gòu)是什么樣子的?

          整個(gè)生產(chǎn)者客戶端由兩個(gè)線程協(xié)調(diào)運(yùn)行,這兩個(gè)線程分別為主線程和 Sender 線程(發(fā)送線程)。
          在主線程中由 KafkaProducer 創(chuàng)建消息,然后通過可能的攔截器、序列化器和分區(qū)器的作用之后緩存到消息累加器(RecordAccumulator,也稱為消息收集器)中。
          Sender 線程負(fù)責(zé)從 RecordAccumulator 中獲取消息并將其發(fā)送到 Kafka 中。
          RecordAccumulator 主要用來緩存消息以便 Sender 線程可以批量發(fā)送,進(jìn)而減少網(wǎng)絡(luò)傳輸?shù)馁Y源消耗以提升性能。

          Kafka生產(chǎn)者客戶端中使用了幾個(gè)線程來處理?分別是什么?

          整個(gè)生產(chǎn)者客戶端由兩個(gè)線程協(xié)調(diào)運(yùn)行,這兩個(gè)線程分別為主線程和 Sender 線程(發(fā)送線程)。在主線程中由 KafkaProducer 創(chuàng)建消息,然后通過可能的攔截器、序列化器和分區(qū)器的作用之后緩存到消息累加器(RecordAccumulator,也稱為消息收集器)中。Sender 線程負(fù)責(zé)從 RecordAccumulator 中獲取消息并將其發(fā)送到 Kafka 中。

          Kafka的舊版Scala的消費(fèi)者客戶端的設(shè)計(jì)有什么缺陷?

          老版本的 Consumer Group 把位移保存在 ZooKeeper 中。Apache ZooKeeper 是一個(gè)分布式的協(xié)調(diào)服務(wù)框架,Kafka 重度依賴它實(shí)現(xiàn)各種各樣的協(xié)調(diào)管理。將位移保存在 ZooKeeper 外部系統(tǒng)的做法,最顯而易見的好處就是減少了 Kafka Broker 端的狀態(tài)保存開銷。

          ZooKeeper 這類元框架其實(shí)并不適合進(jìn)行頻繁的寫更新,而 Consumer Group 的位移更新卻是一個(gè)非常頻繁的操作。這種大吞吐量的寫操作會(huì)極大地拖慢 ZooKeeper 集群的性能

          “消費(fèi)組中的消費(fèi)者個(gè)數(shù)如果超過topic的分區(qū),那么就會(huì)有消費(fèi)者消費(fèi)不到數(shù)據(jù)”這句話是否正確?如果正確,那么有沒有什么hack的手段?

          一般來說如果消費(fèi)者過多,出現(xiàn)了消費(fèi)者的個(gè)數(shù)大于分區(qū)個(gè)數(shù)的情況,就會(huì)有消費(fèi)者分配不到任何分區(qū)。

          開發(fā)者可以繼承AbstractPartitionAssignor實(shí)現(xiàn)自定義消費(fèi)策略,從而實(shí)現(xiàn)同一消費(fèi)組內(nèi)的任意消費(fèi)者都可以消費(fèi)訂閱主題的所有分區(qū):

          Copy
          public class BroadcastAssignor extends AbstractPartitionAssignor{
          @Override
          public String name() {
          return "broadcast";
          }

          private Map> consumersPerTopic(
          Map consumerMetadata) {
          (具體實(shí)現(xiàn)請(qǐng)參考RandomAssignor中的consumersPerTopic()方法)
          }

          @Override
          public Map> assign(
          Map partitionsPerTopic,
          Map subscriptions) {
          Map> consumersPerTopic =
          consumersPerTopic(subscriptions);
          Map> assignment = new HashMap<>();
          //Java8
          subscriptions.keySet().forEach(memberId ->
          assignment.put(memberId, new ArrayList<>()));
          //針對(duì)每一個(gè)主題,為每一個(gè)訂閱的消費(fèi)者分配所有的分區(qū)
          consumersPerTopic.entrySet().forEach(topicEntry->{
          String topic = topicEntry.getKey();
          List members = topicEntry.getValue();

          Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
          if (numPartitionsForTopic == null || members.isEmpty())
          return;
          List partitions = AbstractPartitionAssignor
          .partitions(topic, numPartitionsForTopic);
          if (!partitions.isEmpty()) {
          members.forEach(memberId ->
          assignment.get(memberId).addAll(partitions));
          }
          });
          return assignment;
          }
          }

          注意組內(nèi)廣播的這種實(shí)現(xiàn)方式會(huì)有一個(gè)嚴(yán)重的問題—默認(rèn)的消費(fèi)位移的提交會(huì)失效。

          消費(fèi)者提交消費(fèi)位移時(shí)提交的是當(dāng)前消費(fèi)到的最新消息的offset還是offset+1?

          在舊消費(fèi)者客戶端中,消費(fèi)位移是存儲(chǔ)在 ZooKeeper 中的。而在新消費(fèi)者客戶端中,消費(fèi)位移存儲(chǔ)在 Kafka 內(nèi)部的主題__consumer_offsets 中。
          當(dāng)前消費(fèi)者需要提交的消費(fèi)位移是offset+1

          有哪些情形會(huì)造成重復(fù)消費(fèi)?

          1. Rebalance
            一個(gè)consumer正在消費(fèi)一個(gè)分區(qū)的一條消息,還沒有消費(fèi)完,發(fā)生了rebalance(加入了一個(gè)consumer),從而導(dǎo)致這條消息沒有消費(fèi)成功,rebalance后,另一個(gè)consumer又把這條消息消費(fèi)一遍。

          2. 消費(fèi)者端手動(dòng)提交
            如果先消費(fèi)消息,再更新offset位置,導(dǎo)致消息重復(fù)消費(fèi)。

          3. 消費(fèi)者端自動(dòng)提交
            設(shè)置offset為自動(dòng)提交,關(guān)閉kafka時(shí),如果在close之前,調(diào)用 consumer.unsubscribe() 則有可能部分offset沒提交,下次重啟會(huì)重復(fù)消費(fèi)。

          4. 生產(chǎn)者端
            生產(chǎn)者因?yàn)闃I(yè)務(wù)問題導(dǎo)致的宕機(jī),在重啟之后可能數(shù)據(jù)會(huì)重發(fā)

          那些情景下會(huì)造成消息漏消費(fèi)?

          1. 自動(dòng)提交
            設(shè)置offset為自動(dòng)定時(shí)提交,當(dāng)offset被自動(dòng)定時(shí)提交時(shí),數(shù)據(jù)還在內(nèi)存中未處理,此時(shí)剛好把線程kill掉,那么offset已經(jīng)提交,但是數(shù)據(jù)未處理,導(dǎo)致這部分內(nèi)存中的數(shù)據(jù)丟失。

          2. 生產(chǎn)者發(fā)送消息
            發(fā)送消息設(shè)置的是fire-and-forget(發(fā)后即忘),它只管往 Kafka 中發(fā)送消息而并不關(guān)心消息是否正確到達(dá)。不過在某些時(shí)候(比如發(fā)生不可重試異常時(shí))會(huì)造成消息的丟失。這種發(fā)送方式的性能最高,可靠性也最差。

          3. 消費(fèi)者端
            先提交位移,但是消息還沒消費(fèi)完就宕機(jī)了,造成了消息沒有被消費(fèi)。自動(dòng)位移提交同理

          4. acks沒有設(shè)置為all
            如果在broker還沒把消息同步到其他broker的時(shí)候宕機(jī)了,那么消息將會(huì)丟失

          KafkaConsumer是非線程安全的,那么怎么樣實(shí)現(xiàn)多線程消費(fèi)?

          1. 線程封閉,即為每個(gè)線程實(shí)例化一個(gè) KafkaConsumer 對(duì)象

          一個(gè)線程對(duì)應(yīng)一個(gè) KafkaConsumer 實(shí)例,我們可以稱之為消費(fèi)線程。一個(gè)消費(fèi)線程可以消費(fèi)一個(gè)或多個(gè)分區(qū)中的消息,所有的消費(fèi)線程都隸屬于同一個(gè)消費(fèi)組。

          1. 消費(fèi)者程序使用單或多線程獲取消息,同時(shí)創(chuàng)建多個(gè)消費(fèi)線程執(zhí)行消息處理邏輯。
            獲取消息的線程可以是一個(gè),也可以是多個(gè),每個(gè)線程維護(hù)專屬的 KafkaConsumer 實(shí)例,處理消息則交由特定的線程池來做,從而實(shí)現(xiàn)消息獲取與消息處理的真正解耦。具體架構(gòu)如下圖所示:

          兩個(gè)方案對(duì)比:

          簡述消費(fèi)者與消費(fèi)組之間的關(guān)系

          1. Consumer Group 下可以有一個(gè)或多個(gè) Consumer 實(shí)例。這里的實(shí)例可以是一個(gè)單獨(dú)的進(jìn)程,也可以是同一進(jìn)程下的線程。在實(shí)際場景中,使用進(jìn)程更為常見一些。

          2. Group ID 是一個(gè)字符串,在一個(gè) Kafka 集群中,它標(biāo)識(shí)唯一的一個(gè) Consumer Group。

          3. Consumer Group 下所有實(shí)例訂閱的主題的單個(gè)分區(qū),只能分配給組內(nèi)的某個(gè) Consumer 實(shí)例消費(fèi)。這個(gè)分區(qū)當(dāng)然也可以被其他的 Group 消費(fèi)。

          當(dāng)你使用kafka-topics.sh創(chuàng)建(刪除)了一個(gè)topic之后,Kafka背后會(huì)執(zhí)行什么邏輯?

          在執(zhí)行完腳本之后,Kafka 會(huì)在 log.dir 或 log.dirs 參數(shù)所配置的目錄下創(chuàng)建相應(yīng)的主題分區(qū),默認(rèn)情況下這個(gè)目錄為/tmp/kafka-logs/。

          在 ZooKeeper 的/brokers/topics/目錄下創(chuàng)建一個(gè)同名的實(shí)節(jié)點(diǎn),該節(jié)點(diǎn)中記錄了該主題的分區(qū)副本分配方案。示例如下:

          Copy
          [zk: localhost:2181/kafka(CONNECTED) 2] get /brokers/topics/topic-create
          {"version":1,"partitions":{"2":[1,2],"1":[0,1],"3":[2,1],"0":[2,0]}}

          topic的分區(qū)數(shù)可不可以增加?如果可以怎么增加?如果不可以,那又是為什么?

          可以增加,使用 kafka-topics 腳本,結(jié)合 --alter 參數(shù)來增加某個(gè)主題的分區(qū)數(shù),命令如下:

          Copy
          bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic --partitions <新分區(qū)數(shù)>

          當(dāng)分區(qū)數(shù)增加時(shí),就會(huì)觸發(fā)訂閱該主題的所有 Group 開啟 Rebalance。
          首先,Rebalance 過程對(duì) Consumer Group 消費(fèi)過程有極大的影響。在 Rebalance 過程中,所有 Consumer 實(shí)例都會(huì)停止消費(fèi),等待 Rebalance 完成。這是 Rebalance 為人詬病的一個(gè)方面。
          其次,目前 Rebalance 的設(shè)計(jì)是所有 Consumer 實(shí)例共同參與,全部重新分配所有分區(qū)。其實(shí)更高效的做法是盡量減少分配方案的變動(dòng)。
          最后,Rebalance 實(shí)在是太慢了。

          topic的分區(qū)數(shù)可不可以減少?如果可以怎么減少?如果不可以,那又是為什么?

          不支持,因?yàn)閯h除的分區(qū)中的消息不好處理。如果直接存儲(chǔ)到現(xiàn)有分區(qū)的尾部,消息的時(shí)間戳就不會(huì)遞增,如此對(duì)于 Spark、Flink 這類需要消息時(shí)間戳(事件時(shí)間)的組件將會(huì)受到影響;如果分散插入現(xiàn)有的分區(qū),那么在消息量很大的時(shí)候,內(nèi)部的數(shù)據(jù)復(fù)制會(huì)占用很大的資源,而且在復(fù)制期間,此主題的可用性又如何得到保障?與此同時(shí),順序性問題、事務(wù)性問題,以及分區(qū)和副本的狀態(tài)機(jī)切換問題都是不得不面對(duì)的。

          創(chuàng)建topic時(shí)如何選擇合適的分區(qū)數(shù)?

          在 Kafka 中,性能與分區(qū)數(shù)有著必然的關(guān)系,在設(shè)定分區(qū)數(shù)時(shí)一般也需要考慮性能的因素。對(duì)不同的硬件而言,其對(duì)應(yīng)的性能也會(huì)不太一樣。
          可以使用Kafka 本身提供的用于生產(chǎn)者性能測試的 kafka-producer- perf-test.sh 和用于消費(fèi)者性能測試的 kafka-consumer-perf-test.sh來進(jìn)行測試。
          增加合適的分區(qū)數(shù)可以在一定程度上提升整體吞吐量,但超過對(duì)應(yīng)的閾值之后吞吐量不升反降。如果應(yīng)用對(duì)吞吐量有一定程度上的要求,則建議在投入生產(chǎn)環(huán)境之前對(duì)同款硬件資源做一個(gè)完備的吞吐量相關(guān)的測試,以找到合適的分區(qū)數(shù)閾值區(qū)間。
          分區(qū)數(shù)的多少還會(huì)影響系統(tǒng)的可用性。如果分區(qū)數(shù)非常多,如果集群中的某個(gè) broker 節(jié)點(diǎn)宕機(jī),那么就會(huì)有大量的分區(qū)需要同時(shí)進(jìn)行 leader 角色切換,這個(gè)切換的過程會(huì)耗費(fèi)一筆可觀的時(shí)間,并且在這個(gè)時(shí)間窗口內(nèi)這些分區(qū)也會(huì)變得不可用。
          分區(qū)數(shù)越多也會(huì)讓 Kafka 的正常啟動(dòng)和關(guān)閉的耗時(shí)變得越長,與此同時(shí),主題的分區(qū)數(shù)越多不僅會(huì)增加日志清理的耗時(shí),而且在被刪除時(shí)也會(huì)耗費(fèi)更多的時(shí)間。

          歡迎點(diǎn)贊+收藏+轉(zhuǎn)發(fā)朋友圈素質(zhì)三連


          文章不錯(cuò)?點(diǎn)個(gè)【在看】吧!??

          瀏覽 41
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  学生妹一级内射 | 日韩性爱小视频 | 日日碰日日摸 | 色五月播五月丁香综合 | 俺去也在线www色官网 |