mica-mqtt 1.1.4 發(fā)布,重構(gòu)自定義接口
一、簡介
mica-mqtt 基于 t-io 實現(xiàn)的簡單、低延遲、高性能 的 mqtt 物聯(lián)網(wǎng)開源組件。使用詳見 mica-mqtt gitee 源碼 mica-mqtt-example 模塊。
mica-mqtt 更加易于集成到已有服務和二次開發(fā),降低自研物聯(lián)網(wǎng)平臺開發(fā)成本。
二、功能
[x] 支持 MQTT v3.1、v3.1.1 以及 v5.0 協(xié)議。 [x] 支持 websocket mqtt 子協(xié)議(支持 mqtt.js)。 [x] 支持 http rest api,http api 文檔詳見。 [x] 支持 MQTT client 客戶端。 [x] 支持 MQTT server 服務端。 [x] 支持 MQTT 遺囑消息。 [x] 支持 MQTT 保留消息。 [x] 支持自定義消息(mq)處理轉(zhuǎn)發(fā)實現(xiàn)集群。 [x] MQTT 客戶端 阿里云 mqtt 連接 demo。 [x] 支持 GraalVM 編譯成本機可執(zhí)行程序。 [x] 支持 Spring boot 項目快速接入(mica-mqtt-spring-boot-starter)。 [x] mica-mqtt-spring-boot-starter 支持對接 Prometheus + Grafana。 [x] 基于 redis pub/sub 實現(xiàn)集群,詳見 mica-mqtt-broker 模塊。
三、待辦
[ ] 優(yōu)化處理 mqtt session,以及支持部分 mqtt v5.0 新特性。
四、更新記錄
1.1.4
? 添加 IMqttServerUniqueIdService 接口,用來處理 clientId 不唯一的場景。詳見:gitee #I4DXQU ? 微調(diào) IMqttServerAuthHandler 認證,添加 uniqueId 參數(shù)。
1.1.3
? 狀態(tài)事件接口 IMqttConnectStatusListener 添加 ChannelContext 參數(shù)。 ? 從認證中拆分 IMqttServerSubscribeValidator 訂閱校驗接口,添加 ChannelContext、clientId 參數(shù)。 ? 認證 IMqttServerAuthHandler 調(diào)整包、添加 ChannelContext 參數(shù)。 ? 完善文檔和示例,添加默認端口號說明。 ?? 依賴升級。
五、Spring boot 快速接入
5.1 添加依賴
<dependency>
????<groupId>net.dreamlugroupId>
????<artifactId>mica-mqtt-spring-boot-starterartifactId>
????<version>1.1.4version>
dependency>
5.2 服務端配置示例
mqtt:
??server:
????enabled:?true???????????????#?是否開啟,默認:true
????ip:?127.0.0.1???????????????#?服務端 ip 默認:127.0.0.1
????port:?5883??????????????????#?端口,默認:1883
????name:?Mica-Mqtt-Server??????#?名稱,默認:Mica-Mqtt-Server
????buffer-allocator:?HEAP??????#?堆內(nèi)存和堆外內(nèi)存,默認:堆內(nèi)存
????heartbeat-timeout:?120000???#?心跳超時,單位毫秒,默認:?1000?*?120
????read-buffer-size:?8092??????#?接收數(shù)據(jù)的 buffer size,默認:8092
????max-bytes-in-message:?8092??#?消息解析最大 bytes 長度,默認:8092
????debug:?true?????????????????#?如果開啟?prometheus?指標收集建議關(guān)閉
????websocket-enable:?true??????#?開啟?websocket?子協(xié)議,默認開啟
????websocket-port:?8083????????# websocket 端口,默認:8083
5.3 服務端可實現(xiàn)接口(注冊成 Spring Bean 即可)
| 接口 | 是否必須 | 說明 |
|---|---|---|
| IMqttServerUniqueIdService | 否 | 1.1.4 新增,用于 clientId 不唯一時,自定義實現(xiàn)唯一標識,后續(xù)接口使用它替代 clientId |
| IMqttServerAuthHandler | 是 | 用于服務端認證 |
| IMqttServerSubscribeValidator | 是 | 1.1.3 新增,用于服務端訂閱校驗 |
| IMqttMessageListener | 是 | 消息監(jiān)聽 |
| IMqttConnectStatusListener | 是 | 連接狀態(tài)監(jiān)聽 |
| IMqttSessionManager | 否 | session 管理 |
| IMqttMessageStore | 集群是,單機否 | 遺囑和保留消息存儲 |
| AbstractMqttMessageDispatcher | 集群是,單機否 | 消息轉(zhuǎn)發(fā),(遺囑、保留消息轉(zhuǎn)發(fā)) |
| IpStatListener | 否 | t-io ip 狀態(tài)監(jiān)聽 |
5.4 Prometheus + Grafana 監(jiān)控對接
得益于 t-io 良好的設計,監(jiān)控指標直接對接的 t-iostat,目前支持下列指標,后期會不斷完善。
| 支持得指標 | 說明 |
|---|---|
| mqtt_connections_accepted | 共接受過連接數(shù) |
| mqtt_connections_closed | 關(guān)閉過的連接數(shù) |
| mqtt_connections_size | 當前連接數(shù) |
| mqtt_messages_handled_packets | 已處理消息數(shù) |
| mqtt_messages_handled_bytes | 已處理消息字節(jié)數(shù) |
| mqtt_messages_received_packets | 已接收消息數(shù) |
| mqtt_messages_received_bytes | 已處理消息字節(jié)數(shù) |
| mqtt_messages_send_packets | 已發(fā)送消息數(shù) |
| mqtt_messages_send_bytes | 已發(fā)送消息字節(jié)數(shù) |

