<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          kafka初探

          共 7869字,需瀏覽 16分鐘

           ·

          2021-04-02 18:00

          Apache Kafka的流行歸功于它設(shè)計(jì)和操作簡單、存儲(chǔ)系統(tǒng)高效、充分利用磁盤順序讀寫等特性、非常適合在線日志收集等高吞吐場景。

          初識(shí)kafka集群結(jié)構(gòu):


              kafaka集群的broker和Consumer都需要連接Zookeeper進(jìn)行集群配置管理,Producer 直接連接 Broker。Producer把數(shù)據(jù)上傳到Broker,Producer可以指定數(shù)據(jù)有幾個(gè)分區(qū)、幾個(gè)備份。

              上面的圖中,數(shù)據(jù)有兩個(gè)分區(qū) 0、1,每個(gè)分區(qū)都有自己的副本:0'、 1'。黃色的分區(qū)為leader,白色的為follower。

          和其他消息隊(duì)列相比,Kafka的優(yōu)勢在哪里?

          我們現(xiàn)在經(jīng)常提到 Kafka 的時(shí)候就已經(jīng)默認(rèn)它是一個(gè)非常優(yōu)秀的消息隊(duì)列了,我們也會(huì)經(jīng)常拿它給 RocketMQ、RabbitMQ 對(duì)比。我覺得 Kafka 相比其他消息隊(duì)列主要的優(yōu)勢如下:

          1. 極致的性能 :基于 Scala 和 Java 語言開發(fā),設(shè)計(jì)中大量使用了批量處理和異步的思想,最高可以每秒處理千萬級(jí)別的消息。

          2. 生態(tài)系統(tǒng)兼容性無可匹敵 :Kafka 與周邊生態(tài)系統(tǒng)的兼容性是最好的沒有之一,尤其在大數(shù)據(jù)和流計(jì)算領(lǐng)域。


          kafka的消息模型:發(fā)布-訂閱模型

          主要區(qū)別傳統(tǒng)的queue模型中,消息只能被一個(gè)consumer消費(fèi)的問題。

          發(fā)布訂閱模型(Pub-Sub)使用主題(Topic) 作為消息通信載體,類似于廣播模式;發(fā)布者發(fā)布一條消息,該消息通過主題傳遞給所有的訂閱者,在一條消息廣播之后才訂閱的用戶則是收不到該條消息的(不包括beging這種情況)。在發(fā)布 - 訂閱模型中,如果只有一個(gè)訂閱者,那它和隊(duì)列模型就基本是一樣的了。所以說,發(fā)布 - 訂閱模型在功能層面上是可以兼容隊(duì)列模型的。

          kafka核心元件整理

          • producer:消息生產(chǎn)者

          • consumer:消息消費(fèi)者

          • broker(代理):可以看作是一個(gè)獨(dú)立的kafka實(shí)例。多個(gè)kafka broker組成一個(gè)kafka cluster。

          • topic(主題):Kafka 將生產(chǎn)者發(fā)布的消息發(fā)送到 Topic(主題) 中,需要這些消息的消費(fèi)者可以訂閱這些 Topic(主題),topic就是消息的分組。

          • partition(分區(qū)):Partition屬于Topic的一部分。一個(gè)Topic可以有多個(gè)Partition ,并且同一Topic下的Partition可以分布在不同的Broker上,這也就表明一個(gè)Topic可以橫跨多個(gè)Broker 。

          Kafka 中的 Partition(分區(qū)) 實(shí)際上可以對(duì)應(yīng)成為消息隊(duì)列中的隊(duì)列queue。

          kafka多副本機(jī)制

          還有一點(diǎn)我覺得比較重要的是 Kafka 為分區(qū)(Partition)引入了多副本(Replica)機(jī)制。分區(qū)(Partition)中的多個(gè)副本之間會(huì)有一個(gè)叫做leader節(jié)點(diǎn),其他副本稱為follower。我們發(fā)送的消息會(huì)被發(fā)送到leader副本,然后follower 副本才能從 leader副本中拉取消息進(jìn)行同步。

          生產(chǎn)者和消費(fèi)者只與 leader 副本交互。你可以理解為其他副本只是 leader 副本的拷貝,它們的存在只是為了保證消息存儲(chǔ)的安全性。當(dāng) leader 副本發(fā)生故障時(shí)會(huì)從 follower 中選舉出一個(gè) leader,但是 follower 中如果有和 leader 同步程度達(dá)不到要求的參加不了 leader 的競選。

          多分區(qū)(Partition)以及多副本(Replica)機(jī)制有什么好處

          • Kafka 通過給特定 Topic 指定多個(gè) Partition, 而各個(gè) Partition 可以分布在不同的 Broker 上, 這樣便能提供比較好的并發(fā)能力(負(fù)載均衡)。

          • Partition 可以指定對(duì)應(yīng)的 Replica 數(shù), 這也極大地提高了消息存儲(chǔ)的安全性, 提高了容災(zāi)能力,不過也相應(yīng)的增加了所需要的存儲(chǔ)空間。

          kafka能保證消息的順序性嗎?

          Kafka 中 Partition(分區(qū))是真正保存消息的地方,我們發(fā)送的消息都被放在了這里。每次添加消息到 Partition(分區(qū)) 的時(shí)候都會(huì)采用尾加法,Kafka 只能為我們保證 Partition(分區(qū)) 中的消息有序,而不能保證 Topic(主題) 中的 Partition(分區(qū)) 的有序。

          消息在被追加到 Partition(分區(qū))的時(shí)候都會(huì)分配一個(gè)特定的偏移量(offset)。Kafka 通過偏移量(offset)來保證消息在分區(qū)內(nèi)的順序性。所以,我們就有一種很簡單的保證消息消費(fèi)順序的方法:1 個(gè) Topic 只對(duì)應(yīng)一個(gè) Partition。這樣當(dāng)然可以解決問題,但是破壞了 Kafka 的設(shè)計(jì)初衷。

          Kafka 中發(fā)送 1 條消息的時(shí)候,可以指定 topic, partition, key,data(數(shù)據(jù)) 4 個(gè)參數(shù)。如果在發(fā)送消息的時(shí)候指定了 Partition的話,所有消息都會(huì)被發(fā)送到指定的 Partition。并且,同一個(gè) key 的消息可以保證只發(fā)送到同一個(gè)partition,可以采用表/對(duì)象的 id 來作為key 。

          總結(jié)一下,對(duì)于如何保證 Kafka 中消息消費(fèi)的順序,有了下面兩種方法:

          • 1個(gè)Topic只對(duì)應(yīng)一個(gè) Partition。

          • 發(fā)送消息的時(shí)候指定 key/Partition。(推薦)

          當(dāng)然不僅僅只有上面兩種方法,比如如果producer單點(diǎn)固定,可以為每條消息綁定一個(gè)順序號(hào)等。需要根據(jù)具體場景適用。

          如何保證消息安全不丟失

          producer端消息丟失

          生產(chǎn)者(Producer) 調(diào)用send方法發(fā)送消息之后,消息可能因?yàn)榫W(wǎng)絡(luò)問題并沒有發(fā)送過去。

          所以,我們不能默認(rèn)在調(diào)用send方法發(fā)送消息之后消息消息發(fā)送成功了。為了確定消息是發(fā)送成功,我們要判斷消息發(fā)送的結(jié)果。但是要注意的是 Kafka 生產(chǎn)者(Producer) 使用  send 方法發(fā)送消息實(shí)際上是異步的操作,我們可以通過 get()方法獲取調(diào)用結(jié)果,但是這樣也讓它變?yōu)榱送讲僮鳎?/p>

          SendResult<String, Object> sendResult = kafkaTemplate.send(topic, o).get();if (sendResult.getRecordMetadata() != null) {  logger.info("生產(chǎn)者成功發(fā)送消息到" + sendResult.getProducerRecord().topic() + "-> " + sendRe              sult.getProducerRecord().value().toString());}

          但是一般不推薦這么做!可以采用為其添加回調(diào)函數(shù)的形式,示例代碼如下:

          ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);        future.addCallback(result -> logger.info("生產(chǎn)者成功發(fā)送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),                ex -> logger.error("生產(chǎn)者發(fā)送消失敗,原因:{}", ex.getMessage()));

          如果消息發(fā)送失敗的話,我們檢查失敗的原因之后重新發(fā)送即可.

          另外這里推薦為 Producer 的retries (重試次數(shù))設(shè)置一個(gè)比較合理的值,一般是 3 ,但是為了保證消息不丟失的話一般會(huì)設(shè)置比較大一點(diǎn)。設(shè)置完成之后,當(dāng)出現(xiàn)網(wǎng)絡(luò)問題之后能夠自動(dòng)重試消息發(fā)送,避免消息丟失。另外,建議還要設(shè)置重試間隔,因?yàn)殚g隔太小的話重試的效果就不明顯了,網(wǎng)絡(luò)波動(dòng)一次你3次一下子就重試完了.

          Consumer端消息丟失

          消息在被追加到 Partition(分區(qū))的時(shí)候都會(huì)分配一個(gè)特定的偏移量(offset)。偏移量(offset)表示 Consumer 當(dāng)前消費(fèi)到的 Partition(分區(qū))的所在的位置。Kafka 通過偏移量(offset)可以保證消息在分區(qū)內(nèi)的順序性。

          短讀風(fēng)險(xiǎn):當(dāng)消費(fèi)者拉取到了分區(qū)的某個(gè)消息之后,消費(fèi)者會(huì)自動(dòng)提交了 offset。自動(dòng)提交的話會(huì)有一個(gè)問題,試想一下,當(dāng)消費(fèi)者剛拿到這個(gè)消息準(zhǔn)備進(jìn)行真正消費(fèi)的時(shí)候,突然掛掉了,消息實(shí)際上并沒有被消費(fèi),但是 offset 卻被自動(dòng)提交了。

          長讀風(fēng)險(xiǎn):為了解決消息少消費(fèi)的問題,我們手動(dòng)關(guān)閉自動(dòng)提交 offset,每次在真正消費(fèi)完消息之后之后再自己手動(dòng)提交 offset 。 但是,這樣會(huì)帶來消息被重新消費(fèi)的問題。比如你剛剛消費(fèi)完消息之后,還沒提交 offset,結(jié)果自己掛掉了,那么這個(gè)消息理論上就會(huì)被消費(fèi)兩次。

          集群故障導(dǎo)致消息丟失

          由于partition中的消息都是通過leader節(jié)點(diǎn)復(fù)制到follower節(jié)點(diǎn)的,假如 leader 副本所在的 broker 突然掛掉,那么就要從 follower 副本重新選出一個(gè) leader ,但是 leader 的數(shù)據(jù)還有一些沒有被 follower 副本的同步的話,就會(huì)造成消息丟失。

          解決辦法就是我們設(shè)置 

          • acks = all

          acks 是 Kafka 生產(chǎn)者(Producer) 很重要的一個(gè)參數(shù)。acks 的默認(rèn)值即為1,代表我們的消息被leader副本接收之后就算被成功發(fā)送。當(dāng)我們配置 acks = all 代表則所有副本都要接收到該消息之后該消息才算真正成功被發(fā)送。

          • replication.factor >= 3

          為了保證 leader 副本能有 follower 副本能同步消息,我們一般會(huì)為 topic 設(shè)置 replication.factor >= 3。這樣就可以保證每個(gè) 分區(qū)(partition) 至少有 3 個(gè)副本。雖然造成了數(shù)據(jù)冗余,但是帶來了數(shù)據(jù)的安全性。

          • min.insync.replicas > 1

          一般情況下我們還需要設(shè)置 min.insync.replicas> 1 ,這樣配置代表消息至少要被寫入到 2 個(gè)副本才算是被成功發(fā)送。min.insync.replicas的默認(rèn)值為 1 ,在實(shí)際生產(chǎn)中應(yīng)盡量避免默認(rèn)值 1。但是,為了保證整個(gè) Kafka 服務(wù)的高可用性,需要確保 replication.factor > min.insync.replicas 。主要是為了應(yīng)對(duì)只要是有一個(gè)副本掛掉,整個(gè)分區(qū)就無法正常工作的情況。這明顯違反高可用性!一般推薦設(shè)置成 replication.factor = min.insync.replicas + 1。

          • unclean.leader.election.enable = false

          Kafka 0.11.0.0版本開始 unclean.leader.election.enable 參數(shù)的默認(rèn)值由原來的true 改為false

          我們最開始也說了我們發(fā)送的消息會(huì)被發(fā)送到 leader 副本,然后 follower 副本才能從 leader 副本中拉取消息進(jìn)行同步。多個(gè) follower副本之間的消息同步情況不一樣,當(dāng)我們配置了 unclean.leader.election.enable = false 的話,當(dāng) leader 副本發(fā)生故障時(shí)就不會(huì)從 follower 副本中和 leader 同步程度達(dá)不到要求的副本中選擇出 leader ,這樣降低了消息丟失的可能性。

          總結(jié)

          生產(chǎn)端

          消費(fèi)端

          集群配置

          將異步發(fā)送改為同步發(fā)送send().get()

          通過自動(dòng)提交offset,存在短讀風(fēng)險(xiǎn)

          acks=all,完成所有副本的寫

          通過添加回調(diào)函數(shù)future=send()

          通過手動(dòng)提交offset,存在長讀風(fēng)險(xiǎn)


          配置retries重試次數(shù)



          zookeeper在kafka中的作用

          zookeeper為kafka提供了配置元數(shù)據(jù)的管理。

          每個(gè)存儲(chǔ)節(jié)點(diǎn)的含義:

          broker注冊

          Broker是分布式部署并且相互之間相互獨(dú)立,但是需要有一個(gè)注冊系統(tǒng)能夠?qū)⒄麄€(gè)集群中的Broker管理起來,此時(shí)就使用到了Zookeeper。在Zookeeper上會(huì)有一個(gè)專門用來進(jìn)行Broker服務(wù)器列表記錄的節(jié)點(diǎn):

          /brokers/ids

          每個(gè)Broker在啟動(dòng)時(shí),都會(huì)到Zookeeper上進(jìn)行注冊,即到/brokers/ids下創(chuàng)建屬于自己的節(jié)點(diǎn),如/brokers/ids/[0...N]。Kafka使用了全局唯一的數(shù)字來指代每個(gè)Broker服務(wù)器,不同的Broker必須使用不同的Broker ID進(jìn)行注冊,創(chuàng)建完節(jié)點(diǎn)后,每個(gè)Broker就會(huì)將自己的IP地址和端口信息記錄到該節(jié)點(diǎn)中去。其中Broker創(chuàng)建的節(jié)點(diǎn)類型是臨時(shí)節(jié)點(diǎn),一旦Broker宕機(jī),則對(duì)應(yīng)的臨時(shí)節(jié)點(diǎn)也會(huì)被自動(dòng)刪除。

          topic注冊

          在Kafka中,同一個(gè)Topic的消息會(huì)被分成多個(gè)分區(qū)并將其分布在多個(gè)Broker上,這些分區(qū)信息及與Broker的對(duì)應(yīng)關(guān)系也都是由Zookeeper在維護(hù),由專門的節(jié)點(diǎn)來記錄,如:

          /borkers/topics

          Kafka中每個(gè)Topic都會(huì)以/brokers/topics/[topic]的形式被記錄,如/brokers/topics/login和/brokers/topics/search等。Broker服務(wù)器啟動(dòng)后,會(huì)到對(duì)應(yīng)Topic節(jié)點(diǎn)(/brokers/topics)上注冊自己的Broker ID并寫入針對(duì)該Topic的分區(qū)總數(shù),如/brokers/topics/login/3->2,這個(gè)節(jié)點(diǎn)表示Broker ID為3的一個(gè)Broker服務(wù)器,對(duì)于"login"這個(gè)Topic的消息,提供了2個(gè)分區(qū)進(jìn)行消息存儲(chǔ),同樣,這個(gè)分區(qū)節(jié)點(diǎn)也是臨時(shí)節(jié)點(diǎn)。

          負(fù)載均衡

          生產(chǎn)者負(fù)載均衡

          由于同一個(gè)Topic消息會(huì)被分區(qū)并將其分布在多個(gè)Broker上,因此,生產(chǎn)者需要將消息合理地發(fā)送到這些分布式的Broker上,那么如何實(shí)現(xiàn)生產(chǎn)者的負(fù)載均衡,Kafka支持傳統(tǒng)的四層負(fù)載均衡,也支持Zookeeper方式實(shí)現(xiàn)負(fù)載均衡。

          • 四層負(fù)載均衡

          根據(jù)生產(chǎn)者的IP地址和端口來為其確定一個(gè)相關(guān)聯(lián)的Broker。通常,一個(gè)生產(chǎn)者只會(huì)對(duì)應(yīng)單個(gè)Broker,然后該生產(chǎn)者產(chǎn)生的消息都發(fā)往該Broker。這種方式邏輯簡單,每個(gè)生產(chǎn)者不需要同其他系統(tǒng)建立額外的TCP連接,只需要和Broker維護(hù)單個(gè)TCP連接即可。但是,其無法做到真正的負(fù)載均衡,因?yàn)閷?shí)際系統(tǒng)中的每個(gè)生產(chǎn)者產(chǎn)生的消息量及每個(gè)Broker的消息存儲(chǔ)量都是不一樣的,如果有些生產(chǎn)者產(chǎn)生的消息遠(yuǎn)多于其他生產(chǎn)者的話,那么會(huì)導(dǎo)致不同的Broker接收到的消息總數(shù)差異巨大,同時(shí),生產(chǎn)者也無法實(shí)時(shí)感知到Broker的新增和刪除。

          • 使用Zookeeper進(jìn)行負(fù)載均衡

          由于每個(gè)Broker啟動(dòng)時(shí),都會(huì)完成Broker注冊過程,生產(chǎn)者會(huì)通過該節(jié)點(diǎn)的變化來動(dòng)態(tài)地感知到Broker服務(wù)器列表的變更,這樣就可以實(shí)現(xiàn)動(dòng)態(tài)的負(fù)載均衡機(jī)制。

          消費(fèi)者負(fù)載均衡

          與生產(chǎn)者類似,Kafka中的消費(fèi)者同樣需要進(jìn)行負(fù)載均衡來實(shí)現(xiàn)多個(gè)消費(fèi)者合理地從對(duì)應(yīng)的Broker服務(wù)器上接收消息,每個(gè)消費(fèi)者分組包含若干消費(fèi)者,每條消息都只會(huì)發(fā)送給分組中的一個(gè)消費(fèi)者,不同的消費(fèi)者分組消費(fèi)自己特定的Topic下面的消息,互不干擾。

          記錄partition與consumer之間的分組group關(guān)系

          消費(fèi)組 (Consumer Group):consumer group 下有多個(gè) Consumer(消費(fèi)者)。

          對(duì)于每個(gè)消費(fèi)者組 (Consumer Group),Kafka都會(huì)為其分配一個(gè)全局唯一的Group ID,Group 內(nèi)部的所有消費(fèi)者共享該 ID。訂閱的topic下的每個(gè)分區(qū)只能分配給某個(gè) group 下的一個(gè)consumer(當(dāng)然該分區(qū)還可以被分配給其他group)。同時(shí),Kafka為每個(gè)消費(fèi)者分配一個(gè)Consumer ID,通常用"Hostname:UUID"形式表示。

          在Kafka中,規(guī)定了每個(gè)消息分區(qū)只能被同組的一個(gè)消費(fèi)者進(jìn)行消費(fèi),因此,需要在 Zookeeper 上記錄 消息分區(qū) 與 Consumer 之間的關(guān)系,每個(gè)消費(fèi)者一旦確定了對(duì)一個(gè)消息分區(qū)的消費(fèi)權(quán)力,需要將其Consumer ID 寫入到 Zookeeper 對(duì)應(yīng)消息分區(qū)的臨時(shí)節(jié)點(diǎn)上,例如:

          /consumers/[group_id]/owners/[topic]/[broker_id-partition_id]。其中,[broker_id-partition_id]就是一個(gè) 消息分區(qū) 的標(biāo)識(shí),節(jié)點(diǎn)內(nèi)容就是該 消息分區(qū) 上 消費(fèi)者的Consumer ID。

          記錄消息偏移量offset

          在消費(fèi)者對(duì)指定消息分區(qū)進(jìn)行消息消費(fèi)的過程中,需要定時(shí)地將分區(qū)消息的消費(fèi)進(jìn)度Offset記錄到Zookeeper上,以便在該消費(fèi)者進(jìn)行重啟或者其他消費(fèi)者重新接管該消息分區(qū)的消息消費(fèi)后,能夠從之前的進(jìn)度開始繼續(xù)進(jìn)行消息消費(fèi)。Offset在Zookeeper中由一個(gè)專門節(jié)點(diǎn)進(jìn)行記錄,其節(jié)點(diǎn)路徑為:

          /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id],節(jié)點(diǎn)內(nèi)容就是Offset的值。

          消費(fèi)者注冊信息

          消費(fèi)者服務(wù)器在初始化啟動(dòng)時(shí)加入消費(fèi)者分組的步驟如下

          1、注冊到消費(fèi)者分組。每個(gè)消費(fèi)者服務(wù)器啟動(dòng)時(shí),都會(huì)到Zookeeper的指定節(jié)點(diǎn)下創(chuàng)建一個(gè)屬于自己的消費(fèi)者節(jié)點(diǎn),例如/consumers/[group_id]/ids/[consumer_id],完成節(jié)點(diǎn)創(chuàng)建后,消費(fèi)者就會(huì)將自己訂閱的Topic信息寫入該臨時(shí)節(jié)點(diǎn)。

          2、對(duì) 消費(fèi)者分組 中的 消費(fèi)者 的變化注冊監(jiān)聽。每個(gè) 消費(fèi)者 都需要關(guān)注所屬 消費(fèi)者分組 中其他消費(fèi)者服務(wù)器的變化情況,即對(duì)/consumers/[group_id]/ids節(jié)點(diǎn)注冊子節(jié)點(diǎn)變化的Watcher監(jiān)聽,一旦發(fā)現(xiàn)消費(fèi)者新增或減少,就觸發(fā)消費(fèi)者的負(fù)載均衡。

          3、對(duì)Broker服務(wù)器變化注冊監(jiān)聽。消費(fèi)者需要對(duì)/broker/ids/[0-N]中的節(jié)點(diǎn)進(jìn)行監(jiān)聽,如果發(fā)現(xiàn)Broker服務(wù)器列表發(fā)生變化,那么就根據(jù)具體情況來決定是否需要進(jìn)行消費(fèi)者負(fù)載均衡。

          4、進(jìn)行消費(fèi)者負(fù)載均衡。為了讓同一個(gè)Topic下不同分區(qū)的消息盡量均衡地被多個(gè) 消費(fèi)者 消費(fèi)而進(jìn)行 消費(fèi)者 與 消息 分區(qū)分配的過程,通常,對(duì)于一個(gè)消費(fèi)者分組,如果組內(nèi)的消費(fèi)者服務(wù)器發(fā)生變更或Broker服務(wù)器發(fā)生變更,會(huì)發(fā)出消費(fèi)者負(fù)載均衡。



          瀏覽 71
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产精品成人自拍 | 操碰久久| 国产精品在线观看成人视频 | 欧美天天澡天天爽日日a | 操逼18p|