SpringBoot整合Netty并使用Protobuf進行數(shù)據(jù)傳輸(附工程)
點擊上方藍色“程序猿DD”,選擇“設為星標”
回復“資源”獲取獨家整理的學習資料!

前言
本篇文章主要介紹的是SpringBoot整合Netty以及使用Protobuf進行數(shù)據(jù)傳輸?shù)南嚓P內容。Protobuf會簡單的介紹下用法,至于Netty在之前的文章中已經簡單的介紹過了,這里就不再過多細說了。
Protobuf
介紹
protocolbuffer(以下簡稱PB)是google 的一種數(shù)據(jù)交換的格式,它獨立于語言,獨立于平臺。google 提供了多種語言的實現(xiàn):java、c#、c++、go 和python,每一種實現(xiàn)都包含了相應語言的編譯器以及庫文件。由于它是一種二進制的格式,比使用 xml進行數(shù)據(jù)交換快許多??梢园阉糜诜植际綉弥g的數(shù)據(jù)通信或者異構環(huán)境下的數(shù)據(jù)交換。作為一種效率和兼容性都很優(yōu)秀的二進制數(shù)據(jù)傳輸格式,可以用于諸如網絡傳輸、配置文件、數(shù)據(jù)存儲等諸多領域。
官方地址:https://github.com/google/protobuf
使用
這里的使用就只介紹Java相關的使用。首先我們需要建立一個proto文件,在該文件定義我們需要傳輸?shù)奈募?。例如我們需要定義一個用戶的信息,包含的字段主要有編號、名稱、年齡。那么該protobuf文件的格式如下:注:這里使用的是proto3,相關的注釋我已寫了,這里便不再過多講述了。需要注意一點的是proto文件和生成的Java文件名稱不能一致!
syntax?=?"proto3";
//?生成的包名
option?java_package="com.pancm.protobuf";
//生成的java名
option?java_outer_classname?=?"UserInfo";
message?UserMsg?{??
??????
?????//?ID??
?????int32?id?=?1;??
??????
????//?姓名??
?????string?name?=?2;??
??????
????//?年齡??
??????int32?age?=?3;??
???
??//?狀態(tài)?
?????int32?state?=?4;??
}?
創(chuàng)建好該文件之后,我們把該文件和protoc.exe(生成Java文件的軟件)放到E盤目錄下的protobuf文件夾下,然后再到該目錄的dos界面下輸入:protoc.exe --java_out=文件絕對路徑名稱。例如:
protoc.exe?--java_out=E:\protobuf?User.proto
輸入完之后,回車即可在同級目錄看到已經生成好的Java文件,然后將該文件放到項目中該文件指定的路徑下即可。
注:生成protobuf的文件軟件和測試的protobuf文件我也整合到該項目中了,可以直接獲取的。
Java文件生成好之后,我們再來看怎么使用。這里我就直接貼代碼了,并且將注釋寫在代碼中,應該更容易理解些吧。。。代碼示例:
??//?按照定義的數(shù)據(jù)結構,創(chuàng)建一個對象??
?????UserInfo.UserMsg.Builder?userInfo?=?UserInfo.UserMsg.newBuilder();??
?????userInfo.setId(1);
?????userInfo.setName("xuwujing");
?????userInfo.setAge(18);
?????UserInfo.UserMsg?userMsg?=?userInfo.build();??
????????//?將數(shù)據(jù)寫到輸出流?
????????ByteArrayOutputStream?output?=?new?ByteArrayOutputStream();??
????????userMsg.writeTo(output);??
????????//?將數(shù)據(jù)序列化后發(fā)送?
????????byte[]?byteArray?=?output.toByteArray();??
????????//?接收到流并讀取
????????ByteArrayInputStream?input?=?new?ByteArrayInputStream(byteArray);??
????????//?反序列化??
????????UserInfo.UserMsg?userInfo2?=?UserInfo.UserMsg.parseFrom(input);??
????????System.out.println("id:"?+?userInfo2.getId());??
????????System.out.println("name:"?+?userInfo2.getName());??
????????System.out.println("age:"?+?userInfo2.getAge());??
注:這里說明一點,因為protobuf是通過二進制進行傳輸,所以需要注意下相應的編碼。還有使用protobuf也需要注意一下一次傳輸?shù)淖畲笞止?jié)長度。
輸出結果:
id:1
name:xuwujing
age:18
SpringBoot整合Netty
說明:如果想直接獲取工程那么可以直接跳到底部,通過鏈接下載工程代碼。
開發(fā)準備
環(huán)境要求JDK::1.8Netty::4.0或以上(不包括5)Protobuf:3.0或以上
如果對Netty不熟的話,可以看看我之前寫的一些文章。大神請無視~。~ 地址:https://blog.csdn.net/column/details/17640.html
首先還是Maven的相關依賴:
??UTF-8
??1.8
??4.1.22.Final
??3.5.1
??1.5.9.RELEASE
??1.2.41
??1.8
?????1.8
?
?
??
???org.springframework.boot
???spring-boot-starter
???${springboot}
??
??
???org.springframework.boot
???spring-boot-starter-test
???${springboot}
???test
??
??
???org.springframework.boot
???spring-boot-devtools
???${springboot}
???true
??
??
??
???io.netty
???netty-all
???${netty.version}
??
??
???com.google.protobuf
???protobuf-java
???${protobuf.version}
??
??
???com.alibaba
???fastjson
???${fastjson}
??
??
?
???junit
???junit
???4.12
???test
?? ?
添加了相應的maven依賴之后,配置文件這塊暫時沒有什么可以添加的,因為暫時就一個監(jiān)聽的端口而已。
代碼編寫
代碼模塊主要分為服務端和客戶端。主要實現(xiàn)的業(yè)務邏輯:服務端啟動成功之后,客戶端也啟動成功,這時服務端會發(fā)送一條protobuf格式的信息給客戶端,然后客戶端給予相應的應答??蛻舳伺c服務端連接成功之后,客戶端每個一段時間會發(fā)送心跳指令給服務端,告訴服務端該客戶端還存過中,如果客戶端沒有在指定的時間發(fā)送信息,服務端會關閉與該客戶端的連接。當客戶端無法連接到服務端之后,會每隔一段時間去嘗試重連,只到重連成功!
服務端
首先是編寫服務端的啟動類,相應的注釋在代碼中寫得很詳細了,這里也不再過多講述了。不過需要注意的是,在之前的我寫的Netty文章中,是通過main方法直接啟動服務端,因此是直接new一個對象的。而在和SpringBoot整合之后,我們需要將Netty交給springBoot去管理,所以這里就用了相應的注解。代碼如下:
@Service("nettyServer")
public?class?NettyServer?{
?private?static?final?int?port?=?9876;?//?設置服務端端口
?private?static?EventLoopGroup?boss?=?new?NioEventLoopGroup();?//?通過nio方式來接收連接和處理連接
?private?static?EventLoopGroup?work?=?new?NioEventLoopGroup();?//?通過nio方式來接收連接和處理連接
?private?static?ServerBootstrap?b?=?new?ServerBootstrap();
?
?@Autowired
?private?NettyServerFilter?nettyServerFilter;
?
?
?public?void?run()?{
??try?{
???b.group(boss,?work);
???b.channel(NioServerSocketChannel.class);
???b.childHandler(nettyServerFilter);?//?設置過濾器
???//?服務器綁定端口監(jiān)聽
???ChannelFuture?f?=?b.bind(port).sync();
???System.out.println("服務端啟動成功,端口是:"?+?port);
???//?監(jiān)聽服務器關閉監(jiān)聽
???f.channel().closeFuture().sync();
??}?catch?(InterruptedException?e)?{
???e.printStackTrace();
??}?finally?{
???//?關閉EventLoopGroup,釋放掉所有資源包括創(chuàng)建的線程
???work.shutdownGracefully();
???boss.shutdownGracefully();
??}
?}
}
服務端主類編寫完畢之后,我們再來設置下相應的過濾條件。這里需要繼承Netty中ChannelInitializer類,然后重寫initChannel該方法,進行添加相應的設置,如心跳超時設置,傳輸協(xié)議設置,以及相應的業(yè)務實現(xiàn)類。代碼如下:
?@Component
??public?class?NettyServerFilter?extends?ChannelInitializer?{
?
?@Autowired
?private?NettyServerHandler?nettyServerHandler;
?
?????@Override
?????protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
?????????ChannelPipeline?ph?=?ch.pipeline();
??????
?????????//入?yún)⒄f明:?讀超時時間、寫超時時間、所有類型的超時時間、時間格式
?????????ph.addLast(new?IdleStateHandler(5,?0,?0,?TimeUnit.SECONDS));
?????????//?解碼和編碼,應和客戶端一致
?????????//傳輸?shù)膮f(xié)議?Protobuf
?????????ph.addLast(new?ProtobufVarint32FrameDecoder());
?????????ph.addLast(new?ProtobufDecoder(UserMsg.getDefaultInstance()));
?????????ph.addLast(new?ProtobufVarint32LengthFieldPrepender());
?????????ph.addLast(new?ProtobufEncoder());
?????????
?????????//業(yè)務邏輯實現(xiàn)類
?????????ph.addLast("nettyServerHandler",?nettyServerHandler);
???????}
?????}
服務相關的設置的代碼寫完之后,我們再來編寫主要的業(yè)務代碼。使用Netty編寫業(yè)務層的代碼,我們需要繼承ChannelInboundHandlerAdapter?或SimpleChannelInboundHandler類,在這里順便說下它們兩的區(qū)別吧。繼承SimpleChannelInboundHandler類之后,會在接收到數(shù)據(jù)后會自動release掉數(shù)據(jù)占用的Bytebuffer資源。并且繼承該類需要指定數(shù)據(jù)格式。而繼承ChannelInboundHandlerAdapter則不會自動釋放,需要手動調用ReferenceCountUtil.release()等方法進行釋放。繼承該類不需要指定數(shù)據(jù)格式。所以在這里,個人推薦服務端繼承ChannelInboundHandlerAdapter,手動進行釋放,防止數(shù)據(jù)未處理完就自動釋放了。而且服務端可能有多個客戶端進行連接,并且每一個客戶端請求的數(shù)據(jù)格式都不一致,這時便可以進行相應的處理。客戶端根據(jù)情況可以繼承SimpleChannelInboundHandler類。好處是直接指定好傳輸?shù)臄?shù)據(jù)格式,就不需要再進行格式的轉換了。
代碼如下:
@Service("nettyServerHandler")
public?class?NettyServerHandler?extends?ChannelInboundHandlerAdapter?{
?/**?空閑次數(shù)?*/
?private?int?idle_count?=?1;
?/**?發(fā)送次數(shù)?*/
?private?int?count?=?1;
?/**
??*?建立連接時,發(fā)送一條消息
??*/
?@Override
?public?void?channelActive(ChannelHandlerContext?ctx)?throws?Exception?{
??System.out.println("連接的客戶端地址:"?+?ctx.channel().remoteAddress());
??UserInfo.UserMsg?userMsg?=?UserInfo.UserMsg.newBuilder().setId(1).setAge(18).setName("xuwujing").setState(0)
????.build();
??ctx.writeAndFlush(userMsg);
??super.channelActive(ctx);
?}
?/**
??*?超時處理?如果5秒沒有接受客戶端的心跳,就觸發(fā);?如果超過兩次,則直接關閉;
??*/
?@Override
?public?void?userEventTriggered(ChannelHandlerContext?ctx,?Object?obj)?throws?Exception?{
??if?(obj?instanceof?IdleStateEvent)?{
???IdleStateEvent?event?=?(IdleStateEvent)?obj;
???if?(IdleState.READER_IDLE.equals(event.state()))?{?//?如果讀通道處于空閑狀態(tài),說明沒有接收到心跳命令
????System.out.println("已經5秒沒有接收到客戶端的信息了");
????if?(idle_count?>?1)?{
?????System.out.println("關閉這個不活躍的channel");
?????ctx.channel().close();
????}
????idle_count++;
???}
??}?else?{
???super.userEventTriggered(ctx,?obj);
??}
?}
?/**
??*?業(yè)務邏輯處理
??*/
?@Override
?public?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)?throws?Exception?{
??System.out.println("第"?+?count?+?"次"?+?",服務端接受的消息:"?+?msg);
??try?{
???//?如果是protobuf類型的數(shù)據(jù)
????if?(msg?instanceof?UserMsg)?{
????UserInfo.UserMsg?userState?=?(UserInfo.UserMsg)?msg;
????if?(userState.getState()?==?1)?{
?????System.out.println("客戶端業(yè)務處理成功!");
????}?else?if(userState.getState()?==?2){
?????System.out.println("接受到客戶端發(fā)送的心跳!");
????}else{
?????System.out.println("未知命令!");
????}
???}?else?{
????System.out.println("未知數(shù)據(jù)!"?+?msg);
????return;
???}
??}?catch?(Exception?e)?{
???e.printStackTrace();
??}?finally?{
???ReferenceCountUtil.release(msg);
??}
??count++;
?}
?/**
??*?異常處理
??*/
?@Override
?public?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?throws?Exception?{
??cause.printStackTrace();
??ctx.close();
?}
}
還有個服務端的啟動類,之前是通過main方法直接啟動, 不過這里改成了通過springBoot進行啟動,差別不大。代碼如下:
@SpringBootApplication
public?class?NettyServerApp?{
?public?static?void?main(String[]?args)?{
??//?啟動嵌入式的?Tomcat?并初始化?Spring?環(huán)境及其各?Spring?組件
??ApplicationContext?context?=?SpringApplication.run(NettyServerApp.class,?args);
??NettyServer?nettyServer?=?context.getBean(NettyServer.class);
??nettyServer.run();
?}
}
到這里服務端相應的代碼就編寫完畢了。
客戶端
客戶端這邊的代碼和服務端的很多地方都類似,我就不再過多細說了,主要將一些不同的代碼拿出來簡單的講述下。首先是客戶端的主類,基本和服務端的差不多,也就是多了監(jiān)聽的端口和一個監(jiān)聽器(用來監(jiān)聽是否和服務端斷開連接,用于重連)。主要實現(xiàn)的代碼邏輯如下:
?public?void?doConnect(Bootstrap?bootstrap,?EventLoopGroup?eventLoopGroup)?{
??ChannelFuture?f?=?null;
??try?{
???if?(bootstrap?!=?null)?{
????bootstrap.group(eventLoopGroup);
????bootstrap.channel(NioSocketChannel.class);
????bootstrap.option(ChannelOption.SO_KEEPALIVE,?true);
????bootstrap.handler(nettyClientFilter);
????bootstrap.remoteAddress(host,?port);
????f?=?bootstrap.connect().addListener((ChannelFuture?futureListener)?->?{
?????final?EventLoop?eventLoop?=?futureListener.channel().eventLoop();
?????if?(!futureListener.isSuccess())?{
??????System.out.println("與服務端斷開連接!在10s之后準備嘗試重連!");
??????eventLoop.schedule(()?->?doConnect(new?Bootstrap(),?eventLoop),?10,?TimeUnit.SECONDS);
?????}
????});
????if(initFalg){
?????System.out.println("Netty客戶端啟動成功!");
?????initFalg=false;
????}
????//?阻塞
????f.channel().closeFuture().sync();
???}
??}?catch?(Exception?e)?{
???System.out.println("客戶端連接失敗!"+e.getMessage());
??}
?}
注:監(jiān)聽器這塊的實現(xiàn)用的是JDK1.8的寫法。
客戶端過濾其這塊基本和服務端一直。不過需要注意的是,傳輸協(xié)議、編碼和解碼應該一致,還有心跳的讀寫時間應該小于服務端所設置的時間。改動的代碼如下:
?ChannelPipeline?ph?=?ch.pipeline();
????????/*
?????????*?解碼和編碼,應和服務端一致
?????????*?*/
????????//入?yún)⒄f明:?讀超時時間、寫超時時間、所有類型的超時時間、時間格式
????????ph.addLast(new?IdleStateHandler(0,?4,?0,?TimeUnit.SECONDS));?
客戶端的業(yè)務代碼邏輯。主要實現(xiàn)的幾點邏輯是心跳按時發(fā)送以及解析服務發(fā)送的protobuf格式的數(shù)據(jù)。這里比服務端多個個注解, 該注解Sharable主要是為了多個handler可以被多個channel安全地共享,也就是保證線程安全。廢話就不多說了,代碼如下:
?@Service("nettyClientHandler")
[email protected]
?public?class?NettyClientHandler?extends?ChannelInboundHandlerAdapter?{
?@Autowired
?private?NettyClient?nettyClient;
?
?/**?循環(huán)次數(shù)?*/
?private?int?fcount?=?1;
?
?/**
??*?建立連接時
??*/
?@Override
?public?void?channelActive(ChannelHandlerContext?ctx)?throws?Exception?{
??System.out.println("建立連接時:"?+?new?Date());
??ctx.fireChannelActive();
?}
?/**
??*?關閉連接時
??*/
?@Override
?public?void?channelInactive(ChannelHandlerContext?ctx)?throws?Exception?{
??System.out.println("關閉連接時:"?+?new?Date());
??final?EventLoop?eventLoop?=?ctx.channel().eventLoop();
??nettyClient.doConnect(new?Bootstrap(),?eventLoop);
??super.channelInactive(ctx);
?}
?/**
??*?心跳請求處理?每4秒發(fā)送一次心跳請求;
??*?
??*/
?@Override
?public?void?userEventTriggered(ChannelHandlerContext?ctx,?Object?obj)?throws?Exception?{
??System.out.println("循環(huán)請求的時間:"?+?new?Date()?+?",次數(shù)"?+?fcount);
??if?(obj?instanceof?IdleStateEvent)?{
???IdleStateEvent?event?=?(IdleStateEvent)?obj;
???if?(IdleState.WRITER_IDLE.equals(event.state()))?{?//?如果寫通道處于空閑狀態(tài),就發(fā)送心跳命令
????UserMsg.Builder?userState?=?UserMsg.newBuilder().setState(2);
????ctx.channel().writeAndFlush(userState);
????fcount++;
???}
??}
?}
?/**
??*?業(yè)務邏輯處理
??*/
?@Override
?public?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)?throws?Exception?{
??//?如果不是protobuf類型的數(shù)據(jù)
??if?(!(msg?instanceof?UserMsg))?{
???System.out.println("未知數(shù)據(jù)!"?+?msg);
???return;
??}
??try?{
???//?得到protobuf的數(shù)據(jù)
???UserInfo.UserMsg?userMsg?=?(UserInfo.UserMsg)?msg;
???//?進行相應的業(yè)務處理。。。
???//?這里就從簡了,只是打印而已
???System.out.println(
?????"客戶端接受到的用戶信息。編號:"?+?userMsg.getId()?+?",姓名:"?+?userMsg.getName()?+?",年齡:"?+?userMsg.getAge());
???//?這里返回一個已經接受到數(shù)據(jù)的狀態(tài)
???UserMsg.Builder?userState?=?UserMsg.newBuilder().setState(1);
???ctx.writeAndFlush(userState);
???System.out.println("成功發(fā)送給服務端!");
??}?catch?(Exception?e)?{
???e.printStackTrace();
??}?finally?{
???ReferenceCountUtil.release(msg);
??}
??}
?}
那么到這里客戶端的代碼也編寫完畢了。
功能測試
首先啟動服務端,然后再啟動客戶端。我們來看看結果是否如上述所說。
服務端輸出結果:
服務端啟動成功,端口是:9876
連接的客戶端地址:/127.0.0.1:53319
第1次,服務端接受的消息:state:?1
客戶端業(yè)務處理成功!
第2次,服務端接受的消息:state:?2
接受到客戶端發(fā)送的心跳!
第3次,服務端接受的消息:state:?2
接受到客戶端發(fā)送的心跳!
第4次,服務端接受的消息:state:?2
接受到客戶端發(fā)送的心跳!
客戶端輸入結果:
Netty客戶端啟動成功!
建立連接時:Mon Jul 16 23:31:58 CST 2018
客戶端接受到的用戶信息。編號:1,姓名:xuwujing,年齡:18
成功發(fā)送給服務端!
循環(huán)請求的時間:Mon Jul 16 23:32:02 CST 2018,次數(shù)1
循環(huán)請求的時間:Mon Jul 16 23:32:06 CST 2018,次數(shù)2
循環(huán)請求的時間:Mon Jul 16 23:32:10 CST 2018,次數(shù)3
循環(huán)請求的時間:Mon Jul 16 23:32:14 CST 2018,次數(shù)4
通過打印信息可以看出如上述所說。
接下來我們再來看看客戶端是否能夠實現(xiàn)重連。先啟動客戶端,再啟動服務端。
客戶端輸入結果:
Netty客戶端啟動成功!
與服務端斷開連接!在10s之后準備嘗試重連!
客戶端連接失敗!AbstractChannel$CloseFuture@1fbaa3ac(incomplete)
建立連接時:Mon Jul 16 23:41:33 CST 2018
客戶端接受到的用戶信息。編號:1,姓名:xuwujing,年齡:18
成功發(fā)送給服務端!
循環(huán)請求的時間:Mon Jul 16 23:41:38 CST 2018,次數(shù)1
循環(huán)請求的時間:Mon Jul 16 23:41:42 CST 2018,次數(shù)2
循環(huán)請求的時間:Mon Jul 16 23:41:46 CST 2018,次數(shù)3
服務端輸出結果:
服務端啟動成功,端口是:9876
連接的客戶端地址:/127.0.0.1:53492
第1次,服務端接受的消息:state:?1
客戶端業(yè)務處理成功!
第2次,服務端接受的消息:state:?2
接受到客戶端發(fā)送的心跳!
第3次,服務端接受的消息:state:?2
接受到客戶端發(fā)送的心跳!
第4次,服務端接受的消息:state:?2
結果也如上述所說!
其它
關于SpringBoot整合Netty使用Protobuf進行數(shù)據(jù)傳輸?shù)竭@里就結束了。SpringBoot整合Netty使用Protobuf進行數(shù)據(jù)傳輸?shù)捻椖抗こ痰刂? https://github.com/xuwujing/springBoot-study/tree/master/springboot-netty-protobuf
對了,也有不使用springBoot整合的Netty項目工程地址: https://github.com/xuwujing/Netty-study/tree/master/Netty-protobuf
往期推薦


我的星球是否適合你?
點擊閱讀原文看看我們都聊過啥?