關(guān)于 mica-mqtt-spring-boot-starter 更多請查看文檔:https://gitee.com/596392912/mica-mqtt/tree/master/mica-mqtt-spring-boot-starter
六、普通 java 項目接入
6.1 maven 依賴
?<dependency>
???<groupId>net.dreamlugroupId>
???<artifactId>mica-mqtt-coreartifactId>
???<version>1.1.4version>
?dependency>
6.2 mica-mqtt 客戶端
?//?初始化?mqtt?客戶端
?MqttClient?client?=?MqttClient.create()
?????.ip("127.0.0.1")
?????.port(1883)?????????????????????//?默認:1883
?????.username("admin")
?????.password("123456")
?????.version(MqttVersion.MQTT_5)????//?默認:3_1_1
?????.clientId("xxxxxx")?????????????//?默認:MICA-MQTT-?前綴和 36進制的納秒數(shù)
?????.connect();?????????????????????//?連接
?
?????//?消息訂閱,同類方法?subxxx
?????client.subQos0("/test/#",?(topic,?payload)?->?{
?????????logger.info(topic?+?'\t'?+?ByteBufferUtil.toString(payload));
?????});
?????//?取消訂閱
?????client.unSubscribe("/test/#");
?
?????//?發(fā)送消息
?????client.publish("/test/client",?ByteBuffer.wrap("mica最牛皮".getBytes(StandardCharsets.UTF_8)));
?
?????//?斷開連接
?????client.disconnect();
?????//?重連
?????client.reconnect();
?????//?停止
?????client.stop();
6.3 mica-mqtt 服務端
?//?注意:為了能接受更多鏈接(降低內(nèi)存),請?zhí)砑?jvm 參數(shù)?-Xss129k
?MqttServer?mqttServer?=?MqttServer.create()
?????//?默認:127.0.0.1
?????.ip("127.0.0.1")
?????//?默認:1883
?????.port(1883)
?????//?默認為:8092(mqtt 默認最大消息大?。瑸榱私档蛢?nèi)存可以減小小此參數(shù),如果消息過大 t-io 會嘗試解析多次(建議根據(jù)實際業(yè)務情況而定)
?????.readBufferSize(512)
?????//?消息監(jiān)聽
?????.messageListener((clientId,?topic,?mqttQoS,?payload)?->?{
?????????logger.info("clientId:{}?topic:{}?mqttQoS:{}?message:{}",?clientId,?topic,?mqttQoS,?ByteBufferUtil.toString(payload));
?????})
?????.debug()?//?開啟?t-io?debug?信息日志
?????.start();
?
?//?發(fā)送給某個客戶端
?mqttServer.publish("clientId","/test/123",?ByteBuffer.wrap("mica最牛皮".getBytes()));
?
?//?發(fā)送給所有在線監(jiān)聽這個?topic?的客戶端
?mqttServer.publishAll("/test/123",?ByteBuffer.wrap("mica最牛皮".getBytes()));
?
?//?停止服務
?mqttServer.stop();
七、關(guān)注我們
更多精彩內(nèi)容每天推薦!
評論
圖片
表情
