Kafka超詳細(xì)學(xué)習(xí)筆記【概念理解,安裝配置】
官方文檔:http://kafka.apache.org/23/documentation.html#introduction
中文文檔:https://kafka.apachecn.org/
本篇要點(diǎn)
介紹kafka的特性、概念、API及專業(yè)術(shù)語。
介紹Windows環(huán)境下kafka的安裝配置,啟動(dòng)測(cè)試。
Java客戶端連接kafka的案例演示。
Kafka介紹
Apache Kafka 是一個(gè)分布式流處理平臺(tái):distributed streaming platform。
作為流處理平臺(tái)的三種特性
可發(fā)布和訂閱消息(流),這與消息隊(duì)列或企業(yè)消息系統(tǒng)類似。
以容錯(cuò)(故障轉(zhuǎn)移)的方式存儲(chǔ)消息(流)。
提供實(shí)時(shí)的流處理。
主要應(yīng)用
kafka主要應(yīng)用于兩大類應(yīng)用:
構(gòu)建實(shí)時(shí)的流數(shù)據(jù)通道,可靠地獲取系統(tǒng)和應(yīng)用程序之間的數(shù)據(jù)。
構(gòu)建實(shí)時(shí)流的應(yīng)用程序,對(duì)數(shù)據(jù)流進(jìn)行轉(zhuǎn)換或反應(yīng)。
四個(gè)核心API
Producer API:發(fā)布消息到一個(gè)或多個(gè)topic主題上。
Consumer API:訂閱一個(gè)或多個(gè)topic,處理產(chǎn)生的消息。
Streams API:流處理器,從一個(gè)或多個(gè)topic消費(fèi)輸入流,并產(chǎn)生一個(gè)輸出流到一個(gè)或多個(gè)輸出topic,有效地將輸入流轉(zhuǎn)換到輸出流。
Connector API:可構(gòu)建或運(yùn)行可重用地生產(chǎn)者或消費(fèi)者,將topic連接到現(xiàn)有地應(yīng)用程序或數(shù)據(jù)系統(tǒng)。

基本術(shù)語
Topic:kafka將消息分類,每一類的消息都有一個(gè)主題topic。
Producer:生產(chǎn)者,發(fā)布消息的對(duì)象。
Consumer:消費(fèi)者,訂閱消息的對(duì)象。
Broker:代理,已發(fā)布的消息保存在一組服務(wù)器中,稱之為kafka集群,集群中每個(gè)服務(wù)器都是一個(gè)代理(broker)。消費(fèi)者可以訂閱一個(gè)或多個(gè)主題,并從broker上拉取數(shù)據(jù),從而消費(fèi)這些已發(fā)布的消息。
Partition:Topic物理上的分組,一個(gè)Topic可以分為多個(gè)partition,每個(gè)partition都是一個(gè)順序的、不可變的消息隊(duì)列,且可以持續(xù)添加。Partition中的每條消息都會(huì)被分配一個(gè)有序的序列號(hào),稱為偏移量(offset),因此每個(gè)分區(qū)中偏移量都是唯一的。

Consumer Group:每個(gè)Consumer屬于一個(gè)特定的Consumer Group,這是kafka用來實(shí)現(xiàn)一個(gè)Topic消息的廣播【發(fā)送給所有的consumer的發(fā)布訂閱式消息模型】和單播【發(fā)送給任意一個(gè)consumer隊(duì)列消息模型】的手段。一個(gè)topic可以有多個(gè)consumer group。
如果要實(shí)現(xiàn)廣播,只要每個(gè)consumer有獨(dú)立的consumer group就可以,此時(shí)就是發(fā)布訂閱模型。
如果要實(shí)現(xiàn)單播,只要所有的consumer在同一個(gè)consumer group中就可以,此時(shí)就是隊(duì)列模型。
關(guān)于Consumer group的補(bǔ)充:一般來說,我們可以創(chuàng)建一些consumer group作為邏輯上的訂閱者,每個(gè)組中包含數(shù)目不等的consumer,一個(gè)組內(nèi)的多個(gè)消費(fèi)者可以用來擴(kuò)展性能和容錯(cuò)。
關(guān)于partition分區(qū)的補(bǔ)充:
1、【負(fù)載均衡】處理更多的消息,不受單臺(tái)服務(wù)器的限制。
2、【順序保證】kafka不能保證并行的時(shí)候消息的有序性,但是可以保證一個(gè)partition分區(qū)之中,消息只能由消費(fèi)者組中的唯一一個(gè)消費(fèi)者處理,以保證一個(gè)分區(qū)的消息先后順序。
如下圖:2個(gè)kafka集群托管4個(gè)分區(qū)(p0-p3),2個(gè)消費(fèi)者組,組A有2個(gè)消費(fèi)者實(shí)例,組B有4個(gè)消費(fèi)者實(shí)例。

