mica-mqtt 1.0.3 發(fā)布,新增 websocket 子協(xié)議支持
一、簡介
mica-mqtt 基于 t-io 實(shí)現(xiàn)的簡單、低延遲、高性能 的 mqtt 物聯(lián)網(wǎng)開源組件。使用詳見 mica-mqtt gitee 源碼mica-mqtt-example 模塊。
二、功能
[x] 支持 MQTT v3.1、v3.1.1 以及 v5.0 協(xié)議。 [x] 支持 websocket mqtt 子協(xié)議(支持 mqtt.js)。 [x] 支持 MQTT client 客戶端。 [x] 支持 MQTT server 服務(wù)端。 [x] 支持 MQTT 遺囑消息。 [x] 支持 MQTT 保留消息。 [x] 支持自定義消息(mq)處理轉(zhuǎn)發(fā)實(shí)現(xiàn)集群。 [x] MQTT 客戶端 阿里云 mqtt 連接 demo。 [x] 支持 GraalVM 編譯成本機(jī)可執(zhí)行程序。 [x] 支持 Spring boot 項(xiàng)目快速接入(mica-mqtt-spring-boot-starter)。 [x] mica-mqtt-spring-boot-starter 支持對接 Prometheus + Grafana。
三、待辦
[ ] 優(yōu)化處理 mqtt session,以及支持部分 mqtt v5.0 新特性。
四、更新記錄
?mica-mqtt server 添加 websocket mqtt 子協(xié)議支持(支持 mqtt.js)。 ?mica-mqtt server ip,默認(rèn)為空,可不設(shè)置。 ?mica-mqtt client去除 CountDownLatch 避免啟動時未連接上服務(wù)端卡住。 ?mica-mqtt client 添加最大包體長度字段,避免超過 8092 長度的包體導(dǎo)致解析異常。 ?mica-mqtt client 添加連接監(jiān)聽 IMqttClientConnectListener。 ?mica-mqtt 3.1 協(xié)議會校驗(yàn) clientId 長度,添加配置項(xiàng) maxClientIdLength。 ?mica-mqtt 優(yōu)化 mqtt 解碼異常處理。 ?mica-mqtt 日志優(yōu)化,方便查詢。 ?mica-mqtt 代碼優(yōu)化,部分 Tio.close 改為 Tio.remove。 ?mica-mqtt-spring-boot-example 添加 Dockerfile,支持spring-boot:build-image。 ?完善 mica-mqtt-spring-boot-starter,添加遺囑消息配置。 ?? 升級 t-io 到 3.7.4。
五、Spring boot 快速接入
5.1 添加依賴
<dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-mqtt-spring-boot-starter</artifactId>
<version>1.0.3</version>
</dependency>
5.2 服務(wù)端配置示例
mqtt:
server:
enabled: true # 是否開啟,默認(rèn):true
ip: 127.0.0.1 # 服務(wù)端 ip 默認(rèn):127.0.0.1
port: 5883 # 端口,默認(rèn):1883
name: Mica-Mqtt-Server # 名稱,默認(rèn):Mica-Mqtt-Server
buffer-allocator: HEAP # 堆內(nèi)存和堆外內(nèi)存,默認(rèn):堆內(nèi)存
heartbeat-timeout: 120000 # 心跳超時,單位毫秒,默認(rèn): 1000 * 120
read-buffer-size: 8092 # 接收數(shù)據(jù)的 buffer size,默認(rèn):8092
max-bytes-in-message: 8092 # 消息解析最大 bytes 長度,默認(rèn):8092
debug: true # 如果開啟 prometheus 指標(biāo)收集建議關(guān)閉
websocket-enable: true # 開啟 websocket 子協(xié)議,默認(rèn)開啟
websocket-port: 8083 # websocket 端口,默認(rèn):8083
5.3 服務(wù)端可實(shí)現(xiàn)接口(注冊成 Spring Bean 即可)
| 接口 | 是否必須 | 說明 |
|---|---|---|
| IMqttServerAuthHandler | 是 | 用于客戶端認(rèn)證 |
| IMqttMessageListener | 是 | 消息監(jiān)聽 |
| IMqttConnectStatusListener | 是 | 連接狀態(tài)監(jiān)聽 |
| IMqttSessionManager | 否 | session 管理 |
| IMqttMessageStore | 集群是,單機(jī)否 | 遺囑和保留消息存儲 |
| AbstractMqttMessageDispatcher | 集群是,單機(jī)否 | 消息轉(zhuǎn)發(fā),(遺囑、保留消息轉(zhuǎn)發(fā)) |
| IpStatListener | 否 | t-io ip 狀態(tài)監(jiān)聽 |
5.4 服務(wù)端自定義配置(可選)
@Configuration(proxyBeanMethods = false)
public class MqttServerCustomizerConfiguration {
@Bean
public MqttServerCustomizer activeRecordPluginCustomizer() {
return new MqttServerCustomizer() {
@Override
public void customize(MqttServerCreator creator) {
// 此處可自定義配置 creator,會覆蓋 yml 中的配置
System.out.println("----------------MqttServerCustomizer-----------------");
}
};
}
}
5.5 MqttServerTemplate 使用示例
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.ByteBuffer;
/**
* @author wsq
*/
@Service
public class ServerService {
@Autowired
private MqttServerTemplate server;
public boolean publish(String body) {
server.publishAll("/test/123", ByteBuffer.wrap(body.getBytes()));
return true;
}
}
5.6 基于 mq 消息廣播集群處理
實(shí)現(xiàn) IMqttConnectStatusListener處理設(shè)備狀態(tài)存儲。實(shí)現(xiàn) IMqttMessageListener將消息轉(zhuǎn)發(fā)到 mq,業(yè)務(wù)按需處理 mq 消息。實(shí)現(xiàn) IMqttMessageStore存儲遺囑和保留消息。實(shí)現(xiàn) AbstractMqttMessageDispatcher將消息發(fā)往 mq,mq 再廣播回 mqtt 集群,mqtt 將消息發(fā)送到設(shè)備。業(yè)務(wù)消息發(fā)送到 mq,mq 廣播到 mqtt 集群,mqtt 將消息發(fā)送到設(shè)備。
5.7 Prometheus + Grafana 監(jiān)控對接
得益于 t-io 良好的設(shè)計,監(jiān)控指標(biāo)直接對接的 t-iostat,目前支持下列指標(biāo),后期會不斷完善。
| 支持得指標(biāo) | 說明 |
|---|---|
| mqtt_connections_accepted | 共接受過連接數(shù) |
| mqtt_connections_closed | 關(guān)閉過的連接數(shù) |
| mqtt_connections_size | 當(dāng)前連接數(shù) |
| mqtt_messages_handled_packets | 已處理消息數(shù) |
| mqtt_messages_handled_bytes | 已處理消息字節(jié)數(shù) |
| mqtt_messages_received_packets | 已接收消息數(shù) |
| mqtt_messages_received_bytes | 已處理消息字節(jié)數(shù) |
| mqtt_messages_send_packets | 已發(fā)送消息數(shù) |
| mqtt_messages_send_bytes | 已發(fā)送消息字節(jié)數(shù) |

