Netty 入門 — ChannelHandler, Netty 的數(shù)據(jù)加工廠
通過上篇文章(Netty入門 — Channel,把握 Netty 通信的命門),我們知道 Channel 是傳輸數(shù)據(jù)的通道,但是有了數(shù)據(jù),也有數(shù)據(jù)通道,沒有數(shù)據(jù)加工也是沒有意義的,所以今天學(xué)習(xí) Netty 的第四個(gè)組件:ChannelHandler ,它是 Netty 的數(shù)據(jù)加工廠。
ChannelHandler
在上篇文章(Netty入門 — Channel,把握 Netty 通信的命門)中,大明哥提到:EventLoop 接收到 Channel 的 I/O 事件后會(huì)將該事件轉(zhuǎn)交給 Handler 來處理,這個(gè) Handler 就是 ChannelHandler。ChannelHandler 它是對 Netty 輸入輸出數(shù)據(jù)進(jìn)行加工處理的載體,它包含了我們應(yīng)用程序業(yè)務(wù)處理的邏輯。
在整個(gè) Netty 生產(chǎn)線上,它就是那個(gè)埋頭苦干,一心干活的工人。
Channel 的狀態(tài)處理
在介紹 Channel 的時(shí)候(Netty入門 — Channel,把握 Netty 通信的命門)我們知道,Channel 是有狀態(tài)的,而且 Channel 也提供了判斷 Channel 當(dāng)前狀態(tài)的 API,如下:
-
isOpen():檢查 Channel 是否為 open 狀態(tài)。 -
isRegistered():檢查 Channel 是否為 registered 狀態(tài)。 -
isActive():檢查 Channel 是否為 active 狀態(tài)。
上面三個(gè) API 對應(yīng)了 Channel 四個(gè)狀態(tài):
| 狀態(tài) | 描述 |
|---|---|
| ChannelUnregistered | Channel 已經(jīng)被創(chuàng)建,但還未注冊到 EventLoop。此時(shí) isOpen() 返回 true,但 isRegistered() 返回 false。 |
| ChannelRegistered | Channel 已經(jīng)被注冊到 EventLoop。此時(shí) isRegistered() 返回 true,但 isActive() 返回 false。 |
| ChannelActive | Channel 已經(jīng)處理活動(dòng)狀態(tài)并可以接收與發(fā)送數(shù)據(jù)。此時(shí) isActive() 返回 true。 |
| ChannelInactive | Channel 沒有連接到遠(yuǎn)程節(jié)點(diǎn) |
狀態(tài)變更如下:
當(dāng) Channel 的狀態(tài)發(fā)生改變時(shí),會(huì)生成相對應(yīng)的事件,這些事件會(huì)被轉(zhuǎn)發(fā)給 ChannelHandler,而 ChannelHandler 中會(huì)有相對應(yīng)的方法來對其進(jìn)行響應(yīng)。在 ChannelHandler 中定義一些與這生命周期相關(guān)的 API,如 channelRegistered() 、channelUnregistered() 、channelActive() 、channelInactive()等等,后面大明哥會(huì)詳細(xì)介紹這些 API。
ChannelHandler 的家族
ChannelHandler 的大家族如下圖:
ChannelHandler 是整個(gè)家族中的頂層接口,它有兩大子接口,ChannelInBoundHandler 和 ChannelOutBoundHandler,其中 ChannelInBoundHandler 用于處理入站數(shù)據(jù)(讀請求),ChannelOutBoundHandler 用于處理出站數(shù)據(jù)(寫請求),他們兩個(gè)接口都有一個(gè)相對應(yīng)的默認(rèn)實(shí)現(xiàn),即 XxxxAdapter。
ChannelHandler
ChannelHandler 作為頂層接口,它并不具備太多功能,它僅僅只提供了三個(gè) API:
| API | 描述 |
|---|---|
handlerAdded() |
當(dāng)ChannelHandler 添加到 ChannelPipeline 中時(shí)被調(diào)用 |
handlerRemoved() |
當(dāng) ChannelHandler 被從 ChannelPipeline 移除時(shí)調(diào)用 |
exceptionCaught() |
當(dāng) ChannelHandler 在處理過程中出現(xiàn)異常時(shí)調(diào)用 |
從 ChannelHandler 提供的 API 中我們可以看出,它并不直接參與 Channel 的數(shù)據(jù)加工過程,而是用來響應(yīng) ChannelPipeline 鏈和異常處 理的,對于 Channel 的數(shù)據(jù)加工則由它的子接口處理:
-
ChannelInboundHandler:攔截&處理各種入站的 I/O 事件 -
ChannelOutboundHandler:攔截&處理各種出站的 I/O 事件
這里解釋下出站和入站的概念:
-
接收對方傳輸?shù)臄?shù)據(jù)并處理,即為入站 -
向?qū)Ψ綄憯?shù)據(jù)時(shí),即為出站 -
如:客戶端 —> 服務(wù)端:服務(wù)端讀取客戶端消息為入站,服務(wù)端處理消息完后給客戶端返回消息為出站
ChannelInboundHandler
ChannelInboundHandler 用來處理和攔截各種入站的 I/O 事件,它可以處理的事件如下:
| 響應(yīng)方法 | 觸發(fā)事件 |
| channelRegistered | Channel 被注冊到EventLoop 時(shí) |
| channelUnregistered | Channel 從 EventLoop 中取消時(shí) |
| channelActive | Channel 處理活躍狀態(tài),可以讀寫時(shí) |
| channelInactive | Channel 不再是活動(dòng)狀態(tài)且不再連接它的遠(yuǎn)程節(jié)點(diǎn)時(shí) |
| channelReadComplete | Channel 從上一個(gè)讀操作完成時(shí) |
| channelRead | Channel 讀取數(shù)據(jù)時(shí) |
| channelWritabilityChanged | Channel 的可寫狀態(tài)發(fā)生改變時(shí) |
| userEventTriggered | ChannelInboundHandler.fireUserEventTriggered()方法被調(diào)用時(shí) |
一般情況我們不直接使用 ChannelInboundHandler,而是使用它的實(shí)現(xiàn)類 ChannelInboundHandlerAdapter。我們寫業(yè)務(wù)處理時(shí)直接繼承 ChannelInboundHandlerAdapter ,然后重寫你感興趣的 I/O 事件響應(yīng)方法即可,比如你想處理 Channel 讀數(shù)據(jù),那重寫 channelRead() 即可,代碼如下:
public class ChannelHandlerTest_1 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// do something
}
}
但是在使用 ChannelInboundHandlerAdapter 的時(shí)候,需要注意的是我們需要顯示地釋放與池化 ByteBuf 實(shí)例相關(guān)的內(nèi)存,Netty 為此專門提供了一個(gè)方法 ReferenceCountUtil.release(),即我們需要在 ChannelInboundHandler 的鏈的末尾需要使用該方法來釋放內(nèi)存,如下:
public class ByteBufReleaseHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg){
//釋放msg
ReferenceCountUtil.release(msg);
}
}
但是有些小伙伴有時(shí)候會(huì)忘記這點(diǎn),會(huì)帶來不必要的麻煩,那有沒有更好的方法呢?Netty 提供了一個(gè)類來幫助我們簡化這個(gè)過程: SimpleChannelInboundHandler,對于我們業(yè)務(wù)處理的類,采用繼承 SimpleChannelInboundHandler 而不是 ChannelInboundHandlerAdapter 就可以解決了。
public class ChannelHandlerTest_1 extends SimpleChannelInboundHandler<Object> {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// do something
}
}
使用 SimpleChannelInboundHandler 我們就不需要顯示釋放資源了,是不是非常人性化。
ChannelOutboundHandler
ChannelOutboundHandler 是攔截和處理出站的各種 I/O 事件的。處理的事件如下:
| 響應(yīng)方法 | 觸發(fā)事件 |
|---|---|
| bind | 請求將 Channel 綁定到本地地址時(shí) |
| connect | 請求將 Channel 連接到遠(yuǎn)程節(jié)點(diǎn)時(shí) |
| disconnect | 請求將 Channel 從遠(yuǎn)程節(jié)點(diǎn)斷開時(shí) |
| close | 請求關(guān)閉 Channel 時(shí) |
| dderegister | 請求將 Channel 從它的 EventLoop 注銷時(shí) |
| read | 請求從 Channel 中讀取數(shù)據(jù)時(shí) |
| flush | 請求通過 Channel 將入隊(duì)數(shù)據(jù)刷入遠(yuǎn)程節(jié)點(diǎn)時(shí) |
| write | 請求通過 Channel 將數(shù)據(jù)寫入遠(yuǎn)程節(jié)點(diǎn)時(shí) |
與 ChannelInboundHandler 一樣,我們也不直接使用 ChannelOutboundHandler 接口,而是使用它的默認(rèn)實(shí)現(xiàn)類 ChannelOutboundHandlerAdapter,重寫我們想處理的 I/O 事件的響應(yīng)方法就可以了。
ChannelHandler 的示例
ChannelHandler 生命周期
大明哥通過一個(gè)示例來告訴你 ChannelHandler 在整個(gè)執(zhí)行過程中的生命周期是怎么樣的,響應(yīng)方法調(diào)用的順序是如何的。我們只有了解了整個(gè)生命周期的運(yùn)行,才能在合適的生命周期響應(yīng)方法里面擴(kuò)展我們自己的業(yè)務(wù)功能。
-
服務(wù)端代碼
public class ChannelHandlerTest_1_server extends ChannelInboundHandlerAdapter {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelHandlerTest());
}
})
.bind(8081);
}
private static class ChannelHandlerTest extends ChannelInboundHandlerAdapter {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println(LocalTime.now().toString() + "--handlerAdded:handler 被添加到 ChannelPipeline");
super.handlerAdded(ctx);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println(LocalTime.now().toString() + "--channelRegistered:Channel 注冊到 EventLoop");
super.channelRegistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(LocalTime.now().toString() + "--channelActive:Channel 準(zhǔn)備就緒");
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(LocalTime.now().toString() + "--channelRead:Channel 中有可讀數(shù)據(jù)");
super.channelRead(ctx, msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println(LocalTime.now().toString() + "--channelReadComplete:Channel 讀取數(shù)據(jù)完成");
super.channelReadComplete(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(LocalTime.now().toString() + "--channelInactive:Channel 被關(guān)閉,不在活躍");
super.channelInactive(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println(LocalTime.now().toString() + "--channelUnregistered:Channe 從 EventLoop 中被取消");
super.channelUnregistered(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println(LocalTime.now().toString() + "--handlerRemoved:handler 從 ChannelPipeline 中移除");
super.handlerRemoved(ctx);
}
}
}
服務(wù)端重寫整個(gè)生命周期中的各個(gè)方法。
-
客戶端
public class ChannelHandlerTest_1_client extends ChannelInboundHandlerAdapter {
public static void main(String[] args) throws Exception {
Channel channel = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect("127.0.0.1",8081)
.sync()
.channel();
for (int i = 1 ; i <= 2 ; i++) {
channel.writeAndFlush("hello,i am ChannelHander client -" + i);
TimeUnit.SECONDS.sleep(2);
}
channel.close();
}
}
客戶端連接服務(wù)端后,每個(gè) 2 秒向服務(wù)端發(fā)送一次消息,共發(fā)兩次。發(fā)送完消息后,再過 2 秒關(guān)閉連接。
-
運(yùn)行結(jié)果
11:45:09.456--handlerAdded:handler 被添加到 ChannelPipeline
11:45:09.457--channelRegistered:Channel 注冊到 EventLoop
11:45:09.457--channelActive:Channel 準(zhǔn)備就緒
11:45:09.474--channelRead:Channel 中有可讀數(shù)據(jù)
11:45:09.475--channelReadComplete:Channel 讀取數(shù)據(jù)完成
11:45:11.397--channelRead:Channel 中有可讀數(shù)據(jù)
11:45:11.397--channelReadComplete:Channel 讀取數(shù)據(jù)完成
11:45:13.405--channelReadComplete:Channel 讀取數(shù)據(jù)完成
11:45:13.407--channelInactive:Channel 被關(guān)閉,不在活躍
11:45:13.407--channelUnregistered:Channe 從 EventLoop 中被取消
11:45:13.407--handlerRemoved:handler 從 ChannelPipeline 中移除
結(jié)果分析如下:
-
服務(wù)端檢測到客戶端發(fā)起連接后,會(huì)將要處理的 Handler 添加到 ChannelPipeline 中,然后將 Channel 注冊到 EventLoop,注冊完成后,Channel 準(zhǔn)備就緒處于活躍狀態(tài),可以接收消息了 -
客戶端向服務(wù)端發(fā)送消息,服務(wù)端讀取消息 -
當(dāng)服務(wù)端檢測到客戶端已關(guān)閉連接后,該 Channel 就被關(guān)閉了,不再活躍,然后將該 Channel 從 EventLoop 取消,并將 Handler 從 ChannelPipeline 中移除。
在整個(gè)生命周期中,響應(yīng)方法執(zhí)行順序如下:
-
建立連接: handlerAdded()->channelRegistered()->channelActive () -
數(shù)據(jù)請求: channelRead()->channelReadComplete() -
關(guān)閉連接: channelReadComplete()->channelInactive()->channelUnregistered()->handlerRemoved()
這里有一點(diǎn)需要注意,為什么關(guān)閉連接會(huì)響應(yīng) channelReadComplete() 呢?這里埋個(gè)點(diǎn),后續(xù)大明哥在來說明,有興趣的小伙伴可以先研究研究。
這里大明哥對 ChannelHandler 生命周期的方法做一個(gè)總結(jié):
-
handlerAdded():ChannelHandler 被加入到 Pipeline 時(shí)觸發(fā)。當(dāng)服務(wù)端檢測到新鏈接后,會(huì)將 ChannelHandler 構(gòu)建成一個(gè)雙向鏈表(下篇文章介紹),該方法被觸發(fā)表示在當(dāng)前 Channel 中已經(jīng)添加了一個(gè) ChannelHandler 業(yè)務(wù)處理鏈了》。 -
channelRegistered():當(dāng) Channel 注冊到 EventLoop 中時(shí)被觸發(fā)。該方法被觸發(fā)了,表明當(dāng)前 Channel 已經(jīng)綁定到了某一個(gè) EventLoop 中了。 -
channelActive():Channel 連接就緒時(shí)觸發(fā)。該方法被觸發(fā),說明當(dāng)前 Channel 已經(jīng)處于活躍狀態(tài)了,可以進(jìn)行數(shù)據(jù)讀寫了。 -
channelRead():當(dāng) Channel 有數(shù)據(jù)可讀時(shí)觸發(fā)。客戶端向服務(wù)端發(fā)送數(shù)據(jù),都會(huì)觸發(fā)該方法,該方法被調(diào)用說明有數(shù)據(jù)可讀。而且我們自定義業(yè)務(wù) handler 時(shí)都是重寫該方法。 -
channelReadComplete():當(dāng) Channel 數(shù)據(jù)讀完時(shí)觸發(fā)。服務(wù)端每次讀完數(shù)據(jù)后都會(huì)觸發(fā)該方法,表明數(shù)據(jù)已讀取完畢。 -
channelInactive():當(dāng) Channel 斷開連接時(shí)觸發(fā)。該方法被觸發(fā),說明 Channel 已經(jīng)不再是活躍狀態(tài)了,連接已經(jīng)關(guān)閉了。 -
channelUnregistered():當(dāng) Channel 取消注冊時(shí)觸發(fā):連接關(guān)閉后,我們就要取消該 Channel 與 EventLoop 的綁定關(guān)系了。 -
handlerRemoved():當(dāng) ChannelHandler 被從 ChannelPipeline 中移除時(shí)觸發(fā)。將與該 Channel 綁定的 ChannelPipeline 中的 ChannelHandler 業(yè)務(wù)處理鏈全部移除。
ChannelHandler 業(yè)務(wù)開發(fā)
上面只是一個(gè)很簡單的 ChannelHandler 實(shí)例,但是我們在實(shí)際項(xiàng)目開發(fā)中,要處理的業(yè)務(wù)復(fù)制多了,它肯定是由多個(gè)業(yè)務(wù)組合而成,那我們又該如何去做呢?
一個(gè) ChannelHandler
偽代碼如下:
public class ChannelHandlerTest extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Map<String,String> map=(Map<String,String>)msg;
String code=map.get("code");
if(code.equals("xxx1")){
//do something 1
}else if(code.equals("xxx2")){
//do something 2
}else{
// do something 3
}
}
}
這種實(shí)現(xiàn)方式簡單,非常容易實(shí)現(xiàn),但是如果業(yè)務(wù)較多的情況下,該 Handler 的處理邏輯會(huì)非常臃腫,而且很不好維護(hù),而且這么多 if...else ,看起來就煩,當(dāng)然我們可以采用相對應(yīng)的方式來干掉 if...else ,例如利用策略模式:
public class ChannelHandlerTest extends ChannelInboundHandlerAdapter {
HashMap<String,HandlerService) handlerServiceMap = new HashMap();
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Map<String,String> map=(Map<String,String>)msg;
String code=map.get("code");
handlerServiceMap.get(code).doSomething(map);
}
}
這種方式也行,但他把所有的業(yè)務(wù)都耦合在一起了,終究不是那么優(yōu)雅。
多個(gè) ChannelHandler
我們可以定義多個(gè) ChannelHandler,根據(jù) code 來判斷,如果 code 是自己的則處理,否則流轉(zhuǎn)到下一個(gè)節(jié)點(diǎn):
public class ChannelHandlerTest extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Map<String,String> map=(Map<String,String>)msg;
String code=map.get("code");
if(code.equals("xxx")){
//自己的業(yè)務(wù),處理
}else{
//不是自己的,流轉(zhuǎn)到下一個(gè)節(jié)點(diǎn)
super.channelRead(ctx, msg);
}
}
}
這種方式將所有業(yè)務(wù)解耦了,每個(gè) Handler 各司其職,所有業(yè)務(wù)不再是都耦合在一起了,后期維護(hù)更加輕松。這種方式和上面方式的一樣,依賴 code,他們都需要服務(wù)端和客戶端都維護(hù)一套 code,而這個(gè) code 如果還不能輕易發(fā)生變更。
自定義類型
根據(jù)客戶端提交的參數(shù)類型,自動(dòng)流轉(zhuǎn)到相對應(yīng)的 ChannelHandler。
public class ChannelHandlerTest extends SimpleChannelInboundHandler<User> {
protected void channelRead0(ChannelHandlerContext channelHandlerContext, User user) throws Exception {
// do something
}
}
這種方式是最優(yōu)雅的,也是我們使用最多的方式。他的優(yōu)點(diǎn)明顯,1. 業(yè)務(wù)解耦,2. 不需要維護(hù)碼表。
總結(jié)
-
ChannelHandler 是 Netty 中真正做事的組件,EventLoop 將監(jiān)聽到的 I/O 事件轉(zhuǎn)發(fā)后,就由 ChannelHandler 來處理,它也是我們編寫 Netty 代碼最多的地方。 -
ChannelHandler 作為頂層接口,它一般不會(huì)負(fù)責(zé)具體業(yè)務(wù) I/O 事件,具體的業(yè)務(wù) I/O 事件由它兩個(gè)子接口負(fù)責(zé): -
ChannelInboundHandler:負(fù)責(zé)攔截響應(yīng)入站 I/O 事件 -
ChannelOutboundHandler:負(fù)責(zé)攔截響應(yīng)出站 I/O 事件 -
我們自定義的業(yè)務(wù) Handler ,一般不會(huì)直接實(shí)現(xiàn) ChannelInboundHandler 和 ChannelOutboundHandler,而是繼承他們兩個(gè)的默認(rèn)實(shí)現(xiàn)類:ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter,這樣我們就可以只需要關(guān)注我們想關(guān)注的 I/O 事件即可。 -
在這節(jié)內(nèi)容中,最重要的是理解 ChannelHandler 的生命周期方法,在文章總已多次總結(jié)了,這里不再闡述了。
雖然 ChannelHandler 是一個(gè)好的打工人,但是它沒辦法一個(gè)人干所有的活啊,他需要有其他的 ChannelHandler 來配合它,這個(gè)時(shí)候怎么辦?大明哥下篇文章揭曉!
示例源碼:http://suo.nz/1v8w02
