<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          為什么放棄Kafka,選擇Pulsar?

          共 6429字,需瀏覽 13分鐘

           ·

          2021-06-24 10:49

          Spring Boot 作為主流微服務(wù)框架,擁有成熟的社區(qū)生態(tài)。市場應(yīng)用廣泛,為了方便大家,整理了一個基于spring boot的常用中間件快速集成入門系列手冊,涉及RPC、緩存、消息隊列、分庫分表、注冊中心、分布式配置等常用開源組件,大概有幾十篇文章,陸續(xù)會開放出來,感興趣同學(xué)請?zhí)崆瓣P(guān)注&收藏

          Pulsar 介紹

          Pulsar 是 Yahoo 在 2013 年創(chuàng)建的,2016年貢獻給了 Apache 基金會,目前已經(jīng)是 Apache 的頂級項目。Yahoo、Verizon、Twitter 等很多公司都在使用 Pulsar 來處理海量消息。

          Pulsar 聲稱比 Kafka 更快、運行成本更低、解決了很多 Kafka 的痛點。

          Pulsar 非常靈活,可以像Kafka 一樣作為分布式日志系統(tǒng),也可以作為類似RabbitMQ 這類簡單的消息系統(tǒng)。

          Pulsar 有多種訂閱類型、傳遞保障、保存策略。

          特性

          • 內(nèi)置多租戶

          不同的團隊可以使用同一個集群,互相隔離。支持隔離、認證授權(quán)、配額。

          • 多層架構(gòu)

          Pulsar 使用特定的數(shù)據(jù)層來存儲 topic 數(shù)據(jù),使用了 Apache BookKeeper 作為數(shù)據(jù)賬本。Broker 與存儲分離。

          使用分隔機制可以解決集群的擴展、再平衡、維護等問題。也提升了可用性,不會丟失數(shù)據(jù)。

          因為使用了多層架構(gòu),對于 topic 數(shù)量沒有限制,topic 與存儲是分離的,也可以創(chuàng)建非持久化的 topic。

          • 多層存儲

          Kafka 中存儲是很昂貴的,所以很少存儲冷數(shù)據(jù)。Pulsar 使用了多層存儲,可以自動把舊數(shù)據(jù)移動到專門的存儲設(shè)備,例如 Amazon S3,但是對于客戶端來講是透明的,還可以正常使用。

          • Functions

          Pulsar Function 是一種部署簡單,輕量級計算、對開發(fā)人員友好的 API,無需像 Kafka 那樣運行自己的流處理引擎。

          • 安全

          內(nèi)置了代理、多租戶安全機制、可插入的身份驗證等功能。

          • 快速再平衡

          partition 被分為了小塊兒,所以再平衡時非常快。

          • 多系統(tǒng)集成

          例如 Kafka、RabbitMQ 等系統(tǒng)都可以輕松集成。

          • 支持多種開發(fā)語言

          例如 Go、Java、Scala、Node、Python 等等

          為什么選擇 Pulsar

          目前業(yè)界使用比較多的是 Kafka,主要場景是大數(shù)據(jù)日志處理,較少用于金融場景。RocketMQ 對 Topic 運營不太友好,特別是不支持按 Topic 刪除失效消息,以及不具備宕機 Failover 能力。選 Pulsar 是因為其原生的高一致性,基于 BookKeeper 提供高可用存儲服務(wù),采用了存儲和服務(wù)分離架構(gòu)方便擴容,同時還支持多種消費模式和多域部署模式。Kafka、RocketMQ 和 Pulsar 的對比如下:

          Show me the code

          外部依賴:

          在 pom.xml 中添加 Pulsar 依賴:

          <dependency>
              <groupId>org.apache.pulsar</groupId>
              <artifactId>pulsar-client</artifactId>
              <version>2.4.0</version>
          </dependency>

          配置文件:

          在配置文件 application.yaml中配置 Pulsar 的相關(guān)參數(shù),具體內(nèi)容如下:

          pulsar:
            service:
              url: pulsar://127.0.0.1:6650

          Producer 發(fā)送消息:

          生產(chǎn)端提供了一個restful接口,模擬發(fā)送一條創(chuàng)建新用戶消息。

          Long id = Long.valueOf(new Random().nextInt(1000));
          User user = User.builder().id(id).userName("TomGE").age(29).address("上海").build();
          userPulsarMsgProducer.send(user);

          內(nèi)部通過 @PostConstruct 在應(yīng)用啟動時,初始化org.apache.pulsar.client.api.Producer實例,并交由spring 容器統(tǒng)一管理。

          public void send(T msg) {
              String msgBody = JSON.toJSONString(msg);
              try {
                  MessageId messageId = producer.send(msgBody.getBytes(StandardCharsets.UTF_8));
                  log.info("pulsar msg send success, topic:{}, messageId:{}, msg:{}", getTopic(), messageId, msgBody);
              } catch (Throwable e) {
                  log.error("pulsar msg send failed, topic:{}, msg:{}", getTopic(), msgBody);
              }
          }

          Producer 發(fā)送延遲消息:

          適用于一些有延遲處理要求的業(yè)務(wù)場景,比如電商交易的自動確認收貨,在賣家發(fā)出貨品后,有15天的觀察期,這期間如果買家沒有發(fā)起逆向流程/申請退款,將會由系統(tǒng)自動觸發(fā)超時確認收貨

          不同業(yè)務(wù)場景,設(shè)定不同的延遲時間值,可以讓消費端在延遲指定時間后才能拉取到消息并進行消費。借助于該框架特性,有效節(jié)省開發(fā)成本和難度。

          producer.newMessage().deliverAfter(delay, unit)
                  .value(msgBody.getBytes(StandardCharsets.UTF_8))
                  .send();

          Consumer 消費消息:

          系統(tǒng)啟動時,自動創(chuàng)建consumer消費實例,并埋入org.apache.pulsar.client.api.MessageListener接口實現(xiàn),用于具體的消息消費處理邏輯。

          @PostConstruct
          void init() throws PulsarClientException {
              consumer = client.createConsumer(getTopic(), getSubscriptionName(), new DefaultJsonMsgListener());
          }

           class DefaultJsonMsgListener implements MessageListener<byte[]> {

                  @Override
                  public void received(Consumer<byte[]> consumer, Message<byte[]> message) {
                      if (null != message && null != message.getData() && message.getData().length != 0) {
                          String msgBody = new String(message.getValue(), StandardCharsets.UTF_8);

                          log.warn("topic:{} receive message:{}", getTopic(), msgBody);
                          try {
                              T msg = JSON.parseObject(msgBody, clazzT);
                              handleMsg(msg);
                          } catch (Exception e) {
                              log.error("handle msg failed, topic:{}, message:{}", getTopic(), msgBody, e);
                              return;
                          }
                      }

                      try {
                          // 提交消費位移
                          consumer.acknowledge(message);
                      } catch (PulsarClientException e) {
                          log.error("topic:{} ack failed", getTopic(), e);
                      }
                  }
              }

          演示代碼地址

          https://github.com/aalansehaiyang/spring-boot-bulking  

          模塊:spring-boot-bulking-pulsar




           

          推薦閱讀:
          為什么MySQL選擇RR作為默認隔離級別?
          故事篇:數(shù)據(jù)庫架構(gòu)演變之路
          35 張圖帶你 MySQL 調(diào)優(yōu)

          關(guān)互聯(lián)網(wǎng)全棧架構(gòu)

          瀏覽 155
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  水蜜桃视频高清 | 操逼网视频在线 | 97香蕉久久夜色精品国产 | 一区二区三视频 | 欧美性爱少妇性爱 |