<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Netty 入門 — ChannelHandler, Netty 的數(shù)據(jù)加工廠

          共 21522字,需瀏覽 44分鐘

           ·

          2023-10-26 14:27

          通過上篇文章(Netty入門 — Channel,把握 Netty 通信的命門),我們知道 Channel 是傳輸數(shù)據(jù)的通道,但是有了數(shù)據(jù),也有數(shù)據(jù)通道,沒有數(shù)據(jù)加工也是沒有意義的,所以今天學(xué)習(xí) Netty 的第四個(gè)組件:ChannelHandler ,它是 Netty 的數(shù)據(jù)加工廠。

          ChannelHandler

          在上篇文章(Netty入門 — Channel,把握 Netty 通信的命門)中,大明哥提到:EventLoop 接收到 Channel 的 I/O  事件后會(huì)將該事件轉(zhuǎn)交給 Handler 來處理,這個(gè) Handler 就是 ChannelHandler。ChannelHandler 它是對 Netty 輸入輸出數(shù)據(jù)進(jìn)行加工處理的載體,它包含了我們應(yīng)用程序業(yè)務(wù)處理的邏輯

          在整個(gè) Netty 生產(chǎn)線上,它就是那個(gè)埋頭苦干,一心干活的工人。

          Channel 的狀態(tài)處理

          在介紹 Channel 的時(shí)候(Netty入門 — Channel,把握 Netty 通信的命門)我們知道,Channel 是有狀態(tài)的,而且 Channel 也提供了判斷 Channel 當(dāng)前狀態(tài)的 API,如下:

          • isOpen():檢查 Channel 是否為 open 狀態(tài)。
          • isRegistered():檢查 Channel 是否為 registered 狀態(tài)。
          • isActive():檢查 Channel 是否為 active 狀態(tài)。

          上面三個(gè) API 對應(yīng)了 Channel 四個(gè)狀態(tài):

          狀態(tài) 描述
          ChannelUnregistered Channel 已經(jīng)被創(chuàng)建,但還未注冊到 EventLoop。此時(shí) isOpen() 返回 true,但 isRegistered() 返回 false。
          ChannelRegistered Channel 已經(jīng)被注冊到 EventLoop。此時(shí) isRegistered() 返回 true,但 isActive() 返回 false。
          ChannelActive Channel 已經(jīng)處理活動(dòng)狀態(tài)并可以接收與發(fā)送數(shù)據(jù)。此時(shí) isActive()  返回 true。
          ChannelInactive Channel 沒有連接到遠(yuǎn)程節(jié)點(diǎn)

          狀態(tài)變更如下:

          當(dāng) Channel 的狀態(tài)發(fā)生改變時(shí),會(huì)生成相對應(yīng)的事件,這些事件會(huì)被轉(zhuǎn)發(fā)給 ChannelHandler,而 ChannelHandler 中會(huì)有相對應(yīng)的方法來對其進(jìn)行響應(yīng)。在 ChannelHandler 中定義一些與這生命周期相關(guān)的 API,如 channelRegistered()channelUnregistered()channelActive()channelInactive()等等,后面大明哥會(huì)詳細(xì)介紹這些 API。

          ChannelHandler 的家族

          ChannelHandler 的大家族如下圖:

          ChannelHandler 是整個(gè)家族中的頂層接口,它有兩大子接口,ChannelInBoundHandler 和 ChannelOutBoundHandler,其中 ChannelInBoundHandler 用于處理入站數(shù)據(jù)(讀請求),ChannelOutBoundHandler 用于處理出站數(shù)據(jù)(寫請求),他們兩個(gè)接口都有一個(gè)相對應(yīng)的默認(rèn)實(shí)現(xiàn),即 XxxxAdapter。

          ChannelHandler

          ChannelHandler 作為頂層接口,它并不具備太多功能,它僅僅只提供了三個(gè) API:

          API 描述
          handlerAdded() 當(dāng)ChannelHandler 添加到 ChannelPipeline 中時(shí)被調(diào)用
          handlerRemoved() 當(dāng) ChannelHandler 被從 ChannelPipeline 移除時(shí)調(diào)用
          exceptionCaught() 當(dāng) ChannelHandler 在處理過程中出現(xiàn)異常時(shí)調(diào)用

          從 ChannelHandler 提供的 API 中我們可以看出,它并不直接參與 Channel 的數(shù)據(jù)加工過程,而是用來響應(yīng) ChannelPipeline 鏈和異常處 理的,對于 Channel 的數(shù)據(jù)加工則由它的子接口處理:

          • ChannelInboundHandler:攔截&處理各種入站的 I/O 事件
          • ChannelOutboundHandler:攔截&處理各種出站的 I/O 事件

          這里解釋下出站和入站的概念:

          • 接收對方傳輸?shù)臄?shù)據(jù)并處理,即為入站
          • 向?qū)Ψ綄憯?shù)據(jù)時(shí),即為出站
          • 如:客戶端  —> 服務(wù)端:服務(wù)端讀取客戶端消息為入站,服務(wù)端處理消息完后給客戶端返回消息為出站

          ChannelInboundHandler

          ChannelInboundHandler 用來處理和攔截各種入站的 I/O 事件,它可以處理的事件如下:



          響應(yīng)方法 觸發(fā)事件
          channelRegistered Channel 被注冊到EventLoop 時(shí)
          channelUnregistered Channel 從 EventLoop 中取消時(shí)
          channelActive Channel 處理活躍狀態(tài),可以讀寫時(shí)
          channelInactive Channel 不再是活動(dòng)狀態(tài)且不再連接它的遠(yuǎn)程節(jié)點(diǎn)時(shí)
          channelReadComplete Channel 從上一個(gè)讀操作完成時(shí)
          channelRead Channel 讀取數(shù)據(jù)時(shí)
          channelWritabilityChanged Channel 的可寫狀態(tài)發(fā)生改變時(shí)
          userEventTriggered ChannelInboundHandler.fireUserEventTriggered()方法被調(diào)用時(shí)

          一般情況我們不直接使用 ChannelInboundHandler,而是使用它的實(shí)現(xiàn)類 ChannelInboundHandlerAdapter。我們寫業(yè)務(wù)處理時(shí)直接繼承 ChannelInboundHandlerAdapter ,然后重寫你感興趣的 I/O 事件響應(yīng)方法即可,比如你想處理 Channel 讀數(shù)據(jù),那重寫 channelRead() 即可,代碼如下:

          public class ChannelHandlerTest_1 extends ChannelInboundHandlerAdapter {
              @Override
              public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                  // do something
              }
          }

          但是在使用 ChannelInboundHandlerAdapter 的時(shí)候,需要注意的是我們需要顯示地釋放與池化 ByteBuf 實(shí)例相關(guān)的內(nèi)存,Netty 為此專門提供了一個(gè)方法 ReferenceCountUtil.release(),即我們需要在 ChannelInboundHandler 的鏈的末尾需要使用該方法來釋放內(nèi)存,如下:

          public class ByteBufReleaseHandler extends ChannelInboundHandlerAdapter{
            @Override
            public void channelRead(ChannelHandlerContext ctx,Object msg){
              //釋放msg
              ReferenceCountUtil.release(msg);
            }
          }

          但是有些小伙伴有時(shí)候會(huì)忘記這點(diǎn),會(huì)帶來不必要的麻煩,那有沒有更好的方法呢?Netty 提供了一個(gè)類來幫助我們簡化這個(gè)過程: SimpleChannelInboundHandler,對于我們業(yè)務(wù)處理的類,采用繼承 SimpleChannelInboundHandler 而不是 ChannelInboundHandlerAdapter 就可以解決了。

          public class ChannelHandlerTest_1 extends SimpleChannelInboundHandler<Object{
              @Override
              public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                  // do something
              }
          }

          使用 SimpleChannelInboundHandler 我們就不需要顯示釋放資源了,是不是非常人性化。

          ChannelOutboundHandler

          ChannelOutboundHandler 是攔截和處理出站的各種 I/O 事件的。處理的事件如下:

          響應(yīng)方法 觸發(fā)事件
          bind 請求將 Channel 綁定到本地地址時(shí)
          connect 請求將 Channel 連接到遠(yuǎn)程節(jié)點(diǎn)時(shí)
          disconnect 請求將 Channel 從遠(yuǎn)程節(jié)點(diǎn)斷開時(shí)
          close 請求關(guān)閉 Channel 時(shí)
          dderegister 請求將 Channel 從它的 EventLoop 注銷時(shí)
          read 請求從 Channel 中讀取數(shù)據(jù)時(shí)
          flush 請求通過 Channel 將入隊(duì)數(shù)據(jù)刷入遠(yuǎn)程節(jié)點(diǎn)時(shí)
          write 請求通過 Channel 將數(shù)據(jù)寫入遠(yuǎn)程節(jié)點(diǎn)時(shí)

          與 ChannelInboundHandler 一樣,我們也不直接使用 ChannelOutboundHandler 接口,而是使用它的默認(rèn)實(shí)現(xiàn)類 ChannelOutboundHandlerAdapter,重寫我們想處理的 I/O 事件的響應(yīng)方法就可以了。

          ChannelHandler 的示例

          ChannelHandler 生命周期

          大明哥通過一個(gè)示例來告訴你 ChannelHandler 在整個(gè)執(zhí)行過程中的生命周期是怎么樣的,響應(yīng)方法調(diào)用的順序是如何的。我們只有了解了整個(gè)生命周期的運(yùn)行,才能在合適的生命周期響應(yīng)方法里面擴(kuò)展我們自己的業(yè)務(wù)功能。

          • 服務(wù)端代碼
          public class ChannelHandlerTest_1_server extends ChannelInboundHandlerAdapter {
              public static void main(String[] args) {
                  new ServerBootstrap()
                          .group(new NioEventLoopGroup())
                          .channel(NioServerSocketChannel.class)
                          .childHandler(new ChannelInitializer<NioSocketChannel>() 
          {
                              @Override
                              protected void initChannel(NioSocketChannel ch) throws Exception {
                                  ch.pipeline().addLast(new ChannelHandlerTest());
                              }
                          })
                          .bind(8081);
              }

              private static class ChannelHandlerTest extends ChannelInboundHandlerAdapter {
                  @Override
                  public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
                      System.out.println(LocalTime.now().toString() + "--handlerAdded:handler 被添加到 ChannelPipeline");
                      super.handlerAdded(ctx);
                  }

                  @Override
                  public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                      System.out.println(LocalTime.now().toString() + "--channelRegistered:Channel 注冊到 EventLoop");
                      super.channelRegistered(ctx);
                  }

                  @Override
                  public void channelActive(ChannelHandlerContext ctx) throws Exception {
                      System.out.println(LocalTime.now().toString() + "--channelActive:Channel 準(zhǔn)備就緒");
                      super.channelActive(ctx);
                  }

                  @Override
                  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                      System.out.println(LocalTime.now().toString() + "--channelRead:Channel 中有可讀數(shù)據(jù)");

                      super.channelRead(ctx, msg);
                  }

                  @Override
                  public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
                      System.out.println(LocalTime.now().toString() + "--channelReadComplete:Channel 讀取數(shù)據(jù)完成");
                      super.channelReadComplete(ctx);
                  }

                  @Override
                  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                      System.out.println(LocalTime.now().toString() + "--channelInactive:Channel 被關(guān)閉,不在活躍");
                      super.channelInactive(ctx);
                  }

                  @Override
                  public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
                      System.out.println(LocalTime.now().toString() + "--channelUnregistered:Channe 從 EventLoop 中被取消");
                      super.channelUnregistered(ctx);
                  }

                  @Override
                  public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
                      System.out.println(LocalTime.now().toString() + "--handlerRemoved:handler 從 ChannelPipeline 中移除");
                      super.handlerRemoved(ctx);
                  }
              }
          }

          服務(wù)端重寫整個(gè)生命周期中的各個(gè)方法。

          • 客戶端
          public class ChannelHandlerTest_1_client extends ChannelInboundHandlerAdapter {
              public static void main(String[] args) throws Exception {
                  Channel channel = new Bootstrap()
                          .group(new NioEventLoopGroup())
                          .channel(NioSocketChannel.class)
                          .handler(new ChannelInitializer<NioSocketChannel>() 
          {
                              @Override
                              protected void initChannel(NioSocketChannel ch) throws Exception {
                                  ch.pipeline().addLast(new StringEncoder());
                              }
                          })
                          .connect("127.0.0.1",8081)
                          .sync()
                          .channel();

                  for (int i = 1 ; i <= 2 ; i++) {
                      channel.writeAndFlush("hello,i am ChannelHander client -" + i);
                      TimeUnit.SECONDS.sleep(2);
                  }

                  channel.close();
              }
          }

          客戶端連接服務(wù)端后,每個(gè) 2 秒向服務(wù)端發(fā)送一次消息,共發(fā)兩次。發(fā)送完消息后,再過 2 秒關(guān)閉連接。

          • 運(yùn)行結(jié)果
          11:45:09.456--handlerAdded:handler 被添加到 ChannelPipeline
          11:45:09.457--channelRegistered:Channel 注冊到 EventLoop
          11:45:09.457--channelActive:Channel 準(zhǔn)備就緒

          11:45:09.474--channelRead:Channel 中有可讀數(shù)據(jù)
          11:45:09.475--channelReadComplete:Channel 讀取數(shù)據(jù)完成

          11:45:11.397--channelRead:Channel 中有可讀數(shù)據(jù)
          11:45:11.397--channelReadComplete:Channel 讀取數(shù)據(jù)完成

          11:45:13.405--channelReadComplete:Channel 讀取數(shù)據(jù)完成
          11:45:13.407--channelInactive:Channel 被關(guān)閉,不在活躍
          11:45:13.407--channelUnregistered:Channe 從 EventLoop 中被取消
          11:45:13.407--handlerRemoved:handler 從 ChannelPipeline 中移除

          結(jié)果分析如下:

          • 服務(wù)端檢測到客戶端發(fā)起連接后,會(huì)將要處理的 Handler 添加到 ChannelPipeline 中,然后將 Channel 注冊到 EventLoop,注冊完成后,Channel 準(zhǔn)備就緒處于活躍狀態(tài),可以接收消息了
          • 客戶端向服務(wù)端發(fā)送消息,服務(wù)端讀取消息
          • 當(dāng)服務(wù)端檢測到客戶端已關(guān)閉連接后,該 Channel 就被關(guān)閉了,不再活躍,然后將該 Channel 從 EventLoop 取消,并將 Handler 從 ChannelPipeline 中移除。

          在整個(gè)生命周期中,響應(yīng)方法執(zhí)行順序如下:

          1. 建立連接handlerAdded() -> channelRegistered() -> channelActive ()
          2. 數(shù)據(jù)請求channelRead() -> channelReadComplete()
          3. 關(guān)閉連接channelReadComplete()  -> channelInactive() -> channelUnregistered() -> handlerRemoved()

          這里有一點(diǎn)需要注意,為什么關(guān)閉連接會(huì)響應(yīng) channelReadComplete() 呢?這里埋個(gè)點(diǎn),后續(xù)大明哥在來說明,有興趣的小伙伴可以先研究研究。

          這里大明哥對 ChannelHandler 生命周期的方法做一個(gè)總結(jié):

          1. handlerAdded():ChannelHandler 被加入到 Pipeline 時(shí)觸發(fā)。當(dāng)服務(wù)端檢測到新鏈接后,會(huì)將 ChannelHandler 構(gòu)建成一個(gè)雙向鏈表(下篇文章介紹),該方法被觸發(fā)表示在當(dāng)前 Channel 中已經(jīng)添加了一個(gè) ChannelHandler 業(yè)務(wù)處理鏈了》。
          2. channelRegistered():當(dāng) Channel 注冊到 EventLoop 中時(shí)被觸發(fā)。該方法被觸發(fā)了,表明當(dāng)前 Channel 已經(jīng)綁定到了某一個(gè) EventLoop 中了。
          3. channelActive():Channel 連接就緒時(shí)觸發(fā)。該方法被觸發(fā),說明當(dāng)前 Channel 已經(jīng)處于活躍狀態(tài)了,可以進(jìn)行數(shù)據(jù)讀寫了。
          4. channelRead():當(dāng) Channel 有數(shù)據(jù)可讀時(shí)觸發(fā)。客戶端向服務(wù)端發(fā)送數(shù)據(jù),都會(huì)觸發(fā)該方法,該方法被調(diào)用說明有數(shù)據(jù)可讀。而且我們自定義業(yè)務(wù) handler 時(shí)都是重寫該方法。
          5. channelReadComplete():當(dāng) Channel 數(shù)據(jù)讀完時(shí)觸發(fā)。服務(wù)端每次讀完數(shù)據(jù)后都會(huì)觸發(fā)該方法,表明數(shù)據(jù)已讀取完畢。
          6. channelInactive():當(dāng) Channel 斷開連接時(shí)觸發(fā)。該方法被觸發(fā),說明 Channel 已經(jīng)不再是活躍狀態(tài)了,連接已經(jīng)關(guān)閉了。
          7. channelUnregistered():當(dāng) Channel 取消注冊時(shí)觸發(fā):連接關(guān)閉后,我們就要取消該 Channel 與 EventLoop 的綁定關(guān)系了。
          8. handlerRemoved():當(dāng) ChannelHandler 被從 ChannelPipeline 中移除時(shí)觸發(fā)。將與該 Channel 綁定的 ChannelPipeline 中的 ChannelHandler 業(yè)務(wù)處理鏈全部移除。

          ChannelHandler 業(yè)務(wù)開發(fā)

          上面只是一個(gè)很簡單的 ChannelHandler 實(shí)例,但是我們在實(shí)際項(xiàng)目開發(fā)中,要處理的業(yè)務(wù)復(fù)制多了,它肯定是由多個(gè)業(yè)務(wù)組合而成,那我們又該如何去做呢?

          一個(gè) ChannelHandler

          偽代碼如下:

          public class ChannelHandlerTest extends ChannelInboundHandlerAdapter {
              @Override
              public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                  Map<String,String> map=(Map<String,String>)msg;
                  String code=map.get("code");
                  if(code.equals("xxx1")){
                      //do something 1
                  }else if(code.equals("xxx2")){
                      //do something 2
                  }else{
                      // do something 3
                  }
              }
          }

          這種實(shí)現(xiàn)方式簡單,非常容易實(shí)現(xiàn),但是如果業(yè)務(wù)較多的情況下,該 Handler 的處理邏輯會(huì)非常臃腫,而且很不好維護(hù),而且這么多 if...else ,看起來就煩,當(dāng)然我們可以采用相對應(yīng)的方式來干掉 if...else ,例如利用策略模式:

          public class ChannelHandlerTest extends ChannelInboundHandlerAdapter {
              HashMap<String,HandlerService) handlerServiceMap = new HashMap();
              @Override
              public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                  Map<String,String> map=(Map<String,String>)msg;
                  String code=map.get("code");
                  handlerServiceMap.get(code).doSomething(map);
              }
          }

          這種方式也行,但他把所有的業(yè)務(wù)都耦合在一起了,終究不是那么優(yōu)雅。

          多個(gè) ChannelHandler

          我們可以定義多個(gè) ChannelHandler,根據(jù) code 來判斷,如果 code 是自己的則處理,否則流轉(zhuǎn)到下一個(gè)節(jié)點(diǎn):

          public class ChannelHandlerTest extends ChannelInboundHandlerAdapter {
              @Override
              public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                  Map<String,String> map=(Map<String,String>)msg;
                  String code=map.get("code");
                  if(code.equals("xxx")){
                      //自己的業(yè)務(wù),處理
                  }else{
                      //不是自己的,流轉(zhuǎn)到下一個(gè)節(jié)點(diǎn)
                      super.channelRead(ctx, msg);
                  }
              }
          }

          這種方式將所有業(yè)務(wù)解耦了,每個(gè) Handler 各司其職,所有業(yè)務(wù)不再是都耦合在一起了,后期維護(hù)更加輕松。這種方式和上面方式的一樣,依賴 code,他們都需要服務(wù)端和客戶端都維護(hù)一套 code,而這個(gè) code 如果還不能輕易發(fā)生變更。

          自定義類型

          根據(jù)客戶端提交的參數(shù)類型,自動(dòng)流轉(zhuǎn)到相對應(yīng)的 ChannelHandler。

          public class ChannelHandlerTest extends SimpleChannelInboundHandler<User{
              protected void channelRead0(ChannelHandlerContext channelHandlerContext, User user) throws Exception {
                  // do something
              }
          }

          這種方式是最優(yōu)雅的,也是我們使用最多的方式。他的優(yōu)點(diǎn)明顯,1. 業(yè)務(wù)解耦,2. 不需要維護(hù)碼表。

          總結(jié)

          1. ChannelHandler 是 Netty 中真正做事的組件,EventLoop 將監(jiān)聽到的 I/O 事件轉(zhuǎn)發(fā)后,就由 ChannelHandler 來處理,它也是我們編寫 Netty 代碼最多的地方。
          2. ChannelHandler 作為頂層接口,它一般不會(huì)負(fù)責(zé)具體業(yè)務(wù) I/O 事件,具體的業(yè)務(wù) I/O 事件由它兩個(gè)子接口負(fù)責(zé):
            1. ChannelInboundHandler:負(fù)責(zé)攔截響應(yīng)入站 I/O 事件
            2. ChannelOutboundHandler:負(fù)責(zé)攔截響應(yīng)出站 I/O 事件
          3. 我們自定義的業(yè)務(wù) Handler ,一般不會(huì)直接實(shí)現(xiàn) ChannelInboundHandler 和 ChannelOutboundHandler,而是繼承他們兩個(gè)的默認(rèn)實(shí)現(xiàn)類:ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter,這樣我們就可以只需要關(guān)注我們想關(guān)注的 I/O 事件即可。
          4. 在這節(jié)內(nèi)容中,最重要的是理解 ChannelHandler 的生命周期方法,在文章總已多次總結(jié)了,這里不再闡述了。

          雖然 ChannelHandler 是一個(gè)好的打工人,但是它沒辦法一個(gè)人干所有的活啊,他需要有其他的 ChannelHandler 來配合它,這個(gè)時(shí)候怎么辦?大明哥下篇文章揭曉!

          示例源碼:http://suo.nz/1v8w02


          瀏覽 602
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  夜夜干天天撸时时操 | 国产青青操逼视频 | 俺也搞俺也色俺也干 | 又黑又长的大黑鸡巴免费高清 | 天天插,天天狠,天天透 |