關(guān)于偏移量的補(bǔ)充:kafka集群將會(huì)保持所有的消息,直到他們過期,無論他們是否被消費(fèi)。當(dāng)消費(fèi)者消費(fèi)消息時(shí),偏移量offset將會(huì)線性增加,但是消費(fèi)者其實(shí)可以控制實(shí)際的偏移量,可以重置偏移量為更早的位置,意為著重新讀取消息,且不會(huì)影響其他消費(fèi)者對(duì)此log的處理。

快速開始
安裝配置Zookeeper
Kafka的安裝配置啟動(dòng)需要依賴于Zookeeper,Zookeeper的安裝配置可以參考我的前一篇文章。
當(dāng)然,其實(shí)你下載kafka之后,就自動(dòng)已經(jīng)集成了Zookeeper,你可以通過修改配置,啟動(dòng)內(nèi)置的zookeeper。
關(guān)于使用內(nèi)置的Zookeeper還是自己安裝的Zookeeper的區(qū)別,可以看看這篇文章:https://segmentfault.com/q/1010000021110446
下載kafka
下載地址:http://kafka.apache.org/downloads
下載二進(jìn)制版本【Binary downloads】,下載完成之后,解壓到合適的目錄下。
筆者目錄為:D:\dev\kafka_2.11-2.3.1。
配置文件
進(jìn)入config目錄下,找到server.properties文件并修改如下:
log.dirs=D:\\dev\\kafka_2.11-2.3.1\\config\\kafka-logs
zookeeper.connect=localhost:2182 # 默認(rèn)端口是2181,這里修改為2182
找到zookeeper.properties文件,修改如下:
dataDir=D:\\softs\\zookeeper-3.4.13\\datas
dataLogDir=D:\\softs\\zookeeper-3.4.13\\logs
clientPort=2182
Windows的命令
在bin目錄下存放著所有可以使用的命令行指令,Linux和Windows的存放目錄需要注意:

啟動(dòng)Zookeeper
D:\dev\kafka_2.11-2.3.1> .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

啟動(dòng)Kafka
D:\dev\kafka_2.11-2.3.1> .\bin\windows\kafka-server-start.bat .\config\server.properties

進(jìn)行測(cè)試
創(chuàng)建topic
創(chuàng)建1個(gè)分區(qū)1個(gè)副本,topic為test-topic
D:\dev\kafka_2.11-2.3.1>.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2182 --replication-factor 1 --partitions 1 --topic test-topic
Created topic test-topic.
查看topic
D:\dev\kafka_2.11-2.3.1>.\bin\windows\kafka-topics.bat --list --zookeeper localhost:2182
test-topic
生產(chǎn)者
D:\dev\kafka_2.11-2.3.1>.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test-topic
消費(fèi)者
D:\dev\kafka_2.11-2.3.1>.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test-topic --from-beginning
生產(chǎn)者與消費(fèi)者消息傳遞

