【Kafka】使用Java實(shí)現(xiàn)數(shù)據(jù)的生產(chǎn)和消費(fèi)
點(diǎn)擊關(guān)注,與你共同成長(zhǎng)!

【Kafka】Java實(shí)現(xiàn)數(shù)據(jù)的生產(chǎn)和消費(fèi)
Kafka介紹
Kafka 是由 LinkedIn 公司開發(fā)的,它是一個(gè)分布式的,支持多分區(qū)、多副本,基于 Zookeeper 的分布式消息流平臺(tái),它同時(shí)也是一款開源的基于發(fā)布訂閱模式的消息引擎系統(tǒng)。
Kafka術(shù)語
Broker:消息中間件處理節(jié)點(diǎn),一個(gè)Kafka節(jié)點(diǎn)就是一個(gè)Broker,一個(gè)或者多個(gè)Broker可以組成一個(gè)Kafka集群; Topic:每條發(fā)布到Kafka集群的消息都有一個(gè)類別,這個(gè)類別被稱為Topic。(物理上不同Topic的消息分開存儲(chǔ),邏輯上一個(gè)Topic的消息雖然保存于一個(gè)或多個(gè)broker上但用戶只需指定消息的Topic即可生產(chǎn)或消費(fèi)數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處); Partition:Partition是物理上的概念,每個(gè)Topic包含一個(gè)或多個(gè)Partition; Producer:負(fù)責(zé)發(fā)布消息到Kafka Broker; Consumer:消息消費(fèi)者,向Kafka Broker讀取消息的客戶端; Consumer Group:每個(gè)Consumer屬于一個(gè)特定的Consumer Group(可為每個(gè)Consumer指定Groupname,若不指定Groupname則屬于默認(rèn)的Group); Consumer Offset:消費(fèi)者在消費(fèi)消息的過程中,記錄消費(fèi)者在分區(qū)中消費(fèi)進(jìn)度的字段,就是消息位移,它是一個(gè)偏移量,隨著消費(fèi)者不斷消費(fèi)分區(qū)中的消息而遞增; Replica:Kafka 中消息的備份又叫做 副本(Replica),副本的數(shù)量是可以配置的,Kafka 定義了兩類副本,領(lǐng)導(dǎo)者副本(Leader Replica) 和 追隨者副本(Follower Replica),前者對(duì)外提供服務(wù),后者只是被動(dòng)跟隨;Rebalance:當(dāng) Kafka 的某個(gè)主題的消費(fèi)者組中,有一個(gè)消費(fèi)者不可用后,其他消費(fèi)者會(huì)自動(dòng)重新分配訂閱的主題分區(qū),這個(gè)過程叫做 Rebalance,是 Kafka 實(shí)現(xiàn)消費(fèi)者端高可用的重要手段。
Kafka特性
高吞吐、低延遲:kakfa 最大的特點(diǎn)就是收發(fā)消息非常快,kafka 每秒可以處理幾十萬條消息,它的最低延遲只有幾毫秒;高伸縮性:每個(gè)主題(topic) 包含多個(gè)分區(qū)(partition),主題中的分區(qū)可以分布在不同的主機(jī)(broker)中;持久性、可靠性:Kafka 能夠允許數(shù)據(jù)的持久化存儲(chǔ),消息被持久化到磁盤,并支持?jǐn)?shù)據(jù)備份防止數(shù)據(jù)丟失,Kafka 底層的數(shù)據(jù)存儲(chǔ)是基于 Zookeeper 存儲(chǔ)的,Zookeeper 的數(shù)據(jù)能夠持久存儲(chǔ);容錯(cuò)性:允許集群中的節(jié)點(diǎn)失敗,某個(gè)節(jié)點(diǎn)宕機(jī),Kafka 集群能夠正常工作;高并發(fā):支持?jǐn)?shù)千個(gè)客戶端同時(shí)讀寫。
Kafka應(yīng)用場(chǎng)景
活動(dòng)跟蹤:Kafka 可以用來 跟蹤用戶行為,比如你經(jīng)常回去App購(gòu)物,你打開App的那一刻,你的登陸信息,登陸次數(shù)都會(huì)作為消息傳輸?shù)?Kafka ,當(dāng)你瀏覽購(gòu)物的時(shí)候,你的瀏覽信息,你的搜索指數(shù),你的購(gòu)物愛好都會(huì)作為一個(gè)個(gè)消息傳遞給 Kafka ,這樣就可以生成報(bào)告,可以做智能推薦,購(gòu)買喜好等;傳遞消息:Kafka 另外一個(gè)基本用途是 傳遞消息,應(yīng)用程序向用戶發(fā)送通知就是通過傳遞消息來實(shí)現(xiàn)的,這些應(yīng)用組件可以生成消息,而不需要關(guān)心消息的格式,也不需要關(guān)心消息是如何發(fā)送的;度量指標(biāo):Kafka也經(jīng)常 用來記錄運(yùn)營(yíng)監(jiān)控?cái)?shù)據(jù)。包括收集各種分布式應(yīng)用的數(shù)據(jù),生產(chǎn)各種操作的集中反饋,比如報(bào)警和報(bào)告;日志記錄:Kafka 的基本概念來源于提交日志,比如可以把數(shù)據(jù)庫(kù)的更新發(fā)送到 Kafka 上,用來記錄數(shù)據(jù)庫(kù)的更新時(shí)間,通過Kafka以統(tǒng)一接口服務(wù)的方式開放給各種consumer,例如hadoop、Hbase、Solr等; 流式處理:流式處理是有一個(gè)能夠提供多種應(yīng)用程序的領(lǐng)域; 限流削峰:Kafka 多用于互聯(lián)網(wǎng)領(lǐng)域某一時(shí)刻請(qǐng)求特別多的情況下,可以把請(qǐng)求寫入Kafka 中,避免直接請(qǐng)求后端程序?qū)е路?wù)崩潰。
以上介紹參考Kafka官方文檔。
Kafka核心API
Kafka有4個(gè)核心API
應(yīng)用程序使用Producer API發(fā)布消息到 1個(gè)或多個(gè)Topics中;應(yīng)用程序使用ConsumerAPI來訂閱 1個(gè)或多個(gè)Topics,并處理產(chǎn)生的消息;應(yīng)用程序使用Streams API充當(dāng)一個(gè)流處理器,從1個(gè)或多個(gè)Topics消費(fèi)輸入流,并產(chǎn)生一個(gè)輸出流到1個(gè)或多個(gè)Topics,有效地將輸入流轉(zhuǎn)換到輸出流; Connector API允許構(gòu)建或運(yùn)行可重復(fù)使用的生產(chǎn)者或消費(fèi)者,將Topic鏈接到現(xiàn)有的應(yīng)用程序或數(shù)據(jù)系統(tǒng)。

