<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Kafka超詳細(xì)學(xué)習(xí)筆記【概念理解,安裝配置】

          共 8558字,需瀏覽 18分鐘

           ·

          2023-06-25 22:35

          走過路過不要錯(cuò)過

          點(diǎn)擊藍(lán)字關(guān)注我們

          官方文檔:http://kafka.apache.org/23/documentation.html#introduction

          中文文檔:https://kafka.apachecn.org/

          本篇要點(diǎn)

          1. 介紹kafka的特性、概念、API及專業(yè)術(shù)語。

          2. 介紹Windows環(huán)境下kafka的安裝配置,啟動(dòng)測(cè)試。

          3. Java客戶端連接kafka的案例演示。

          Kafka介紹

          Apache Kafka 是一個(gè)分布式流處理平臺(tái):distributed streaming platform。

          作為流處理平臺(tái)的三種特性

          1. 可發(fā)布和訂閱消息(流),這與消息隊(duì)列或企業(yè)消息系統(tǒng)類似。

          2. 以容錯(cuò)(故障轉(zhuǎn)移)的方式存儲(chǔ)消息(流)。

          3. 提供實(shí)時(shí)的流處理。

          主要應(yīng)用

          kafka主要應(yīng)用于兩大類應(yīng)用:

          1. 構(gòu)建實(shí)時(shí)的流數(shù)據(jù)通道,可靠地獲取系統(tǒng)和應(yīng)用程序之間的數(shù)據(jù)。

          2. 構(gòu)建實(shí)時(shí)流的應(yīng)用程序,對(duì)數(shù)據(jù)流進(jìn)行轉(zhuǎn)換或反應(yīng)。

          四個(gè)核心API

          1. Producer API:發(fā)布消息到一個(gè)或多個(gè)topic主題上。

          2. Consumer API:訂閱一個(gè)或多個(gè)topic,處理產(chǎn)生的消息。

          3. Streams API:流處理器,從一個(gè)或多個(gè)topic消費(fèi)輸入流,并產(chǎn)生一個(gè)輸出流到一個(gè)或多個(gè)輸出topic,有效地將輸入流轉(zhuǎn)換到輸出流。

          4. Connector API:可構(gòu)建或運(yùn)行可重用地生產(chǎn)者或消費(fèi)者,將topic連接到現(xiàn)有地應(yīng)用程序或數(shù)據(jù)系統(tǒng)。

          基本術(shù)語

          Topic:kafka將消息分類,每一類的消息都有一個(gè)主題topic。

          Producer:生產(chǎn)者,發(fā)布消息的對(duì)象。

          Consumer:消費(fèi)者,訂閱消息的對(duì)象。

          Broker:代理,已發(fā)布的消息保存在一組服務(wù)器中,稱之為kafka集群,集群中每個(gè)服務(wù)器都是一個(gè)代理(broker)。消費(fèi)者可以訂閱一個(gè)或多個(gè)主題,并從broker上拉取數(shù)據(jù),從而消費(fèi)這些已發(fā)布的消息。

          Partition:Topic物理上的分組,一個(gè)Topic可以分為多個(gè)partition,每個(gè)partition都是一個(gè)順序的、不可變的消息隊(duì)列,且可以持續(xù)添加。Partition中的每條消息都會(huì)被分配一個(gè)有序的序列號(hào),稱為偏移量(offset),因此每個(gè)分區(qū)中偏移量都是唯一的。

          Consumer Group:每個(gè)Consumer屬于一個(gè)特定的Consumer Group,這是kafka用來實(shí)現(xiàn)一個(gè)Topic消息的廣播【發(fā)送給所有的consumer的發(fā)布訂閱式消息模型】和單播【發(fā)送給任意一個(gè)consumer隊(duì)列消息模型】的手段。一個(gè)topic可以有多個(gè)consumer group。

          • 如果要實(shí)現(xiàn)廣播,只要每個(gè)consumer有獨(dú)立的consumer group就可以,此時(shí)就是發(fā)布訂閱模型。

          • 如果要實(shí)現(xiàn)單播,只要所有的consumer在同一個(gè)consumer group中就可以,此時(shí)就是隊(duì)列模型。

          關(guān)于Consumer group的補(bǔ)充:一般來說,我們可以創(chuàng)建一些consumer group作為邏輯上的訂閱者,每個(gè)組中包含數(shù)目不等的consumer,一個(gè)組內(nèi)的多個(gè)消費(fèi)者可以用來擴(kuò)展性能和容錯(cuò)。

          關(guān)于partition分區(qū)的補(bǔ)充

          1、【負(fù)載均衡】處理更多的消息,不受單臺(tái)服務(wù)器的限制。

          2、【順序保證】kafka不能保證并行的時(shí)候消息的有序性,但是可以保證一個(gè)partition分區(qū)之中,消息只能由消費(fèi)者組中的唯一一個(gè)消費(fèi)者處理,以保證一個(gè)分區(qū)的消息先后順序。

          如下圖:2個(gè)kafka集群托管4個(gè)分區(qū)(p0-p3),2個(gè)消費(fèi)者組,組A有2個(gè)消費(fèi)者實(shí)例,組B有4個(gè)消費(fèi)者實(shí)例。

          關(guān)于偏移量的補(bǔ)充:kafka集群將會(huì)保持所有的消息,直到他們過期,無論他們是否被消費(fèi)。當(dāng)消費(fèi)者消費(fèi)消息時(shí),偏移量offset將會(huì)線性增加,但是消費(fèi)者其實(shí)可以控制實(shí)際的偏移量,可以重置偏移量為更早的位置,意為著重新讀取消息,且不會(huì)影響其他消費(fèi)者對(duì)此log的處理。

          快速開始

          安裝配置Zookeeper

          Kafka的安裝配置啟動(dòng)需要依賴于Zookeeper,Zookeeper的安裝配置可以參考我的前一篇文章。

          當(dāng)然,其實(shí)你下載kafka之后,就自動(dòng)已經(jīng)集成了Zookeeper,你可以通過修改配置,啟動(dòng)內(nèi)置的zookeeper。

          關(guān)于使用內(nèi)置的Zookeeper還是自己安裝的Zookeeper的區(qū)別,可以看看這篇文章:https://segmentfault.com/q/1010000021110446

          下載kafka

          下載地址:http://kafka.apache.org/downloads

          下載二進(jìn)制版本【Binary downloads】,下載完成之后,解壓到合適的目錄下。

          筆者目錄為:D:\dev\kafka_2.11-2.3.1。

          配置文件

          進(jìn)入config目錄下,找到server.properties文件并修改如下:

          log.dirs=D:\\dev\\kafka_2.11-2.3.1\\config\\kafka-logs
          zookeeper.connect=localhost:2182 # 默認(rèn)端口是2181,這里修改為2182


          找到zookeeper.properties文件,修改如下:

          dataDir=D:\\softs\\zookeeper-3.4.13\\datas 
          dataLogDir=D:\\softs\\zookeeper-3.4.13\\logs
          clientPort=2182


          Windows的命令

          在bin目錄下存放著所有可以使用的命令行指令,Linux和Windows的存放目錄需要注意:

          啟動(dòng)Zookeeper

          D:\dev\kafka_2.11-2.3.1> .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties


          啟動(dòng)Kafka

          D:\dev\kafka_2.11-2.3.1> .\bin\windows\kafka-server-start.bat .\config\server.properties


          進(jìn)行測(cè)試

          創(chuàng)建topic

          創(chuàng)建1個(gè)分區(qū)1個(gè)副本,topic為test-topic

          D:\dev\kafka_2.11-2.3.1>.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2182 --replication-factor 1 --partitions 1 --topic test-topic
          Created topic test-topic.


          查看topic

          D:\dev\kafka_2.11-2.3.1>.\bin\windows\kafka-topics.bat --list --zookeeper localhost:2182
          test-topic


          生產(chǎn)者

          D:\dev\kafka_2.11-2.3.1>.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test-topic


          消費(fèi)者

          D:\dev\kafka_2.11-2.3.1>.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test-topic --from-beginning


          生產(chǎn)者與消費(fèi)者消息傳遞

          刪除topic

          如果kafka啟動(dòng)時(shí)加載的配置文件中 server.properties 中沒有配置delete.topic.enable=true,則此刪除非真正刪除,而是僅僅將topic標(biāo)記為marked for deletion

          D:\dev\kafka_2.11-2.3.1>.\bin\windows\kafka-topics.bat --delete --zookeeper localhost:2182 --topic test-topic

          Topic test-topic is marked for deletion.
          Note: This will have no impact if delete.topic.enable is not set to true.


          登錄內(nèi)置的zookeeper客戶端

          D:\dev\kafka_2.11-2.3.1\bin\windows>zookeeper-shell.bat localhost:2182

          Connecting to localhost:2182
          Welcome to ZooKeeper!
          JLine support is disabled


          物理刪除topic

          ls /brokers
          [ids, topics, seqid]
          ls /brokers/topics
          [test, test-topic, __consumer_offsets]
          rmr /brokers/topics/test-topic # 物理刪除 test-topic
          ls /brokers/topics
          [test, __consumer_offsets]


          Java客戶端使用

          引入依賴

                  <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>2.6.0</version>
          </dependency>


          生產(chǎn)者

          public class ProducerExample {

          public static void main(String[] args) {
          Map<String, Object> props = new HashMap<>();
          props.put("zk.connect", "localhost:2182");
          props.put("bootstrap.servers", "localhost:9092");
          props.put("acks", "all");
          props.put("retries", 0);
          props.put("batch.size", 16384);
          props.put("linger.ms", 1);
          props.put("buffer.memory", 33554432);
          props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
          props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
          Producer<String, String> producer = new KafkaProducer<>(props);
          String topic = "test";
          for (int i = 1; i <= 100; i++) {
          // send方法是異步的 , 返回Future對(duì)象,如果調(diào)用get(),將阻塞,直到相關(guān)請(qǐng)求完成并返回消息的metadata或拋出異常
          producer.send(new ProducerRecord<>(topic, "key" + i, "msg" + i * 100));
          }
          // 生產(chǎn)者的傳沖空間池保留尚未發(fā)送到服務(wù)器的消息,后臺(tái)I/O線程負(fù)責(zé)將這些消息轉(zhuǎn)換程請(qǐng)求發(fā)送到集群
          // 如果使用后不關(guān)閉生產(chǎn)者,將會(huì)丟失這些消息。
          producer.close();
          }

          }


          • zk.connect:設(shè)置zookeeper的地址。

          • bootstrap.servers:用于建立與 kafka 集群連接的 host/port 組。

          • acks:判斷是不是成功發(fā)送,指定all將會(huì)阻塞消息,這種設(shè)置性能最低,但是是最可靠的。

          • retries:如果請(qǐng)求失敗,生產(chǎn)者會(huì)自動(dòng)重試,我們指定是0次,如果啟用重試,則會(huì)有重復(fù)消息的可能性。

          • batch.size:(生產(chǎn)者)緩存每個(gè)分區(qū)未發(fā)送的消息。緩存的大小是通過 batch.size 配置指定的。值較大的話將會(huì)產(chǎn)生更大的批。并需要更多的內(nèi)存(因?yàn)槊總€(gè)“活躍”的分區(qū)都有1個(gè)緩沖區(qū))。

          • linger.ms:默認(rèn)緩沖可立即發(fā)送,即便緩沖空間還沒有滿,但是,如果你想減少請(qǐng)求的數(shù)量,可以設(shè)置linger.ms大于0。這將指示生產(chǎn)者發(fā)送請(qǐng)求之前等待一段時(shí)間,希望更多的消息填補(bǔ)到未滿的批中。這類似于TCP的算法,例如上面的代碼段,可能100條消息在一個(gè)請(qǐng)求發(fā)送,因?yàn)槲覀冊(cè)O(shè)置了linger(逗留)時(shí)間為1毫秒,然后,如果我們沒有填滿緩沖區(qū),這個(gè)設(shè)置將增加1毫秒的延遲請(qǐng)求以等待更多的消息。需要注意的是,在高負(fù)載下,相近的時(shí)間一般也會(huì)組成批,即使是 linger.ms=0。在不處于高負(fù)載的情況下,如果設(shè)置比0大,以少量的延遲代價(jià)換取更少的,更有效的請(qǐng)求。

          • buffer.memory:控制生產(chǎn)者可用的緩存總量,如果消息發(fā)送速度比其傳輸?shù)椒?wù)器的快,將會(huì)耗盡這個(gè)緩存空間。當(dāng)緩存空間耗盡,其他發(fā)送調(diào)用將被阻塞,阻塞時(shí)間的閾值通過max.block.ms設(shè)定,之后它將拋出一個(gè)TimeoutException。

          • key.serializer:用于序列化。

          • value.serializer:用于序列化。

          消費(fèi)者

          public class ConsumerSample {

          public static void main(String[] args) {
          String topic = "test";// topic name

          Properties props = new Properties();
          props.put("bootstrap.servers", "localhost:9092");
          props.put("group.id", "testGroup1");
          props.put("enable.auto.commit", "true");
          props.put("auto.commit.interval.ms", "1000");
          props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
          props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
          Consumer<String, String> consumer = new KafkaConsumer(props);
          // 訂閱多個(gè)主題
          consumer.subscribe(Arrays.asList(topic));
          while (true) {
          // 訂閱一組topic之后,調(diào)用poll時(shí),消費(fèi)者將自動(dòng)加入到組中。
          // 只要持續(xù)調(diào)用poll,消費(fèi)者將一直保持可用,并繼續(xù)從分配的分區(qū)中接收消息。
          // 消費(fèi)者向服務(wù)器定時(shí)發(fā)送心跳,如果在session.timeout.ms配置的時(shí)間內(nèi)無法發(fā)送心跳,被視為死亡,分區(qū)將重新分配
          ConsumerRecords<String, String> records = consumer.poll(100);
          for (ConsumerRecord<String, String> record : records)
          System.out.printf("*****************partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
          }
          }
          }


          • bootstrap.servers:用于建立與 kafka 集群連接的 host/port 組。

          • group.id:消費(fèi)者的組名,組名相同的消費(fèi)者被視為同一個(gè)消費(fèi)組。

          • enable.auto.commit:設(shè)置Consumer 的 offset 是否自動(dòng)提交。

          • auto.commit.interval.ms:上面屬性設(shè)置為true,由本屬性設(shè)置自動(dòng)提交 offset 到 zookeeper 的時(shí)間間隔,時(shí)間是毫秒

          • key.deserializer:用于反序列化。

          • value.deserializer:用于反序列化。

          Kafka通過進(jìn)程池瓜分消息并處理消息,這些進(jìn)程可以在同一臺(tái)機(jī)器運(yùn)行,也可以分布到多臺(tái)機(jī)器上,以增加可擴(kuò)展型和容錯(cuò)性,相同的group.id的消費(fèi)者將視為同一個(gè)消費(fèi)者組。

          組中的每個(gè)消費(fèi)者都通過subscribe API動(dòng)態(tài)的訂閱一個(gè)topic列表。kafka將已訂閱topic的消息發(fā)送到每個(gè)消費(fèi)者組中。并通過平衡分區(qū)在消費(fèi)者分組中所有成員之間來達(dá)到平均。因此每個(gè)分區(qū)恰好地分配1個(gè)消費(fèi)者(一個(gè)消費(fèi)者組中)。所有如果一個(gè)topic有4個(gè)分區(qū),并且一個(gè)消費(fèi)者分組有只有2個(gè)消費(fèi)者。那么每個(gè)消費(fèi)者將消費(fèi)2個(gè)分區(qū)。

          消費(fèi)者組的成員是動(dòng)態(tài)維護(hù)的:如果一個(gè)消費(fèi)者故障。分配給它的分區(qū)將重新分配給同一個(gè)分組中其他的消費(fèi)者。同樣的,如果一個(gè)新的消費(fèi)者加入到分組,將從現(xiàn)有消費(fèi)者中移一個(gè)給它。這被稱為重新平衡分組。

          啟動(dòng)Zookeeper和kafka

          創(chuàng)建topic

          D:\dev\kafka_2.11-2.3.1>.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2182 --replication-factor 1 --partitions 1 --topic test


          啟動(dòng)zookeeper

          D:\dev\kafka_2.11-2.3.1>.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties


          啟動(dòng)kafka

          D:\dev\kafka_2.11-2.3.1>.\bin\windows\kafka-server-start.bat .\config\server.properties


          測(cè)試

          先啟動(dòng)消費(fèi)者ConsumerExample,再啟動(dòng)生產(chǎn)者ProducerExample,觀察控制臺(tái)。

          總結(jié)

          • kafka作為一個(gè)消息系統(tǒng),它設(shè)計(jì)了partition分區(qū),提供了負(fù)載均衡能力,保證了消息分區(qū)內(nèi)的順序。

          • kafka擁有消費(fèi)者組的概念,很好地實(shí)現(xiàn)發(fā)布訂閱和隊(duì)列式的消息模型。

          • kafka作為一個(gè)存儲(chǔ)系統(tǒng),高性能,低延遲。

          • kafka能夠提供實(shí)時(shí)的流處理,提供強(qiáng)大的StreamsAPI,而不是簡(jiǎn)單的讀寫和存儲(chǔ)。




          想進(jìn)大廠的小伙伴請(qǐng)注意,

          大廠面試的套路很神奇,

          早做準(zhǔn)備對(duì)大家更有好處,

          埋頭刷題效率低,

          看面經(jīng)會(huì)更有效率!

          小編準(zhǔn)備了一份大廠常問面經(jīng)匯總集

          剩下的就不會(huì)給大家一展出來了,以上資料按照一下操作即可獲得


          ——將文章進(jìn)行轉(zhuǎn)發(fā)評(píng)論,關(guān)注公眾號(hào)【Java烤豬皮】,關(guān)注后繼續(xù)后臺(tái)回復(fù)領(lǐng)取口令“ 666 ”即可免費(fèi)領(lǐng)文章取中所提供的資料。




          往期精品推薦



          騰訊、阿里、滴滴后臺(tái)試題匯集總結(jié) — (含答案)

          面試:史上最全多線程序面試題!

          最新阿里內(nèi)推Java后端試題

          JVM難學(xué)?那是因?yàn)槟銢]有真正看完整這篇文章


          結(jié)束


          關(guān)注作者微信公眾號(hào) — 《JAVA烤豬皮》


          了解了更多java后端架構(gòu)知識(shí)以及最新面試寶典



          看完本文記得給作者點(diǎn)贊+在看哦~~~大家的支持,是作者來源不斷出文的動(dòng)力~

          瀏覽 56
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  欧美黄色网址 | 成人爱爱网 | 豆花成人精品视频 | 2022国产精品 | 天天射天天射天天射 |