Reactor 高性能設(shè)計模式
Rreactor 模型是 I/O 多路復用的升級版,底層依賴于 Java NIO。不熟悉 Java NIO 技術(shù)的可以看下:《Java NIO:從 Buffer、Channel、Selector 到 Zero-copy、I/O 多路復用,一篇搞定!》。
Reactor是高性能網(wǎng)絡(luò)編程中非常經(jīng)典且重要的一個設(shè)計模式。在很多軟件設(shè)計實現(xiàn)中應(yīng)用很廣泛,像Netty 的設(shè)計中很重要的組成部分就是對 Reactor 模型的一種實現(xiàn);還有 Redis 的實現(xiàn)中也用到了 Reactor 模型,這也是 Redis 之所以底層單線程但是速度卻非??斓脑蛑?。
這里提供兩個非常官方且經(jīng)典的參考資料:
《Scalable IO in Java》由 java.util.concurrent 包的作者 Doug Lea 編寫的一個關(guān)于 Reactor 模型的介紹;
《reactor-siemens》 也是由國外作者編寫的一篇研究 ?Reactor 模型的論文;
需要的小伙伴可以關(guān)注公眾號,公眾號內(nèi)回復:Reactor 獲取下載鏈接!
Reactor 模式介紹
什么是 Reactor 模式
Reactor 模式一般翻譯成反應(yīng)器模式,也有人稱為分發(fā)者模式。是基于事件驅(qū)動的設(shè)計模式,擁有一個或多個并發(fā)輸入源,有一個服務(wù)處理器和多個請求處理器,服務(wù)處理器會同步地將輸入的請求事件以多路復用的方式分發(fā)給相應(yīng)的請求處理器。簡單來說就是 由一個線程來接收所有的請求,然后派發(fā)這些請求到相關(guān)的工作線程中。
為什么使用 Reactor 模式
在 java 中,沒有 NIO 出現(xiàn)之前都是使用 Socket 編程。Socket 接收請求是阻塞的,需要處理完一個請求才能處理下一個請求,所以在面對高并發(fā)的服務(wù)請求時,性能就會很差。
那有人就會說使用多線程(如下圖所示)。接收到一個請求,就創(chuàng)建一個線程處理,這樣就不會阻塞了。實際上這樣的確是可以在提升性能上起到一定的作用,但是當請求很多的時候,就會創(chuàng)建大量的線程,維護線程需要資源的消耗,線程之間的切換也需要消耗性能。而且系統(tǒng)創(chuàng)建線程的數(shù)量也是有限的,所以當高并發(fā)時,會直接把系統(tǒng)拖垮。

