springboot整合kafka
點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號(hào)”
優(yōu)質(zhì)文章,第一時(shí)間送達(dá)
? 作者?|??Jone_chen
來源 |? urlify.cn/yEF7v2
66套java從入門到精通實(shí)戰(zhàn)課程分享
相關(guān)環(huán)境搭建(centos7下搭建單機(jī)kafka)
1、官網(wǎng)下載[kafka][http://kafka.apache.org/]
tar?-xzf?kafka_2.12-2.6.0.tgz
cd?kafka_2.13-2.6.0
2、修改配置文件(conf下面service.properties中advertised.listeners)
#?允許外部端口連接????????????????????????????????????????????
listeners=PLAINTEXT://0.0.0.0:9092??
#?外部代理地址????????????????????????????????????????????????
advertised.listeners=PLAINTEXT://192.168.0.175:9092
3、通過守護(hù)進(jìn)程啟動(dòng)命令
bin/zookeeper-server-start.sh?-daemon?config/zookeeper.properties
bin/kafka-server-start.sh?-daemon?config/server.properties
環(huán)境配置好之后,下面進(jìn)入測(cè)試。
4、創(chuàng)建一個(gè)主題
bin/kafka-topics.sh?--create?--topic?test?--bootstrap-server?localhost:9092
5、將事件/消息寫入主題(創(chuàng)建生產(chǎn)者)
bin/kafka-console-producer.sh?--topic?test?--bootstrap-server?localhost:9092
>this?is?test!
按crtl+c可退出當(dāng)前輸入模式
6、消費(fèi)
bin/kafka-console-consumer.sh?--topic?test?--from-beginning?--bootstrap-server?localhost:9092
springboot集成kafka
1、新建工程,添加pom
????org.springframework.kafka
????spring-kafka
2、application.yml添加kafka相關(guān)配置:
spring:
??application:
????name:?cloud-kafka
??kafka:
????bootstrap-servers:?192.168.0.175:9092
????producer:?#?producer?生產(chǎn)者
??????retries:?0?#?重試次數(shù)
??????acks:?1?#?應(yīng)答級(jí)別:多少個(gè)分區(qū)副本備份完成時(shí)向生產(chǎn)者發(fā)送ack確認(rèn)(可選0、1、all/-1)
??????batch-size:?16384?#?批量大小
??????buffer-memory:?33554432?#?生產(chǎn)端緩沖區(qū)大小
??????key-serializer:?org.apache.kafka.common.serialization.StringSerializer
??????value-serializer:?org.apache.kafka.common.serialization.StringSerializer
????consumer:?#?consumer消費(fèi)者
??????group-id:?test-consumer-group?#?默認(rèn)的消費(fèi)組ID
??????enable-auto-commit:?true?#?是否自動(dòng)提交offset
??????auto-commit-interval:?100?#?提交offset延時(shí)(接收到消息后多久提交offset)
??????auto-offset-reset:?latest
??????#?當(dāng)kafka中沒有初始o(jì)ffset或offset超出范圍時(shí)將自動(dòng)重置offset;
??????#?earliest:重置為分區(qū)中最小的offset;
??????#?latest:重置為分區(qū)中最新的offset(消費(fèi)分區(qū)中新產(chǎn)生的數(shù)據(jù));
??????#?none:只要有一個(gè)分區(qū)不存在已提交的offset,就拋出異常;
??????key-deserializer:?org.apache.kafka.common.serialization.StringDeserializer
??????value-deserializer:?org.apache.kafka.common.serialization.StringDeserializer
3、生產(chǎn)者(發(fā)送者)
@RestController
public?class?KafkaProducer?{
????@Resource
????private?KafkaTemplate?kafkaTemplate;
????@GetMapping("/kafka/normal/{msg}")
????public?void?sendMessage(@PathVariable("msg")?String?msg)?{
????????Message?message?=?new?Message();
????????message.setId(UUID.randomUUID().toString());
????????message.setSendTime(new?Date());
????????message.setMessage(msg);
????????kafkaTemplate.send("test",?JSONUtil.toJsonStr(message));
????}
}
4、消費(fèi)者(接受者)
@Component
public?class?KafkaConsumer?{
????private?final?Logger?logger?=?LoggerFactory.getLogger(KafkaConsumer.class);
????@KafkaListener(topics?=?{"test"})
????public?void?onMessage(ConsumerRecord,??>?consumerRecord)?{
????????Optional>?optional?=?Optional.ofNullable(consumerRecord.value());
????????if?(optional.isPresent())?{
????????????Object?msg?=?optional.get();
????????????logger.info("record:{}",?consumerRecord);
????????????logger.info("message:{}",?msg);
????????}
????}
}
5、實(shí)體類
public?class?Message?{
????private?String?id;
????private?String?message;
????private?Date?sendTime;
????//?getter?setter?略??
}
上面示例創(chuàng)建了一個(gè)生產(chǎn)者,發(fā)送消息到test,消費(fèi)者監(jiān)聽test消費(fèi)消息。監(jiān)聽器用@KafkaListener注解,topics表示監(jiān)聽的topic,支持同時(shí)監(jiān)聽多個(gè),用英文逗號(hào)分隔。啟動(dòng)項(xiàng)目,postman調(diào)接口觸發(fā)生產(chǎn)者發(fā)送消息。
同時(shí)查看日志信息:
2020-11-09?17:28:08.530??INFO?15076?---?[ntainer#0-0-C-1]?com.example.service.KafkaConsumer????????:?record:ConsumerRecord(topic?=?test,?partition?=?0,?leaderEpoch?=?0,?offset?=?3,?CreateTime?=?1604914088509,?serialized?key?size?=?-1,?serialized?value?size?=?87,?headers?=?RecordHeaders(headers?=?[],?isReadOnly?=?false),?key?=?null,?value?=?{"message":"test","sendTime":1604914088452,"id":"f4dcc246-8721-4ef8-bad4-555269328901"})
2020-11-09?17:28:08.530??INFO?15076?---?[ntainer#0-0-C-1]?com.example.service.KafkaConsumer????????:?message:{"message":"test","sendTime":1604914088452,"id":"f4dcc246-8721-4ef8-bad4-555269328901"}
可以看到消費(fèi)成功。
更詳細(xì)內(nèi)容,請(qǐng)參考:https://blog.csdn.net/yuanlong122716/article/details/105160545/
粉絲福利:實(shí)戰(zhàn)springboot+CAS單點(diǎn)登錄系統(tǒng)視頻教程免費(fèi)領(lǐng)取
???
?長按上方微信二維碼?2 秒 即可獲取資料
感謝點(diǎn)贊支持下哈?