Kafka為何如此之快
Kafka 實(shí)現(xiàn)了零拷貝原理來快速移動(dòng)數(shù)據(jù),避免了內(nèi)核之間的切換。Kafka 可以將數(shù)據(jù)記錄分批發(fā)送,從生產(chǎn)者到文件系統(tǒng)(Kafka 主題日志)到消費(fèi)者,可以端到端的查看這些批次的數(shù)據(jù)。批處理能夠進(jìn)行更有效的數(shù)據(jù)壓縮并減少 I/O 延遲,Kafka 采取順序?qū)懭氪疟P的方式,避免了隨機(jī)磁盤尋址的浪費(fèi)。
總結(jié)一下其實(shí)就是四個(gè)要點(diǎn):
順序讀寫; 零拷貝; 消息壓縮; 分批發(fā)送。
案例
項(xiàng)目創(chuàng)建:

Dependencies:

構(gòu)建工具為Maven,Maven的依賴如下:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>1.0.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.0.0</version>
</dependency>
Kafka Producer
package cn.com.codingce.module;
import java.util.Properties;
import java.util.Random;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
public class Producer {
// 定義主題
public static String topic = "codingce_test";
public static void main(String[] args) throws InterruptedException {
Properties p = new Properties();
// bootstrap.servers: kafka的地址, 多個(gè)地址用逗號(hào)分割
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.150:9092");
// acks:消息的確認(rèn)機(jī)制,默認(rèn)值是0. acks=0: 如果設(shè)置為0,生產(chǎn)者不會(huì)等待kafka的響應(yīng); acks=1: 這個(gè)配置意味著kafka會(huì)把這條消息寫到本地日志文件中,但是不會(huì)等待集群中其他機(jī)器的成功響應(yīng)
// acks=all: 這個(gè)配置意味著leader會(huì)等待所有的follower同步完成. 這個(gè)確保消息不會(huì)丟失, 除非kafka集群中所有機(jī)器掛掉. 這是最強(qiáng)的可用性保證.
p.put("acks", "all");
// retries: 配置為大于0的值的話, 客戶端會(huì)在消息發(fā)送失敗時(shí)重新發(fā)送.
p.put("retries", 0);
// batch.size: 當(dāng)多條消息需要發(fā)送到同一個(gè)分區(qū)時(shí),生產(chǎn)者會(huì)嘗試合并網(wǎng)絡(luò)請(qǐng)求. 這會(huì)提高client和生產(chǎn)者的效率.
p.put("batch.size", 16384);
// key.serializer: 鍵序列化,默認(rèn)org.apache.kafka.common.serialization.StringDeserializer.
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// value.deserializer:值序列化,默認(rèn)org.apache.kafka.common.serialization.StringDeserializer.
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p);
try {
do {
String msg = "后端碼匠, " + new Random().nextInt(100);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, msg);
kafkaProducer.send(record);
System.out.println("======消息發(fā)送成功: " + msg + " ======");
Thread.sleep(1000L);
} while (true);
} finally {
kafkaProducer.close();
}
}
}
output
======消息發(fā)送成功: 后端碼匠, 97 ======
======消息發(fā)送成功: 后端碼匠, 35 ======
======消息發(fā)送成功: 后端碼匠, 81 ======
======消息發(fā)送成功: 后端碼匠, 46 ======
======消息發(fā)送成功: 后端碼匠, 62 ======
======消息發(fā)送成功: 后端碼匠, 53 ======
======消息發(fā)送成功: 后端碼匠, 42 ======
======消息發(fā)送成功: 后端碼匠, 56 ======
======消息發(fā)送成功: 后端碼匠, 99 ======
======消息發(fā)送成功: 后端碼匠, 46 ======
======消息發(fā)送成功: 后端碼匠, 49 ======
======消息發(fā)送成功: 后端碼匠, 35 ======
======消息發(fā)送成功: 后端碼匠, 17 ======
======消息發(fā)送成功: 后端碼匠, 78 ======
======消息發(fā)送成功: 后端碼匠, 66 ======
======消息發(fā)送成功: 后端碼匠, 4 ======
======消息發(fā)送成功: 后端碼匠, 9 ======
======消息發(fā)送成功: 后端碼匠, 69 ======
======消息發(fā)送成功: 后端碼匠, 52 ======
======消息發(fā)送成功: 后端碼匠, 2 ======
======消息發(fā)送成功: 后端碼匠, 8 ======
======消息發(fā)送成功: 后端碼匠, 86 ======
======消息發(fā)送成功: 后端碼匠, 12 ======
======消息發(fā)送成功: 后端碼匠, 67 ======
======消息發(fā)送成功: 后端碼匠, 91 ======
======消息發(fā)送成功: 后端碼匠, 8 ======
======消息發(fā)送成功: 后端碼匠, 56 ======
======消息發(fā)送成功: 后端碼匠, 89 ======
======消息發(fā)送成功: 后端碼匠, 37 ======
======消息發(fā)送成功: 后端碼匠, 39 ======
======消息發(fā)送成功: 后端碼匠, 71 ======
Kafka Consumer
package cn.com.codingce.module;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;
public class Consumer {
private static final String GROUPID = "codingce_consumer_a";
public static void main(String[] args) {
Properties p = new Properties();
// bootstrap.servers: kafka的地址, 多個(gè)地址用逗號(hào)分割
p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.150:9092");
// 消費(fèi)者所屬的分組id, 組名 不同組名可以重復(fù)消費(fèi).例如你先使用了組名A消費(fèi)了Kafka的1000條數(shù)據(jù), 但是你還想再次進(jìn)行消費(fèi)這1000條數(shù)據(jù),
// 并且不想重新去產(chǎn)生, 那么這里你只需要更改組名就可以重復(fù)消費(fèi)了.
p.put(ConsumerConfig.GROUP_ID_CONFIG, GROUPID);
// 是否自動(dòng)提交, 默認(rèn)為true.
p.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 從poll(拉)的回話處理時(shí)長(zhǎng)
p.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// 超時(shí)時(shí)間
p.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
// 一次最大拉取的條數(shù)
p.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
// 消費(fèi)規(guī)則, 默認(rèn)earliest
p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// key.serializer: 鍵序列化, 默認(rèn)org.apache.kafka.common.serialization.StringDeserializer.
p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// value.deserializer:值序列化, 默認(rèn)org.apache.kafka.common.serialization.StringDeserializer.
p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(p);
// 訂閱消息
kafkaConsumer.subscribe(Collections.singletonList(Producer.topic));
do {
// 訂閱之后, 再?gòu)膋afka中拉取數(shù)據(jù)
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("-----topic:%s, offset:%d, 消息:%s-----\n", record.topic(), record.offset(), record.value());
}
} while (true);
}
}
output
-----topic:codingce_test, offset:289, 消息:后端碼匠, 97-----
-----topic:codingce_test, offset:290, 消息:后端碼匠, 35-----
-----topic:codingce_test, offset:291, 消息:后端碼匠, 81-----
-----topic:codingce_test, offset:292, 消息:后端碼匠, 46-----
-----topic:codingce_test, offset:293, 消息:后端碼匠, 62-----
-----topic:codingce_test, offset:294, 消息:后端碼匠, 53-----
-----topic:codingce_test, offset:295, 消息:后端碼匠, 42-----
-----topic:codingce_test, offset:296, 消息:后端碼匠, 56-----
-----topic:codingce_test, offset:297, 消息:后端碼匠, 99-----
-----topic:codingce_test, offset:298, 消息:后端碼匠, 46-----
-----topic:codingce_test, offset:299, 消息:后端碼匠, 49-----
-----topic:codingce_test, offset:300, 消息:后端碼匠, 35-----
-----topic:codingce_test, offset:301, 消息:后端碼匠, 17-----
-----topic:codingce_test, offset:302, 消息:后端碼匠, 78-----
-----topic:codingce_test, offset:303, 消息:后端碼匠, 66-----
-----topic:codingce_test, offset:304, 消息:后端碼匠, 4-----
-----topic:codingce_test, offset:305, 消息:后端碼匠, 9-----
-----topic:codingce_test, offset:306, 消息:后端碼匠, 69-----
-----topic:codingce_test, offset:307, 消息:后端碼匠, 52-----
-----topic:codingce_test, offset:308, 消息:后端碼匠, 2-----
-----topic:codingce_test, offset:309, 消息:后端碼匠, 8-----
-----topic:codingce_test, offset:310, 消息:后端碼匠, 86-----
-----topic:codingce_test, offset:311, 消息:后端碼匠, 12-----
-----topic:codingce_test, offset:312, 消息:后端碼匠, 67-----
-----topic:codingce_test, offset:313, 消息:后端碼匠, 91-----
-----topic:codingce_test, offset:314, 消息:后端碼匠, 8-----
-----topic:codingce_test, offset:315, 消息:后端碼匠, 56-----
-----topic:codingce_test, offset:316, 消息:后端碼匠, 89-----
-----topic:codingce_test, offset:317, 消息:后端碼匠, 37-----
-----topic:codingce_test, offset:318, 消息:后端碼匠, 39-----
-----topic:codingce_test, offset:319, 消息:后端碼匠, 71-----
本次采用Docker 搭建的單機(jī) Kafka、Zookeeper,Kafka介紹參考官方文檔:http://kafka.apache.org/intro
項(xiàng)目地址:https://gitee.com/codingce/codingce-leetcode

以上,便是今天的分享,希望大家喜歡,覺得內(nèi)容不錯(cuò)的,歡迎「分享」「贊」或者點(diǎn)擊「在看」支持,謝謝各位。
評(píng)論
圖片
表情
