mica-mqtt client,最好用的 java mqtt 客戶端
簡介
mica-mqtt 基于 t-io 實現(xiàn)的簡單、低延遲、高性能 的 mqtt 物聯(lián)網(wǎng)開源組件。
mica-mqtt server 更加易于集成到已有服務(wù)和二次開發(fā),降低自研物聯(lián)網(wǎng)平臺開發(fā)成本。
mica-mqtt client 是簡單、易用的 java mqtt 客戶端,更加容易集成到自己的業(yè)務(wù)代碼中。今天筆者主要要介紹的就是 mica-mqtt client 的使用。
使用
mica-mqtt-client Spring boot starter
添加依賴
<dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-mqtt-client-spring-boot-starter</artifactId>
<version>1.3.7</version>
</dependency>
配置項說明
mqtt:
client:
enabled: true # 是否開啟客戶端,默認(rèn):false 使用到的場景有限,非必要請不要啟用
ip: 127.0.0.1 # 連接的服務(wù)端 ip ,默認(rèn):127.0.0.1
port: 1883 # 端口:默認(rèn):1883
name: Mica-Mqtt-Client # 名稱,默認(rèn):Mica-Mqtt-Client
clientId: 000001 # 客戶端Id(非常重要,一般為設(shè)備 sn,不可重復(fù))
user-name: mica # 認(rèn)證的用戶名
password: 123456 # 認(rèn)證的密碼
timeout: 5 # 超時時間,單位:秒,默認(rèn):5秒
reconnect: true # 是否重連,默認(rèn):true
re-interval: 5000 # 重連時間,默認(rèn) 5000 毫秒
version: MQTT_5 # mqtt 協(xié)議版本,默認(rèn):3.1.1
read-buffer-size: 8KB # 接收數(shù)據(jù)的 buffer size,默認(rèn):8k
max-bytes-in-message: 10MB # 消息解析最大 bytes 長度,默認(rèn):10M
buffer-allocator: heap # 堆內(nèi)存和堆外內(nèi)存,默認(rèn):堆內(nèi)存
keep-alive-secs: 60 # keep-alive 時間,單位:秒
clean-session: true # mqtt clean session,默認(rèn):true
use-ssl: false # 是否啟用 ssl,默認(rèn):false
連接狀態(tài)監(jiān)聽
@Service
public class MqttClientConnectListener {
private static final Logger logger = LoggerFactory.getLogger(MqttClientConnectListener.class);
@Autowired
private MqttClientCreator mqttClientCreator;
@EventListener
public void onConnected(MqttConnectedEvent event) {
logger.info("MqttConnectedEvent:{}", event);
}
@EventListener
public void onDisconnect(MqttDisconnectEvent event) {
// 離線時更新重連時的密碼,適用于類似阿里云 mqtt clientId 連接帶時間戳的方式
logger.info("MqttDisconnectEvent:{}", event);
// 在斷線時更新 clientId、username、password
mqttClientCreator.clientId("newClient" + System.currentTimeMillis())
.username("newUserName")
.password("newPassword");
}
}
自定義配置java(可選)
@Configuration(proxyBeanMethods = false)
public class MqttClientCustomizerConfiguration {
@Bean
public MqttClientCustomizer mqttClientCustomizer() {
return new MqttClientCustomizer() {
@Override
public void customize(MqttClientCreator creator) {
// 此處可自定義配置 creator,會覆蓋 yml 中的配置
System.out.println("----------------MqttServerCustomizer-----------------");
}
};
}
}
訂閱示例
@Service
public class MqttClientSubscribeListener {
private static final Logger logger = LoggerFactory.getLogger(MqttClientSubscribeListener.class);
@MqttClientSubscribe("/test/#")
public void subQos0(String topic, ByteBuffer payload) {
logger.info("topic:{} payload:{}", topic, ByteBufferUtil.toString(payload));
}
@MqttClientSubscribe(value = "/qos1/#", qos = MqttQoS.AT_LEAST_ONCE)
public void subQos1(String topic, ByteBuffer payload) {
logger.info("topic:{} payload:{}", topic, ByteBufferUtil.toString(payload));
}
}
MqttClientTemplate 使用示例
@Service
public class MainService {
private static final Logger logger = LoggerFactory.getLogger(MainService.class);
@Autowired
private MqttClientTemplate client;
public boolean publish() {
// 發(fā)布消息示例
client.publish("/test/client", ByteBuffer.wrap("mica最牛皮".getBytes(StandardCharsets.UTF_8)));
return true;
}
public boolean sub() {
// 訂閱消息示例
client.subQos0("/test/#", (topic, payload) -> {
logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
});
return true;
}
}
共享訂閱 topic 說明
mica-mqtt client 支持兩種共享訂閱方式:
共享訂閱:訂閱前綴
$queue/,多個客戶端訂閱了$queue/topic,發(fā)布者發(fā)布到topic,則只有一個客戶端會接收到消息。分組訂閱:訂閱前綴
$share/<group>/,組客戶端訂閱了$share/group1/topic、$share/group2/topic..,發(fā)布者發(fā)布到topic,則消息會發(fā)布到每個group中,但是每個group中只有一個客戶端會接收到消息。
jfinal mica-mqtt client(1.3.7 開始支持)
添加依賴
<dependency>
<groupId>net.dreamlu</groupId>
<artifactId>jfinal-mica-mqtt-client</artifactId>
<version>1.3.7</version>
</dependency>
刪除 jfinal-demo 中的 slf4j-nop 依賴
添加 slf4j-log4j12
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.33</version>
</dependency>
在 jfinal Config configPlugin 中添加 mica-mqtt client 插件
MqttClientPlugin mqttClientPlugin = new MqttClientPlugin();
mqttClientPlugin.config(mqttClientCreator -> {
// 設(shè)置 mqtt 連接配置信息
mqttClientCreator
.clientId("clientId") // 按需配置,相同的會互踢
.ip("mqtt.dreamlu.net")
.port(1883)
.connectListener(Aop.get(MqttClientConnectListener.class));
});
me.add(mqttClientPlugin);
在 jfinal Config onStart 啟動完成之后添加 mqtt 訂閱
@Override
public void onStart() {
IMqttClientMessageListener clientMessageListener = Aop.get(TestMqttClientMessageListener.class);
MqttClientKit.subQos0("#", clientMessageListener);
}
使用 MqttClientKit 發(fā)送消息
MqttClientKit.publish("mica", "hello".getBytes(StandardCharsets.UTF_8));
示例代碼 MqttClientConnectListener
public class MqttClientConnectListener implements IMqttClientConnectListener {
@Override
public void onConnected(ChannelContext channelContext, boolean isReconnect) {
if (isReconnect) {
System.out.println("重連 mqtt 服務(wù)器重連成功...");
} else {
System.out.println("連接 mqtt 服務(wù)器成功...");
}
}
@Override
public void onDisconnect(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) {
System.out.println("mqtt 鏈接斷開 remark:" + remark + " isRemove:" + isRemove);
}
}
示例 TestMqttClientMessageListener
public class TestMqttClientMessageListener implements IMqttClientMessageListener {
@Override
public void onMessage(String topic, MqttPublishMessage mqttPublishMessage, ByteBuffer byteBuffer) {
System.out.println("收到消息 topic:" + topic + "內(nèi)容:\n" + ByteBufferUtil.toString(byteBuffer));
}
}
其它 java 項目
添加依賴
<dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-mqtt-core</artifactId>
<version>1.3.7</version>
<exclusions>
<exclusion>
<groupId>org.t-io</groupId>
<artifactId>tio-websocket-server</artifactId>
</exclusion>
<exclusion>
<groupId>net.dreamlu</groupId>
<artifactId>mica-mqtt-model</artifactId>
</exclusion>
<exclusion>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</exclusion>
</exclusions>
</dependency>
使用
// 初始化 mqtt 客戶端
MqttClient client = MqttClient.create()
.ip("127.0.0.1") // mqtt 服務(wù)端 ip 地址
.port(1883) // 默認(rèn):1883
.username("admin") // 賬號
.password("123456") // 密碼
.version(MqttVersion.MQTT_5) // 默認(rèn):3_1_1
.clientId("xxxxxx") // 非常重要務(wù)必手動設(shè)置,一般設(shè)備 sn 號,默認(rèn):MICA-MQTT- 前綴和 36進(jìn)制的納秒數(shù)
.bufferAllocator(ByteBufferAllocator.DIRECT) // 堆內(nèi)存和堆外內(nèi)存,默認(rèn):堆內(nèi)存
.readBufferSize(512) // 消息一起解析的長度,默認(rèn):為 8092 (mqtt 消息最大長度)
.maxBytesInMessage(1024 * 10) // 最大包體長度,如果包體過大需要設(shè)置此參數(shù),默認(rèn)為:10M (10*1024*1024)
.keepAliveSecs(120) // 默認(rèn):60s
.timeout(10) // 超時時間,t-io 配置,可為 null,為 null 時,t-io 默認(rèn)為 5
.reconnect(true) // 是否重連,默認(rèn):true
.reInterval(5000) // 重連重試時間,reconnect 為 true 時有效,t-io 默認(rèn)為:5000
.willMessage(builder -> {
builder.topic("/test/offline").messageText("down"); // 遺囑消息
})
.connectListener(new IMqttClientConnectListener() {
@Override
public void onConnected(ChannelContext context, boolean isReconnect) {
logger.info("鏈接服務(wù)器成功...");
}
@Override
public void onDisconnect(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) {
logger.info("與鏈接服務(wù)器斷開連接...");
}
})
.properties() // mqtt5 properties
.connect(); // 異步連接,還支持同步 connectSync()
// 消息訂閱,同類方法 subxxx
client.subQos0("/test/#", (topic, payload) -> {
logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
});
// 取消訂閱
client.unSubscribe("/test/#");
// 發(fā)送消息
client.publish("/test/client", ByteBuffer.wrap("mica-mqtt 牛皮".getBytes(StandardCharsets.UTF_8)));
// 斷開連接
client.disconnect();
// 重連
client.reconnect();
鳴謝
mica-mqtt 從一個試驗性的項目逐漸完善,目前 gitee 上已有 800 多顆星。

mica-mqtt 的成長也離不開大伙使用和積極反饋,感謝 @冷月宮主、@willianfu、@hjkJOJO、@Symous、@hongfeng11、@胡蘿博、@楊釗、@一醉化千愁、@toskeyfine 、@亡羊補(bǔ)牛等同學(xué),謝謝大家?。?!
