<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          mica-mqtt 1.0.3 發(fā)布,新增 websocket 子協(xié)議支持

          共 10703字,需瀏覽 22分鐘

           ·

          2021-08-21 11:28

          一、簡介

          mica-mqtt 基于 t-io 實(shí)現(xiàn)的簡單、低延遲、高性能 的 mqtt 物聯(lián)網(wǎng)開源組件。使用詳見 mica-mqtt gitee 源碼mica-mqtt-example 模塊。

          二、功能

          • [x] 支持 MQTT v3.1、v3.1.1 以及 v5.0 協(xié)議。
          • [x] 支持 websocket mqtt 子協(xié)議(支持 mqtt.js)。
          • [x] 支持 MQTT client 客戶端。
          • [x] 支持 MQTT server 服務(wù)端。
          • [x] 支持 MQTT 遺囑消息。
          • [x] 支持 MQTT 保留消息。
          • [x] 支持自定義消息(mq)處理轉(zhuǎn)發(fā)實(shí)現(xiàn)集群。
          • [x] MQTT 客戶端 阿里云 mqtt 連接 demo。
          • [x] 支持 GraalVM 編譯成本機(jī)可執(zhí)行程序。
          • [x] 支持 Spring boot 項(xiàng)目快速接入(mica-mqtt-spring-boot-starter)。
          • [x] mica-mqtt-spring-boot-starter 支持對接 Prometheus + Grafana。

          三、待辦

          • [ ] 優(yōu)化處理 mqtt session,以及支持部分 mqtt v5.0 新特性。

          四、更新記錄

          • ?mica-mqtt server 添加 websocket mqtt 子協(xié)議支持(支持 mqtt.js)。
          • ?mica-mqtt server ip,默認(rèn)為空,可不設(shè)置。
          • ?mica-mqtt client去除 CountDownLatch 避免啟動時未連接上服務(wù)端卡住。
          • ?mica-mqtt client 添加最大包體長度字段,避免超過 8092 長度的包體導(dǎo)致解析異常。
          • ?mica-mqtt client 添加連接監(jiān)聽 IMqttClientConnectListener。
          • ?mica-mqtt 3.1 協(xié)議會校驗(yàn) clientId 長度,添加配置項(xiàng) maxClientIdLength。
          • ?mica-mqtt 優(yōu)化 mqtt 解碼異常處理。
          • ?mica-mqtt 日志優(yōu)化,方便查詢。
          • ?mica-mqtt 代碼優(yōu)化,部分 Tio.close 改為 Tio.remove。
          • ?mica-mqtt-spring-boot-example 添加 Dockerfile,支持spring-boot:build-image。
          • ?完善 mica-mqtt-spring-boot-starter,添加遺囑消息配置。
          • ?? 升級 t-io 到 3.7.4。

          五、Spring boot 快速接入

          5.1 添加依賴

          <dependency>
              <groupId>net.dreamlu</groupId>
              <artifactId>mica-mqtt-spring-boot-starter</artifactId>
              <version>1.0.3</version>
          </dependency>

          5.2 服務(wù)端配置示例

          mqtt:
            server:
              enabled: true               # 是否開啟,默認(rèn):true
              ip: 127.0.0.1               # 服務(wù)端 ip 默認(rèn):127.0.0.1
              port: 5883                  # 端口,默認(rèn):1883
              name: Mica-Mqtt-Server      # 名稱,默認(rèn):Mica-Mqtt-Server
              buffer-allocator: HEAP      # 堆內(nèi)存和堆外內(nèi)存,默認(rèn):堆內(nèi)存
              heartbeat-timeout: 120000   # 心跳超時,單位毫秒,默認(rèn): 1000 * 120
              read-buffer-size: 8092      # 接收數(shù)據(jù)的 buffer size,默認(rèn):8092
              max-bytes-in-message: 8092  # 消息解析最大 bytes 長度,默認(rèn):8092
              debug: true                 # 如果開啟 prometheus 指標(biāo)收集建議關(guān)閉
              websocket-enable: true      # 開啟 websocket 子協(xié)議,默認(rèn)開啟
              websocket-port: 8083        # websocket 端口,默認(rèn):8083

          5.3 服務(wù)端可實(shí)現(xiàn)接口(注冊成 Spring Bean 即可)

          接口是否必須說明
          IMqttServerAuthHandler用于客戶端認(rèn)證
          IMqttMessageListener消息監(jiān)聽
          IMqttConnectStatusListener連接狀態(tài)監(jiān)聽
          IMqttSessionManagersession 管理
          IMqttMessageStore集群是,單機(jī)否遺囑和保留消息存儲
          AbstractMqttMessageDispatcher集群是,單機(jī)否消息轉(zhuǎn)發(fā),(遺囑、保留消息轉(zhuǎn)發(fā))
          IpStatListenert-io ip 狀態(tài)監(jiān)聽

          5.4 服務(wù)端自定義配置(可選)

          @Configuration(proxyBeanMethods = false)
          public class MqttServerCustomizerConfiguration {

           @Bean
           public MqttServerCustomizer activeRecordPluginCustomizer() {
            return new MqttServerCustomizer() {
             @Override
             public void customize(MqttServerCreator creator) {
              // 此處可自定義配置 creator,會覆蓋 yml 中的配置
              System.out.println("----------------MqttServerCustomizer-----------------");
             }
            };
           }

          }

          5.5 MqttServerTemplate 使用示例

          import net.dreamlu.iot.mqtt.codec.MqttQoS;
          import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate;
          import org.springframework.beans.factory.annotation.Autowired;
          import org.springframework.stereotype.Service;

          import java.nio.ByteBuffer;

          /**
           * @author wsq
           */

          @Service
          public class ServerService {
              @Autowired
              private MqttServerTemplate server;

              public boolean publish(String body) {
                  server.publishAll("/test/123", ByteBuffer.wrap(body.getBytes()));
                  return true;
              }
          }

          5.6 基于 mq 消息廣播集群處理

          • 實(shí)現(xiàn) IMqttConnectStatusListener 處理設(shè)備狀態(tài)存儲。
          • 實(shí)現(xiàn) IMqttMessageListener 將消息轉(zhuǎn)發(fā)到 mq,業(yè)務(wù)按需處理 mq 消息。
          • 實(shí)現(xiàn) IMqttMessageStore 存儲遺囑和保留消息。
          • 實(shí)現(xiàn) AbstractMqttMessageDispatcher 將消息發(fā)往 mq,mq 再廣播回 mqtt 集群,mqtt 將消息發(fā)送到設(shè)備。
          • 業(yè)務(wù)消息發(fā)送到 mq,mq 廣播到 mqtt 集群,mqtt 將消息發(fā)送到設(shè)備。

          5.7 Prometheus + Grafana 監(jiān)控對接

          得益于 t-io 良好的設(shè)計,監(jiān)控指標(biāo)直接對接的 t-iostat,目前支持下列指標(biāo),后期會不斷完善。

          支持得指標(biāo)說明
          mqtt_connections_accepted共接受過連接數(shù)
          mqtt_connections_closed關(guān)閉過的連接數(shù)
          mqtt_connections_size當(dāng)前連接數(shù)
          mqtt_messages_handled_packets已處理消息數(shù)
          mqtt_messages_handled_bytes已處理消息字節(jié)數(shù)
          mqtt_messages_received_packets已接收消息數(shù)
          mqtt_messages_received_bytes已處理消息字節(jié)數(shù)
          mqtt_messages_send_packets已發(fā)送消息數(shù)
          mqtt_messages_send_bytes已發(fā)送消息字節(jié)數(shù)
          mqtt監(jiān)控1.jpg

          關(guān)于mica-mqtt-spring-boot-starter 更多請查看文檔:https://gitee.com/596392912/mica-mqtt/tree/master/mica-mqtt-spring-boot-starter

          六、普通 java 項(xiàng)目接入

          6.1 maven 依賴

           <dependency>
             <groupId>net.dreamlu</groupId>
             <artifactId>mica-mqtt-core</artifactId>
             <version>1.0.3</version>
           </dependency>

          6.2 mica-mqtt 客戶端

           // 初始化 mqtt 客戶端
           MqttClient client = MqttClient.create()
               .ip("127.0.0.1")
               .port(1883)                     // 默認(rèn):1883
               .username("admin")
               .password("123456")
               .version(MqttVersion.MQTT_5)    // 默認(rèn):3_1_1
               .clientId("xxxxxx")             // 默認(rèn):MICA-MQTT- 前綴和 36進(jìn)制的納秒數(shù)
               .connect();                     // 連接
           
               // 消息訂閱,同類方法 subxxx
               client.subQos0("/test/#", (topic, payload) -> {
                   logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
               });
               // 取消訂閱
               client.unSubscribe("/test/#");
           
               // 發(fā)送消息
               client.publish("/test/client", ByteBuffer.wrap("mica最牛皮".getBytes(StandardCharsets.UTF_8)));
           
               // 斷開連接
               client.disconnect();
               // 重連
               client.reconnect();
               // 停止
               client.stop();

          6.3 mica-mqtt 服務(wù)端

           // 注意:為了能接受更多鏈接(降低內(nèi)存),請?zhí)砑?jvm 參數(shù) -Xss129k
           MqttServer mqttServer = MqttServer.create()
               // 默認(rèn):127.0.0.1
               .ip("127.0.0.1")
               // 默認(rèn):1883
               .port(1883)
               // 默認(rèn)為:8092(mqtt 默認(rèn)最大消息大小),為了降低內(nèi)存可以減小小此參數(shù),如果消息過大 t-io 會嘗試解析多次(建議根據(jù)實(shí)際業(yè)務(wù)情況而定)
               .readBufferSize(512)
               // 自定義認(rèn)證
               .authHandler((clientId, userName, password) -> true)
               // 消息監(jiān)聽
               .messageListener((clientId, topic, mqttQoS, payload) -> {
                   logger.info("clientId:{} topic:{} mqttQoS:{} message:{}", clientId, topic, mqttQoS, ByteBufferUtil.toString(payload));
               })
               // ssl 配置
               .useSsl("""""")
               // 自定義客戶端上下線監(jiān)聽
               .connectStatusListener(new IMqttConnectStatusListener() {
                   @Override
                   public void online(String clientId) {
           
                   }
           
                   @Override
                   public void offline(String clientId) {
           
                   }
               })
               // 自定義消息轉(zhuǎn)發(fā),可用 mq 廣播實(shí)現(xiàn)集群化處理
               .messageDispatcher(new IMqttMessageDispatcher() {
                   @Override
                   public void config(MqttServer mqttServer) {
           
                   }
           
                   @Override
                   public boolean send(Message message) {
                       return false;
                   }
           
                   @Override
                   public boolean send(String clientId, Message message) {
                       return false;
                   }
               })
               .debug() // 開啟 t-io debug 信息日志
               .start();
           
           // 發(fā)送給某個客戶端
           mqttServer.publish("clientId","/test/123", ByteBuffer.wrap("mica最牛皮".getBytes()));
           
           // 發(fā)送給所有在線監(jiān)聽這個 topic 的客戶端
           mqttServer.publishAll("/test/123", ByteBuffer.wrap("mica最牛皮".getBytes()));
           
           // 停止服務(wù)
           mqttServer.stop();

          七、效果演示

          八、關(guān)注我們

          瀏覽 161
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  黄色片毛片 | 欧美三级成人网站 | 蜜 桃 黄 片AV在线观看 91人妻人人澡人人爽人人精品 | 韩日精品在线观看 | 欧美美女操逼 |