關(guān)于mica-mqtt-spring-boot-starter 更多請查看文檔:https://gitee.com/596392912/mica-mqtt/tree/master/mica-mqtt-spring-boot-starter
六、普通 java 項(xiàng)目接入
6.1 maven 依賴
<dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-mqtt-core</artifactId>
<version>1.0.3</version>
</dependency>
6.2 mica-mqtt 客戶端
// 初始化 mqtt 客戶端
MqttClient client = MqttClient.create()
.ip("127.0.0.1")
.port(1883) // 默認(rèn):1883
.username("admin")
.password("123456")
.version(MqttVersion.MQTT_5) // 默認(rèn):3_1_1
.clientId("xxxxxx") // 默認(rèn):MICA-MQTT- 前綴和 36進(jìn)制的納秒數(shù)
.connect(); // 連接
// 消息訂閱,同類方法 subxxx
client.subQos0("/test/#", (topic, payload) -> {
logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
});
// 取消訂閱
client.unSubscribe("/test/#");
// 發(fā)送消息
client.publish("/test/client", ByteBuffer.wrap("mica最牛皮".getBytes(StandardCharsets.UTF_8)));
// 斷開連接
client.disconnect();
// 重連
client.reconnect();
// 停止
client.stop();
6.3 mica-mqtt 服務(wù)端
// 注意:為了能接受更多鏈接(降低內(nèi)存),請?zhí)砑?jvm 參數(shù) -Xss129k
MqttServer mqttServer = MqttServer.create()
// 默認(rèn):127.0.0.1
.ip("127.0.0.1")
// 默認(rèn):1883
.port(1883)
// 默認(rèn)為:8092(mqtt 默認(rèn)最大消息大小),為了降低內(nèi)存可以減小小此參數(shù),如果消息過大 t-io 會嘗試解析多次(建議根據(jù)實(shí)際業(yè)務(wù)情況而定)
.readBufferSize(512)
// 自定義認(rèn)證
.authHandler((clientId, userName, password) -> true)
// 消息監(jiān)聽
.messageListener((clientId, topic, mqttQoS, payload) -> {
logger.info("clientId:{} topic:{} mqttQoS:{} message:{}", clientId, topic, mqttQoS, ByteBufferUtil.toString(payload));
})
// ssl 配置
.useSsl("", "", "")
// 自定義客戶端上下線監(jiān)聽
.connectStatusListener(new IMqttConnectStatusListener() {
@Override
public void online(String clientId) {
}
@Override
public void offline(String clientId) {
}
})
// 自定義消息轉(zhuǎn)發(fā),可用 mq 廣播實(shí)現(xiàn)集群化處理
.messageDispatcher(new IMqttMessageDispatcher() {
@Override
public void config(MqttServer mqttServer) {
}
@Override
public boolean send(Message message) {
return false;
}
@Override
public boolean send(String clientId, Message message) {
return false;
}
})
.debug() // 開啟 t-io debug 信息日志
.start();
// 發(fā)送給某個客戶端
mqttServer.publish("clientId","/test/123", ByteBuffer.wrap("mica最牛皮".getBytes()));
// 發(fā)送給所有在線監(jiān)聽這個 topic 的客戶端
mqttServer.publishAll("/test/123", ByteBuffer.wrap("mica最牛皮".getBytes()));
// 停止服務(wù)
mqttServer.stop();
七、效果演示
八、關(guān)注我們
評論
圖片
表情