刪除topic
如果kafka啟動(dòng)時(shí)加載的配置文件中 server.properties 中沒有配置delete.topic.enable=true,則此刪除非真正刪除,而是僅僅將topic標(biāo)記為marked for deletion
D:\dev\kafka_2.11-2.3.1>.\bin\windows\kafka-topics.bat --delete --zookeeper localhost:2182 --topic test-topic
Topic test-topic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
登錄內(nèi)置的zookeeper客戶端
D:\dev\kafka_2.11-2.3.1\bin\windows>zookeeper-shell.bat localhost:2182
Connecting to localhost:2182
Welcome to ZooKeeper!
JLine support is disabled
物理刪除topic
ls /brokers
[ids, topics, seqid]
ls /brokers/topics
[test, test-topic, __consumer_offsets]
rmr /brokers/topics/test-topic # 物理刪除 test-topic
ls /brokers/topics
[test, __consumer_offsets]
Java客戶端使用
引入依賴
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
生產(chǎn)者
public class ProducerExample {
public static void main(String[] args) {
Map<String, Object> props = new HashMap<>();
props.put("zk.connect", "localhost:2182");
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
String topic = "test";
for (int i = 1; i <= 100; i++) {
// send方法是異步的 , 返回Future對(duì)象,如果調(diào)用get(),將阻塞,直到相關(guān)請(qǐng)求完成并返回消息的metadata或拋出異常
producer.send(new ProducerRecord<>(topic, "key" + i, "msg" + i * 100));
}
// 生產(chǎn)者的傳沖空間池保留尚未發(fā)送到服務(wù)器的消息,后臺(tái)I/O線程負(fù)責(zé)將這些消息轉(zhuǎn)換程請(qǐng)求發(fā)送到集群
// 如果使用后不關(guān)閉生產(chǎn)者,將會(huì)丟失這些消息。
producer.close();
}
}
zk.connect:設(shè)置zookeeper的地址。
bootstrap.servers:用于建立與 kafka 集群連接的 host/port 組。
acks:判斷是不是成功發(fā)送,指定
all將會(huì)阻塞消息,這種設(shè)置性能最低,但是是最可靠的。retries:如果請(qǐng)求失敗,生產(chǎn)者會(huì)自動(dòng)重試,我們指定是0次,如果啟用重試,則會(huì)有重復(fù)消息的可能性。
batch.size:(生產(chǎn)者)緩存每個(gè)分區(qū)未發(fā)送的消息。緩存的大小是通過
batch.size配置指定的。值較大的話將會(huì)產(chǎn)生更大的批。并需要更多的內(nèi)存(因?yàn)槊總€(gè)“活躍”的分區(qū)都有1個(gè)緩沖區(qū))。linger.ms:默認(rèn)緩沖可立即發(fā)送,即便緩沖空間還沒有滿,但是,如果你想減少請(qǐng)求的數(shù)量,可以設(shè)置linger.ms大于0。這將指示生產(chǎn)者發(fā)送請(qǐng)求之前等待一段時(shí)間,希望更多的消息填補(bǔ)到未滿的批中。這類似于TCP的算法,例如上面的代碼段,可能100條消息在一個(gè)請(qǐng)求發(fā)送,因?yàn)槲覀冊(cè)O(shè)置了linger(逗留)時(shí)間為1毫秒,然后,如果我們沒有填滿緩沖區(qū),這個(gè)設(shè)置將增加1毫秒的延遲請(qǐng)求以等待更多的消息。需要注意的是,在高負(fù)載下,相近的時(shí)間一般也會(huì)組成批,即使是
linger.ms=0。在不處于高負(fù)載的情況下,如果設(shè)置比0大,以少量的延遲代價(jià)換取更少的,更有效的請(qǐng)求。buffer.memory:控制生產(chǎn)者可用的緩存總量,如果消息發(fā)送速度比其傳輸?shù)椒?wù)器的快,將會(huì)耗盡這個(gè)緩存空間。當(dāng)緩存空間耗盡,其他發(fā)送調(diào)用將被阻塞,阻塞時(shí)間的閾值通過
max.block.ms設(shè)定,之后它將拋出一個(gè)TimeoutException。key.serializer:用于序列化。
value.serializer:用于序列化。
消費(fèi)者
public class ConsumerSample {
public static void main(String[] args) {
String topic = "test";// topic name
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "testGroup1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer(props);
// 訂閱多個(gè)主題
consumer.subscribe(Arrays.asList(topic));
while (true) {
// 訂閱一組topic之后,調(diào)用poll時(shí),消費(fèi)者將自動(dòng)加入到組中。
// 只要持續(xù)調(diào)用poll,消費(fèi)者將一直保持可用,并繼續(xù)從分配的分區(qū)中接收消息。
// 消費(fèi)者向服務(wù)器定時(shí)發(fā)送心跳,如果在session.timeout.ms配置的時(shí)間內(nèi)無法發(fā)送心跳,被視為死亡,分區(qū)將重新分配
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("*****************partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
}
}
}
bootstrap.servers:用于建立與 kafka 集群連接的 host/port 組。
group.id:消費(fèi)者的組名,組名相同的消費(fèi)者被視為同一個(gè)消費(fèi)組。
enable.auto.commit:設(shè)置Consumer 的 offset 是否自動(dòng)提交。
auto.commit.interval.ms:上面屬性設(shè)置為true,由本屬性設(shè)置自動(dòng)提交 offset 到 zookeeper 的時(shí)間間隔,時(shí)間是毫秒
key.deserializer:用于反序列化。
value.deserializer:用于反序列化。
Kafka通過進(jìn)程池瓜分消息并處理消息,這些進(jìn)程可以在同一臺(tái)機(jī)器運(yùn)行,也可以分布到多臺(tái)機(jī)器上,以增加可擴(kuò)展型和容錯(cuò)性,相同的group.id的消費(fèi)者將視為同一個(gè)消費(fèi)者組。
組中的每個(gè)消費(fèi)者都通過subscribe API動(dòng)態(tài)的訂閱一個(gè)topic列表。kafka將已訂閱topic的消息發(fā)送到每個(gè)消費(fèi)者組中。并通過平衡分區(qū)在消費(fèi)者分組中所有成員之間來達(dá)到平均。因此每個(gè)分區(qū)恰好地分配1個(gè)消費(fèi)者(一個(gè)消費(fèi)者組中)。所有如果一個(gè)topic有4個(gè)分區(qū),并且一個(gè)消費(fèi)者分組有只有2個(gè)消費(fèi)者。那么每個(gè)消費(fèi)者將消費(fèi)2個(gè)分區(qū)。
消費(fèi)者組的成員是動(dòng)態(tài)維護(hù)的:如果一個(gè)消費(fèi)者故障。分配給它的分區(qū)將重新分配給同一個(gè)分組中其他的消費(fèi)者。同樣的,如果一個(gè)新的消費(fèi)者加入到分組,將從現(xiàn)有消費(fèi)者中移一個(gè)給它。這被稱為重新平衡分組。
啟動(dòng)Zookeeper和kafka
創(chuàng)建topic
D:\dev\kafka_2.11-2.3.1>.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2182 --replication-factor 1 --partitions 1 --topic test
啟動(dòng)zookeeper
D:\dev\kafka_2.11-2.3.1>.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
啟動(dòng)kafka
D:\dev\kafka_2.11-2.3.1>.\bin\windows\kafka-server-start.bat .\config\server.properties
測(cè)試
先啟動(dòng)消費(fèi)者ConsumerExample,再啟動(dòng)生產(chǎn)者ProducerExample,觀察控制臺(tái)。
總結(jié)
kafka作為一個(gè)消息系統(tǒng),它設(shè)計(jì)了partition分區(qū),提供了負(fù)載均衡能力,保證了消息分區(qū)內(nèi)的順序。
kafka擁有消費(fèi)者組的概念,很好地實(shí)現(xiàn)發(fā)布訂閱和隊(duì)列式的消息模型。
kafka作為一個(gè)存儲(chǔ)系統(tǒng),高性能,低延遲。
kafka能夠提供實(shí)時(shí)的流處理,提供強(qiáng)大的StreamsAPI,而不是簡(jiǎn)單的讀寫和存儲(chǔ)。

剩下的就不會(huì)給大家一展出來了,以上資料按照一下操作即可獲得
——將文章進(jìn)行轉(zhuǎn)發(fā)和評(píng)論,關(guān)注公眾號(hào)【Java烤豬皮】,關(guān)注后繼續(xù)后臺(tái)回復(fù)領(lǐng)取口令“ 666 ”即可免費(fèi)領(lǐng)文章取中所提供的資料。

騰訊、阿里、滴滴后臺(tái)試題匯集總結(jié) — (含答案)
面試:史上最全多線程序面試題!
最新阿里內(nèi)推Java后端試題
JVM難學(xué)?那是因?yàn)槟銢]有真正看完整這篇文章

關(guān)注作者微信公眾號(hào) — 《JAVA烤豬皮》
了解了更多java后端架構(gòu)知識(shí)以及最新面試寶典
看完本文記得給作者點(diǎn)贊+在看哦~~~大家的支持,是作者來源不斷出文的動(dòng)力~
