<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          mica-mqtt client,最好用的 java mqtt 客戶端

          共 16003字,需瀏覽 33分鐘

           ·

          2022-07-27 06:49

          簡介

          mica-mqtt 基于 t-io 實現(xiàn)的簡單、低延遲、高性能 的 mqtt 物聯(lián)網(wǎng)開源組件。

          mica-mqtt server 更加易于集成到已有服務(wù)和二次開發(fā),降低自研物聯(lián)網(wǎng)平臺開發(fā)成本。

          mica-mqtt client 是簡單、易用的 java mqtt 客戶端,更加容易集成到自己的業(yè)務(wù)代碼中。今天筆者主要要介紹的就是 mica-mqtt client 的使用。

          使用

          mica-mqtt-client Spring boot starter

          添加依賴

          <dependency>
            <groupId>net.dreamlu</groupId>
            <artifactId>mica-mqtt-client-spring-boot-starter</artifactId>
            <version>1.3.7</version>
          </dependency>

          配置項說明

          mqtt:
            client:
              enabled: true               # 是否開啟客戶端,默認(rèn):false 使用到的場景有限,非必要請不要啟用
              ip: 127.0.0.1               # 連接的服務(wù)端 ip ,默認(rèn):127.0.0.1
              port: 1883                  # 端口:默認(rèn):1883
              name: Mica-Mqtt-Client      # 名稱,默認(rèn):Mica-Mqtt-Client
              clientId: 000001            # 客戶端Id(非常重要,一般為設(shè)備 sn,不可重復(fù))
              user-name: mica             # 認(rèn)證的用戶名
              password: 123456            # 認(rèn)證的密碼
              timeout: 5                  # 超時時間,單位:秒,默認(rèn):5秒
              reconnect: true             # 是否重連,默認(rèn):true
              re-interval: 5000           # 重連時間,默認(rèn) 5000 毫秒
              version: MQTT_5             # mqtt 協(xié)議版本,默認(rèn):3.1.1
              read-buffer-size: 8KB       # 接收數(shù)據(jù)的 buffer size,默認(rèn):8k
              max-bytes-in-message: 10MB  # 消息解析最大 bytes 長度,默認(rèn):10M
              buffer-allocator: heap      # 堆內(nèi)存和堆外內(nèi)存,默認(rèn):堆內(nèi)存
              keep-alive-secs: 60         # keep-alive 時間,單位:秒
              clean-session: true         # mqtt clean session,默認(rèn):true
              use-ssl: false              # 是否啟用 ssl,默認(rèn):false

          連接狀態(tài)監(jiān)聽

          @Service
          public class MqttClientConnectListener {
              private static final Logger logger = LoggerFactory.getLogger(MqttClientConnectListener.class);

              @Autowired
              private MqttClientCreator mqttClientCreator;

              @EventListener
              public void onConnected(MqttConnectedEvent event) {
                  logger.info("MqttConnectedEvent:{}", event);
              }

              @EventListener
              public void onDisconnect(MqttDisconnectEvent event) {
                  // 離線時更新重連時的密碼,適用于類似阿里云 mqtt clientId 連接帶時間戳的方式 
                  logger.info("MqttDisconnectEvent:{}", event);
                  // 在斷線時更新 clientId、username、password
                  mqttClientCreator.clientId("newClient" + System.currentTimeMillis())
                      .username("newUserName")
                      .password("newPassword");
              }

          }

          自定義配置java(可選)

          @Configuration(proxyBeanMethods = false)
          public class MqttClientCustomizerConfiguration {

           @Bean
           public MqttClientCustomizer mqttClientCustomizer() {
            return new MqttClientCustomizer() {
             @Override
             public void customize(MqttClientCreator creator) {
              // 此處可自定義配置 creator,會覆蓋 yml 中的配置
              System.out.println("----------------MqttServerCustomizer-----------------");
             }
            };
           }

          }

          訂閱示例

          @Service
          public class MqttClientSubscribeListener {
           private static final Logger logger = LoggerFactory.getLogger(MqttClientSubscribeListener.class);

           @MqttClientSubscribe("/test/#")
           public void subQos0(String topic, ByteBuffer payload) {
            logger.info("topic:{} payload:{}", topic, ByteBufferUtil.toString(payload));
           }

           @MqttClientSubscribe(value = "/qos1/#", qos = MqttQoS.AT_LEAST_ONCE)
           public void subQos1(String topic, ByteBuffer payload) {
            logger.info("topic:{} payload:{}", topic, ByteBufferUtil.toString(payload));
           }

          }

          MqttClientTemplate 使用示例

          @Service
          public class MainService {
              private static final Logger logger = LoggerFactory.getLogger(MainService.class);
              @Autowired
              private MqttClientTemplate client;

              public boolean publish() {
                  // 發(fā)布消息示例
                  client.publish("/test/client", ByteBuffer.wrap("mica最牛皮".getBytes(StandardCharsets.UTF_8)));
                  return true;
              }

              public boolean sub() {
                  // 訂閱消息示例
                  client.subQos0("/test/#", (topic, payload) -> {
                      logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
                  });
                  return true;
              }

          }

          共享訂閱 topic 說明

          mica-mqtt client 支持兩種共享訂閱方式:

          • 共享訂閱:訂閱前綴 $queue/,多個客戶端訂閱了 $queue/topic,發(fā)布者發(fā)布到topic,則只有一個客戶端會接收到消息。

          • 分組訂閱:訂閱前綴 $share/<group>/,組客戶端訂閱了$share/group1/topic、$share/group2/topic..,發(fā)布者發(fā)布到topic,則消息會發(fā)布到每個group中,但是每個group中只有一個客戶端會接收到消息。

          jfinal mica-mqtt client(1.3.7 開始支持)

          添加依賴

          <dependency>
           <groupId>net.dreamlu</groupId>
           <artifactId>jfinal-mica-mqtt-client</artifactId>
           <version>1.3.7</version>
          </dependency>

          刪除 jfinal-demo 中的 slf4j-nop 依賴

          添加 slf4j-log4j12

          <dependency>
           <groupId>org.slf4j</groupId>
           <artifactId>slf4j-log4j12</artifactId>
           <version>1.7.33</version>
          </dependency>

          在 jfinal Config configPlugin 中添加 mica-mqtt client 插件

          MqttClientPlugin mqttClientPlugin = new MqttClientPlugin();
          mqttClientPlugin.config(mqttClientCreator -> {
           // 設(shè)置 mqtt 連接配置信息
           mqttClientCreator
             .clientId("clientId"// 按需配置,相同的會互踢
             .ip("mqtt.dreamlu.net")
             .port(1883)
             .connectListener(Aop.get(MqttClientConnectListener.class));
          });
          me.add(mqttClientPlugin);

          在 jfinal Config onStart 啟動完成之后添加 mqtt 訂閱

          @Override
          public void onStart() {
              IMqttClientMessageListener clientMessageListener = Aop.get(TestMqttClientMessageListener.class);
              MqttClientKit.subQos0("#", clientMessageListener);
          }

          使用 MqttClientKit 發(fā)送消息

          MqttClientKit.publish("mica""hello".getBytes(StandardCharsets.UTF_8));

          示例代碼 MqttClientConnectListener

          public class MqttClientConnectListener implements IMqttClientConnectListener {

              @Override
              public void onConnected(ChannelContext channelContext, boolean isReconnect) {
                  if (isReconnect) {
                      System.out.println("重連 mqtt 服務(wù)器重連成功...");
                  } else {
                      System.out.println("連接 mqtt 服務(wù)器成功...");
                  }
              }

              @Override
              public void onDisconnect(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) {
                  System.out.println("mqtt 鏈接斷開 remark:" + remark + " isRemove:" + isRemove);
              }
          }

          示例 TestMqttClientMessageListener

          public class TestMqttClientMessageListener implements IMqttClientMessageListener {
              @Override
              public void onMessage(String topic, MqttPublishMessage mqttPublishMessage, ByteBuffer byteBuffer) {
                  System.out.println("收到消息 topic:" + topic + "內(nèi)容:\n" + ByteBufferUtil.toString(byteBuffer));
              }
          }

          其它 java 項目

          添加依賴

          <dependency>
              <groupId>net.dreamlu</groupId>
              <artifactId>mica-mqtt-core</artifactId>
              <version>1.3.7</version>
              <exclusions>
                  <exclusion>
                      <groupId>org.t-io</groupId>
                      <artifactId>tio-websocket-server</artifactId>
                  </exclusion>
                  <exclusion>
                      <groupId>net.dreamlu</groupId>
                      <artifactId>mica-mqtt-model</artifactId>
                  </exclusion>
                  <exclusion>
                      <groupId>com.alibaba</groupId>
                      <artifactId>fastjson</artifactId>
                  </exclusion>
              </exclusions>
          </dependency>

          使用

          // 初始化 mqtt 客戶端
          MqttClient client = MqttClient.create()
              .ip("127.0.0.1")                // mqtt 服務(wù)端 ip 地址
              .port(1883)                     // 默認(rèn):1883
              .username("admin")              // 賬號
              .password("123456")             // 密碼
              .version(MqttVersion.MQTT_5)    // 默認(rèn):3_1_1
              .clientId("xxxxxx")             // 非常重要務(wù)必手動設(shè)置,一般設(shè)備 sn 號,默認(rèn):MICA-MQTT- 前綴和 36進(jìn)制的納秒數(shù)
              .bufferAllocator(ByteBufferAllocator.DIRECT) // 堆內(nèi)存和堆外內(nèi)存,默認(rèn):堆內(nèi)存
              .readBufferSize(512)            // 消息一起解析的長度,默認(rèn):為 8092 (mqtt 消息最大長度)
              .maxBytesInMessage(1024 * 10)   // 最大包體長度,如果包體過大需要設(shè)置此參數(shù),默認(rèn)為:10M (10*1024*1024)
              .keepAliveSecs(120)             // 默認(rèn):60s
              .timeout(10)                    // 超時時間,t-io 配置,可為 null,為 null 時,t-io 默認(rèn)為 5
              .reconnect(true)                // 是否重連,默認(rèn):true
              .reInterval(5000)               // 重連重試時間,reconnect 為 true 時有效,t-io 默認(rèn)為:5000
              .willMessage(builder -> {
                  builder.topic("/test/offline").messageText("down");    // 遺囑消息
              })
              .connectListener(new IMqttClientConnectListener() {
                  @Override
                  public void onConnected(ChannelContext context, boolean isReconnect) {
                      logger.info("鏈接服務(wù)器成功...");
                  }
                  
                  @Override
                  public void onDisconnect(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) {
                      logger.info("與鏈接服務(wù)器斷開連接...");
                  }
              })
              .properties()                   // mqtt5 properties
              .connect();                     // 異步連接,還支持同步 connectSync()

              // 消息訂閱,同類方法 subxxx
              client.subQos0("/test/#", (topic, payload) -> {
                  logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
              });
              // 取消訂閱
              client.unSubscribe("/test/#");

              // 發(fā)送消息
              client.publish("/test/client", ByteBuffer.wrap("mica-mqtt 牛皮".getBytes(StandardCharsets.UTF_8)));

              // 斷開連接
              client.disconnect();
              // 重連
              client.reconnect();

          鳴謝

          mica-mqtt 從一個試驗性的項目逐漸完善,目前 gitee 上已有 800 多顆星。

          mica-mqtt 的成長也離不開大伙使用和積極反饋,感謝 @冷月宮主、@willianfu@hjkJOJO、@Symous@hongfeng11、@胡蘿博、@楊釗@一醉化千愁、@toskeyfine 、@亡羊補(bǔ)牛等同學(xué),謝謝大家?。?!


          瀏覽 81
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  在线播放,日韩专区 | 乱伦一及 | 69视频免费在线 | 美女国产网站 | 在线免费日韩视频 |