<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Reactor 高性能設(shè)計模式

          共 6623字,需瀏覽 14分鐘

           ·

          2022-08-14 15:46

          Reactor 模式

          Rreactor 模型是 I/O 多路復用的升級版,底層依賴于 Java NIO。不熟悉 Java NIO 技術(shù)的可以看下:《Java NIO:從 Buffer、Channel、Selector 到 Zero-copy、I/O 多路復用,一篇搞定!》。

          Reactor是高性能網(wǎng)絡(luò)編程中非常經(jīng)典且重要的一個設(shè)計模式。在很多軟件設(shè)計實現(xiàn)中應(yīng)用很廣泛,像Netty 的設(shè)計中很重要的組成部分就是對 Reactor 模型的一種實現(xiàn);還有 Redis 的實現(xiàn)中也用到了 Reactor 模型,這也是 Redis 之所以底層單線程但是速度卻非??斓脑蛑?。

          這里提供兩個非常官方且經(jīng)典的參考資料:

          Scalable IO in Java》由 java.util.concurrent 包的作者 Doug Lea 編寫的一個關(guān)于 Reactor 模型的介紹;

          reactor-siemens》 也是由國外作者編寫的一篇研究 ?Reactor 模型的論文;

          需要的小伙伴可以關(guān)注公眾號,公眾號內(nèi)回復:Reactor 獲取下載鏈接!

          Reactor 模式介紹

          什么是 Reactor 模式

          Reactor 模式一般翻譯成反應(yīng)器模式,也有人稱為分發(fā)者模式。是基于事件驅(qū)動的設(shè)計模式,擁有一個或多個并發(fā)輸入源,有一個服務(wù)處理器和多個請求處理器,服務(wù)處理器會同步地將輸入的請求事件以多路復用的方式分發(fā)給相應(yīng)的請求處理器。簡單來說就是 由一個線程來接收所有的請求,然后派發(fā)這些請求到相關(guān)的工作線程中。

          為什么使用 Reactor 模式

          java 中,沒有 NIO 出現(xiàn)之前都是使用 Socket 編程。Socket 接收請求是阻塞的,需要處理完一個請求才能處理下一個請求,所以在面對高并發(fā)的服務(wù)請求時,性能就會很差。

          那有人就會說使用多線程(如下圖所示)。接收到一個請求,就創(chuàng)建一個線程處理,這樣就不會阻塞了。實際上這樣的確是可以在提升性能上起到一定的作用,但是當請求很多的時候,就會創(chuàng)建大量的線程,維護線程需要資源的消耗,線程之間的切換也需要消耗性能。而且系統(tǒng)創(chuàng)建線程的數(shù)量也是有限的,所以當高并發(fā)時,會直接把系統(tǒng)拖垮。

          6ecb081aa3b7e22408262674075d227a.webp

          因此,基于 Java,Doug Lea 提出了三種形式的 Reactor 模式:單 Reactor 單線程、單 Reactor 多線程和多 Reactor 多線程。

          Reactor 模式中有三個重要的角色:

          • Reactor:負責響應(yīng)事件,將事件分發(fā)到綁定了對應(yīng)事件的 Handler,如果是連接事件,則分發(fā)到 Acceptor;
          • Handler:事件處理器。負責執(zhí)行對應(yīng)事件對應(yīng)的業(yè)務(wù)邏輯;
          • Acceptor:綁定了 connect 事件,當客戶端發(fā)起 connect 請求時,Reactor 會將 accept 事件分發(fā)給 Acceptor 處理;

          單 Reactor 單線程版本

          2f70d1e5484265c9e59e954f30acf1d0.webp單Reactor單線程

          只有一個 Selector 循環(huán)接受請求,客戶端注冊進來由 Reactor 接收注冊事件,然后再由 Reactor 分發(fā)出去,由對應(yīng)的 Handler 進行業(yè)務(wù)邏輯處理。

          偽代碼實例

          class?Reactor?implements?Runnable?{
          ????final?Selector?selector;
          ????final?ServerSocketChannel?serverSocket;
          ????
          ????Reactor(int?port)?throws?IOException?{
          ????????selector?=?Selector.open();
          ????????serverSocket?=?ServerSocketChannel.open();
          ????????serverSocket.socket().bind(new?InetSocketAddress(port));
          ????????serverSocket.configureBlocking(false);
          ????????SelectionKey?sk?=?serverSocket.register(selector,SelectionKey.OP_ACCEPT);
          ????????sk.attach(new?Acceptor());
          ????}
          ????
          ????
          ????public?void?run()?{
          ????????try?{
          ????????????while(!Thread.interrupted())?{
          ????????????????selector.select();
          ????????????????Set?selected?=?selector.selectedKeys();
          ????????????????Iterator?it?=?selected.iterator();
          ????????????????while(it.hasNext())?{
          ????????????????????dispatch((SelectionKey)(it.next()))
          ????????????????}
          ????????????????selected.clear();
          ????????????}
          ????????}catch(IOException?e){
          ????????????
          ????????}
          ????}
          ????
          ????void?dispatch(SelectionKey?k)?{
          ????????Runnable?r?=?(Runnable)(k.attachment());
          ????????if?(r?!=?null){
          ????????????r.run();
          ????????}
          ????}
          ????
          ????class?Acceptor?implements?Runnable?{
          ????????public?void?run()?{
          ????????????try{
          ????????????????SocketChannel?c?=??serverSocket.accept();
          ????????????????if?(c?!=?null){
          ????????????????????new?Handler(selector,?c);
          ????????????????}
          ????????????}catch(IOException?e)?{
          ????????????????
          ????????????}
          ????????}
          ????}

          }



          final?class?Handler?implements?Runnable?{
          ????final?SocketChannel?socket;
          ????final?SelectionKey?sk;
          ????ByteBuffer?input?=?ByteBuffer.allocate(MAXIN);
          ????ByteBuffer?output?=?ByteBuffer.allocate(MAXOUT);
          ????static?final?int?READING?=?0,?SENDING?=?1;
          ????int?state?=?READING;
          ????
          ????Handler(Selector?sel,?SocketChannel?c)?throws?IOException?{
          ????????socket?=?c;
          ????????c.configureBlocking(false);
          ????????//?optionally?try?first?read?now
          ????????sk?=?socket.register(sel,0);
          ????????sk.attach(this);
          ????????sk.interestOps(SelectionKey.OP_READ);
          ????????/**
          ?????????*?selector.wakeup();?喚醒阻塞在select方法上的線程,使其立即返回
          ?????????*/

          ????????sel.wakeup();
          ????}
          ????
          ????boolean?inputIsComplete(){/*……*/}
          ????boolean?outputIsComplete(){/*……*/}
          ????void?process(){/*……*/}
          ????????
          ????public?void?run()?{
          ????????try{
          ????????????if?(state?==?READING){
          ????????????????read();
          ????????????}else?if(state?==?SENDING){
          ????????????????send();
          ????????????}
          ????????}catch(IOException?e){
          ????????????
          ????????}
          ????}
          ????
          ????void?read()?throws?IOException{
          ????????socket.read(input);
          ????????if(inputIsComplete()){
          ????????????process();
          ????????????state?=?SENDING;
          ????????????//?Normally?also?do?first?write?now
          ????????????sk.interestOps(SelectionKey.OP_WRITE);
          ????????}
          ????}?
          ????
          ????void?send()?throws?IOException{
          ????????socket.write(output);
          ????????if(outputIsComplete()){
          ????????????sk.cacel();
          ????????}
          ????}
          }

          這里需要注意的兩點是

          • Selector.wakeup() 方法的作用是:喚醒阻塞在select方法上的線程,使其立即返回。
          • Reactor.dispatch() 方法中,調(diào)用的是任務(wù)的 run 方法,同步執(zhí)行。

          單線程的問題實際上是很明顯的。只要其中一個 Handler 方法阻塞了,那就會導致所有的 clientHandler 都被阻塞了,也會導致注冊事件也無法處理,無法接收新的請求。所以這種模式用的比較少,因為不能充分利用到多核的資源。因此,這種模式僅僅只能處理 Handler 比較快速完成的場景。

          單 Reactor 多線程版本

          11a6d51ced5f2a58530e20c7b6ef4a7a.webp單Reactor多線程

          在多線程 Reactor 中,注冊接收事件都是由 Reactor 來做,其它的計算,編解碼由一個線程池來做。從圖中可以看出工作線程是多線程的,監(jiān)聽注冊事件的 Reactor 還是單線程。

          偽代碼示例

          public?class?Handler?implements?Runnable{

          ????final?SocketChannel?socket;
          ????final?SelectionKey?sk;
          ????ByteBuffer?input?=?ByteBuffer.allocate(Integer.MAX_VALUE);
          ????ByteBuffer?output?=?ByteBuffer.allocate(Integer.MAX_VALUE);
          ????static?final?int?READING?=?0,?SENDING?=?1;
          ????int?state?=?READING;

          ????static?ExecutorService?pool?=?Executors.newCachedThreadPool();

          ????Handler(Selector?sel,?SocketChannel?c)?throws?IOException?{
          ????????socket?=?c;
          ????????c.configureBlocking(false);
          ????????//?optionally?try?first?read?now
          ????????sk?=?socket.register(sel,0);
          ????????sk.attach(this);
          ????????sk.interestOps(SelectionKey.OP_READ);
          ????????sel.wakeup();
          ????}

          ????boolean?inputIsComplete(){return?true;}
          ????boolean?outputIsComplete(){return?true;}
          ????void?process(){}

          ????public?void?run()?{
          ????????try{
          ????????????if?(state?==?READING){
          ????????????????read();
          ????????????}else?if(state?==?SENDING){
          ????????????????send();
          ????????????}
          ????????}catch(IOException?e){

          ????????}
          ????}

          ????void?send()?throws?IOException?{
          ????????socket.write(output);
          ????????if(outputIsComplete()){
          ????????????sk.cancel();
          ????????}
          ????}

          ????synchronized?void?read()??throws?IOException{
          ????????socket.read(input);
          ????????if(inputIsComplete()){
          ????????????pool.execute(new?Processer());
          ????????}
          ????}

          ????synchronized?void?processAndHandOff()?{
          ????????process();
          ????????sk.attach(this);
          ????????sk.interestOps(SelectionKey.OP_WRITE);
          ????}

          ????class?Processer?implements?Runnable?{
          ????????@Override
          ????????public?void?run()?{
          ????????????processAndHandOff();
          ????????}
          ????}
          }

          對于 Reactor 部分,代碼不需要調(diào)整,因為也是單 Reactor ,Handler 部分增加了線程池的支持。

          對比單 Reactor 單線程模型,多線程 Reactor 模式在 Handler 讀寫處理時,交給工作線程池處理,可以充分利用多核cpu的處理能力,因為 Reactor 分發(fā)和 Handler 處理是分開的,不會導致 Reactor 無法執(zhí)行。從而提升應(yīng)用的性能。缺點是 Reactor 只在主線程中運行,承擔所有事件的監(jiān)聽和響應(yīng),如果短時間的高并發(fā)場景下,依然會造成性能瓶頸。

          多 Reactor 多線程版本

          3980bd7ae29d019d64a602fc864873ea.webp多Reactor多線程

          也稱為主從 Reactor 模式,在這種模式下,一般會有兩個 ReactormainReactorsubReactormainReactor 負責監(jiān)聽客戶端請求,專門處理新連接的建立,再將建立好的連接注冊到 subReactor。subReactor 將分配的連接加入到隊列進行監(jiān)聽,當有新的事件發(fā)生時,會調(diào)用連接相對應(yīng)的 Handler 進行業(yè)務(wù)處理。

          這樣的模型使得每個模塊更加專一,耦合度更低,能支持更高的并發(fā)量。許多框架也使用這種模式。

          Reactor 模式的優(yōu)點

          1. 響應(yīng)快,不必為單個同步時間所阻塞,雖然 Reactor 本身依然是同步的。
          2. 可以最大程度地避免復雜的多線程及同步問題,并且避免多線程/進程的切換開銷。
          3. 擴展性好,可以方便地通過增加 Reactor 實例個數(shù)來充分利用 CPU 資源。
          4. 復用性好,Reactor 模式本身與具體事件處理邏輯無關(guān),具有很高的復用性。

          ef7519160d67cceac0206496128ceda8.webp

          記得轉(zhuǎn)發(fā)、在看、關(guān)注哦!d8bbe266c6d4cbdc8763515b73e973dd.webp
          瀏覽 54
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲欧洲AV在线 | 国产精品福利小视频 | 亚洲在线色情 | 婷婷影音| 日屄视频免费 |