為什么放棄Kafka,選擇Pulsar?
Spring Boot 作為主流微服務(wù)框架,擁有成熟的社區(qū)生態(tài)。市場應(yīng)用廣泛,為了方便大家,整理了一個基于spring boot的常用中間件快速集成入門系列手冊,涉及RPC、緩存、消息隊列、分庫分表、注冊中心、分布式配置等常用開源組件,大概有幾十篇文章,陸續(xù)會開放出來,感興趣同學(xué)請?zhí)崆瓣P(guān)注&收藏
Pulsar 介紹
Pulsar 是 Yahoo 在 2013 年創(chuàng)建的,2016年貢獻給了 Apache 基金會,目前已經(jīng)是 Apache 的頂級項目。Yahoo、Verizon、Twitter 等很多公司都在使用 Pulsar 來處理海量消息。
Pulsar 聲稱比 Kafka 更快、運行成本更低、解決了很多 Kafka 的痛點。
Pulsar 非常靈活,可以像Kafka 一樣作為分布式日志系統(tǒng),也可以作為類似RabbitMQ 這類簡單的消息系統(tǒng)。
Pulsar 有多種訂閱類型、傳遞保障、保存策略。
特性
內(nèi)置多租戶
不同的團隊可以使用同一個集群,互相隔離。支持隔離、認證授權(quán)、配額。
多層架構(gòu)
Pulsar 使用特定的數(shù)據(jù)層來存儲 topic 數(shù)據(jù),使用了 Apache BookKeeper 作為數(shù)據(jù)賬本。Broker 與存儲分離。
使用分隔機制可以解決集群的擴展、再平衡、維護等問題。也提升了可用性,不會丟失數(shù)據(jù)。
因為使用了多層架構(gòu),對于 topic 數(shù)量沒有限制,topic 與存儲是分離的,也可以創(chuàng)建非持久化的 topic。
多層存儲
Kafka 中存儲是很昂貴的,所以很少存儲冷數(shù)據(jù)。Pulsar 使用了多層存儲,可以自動把舊數(shù)據(jù)移動到專門的存儲設(shè)備,例如 Amazon S3,但是對于客戶端來講是透明的,還可以正常使用。
Functions
Pulsar Function 是一種部署簡單,輕量級計算、對開發(fā)人員友好的 API,無需像 Kafka 那樣運行自己的流處理引擎。
安全
內(nèi)置了代理、多租戶安全機制、可插入的身份驗證等功能。
快速再平衡
partition 被分為了小塊兒,所以再平衡時非常快。
多系統(tǒng)集成
例如 Kafka、RabbitMQ 等系統(tǒng)都可以輕松集成。
支持多種開發(fā)語言
例如 Go、Java、Scala、Node、Python 等等
為什么選擇 Pulsar
目前業(yè)界使用比較多的是 Kafka,主要場景是大數(shù)據(jù)日志處理,較少用于金融場景。RocketMQ 對 Topic 運營不太友好,特別是不支持按 Topic 刪除失效消息,以及不具備宕機 Failover 能力。選 Pulsar 是因為其原生的高一致性,基于 BookKeeper 提供高可用存儲服務(wù),采用了存儲和服務(wù)分離架構(gòu)方便擴容,同時還支持多種消費模式和多域部署模式。Kafka、RocketMQ 和 Pulsar 的對比如下:

Show me the code
外部依賴:
在 pom.xml 中添加 Pulsar 依賴:
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.4.0</version>
</dependency>
配置文件:
在配置文件 application.yaml中配置 Pulsar 的相關(guān)參數(shù),具體內(nèi)容如下:
pulsar:
service:
url: pulsar://127.0.0.1:6650
Producer 發(fā)送消息:
生產(chǎn)端提供了一個restful接口,模擬發(fā)送一條創(chuàng)建新用戶消息。
Long id = Long.valueOf(new Random().nextInt(1000));
User user = User.builder().id(id).userName("TomGE").age(29).address("上海").build();
userPulsarMsgProducer.send(user);
內(nèi)部通過 @PostConstruct 在應(yīng)用啟動時,初始化org.apache.pulsar.client.api.Producer實例,并交由spring 容器統(tǒng)一管理。
public void send(T msg) {
String msgBody = JSON.toJSONString(msg);
try {
MessageId messageId = producer.send(msgBody.getBytes(StandardCharsets.UTF_8));
log.info("pulsar msg send success, topic:{}, messageId:{}, msg:{}", getTopic(), messageId, msgBody);
} catch (Throwable e) {
log.error("pulsar msg send failed, topic:{}, msg:{}", getTopic(), msgBody);
}
}
Producer 發(fā)送延遲消息:
適用于一些有延遲處理要求的業(yè)務(wù)場景,比如電商交易的自動確認收貨,在賣家發(fā)出貨品后,有15天的觀察期,這期間如果買家沒有發(fā)起逆向流程/申請退款,將會由系統(tǒng)自動觸發(fā)超時確認收貨。
不同業(yè)務(wù)場景,設(shè)定不同的延遲時間值,可以讓消費端在延遲指定時間后才能拉取到消息并進行消費。借助于該框架特性,有效節(jié)省開發(fā)成本和難度。
producer.newMessage().deliverAfter(delay, unit)
.value(msgBody.getBytes(StandardCharsets.UTF_8))
.send();
Consumer 消費消息:
系統(tǒng)啟動時,自動創(chuàng)建consumer消費實例,并埋入org.apache.pulsar.client.api.MessageListener接口實現(xiàn),用于具體的消息消費處理邏輯。
@PostConstruct
void init() throws PulsarClientException {
consumer = client.createConsumer(getTopic(), getSubscriptionName(), new DefaultJsonMsgListener());
}
class DefaultJsonMsgListener implements MessageListener<byte[]> {
@Override
public void received(Consumer<byte[]> consumer, Message<byte[]> message) {
if (null != message && null != message.getData() && message.getData().length != 0) {
String msgBody = new String(message.getValue(), StandardCharsets.UTF_8);
log.warn("topic:{} receive message:{}", getTopic(), msgBody);
try {
T msg = JSON.parseObject(msgBody, clazzT);
handleMsg(msg);
} catch (Exception e) {
log.error("handle msg failed, topic:{}, message:{}", getTopic(), msgBody, e);
return;
}
}
try {
// 提交消費位移
consumer.acknowledge(message);
} catch (PulsarClientException e) {
log.error("topic:{} ack failed", getTopic(), e);
}
}
}演示代碼地址
https://github.com/aalansehaiyang/spring-boot-bulking
模塊:spring-boot-bulking-pulsar
歡迎關(guān)注微信公眾號:互聯(lián)網(wǎng)全棧架構(gòu),收取更多有價值的信息。