因此,基于 Java,Doug Lea 提出了三種形式的 Reactor 模式:單 Reactor 單線程、單 Reactor 多線程和多 Reactor 多線程。
在 Reactor 模式中有三個重要的角色:
Reactor:負責響應(yīng)事件,將事件分發(fā)到綁定了對應(yīng)事件的Handler,如果是連接事件,則分發(fā)到Acceptor;Handler:事件處理器。負責執(zhí)行對應(yīng)事件對應(yīng)的業(yè)務(wù)邏輯;Acceptor:綁定了connect事件,當客戶端發(fā)起connect請求時,Reactor會將accept事件分發(fā)給Acceptor處理;
單 Reactor 單線程版本
單Reactor單線程只有一個 Selector 循環(huán)接受請求,客戶端注冊進來由 Reactor 接收注冊事件,然后再由 Reactor 分發(fā)出去,由對應(yīng)的 Handler 進行業(yè)務(wù)邏輯處理。
偽代碼實例
class?Reactor?implements?Runnable?{
????final?Selector?selector;
????final?ServerSocketChannel?serverSocket;
????
????Reactor(int?port)?throws?IOException?{
????????selector?=?Selector.open();
????????serverSocket?=?ServerSocketChannel.open();
????????serverSocket.socket().bind(new?InetSocketAddress(port));
????????serverSocket.configureBlocking(false);
????????SelectionKey?sk?=?serverSocket.register(selector,SelectionKey.OP_ACCEPT);
????????sk.attach(new?Acceptor());
????}
????
????
????public?void?run()?{
????????try?{
????????????while(!Thread.interrupted())?{
????????????????selector.select();
????????????????Set?selected?=?selector.selectedKeys();
????????????????Iterator?it?=?selected.iterator();
????????????????while(it.hasNext())?{
????????????????????dispatch((SelectionKey)(it.next()))
????????????????}
????????????????selected.clear();
????????????}
????????}catch(IOException?e){
????????????
????????}
????}
????
????void?dispatch(SelectionKey?k)?{
????????Runnable?r?=?(Runnable)(k.attachment());
????????if?(r?!=?null){
????????????r.run();
????????}
????}
????
????class?Acceptor?implements?Runnable?{
????????public?void?run()?{
????????????try{
????????????????SocketChannel?c?=??serverSocket.accept();
????????????????if?(c?!=?null){
????????????????????new?Handler(selector,?c);
????????????????}
????????????}catch(IOException?e)?{
????????????????
????????????}
????????}
????}
}
final?class?Handler?implements?Runnable?{
????final?SocketChannel?socket;
????final?SelectionKey?sk;
????ByteBuffer?input?=?ByteBuffer.allocate(MAXIN);
????ByteBuffer?output?=?ByteBuffer.allocate(MAXOUT);
????static?final?int?READING?=?0,?SENDING?=?1;
????int?state?=?READING;
????
????Handler(Selector?sel,?SocketChannel?c)?throws?IOException?{
????????socket?=?c;
????????c.configureBlocking(false);
????????//?optionally?try?first?read?now
????????sk?=?socket.register(sel,0);
????????sk.attach(this);
????????sk.interestOps(SelectionKey.OP_READ);
????????/**
?????????*?selector.wakeup();?喚醒阻塞在select方法上的線程,使其立即返回
?????????*/
????????sel.wakeup();
????}
????
????boolean?inputIsComplete(){/*……*/}
????boolean?outputIsComplete(){/*……*/}
????void?process(){/*……*/}
????????
????public?void?run()?{
????????try{
????????????if?(state?==?READING){
????????????????read();
????????????}else?if(state?==?SENDING){
????????????????send();
????????????}
????????}catch(IOException?e){
????????????
????????}
????}
????
????void?read()?throws?IOException{
????????socket.read(input);
????????if(inputIsComplete()){
????????????process();
????????????state?=?SENDING;
????????????//?Normally?also?do?first?write?now
????????????sk.interestOps(SelectionKey.OP_WRITE);
????????}
????}?
????
????void?send()?throws?IOException{
????????socket.write(output);
????????if(outputIsComplete()){
????????????sk.cacel();
????????}
????}
}
這里需要注意的兩點是
Selector.wakeup()方法的作用是:喚醒阻塞在select方法上的線程,使其立即返回。- 在
Reactor.dispatch()方法中,調(diào)用的是任務(wù)的run方法,同步執(zhí)行。
單線程的問題實際上是很明顯的。只要其中一個 Handler 方法阻塞了,那就會導致所有的 client 的 Handler 都被阻塞了,也會導致注冊事件也無法處理,無法接收新的請求。所以這種模式用的比較少,因為不能充分利用到多核的資源。因此,這種模式僅僅只能處理 Handler 比較快速完成的場景。
單 Reactor 多線程版本
單Reactor多線程在多線程 Reactor 中,注冊接收事件都是由 Reactor 來做,其它的計算,編解碼由一個線程池來做。從圖中可以看出工作線程是多線程的,監(jiān)聽注冊事件的 Reactor 還是單線程。
偽代碼示例
public?class?Handler?implements?Runnable{
????final?SocketChannel?socket;
????final?SelectionKey?sk;
????ByteBuffer?input?=?ByteBuffer.allocate(Integer.MAX_VALUE);
????ByteBuffer?output?=?ByteBuffer.allocate(Integer.MAX_VALUE);
????static?final?int?READING?=?0,?SENDING?=?1;
????int?state?=?READING;
????static?ExecutorService?pool?=?Executors.newCachedThreadPool();
????Handler(Selector?sel,?SocketChannel?c)?throws?IOException?{
????????socket?=?c;
????????c.configureBlocking(false);
????????//?optionally?try?first?read?now
????????sk?=?socket.register(sel,0);
????????sk.attach(this);
????????sk.interestOps(SelectionKey.OP_READ);
????????sel.wakeup();
????}
????boolean?inputIsComplete(){return?true;}
????boolean?outputIsComplete(){return?true;}
????void?process(){}
????public?void?run()?{
????????try{
????????????if?(state?==?READING){
????????????????read();
????????????}else?if(state?==?SENDING){
????????????????send();
????????????}
????????}catch(IOException?e){
????????}
????}
????void?send()?throws?IOException?{
????????socket.write(output);
????????if(outputIsComplete()){
????????????sk.cancel();
????????}
????}
????synchronized?void?read()??throws?IOException{
????????socket.read(input);
????????if(inputIsComplete()){
????????????pool.execute(new?Processer());
????????}
????}
????synchronized?void?processAndHandOff()?{
????????process();
????????sk.attach(this);
????????sk.interestOps(SelectionKey.OP_WRITE);
????}
????class?Processer?implements?Runnable?{
????????@Override
????????public?void?run()?{
????????????processAndHandOff();
????????}
????}
}
對于 Reactor 部分,代碼不需要調(diào)整,因為也是單 Reactor ,Handler 部分增加了線程池的支持。
對比單 Reactor 單線程模型,多線程 Reactor 模式在 Handler 讀寫處理時,交給工作線程池處理,可以充分利用多核cpu的處理能力,因為 Reactor 分發(fā)和 Handler 處理是分開的,不會導致 Reactor 無法執(zhí)行。從而提升應(yīng)用的性能。缺點是 Reactor 只在主線程中運行,承擔所有事件的監(jiān)聽和響應(yīng),如果短時間的高并發(fā)場景下,依然會造成性能瓶頸。
多 Reactor 多線程版本
多Reactor多線程也稱為主從 Reactor 模式,在這種模式下,一般會有兩個 Reactor:mainReactor 和 subReactor。mainReactor 負責監(jiān)聽客戶端請求,專門處理新連接的建立,再將建立好的連接注冊到 subReactor。subReactor 將分配的連接加入到隊列進行監(jiān)聽,當有新的事件發(fā)生時,會調(diào)用連接相對應(yīng)的 Handler 進行業(yè)務(wù)處理。
這樣的模型使得每個模塊更加專一,耦合度更低,能支持更高的并發(fā)量。許多框架也使用這種模式。
Reactor 模式的優(yōu)點
- 響應(yīng)快,不必為單個同步時間所阻塞,雖然
Reactor本身依然是同步的。 - 可以最大程度地避免復雜的多線程及同步問題,并且避免多線程/進程的切換開銷。
- 擴展性好,可以方便地通過增加
Reactor實例個數(shù)來充分利用CPU資源。 - 復用性好,
Reactor模式本身與具體事件處理邏輯無關(guān),具有很高的復用性。


