<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Java開源框架中的設計模式以及應用場景

          共 30696字,需瀏覽 62分鐘

           ·

          2020-09-18 06:29


          前言


          設計模式是軟件設計中常見問題的典型解決方案,你可以通過對其進行定制來解決代碼中的特定設計問題。

          關于設計模式,網上有很多講解。但大部分都是Demo示例,看完有可能還是不知道怎么用。

          本文筆者將從設計模式入手,看一看在優(yōu)秀的Java框架/中間件產品中,不同的設計模式應用場景在哪里。

          一,單例模式


          單例模式是Java中最簡單的設計模式之一,它提供了一種創(chuàng)建對象的最佳方式。這種模式涉及到一個單一的類,該類負責創(chuàng)建自己的對象,同時確保只有單個對象被創(chuàng)建。這個類提供了一種訪問其唯一的對象的方式,可以直接訪問,不需要實例化該類的對象。

          單例模式雖然很簡單,但它的花樣一點都不少,我們一一來看。

          1、餓漢式

          餓漢式,顧名思義,就是我很餓,迫不及待。不管有沒有人用,我先創(chuàng)建了再說。

          比如在Dubbo中的這段代碼,創(chuàng)建一個配置管理器。

          public?class?ConfigManager?{
          ????private?static?final?ConfigManager?configManager?=?new?ConfigManager();?
          ????private?ConfigManager()?{}
          ????public?static?ConfigManager?getInstance()?{
          ????????return?configManager;
          ????}
          }

          又或者在RocketMQ中,創(chuàng)建一個MQ客戶端實例的時候。

          public?class?MQClientManager?{

          ????private?static?MQClientManager?instance?=?new?MQClientManager();
          ????private?MQClientManager()?{}
          ????public?static?MQClientManager?getInstance()?{
          ????????return?instance;
          ????}
          }


          2、懶漢式

          懶漢式是對應餓漢式而言的。它旨在第一次調用才初始化,避免內存浪費。但為了線程安全和性能,一般都會使用雙重檢查鎖的方式來創(chuàng)建。

          我們來看Seata框架中,通過這種方式來創(chuàng)建一個配置類。

          public?class?ConfigurationFactory{

          ????private?static?volatile?Configuration?CONFIG_INSTANCE?=?null;
          ????public?static?Configuration?getInstance()?{
          ????????if?(CONFIG_INSTANCE?==?null)?{
          ????????????synchronized?(Configuration.class)?{
          ????????????????if?(CONFIG_INSTANCE?==?null)?{
          ????????????????????CONFIG_INSTANCE?=?buildConfiguration();
          ????????????????}
          ????????????}
          ????????}
          ????????return?CONFIG_INSTANCE;
          ????}
          }


          3、靜態(tài)內部類

          可以看到,通過雙重檢查鎖的方式來創(chuàng)建單例對象,還是比較復雜的。又是加鎖,又是判斷兩次,還需要加volatile修飾的。

          使用靜態(tài)內部類的方式,可以達到雙重檢查鎖相同的功效,但實現上簡單了。

          在Seata框架中,創(chuàng)建RM事件處理程序器的時候,就使用了靜態(tài)內部類的方式來創(chuàng)建單例對象。

          public?class?DefaultRMHandler?extends?AbstractRMHandler{

          ????protected?DefaultRMHandler()?{
          ????????initRMHandlers();
          ????}
          ????private?static?class?SingletonHolder?{
          ????????private?static?AbstractRMHandler?INSTANCE?=?new?DefaultRMHandler();
          ????}
          ????public?static?AbstractRMHandler?get()?{
          ????????return?DefaultRMHandler.SingletonHolder.INSTANCE;
          ????}
          }

          還有可以通過枚舉的方式來創(chuàng)建單例對象,但這種方式并沒有被廣泛采用,至少筆者在常見的開源框架中沒見過,所以就不再列舉。

          有人說,餓漢式的單例模式不好,不能做到延遲加載,浪費內存。但筆者認為似乎過于吹毛求疵,事實上很多開源框架中,用的最多的就是這種方式。

          如果明確希望實現懶加載效果時,可以考慮用靜態(tài)內部類的方式;如果還有其他特殊的需求,比如創(chuàng)建對象的過程比較繁瑣,可以用雙重檢查鎖的方式。


          二,工廠模式



          工廠模式是Java中最常用的設計模式之一。這種類型的設計模式屬于創(chuàng)建型模式,它提供了一種創(chuàng)建對象的最佳方式。

          簡單來說,在工廠模式中,就是代替new實例化具體類的一種模式。

          1、簡單工廠

          簡單工廠,的確比較簡單,它的作用就是把對象的創(chuàng)建放到一個工廠類中,通過參數來創(chuàng)建不同的對象。

          在分布式事務框架Seata中,如果發(fā)生異常,則需要進行二階段回滾。

          它的過程是,通過事務id找到undoLog記錄,然后解析里面的數據生成SQL,將一階段執(zhí)行的SQL給撤銷掉。

          問題是SQL的種類包含了比如INSERT、UPDATE、DELETE,所以它們反解析的過程也不一樣,就需要不同的執(zhí)行器去解析。

          在Seata中,有一個抽象的撤銷執(zhí)行器,可以生成一條SQL。

          public?abstract?class?AbstractUndoExecutor{
          ????//生成撤銷SQL
          ????protected?abstract?String?buildUndoSQL();
          }

          然后有一個獲取撤銷執(zhí)行器的工廠,根據SQL的類型,創(chuàng)建不同類型的執(zhí)行器并返回。

          public?class?UndoExecutorFactory?{

          ????public?static?AbstractUndoExecutor?getUndoExecutor(String?dbType,?SQLUndoLog?sqlUndoLog)?{
          ????????switch?(sqlUndoLog.getSqlType())?{
          ????????????case?INSERT:
          ????????????????return?new?MySQLUndoInsertExecutor(sqlUndoLog);
          ????????????case?UPDATE:
          ????????????????return?new?MySQLUndoUpdateExecutor(sqlUndoLog);
          ????????????case?DELETE:
          ????????????????return?new?MySQLUndoDeleteExecutor(sqlUndoLog);
          ????????????default:
          ????????????????throw?new?ShouldNeverHappenException();
          ????????}
          ????}
          }

          使用的時候,直接通過工廠類獲取執(zhí)行器。

          AbstractUndoExecutor?undoExecutor?=?UndoExecutorFactory.getUndoExecutor(dataSourceProxy.getDbType(),sqlUndoLog);
          undoExecutor.executeOn(conn);

          簡單工廠模式的優(yōu)點,想必各位都能領會,我們不再贅述。但它還有個小小的缺點:

          一旦有了新的實現類,就需要修改工廠實現,有可能造成工廠邏輯過于復雜,不利于系統(tǒng)的擴展和維護。

          2、工廠方法

          工廠方法模式解決了上面那個問題。它可以創(chuàng)建一個工廠接口和多個工廠實現類,這樣如果增加新的功能,只需要添加新的工廠類就可以,不需要修改之前的代碼。

          另外,工廠方法模式還可以和模板方法模式結合一起,將他們共同的基礎邏輯抽取到父類中,其它的交給子類去實現。

          在Dubbo中,有一個關于緩存的設計完美的體現了工廠方法模式+模板方法模式。

          首先,有一個緩存的接口,它提供了設置緩存和獲取緩存兩個方法。

          public?interface?Cache?{
          ????void?put(Object?key,?Object?value);
          ????Object?get(Object?key);
          }

          然后呢,還有一個緩存工廠,它返回一個緩存的實現。

          public?interface?CacheFactory?{
          ????Cache?getCache(URL?url,?Invocation?invocation);
          }

          由于結合了模板方法模式,所以Dubbo又搞了個抽象的緩存工廠類,它實現了緩存工廠的接口。

          public?abstract?class?AbstractCacheFactory?implements?CacheFactory?{
          ????
          ????//具體的緩存實現類
          ????private?final?ConcurrentMap?caches?=?new?ConcurrentHashMap();
          ????
          ????@Override
          ????public?Cache?getCache(URL?url,?Invocation?invocation)?{
          ????????url?=?url.addParameter(Constants.METHOD_KEY,?invocation.getMethodName());
          ????????String?key?=?url.toFullString();
          ????????Cache?cache?=?caches.get(key);
          ????????if?(cache?==?null)?{
          ????????????//創(chuàng)建緩存實現類,交給子類實現
          ????????????caches.put(key,?createCache(url));
          ????????????cache?=?caches.get(key);
          ????????}
          ????????return?cache;
          ????}
          ????//抽象方法,交給子類實現
          ????protected?abstract?Cache?createCache(URL?url);
          }

          在這里,公共的邏輯就是通過getCahce()創(chuàng)建緩存實現類,那具體創(chuàng)建什么樣的緩存實現類,就由子類去決定。

          所以,每個子類都是一個個具體的緩存工廠類,比如包括:

          ExpiringCacheFactory、JCacheFactory、LruCacheFactory、ThreadLocalCacheFactory。
          這些工廠類,只有一個方法,就是創(chuàng)建具體的緩存實現類。
          public?class?ThreadLocalCacheFactory?extends?AbstractCacheFactory?{
          ????@Override
          ????protected?Cache?createCache(URL?url)?{
          ????????return?new?ThreadLocalCache(url);
          ????}
          }

          這里的ThreadLocalCache就是具體的緩存實現類,比如它是通過ThreadLocal來實現緩存功能。

          public?class?ThreadLocalCache?implements?Cache?{

          ????private?final?ThreadLocal>?store;

          ????public?ThreadLocalCache(URL?url)?{
          ????????this.store?=?new?ThreadLocal>()?{
          ????????????@Override
          ????????????protected?Map?initialValue()?{
          ????????????????return?new?HashMap();
          ????????????}
          ????????};
          ????}
          ????@Override
          ????public?void?put(Object?key,?Object?value)?{
          ????????store.get().put(key,?value);
          ????}
          ????@Override
          ????public?Object?get(Object?key)?{
          ????????return?store.get().get(key);
          ????}
          }

          那在客戶端使用的時候,還是通過工廠來獲取緩存對象。

          public?static?void?main(String[]?args)?{
          ????URL?url?=?URL.valueOf("http://localhost:8080/cache=jacache&.cache.write.expire=1");
          ????Invocation?invocation?=?new?RpcInvocation();
          ????CacheFactory?cacheFactory?=?new?ThreadLocalCacheFactory();
          ????Cache?cache?=?cacheFactory.getCache(url,?invocation);
          ????cache.put("java","java");
          ????System.out.println(cache.get("java"));
          }

          這樣做的好處有兩點。

          第一,如果增加新的緩存實現,只要添加一個新的緩存工廠類就可以,別的都無需改動。

          第二,通過模板方法模式,封裝不變部分,擴展可變部分。提取公共代碼,便于維護。

          另外,在Dubbo中,注冊中心的獲取也是通過工廠方法來實現的。

          3、抽象工廠

          抽象工廠模式,它能創(chuàng)建一系列相關的對象,而無需指定其具體類。

          工廠方法模式和抽象工廠模式,它們之間最大的區(qū)別在于:

          • 工廠方法模式只有一個抽象產品類,具體工廠類只能創(chuàng)建一個具體產品類的實例;

          • 抽象工廠模式有多個抽象產品類,具體工廠類可以創(chuàng)建多個具體產品類的實例。


          我們拿上面緩存的例子來繼續(xù)往下說。

          如果我們現在有一個數據訪問程序,需要同時操作緩存和數據庫,那就需要多個抽象產品和多個具體產品實現。

          緩存相關的產品類都已經有了,我們接著來創(chuàng)建數據庫相關的產品實現。

          首先,有一個數據庫接口,它是抽象產品類。

          public?interface?DataBase?{
          ????void?insert(Object?tableName,?Object?record);
          ????Object?select(Object?tableName);
          }

          然后,我們創(chuàng)建兩個具體產品類MysqlDataBase和OracleDataBase。

          public?class?MysqlDataBase?implements?DataBase{
          ????Map?mysqlDb?=?new?HashMap<>();
          ????@Override
          ????public?void?insert(Object?tableName,?Object?record)?{
          ????????mysqlDb.put(tableName,record);
          ????}
          ????@Override
          ????public?Object?select(Object?tableName)?{
          ????????return?mysqlDb.get(tableName);
          ????}
          }

          public?class?OracleDataBase?implements?DataBase?{
          ????Map?oracleDb?=?new?HashMap<>();
          ????@Override
          ????public?void?insert(Object?tableName,?Object?record)?{
          ????????oracleDb.put(tableName,record);
          ????}
          ????@Override
          ????public?Object?select(Object?tableName)?{
          ????????return?oracleDb.get(tableName);
          ????}
          }

          其次,創(chuàng)建抽象的工廠類,它可以返回一個緩存對象和數據庫對象。

          public?interface?DataAccessFactory?{
          ????Cache?getCache(URL?url);
          ????DataBase?getDb();
          }

          最后是具體的工廠類,可以根據實際的需求,任意組合每一個具體的產品。

          比如我們需要一個基于ThreadLocal的緩存實現和基于MySQL的數據庫對象。

          public?class?DataAccessFactory1?implements?DataAccessFactory?{
          ????@Override
          ????public?Cache?getCache(URL?url)?{
          ????????return?new?ThreadLocalCache(url);
          ????}
          ????@Override
          ????public?DataBase?getDb()?{
          ????????return?new?MysqlDataBase();
          ????}
          }

          如果需要一個基于Lru的緩存實現和基于Oracle的數據庫對象。

          public?class?DataAccessFactory2?implements?DataAccessFactory?{
          ????@Override
          ????public?Cache?getCache(URL?url)?{
          ????????return?new?LruCache(url);
          ????}
          ????@Override
          ????public?DataBase?getDb()?{
          ????????return?new?OracleDataBase();
          ????}
          }

          可以看到,抽象工廠模式隔離了具體類的生成,使得客戶并不需要知道什么被創(chuàng)建。由于這種隔離,更換一個具體工廠就變得相對容易,所有的具體工廠都實現了抽象工廠中定義的那些公共接口,因此只需改變具體工廠的實例,就可以在某種程度上改變整個軟件系統(tǒng)的行為。

          三,模板方式模式



          在模板模式中,一個抽象類公開定義了執(zhí)行它的方法的方式/模板。它的子類可以按需要重寫方法實現,但調用將以抽象類中定義的方式進行。

          簡單來說,有多個子類共有的方法,且邏輯相同,可以考慮作為模板方法。

          在上面Dubbo緩存的例子中,我們已經看到了模板方法模式的應用。但那個是和工廠方法模式結合在一塊的,我們再單獨找找模板方法模式的應用。

          我們知道,當我們的Dubbo應用出現多個服務提供者時,服務消費者需要通過負載均衡算法,選擇其中一個服務來進行調用。

          首先,有一個LoadBalance接口,返回一個Invoker。

          public?interface?LoadBalance?{
          ?????Invoker?select(List>?invokers,?URL?url,?Invocation?invocation)?throws?RpcException;
          }

          然后定義一個抽象類,AbstractLoadBalance,實現LoadBalance接口。

          public?abstract?class?AbstractLoadBalance?implements?LoadBalance?{

          ????@Override
          ????public??Invoker?select(List>?invokers,?URL?url,?Invocation?invocation)?{
          ????????if?(invokers?==?null?||?invokers.isEmpty())?{
          ????????????return?null;
          ????????}
          ????????if?(invokers.size()?==?1)?{
          ????????????return?invokers.get(0);
          ????????}
          ????????return?doSelect(invokers,?url,?invocation);
          ????}
          ????//抽象方法,由子類選擇一個Invoker
          ????protected?abstract??Invoker?doSelect(List>?invokers,?URL?url,?Invocation?invocation);
          }

          這里的公共邏輯就是兩個判斷,判斷invokers集合是否為空或者是否只有一個實例。如果是的話,就無需調用子類,直接返回就好了。

          具體的負載均衡實現有四個:

          • 基于權重隨機算法的 RandomLoadBalance

          • 基于最少活躍調用數算法的 LeastActiveLoadBalance

          • 基于 hash 一致性的 ConsistentHashLoadBalance

          • 基于加權輪詢算法的 RoundRobinLoadBalance


          public?class?RandomLoadBalance?extends?AbstractLoadBalance?{
          ????@Override
          ????protected??Invoker?doSelect(List>?invokers,?URL?url,?Invocation?invocation)?{
          ????????//省略邏輯....
          ????????return?invokers.get(ThreadLocalRandom.current().nextInt(length));
          ????}
          }

          它們根據不同的算法實現,來返回一個具體的Invoker對象。

          四,構造器模式


          構造器模式使用多個簡單的對象一步一步構建成一個復雜的對象。這種類型的設計模式屬于創(chuàng)建型模式,它提供了一種創(chuàng)建對象的最佳方式。

          這種模式,常見于在構建一個復雜的對象,里面可能包含一些業(yè)務邏輯,比如檢查,屬性轉換等。如果都在客戶端手動去設置,那么會產生大量的冗余代碼。那么這時候,我們就可以考慮使用構造器模式。

          比如,在MyBatis中,MappedStatement的創(chuàng)建過程就使用了構造器模式。

          我們知道,XML文件中的每一個SQL標簽就要生成一個MappedStatement對象,它里面包含很多個屬性,我們要構造的對象也是它。

          public?final?class?MappedStatement?{
          ????private?String?resource;
          ????private?Configuration?configuration;
          ????private?String?id;
          ????private?SqlSource?sqlSource;
          ????private?ParameterMap?parameterMap;
          ????private?List?resultMaps;
          ????//.....省略大部分屬性
          }

          然后有一個內部類Builder,它負責完成MappedStatement對象的構造。

          首先,這個Builder類,通過默認的構造函數,先完成對MappedStatement對象,部分的構造。

          public?static?class?Builder?{

          ????private?MappedStatement?mappedStatement?=?new?MappedStatement();
          ????
          ????public?Builder(Configuration?configuration,?String?id,?SqlSource?sqlSource,?SqlCommandType?sqlCommandType)?{
          ????????mappedStatement.configuration?=?configuration;
          ????????mappedStatement.id?=?id;
          ????????mappedStatement.sqlSource?=?sqlSource;
          ????????mappedStatement.statementType?=?StatementType.PREPARED;
          ????????mappedStatement.resultSetType?=?ResultSetType.DEFAULT;
          ????????//.....省略大部分過程
          ????}
          }

          然后,通過一系列方法,可以設置特定的屬性,并返回這個Builder類,這里的方法適合處理一些業(yè)務邏輯。

          public?static?class?Builder?{

          ????public?Builder?parameterMap(ParameterMap?parameterMap)?{
          ??????mappedStatement.parameterMap?=?parameterMap;
          ??????return?this;
          ????}
          ????
          ????public?Builder?resultMaps(List?resultMaps)?{
          ??????mappedStatement.resultMaps?=?resultMaps;
          ??????for?(ResultMap?resultMap?:?resultMaps)?{
          ????????mappedStatement.hasNestedResultMaps?=?mappedStatement.hasNestedResultMaps?||?resultMap.hasNestedResultMaps();
          ??????}
          ??????return?this;
          ????}
          ????
          ????public?Builder?statementType(StatementType?statementType)?{
          ??????mappedStatement.statementType?=?statementType;
          ??????return?this;
          ????}

          ????public?Builder?resultSetType(ResultSetType?resultSetType)?{
          ??????mappedStatement.resultSetType?=?resultSetType?==?null???ResultSetType.DEFAULT?:?resultSetType;
          ??????return?this;
          ????}
          }

          最后呢,就是提供一個build方法,返回構建完成的對象就好了。

          public?MappedStatement?build()?{
          ????assert?mappedStatement.configuration?!=?null;
          ????assert?mappedStatement.id?!=?null;
          ????assert?mappedStatement.sqlSource?!=?null;
          ????assert?mappedStatement.lang?!=?null;
          ????mappedStatement.resultMaps?=?Collections.unmodifiableList(mappedStatement.resultMaps);
          ????return?mappedStatement;
          }

          在客戶端使用的時候,先創(chuàng)建一個Builder,然后鏈式的調用一堆方法,最后再調用一次build()方法,我們需要的對象就有了,這就是構造器模式的應用。

          MappedStatement.Builder?statementBuilder?=?new?MappedStatement.Builder(configuration,?id,?sqlSource,?sqlCommandType)
          ????.resource(resource)
          ????.fetchSize(fetchSize)
          ????.timeout(timeout)
          ????.statementType(statementType)
          ????.keyGenerator(keyGenerator)
          ????.keyProperty(keyProperty)
          ????.keyColumn(keyColumn)
          ????.databaseId(databaseId)
          ????.lang(lang)
          ????.resultOrdered(resultOrdered)
          ????.resultSets(resultSets)
          ????.resultMaps(getStatementResultMaps(resultMap,?resultType,?id))
          ????.resultSetType(resultSetType)
          ????.flushCacheRequired(valueOrDefault(flushCache,?!isSelect))
          ????.useCache(valueOrDefault(useCache,?isSelect))
          ????.cache(currentCache);

          ParameterMap?statementParameterMap?=?getStatementParameterMap(parameterMap,?parameterType,?id);
          MappedStatement?statement?=?statementBuilder.build();
          configuration.addMappedStatement(statement);
          return?statement;


          五,適配器模式


          適配器模式是作為兩個不兼容的接口之間的橋梁。這種類型的設計模式屬于結構型模式,它結合了兩個獨立接口的功能。

          適配器模式一般用于屏蔽業(yè)務邏輯與第三方服務的交互,或者是新老接口之間的差異。

          我們知道,在Dubbo中,所有的數據都是通過Netty來負責傳輸的,然后這就涉及了消息編解碼的問題。

          所以,首先它有一個編解碼器的接口,負責編碼和解碼。

          @SPI
          public?interface?Codec2?{

          ????@Adaptive({Constants.CODEC_KEY})
          ????void?encode(Channel?channel,?ChannelBuffer?buffer,?Object?message)?throws?IOException;
          ????
          ????@Adaptive({Constants.CODEC_KEY})
          ????Object?decode(Channel?channel,?ChannelBuffer?buffer)?throws?IOException;
          ????
          ????enum?DecodeResult?{
          ????????NEED_MORE_INPUT,?SKIP_SOME_INPUT
          ????}
          }

          然后,有幾個實現類,比如DubboCountCodec、DubboCodec、ExchangeCodec等。

          但是,當我們打開這些類的時候,就會發(fā)現,他們都是Dubbo中普通的類,只是實現了Codec2接口,其實不能直接作用于Netty編解碼。

          這是因為,Netty編解碼需要實現ChannelHandler接口,這樣才會被聲明成Netty的處理組件。比如像MessageToByteEncoder、ByteToMessageDecoder那樣。

          鑒于此,Dubbo搞了一個適配器,專門來適配編解碼器接口。

          final?public?class?NettyCodecAdapter?{

          ????private?final?ChannelHandler?encoder?=?new?InternalEncoder();
          ????private?final?ChannelHandler?decoder?=?new?InternalDecoder();
          ????private?final?Codec2?codec;
          ????private?final?URL?url;
          ????private?final?org.apache.dubbo.remoting.ChannelHandler?handler;
          ????
          ????public?NettyCodecAdapter(Codec2?codec,?URL?url,?org.apache.dubbo.remoting.ChannelHandler?handler)?{
          ????????this.codec?=?codec;
          ????????this.url?=?url;
          ????????this.handler?=?handler;
          ????}
          ????public?ChannelHandler?getEncoder()?{
          ????????return?encoder;
          ????}
          ????public?ChannelHandler?getDecoder()?{
          ????????return?decoder;
          ????}
          ????
          ????private?class?InternalEncoder?extends?MessageToByteEncoder?{
          ????????@Override
          ????????protected?void?encode(ChannelHandlerContext?ctx,?Object?msg,?ByteBuf?out)?throws?Exception?{
          ????????????org.apache.dubbo.remoting.buffer.ChannelBuffer?buffer?=?new?NettyBackedChannelBuffer(out);
          ????????????Channel?ch?=?ctx.channel();
          ????????????NettyChannel?channel?=?NettyChannel.getOrAddChannel(ch,?url,?handler);
          ????????????codec.encode(channel,?buffer,?msg);
          ????????}
          ????}
          ????private?class?InternalDecoder?extends?ByteToMessageDecoder?{
          ????????@Override
          ????????protected?void?decode(ChannelHandlerContext?ctx,?ByteBuf?input,?List?out)?throws?Exception?{
          ????????????ChannelBuffer?message?=?new?NettyBackedChannelBuffer(input);
          ????????????NettyChannel?channel?=?NettyChannel.getOrAddChannel(ctx.channel(),?url,?handler);
          ????????????//解碼對象
          ????????????codec.decode(channel,?message);
          ????????????//省略部分代碼...
          ????????}
          ????}
          }

          上面的代碼中,我們看到,NettyCodecAdapter類適配的是Codec2接口,通過構造函數傳遞實現類,然后定義了內部的編碼器實現和解碼器實現,同時它們都是ChannelHandler。

          這樣的話,在內部類里面的編碼和解碼邏輯,真正調用的還是Codec2接口。

          最后我們再來看看,該適配器的調用方式。

          //通過SPI方式獲取編解碼器的實現類,比如這里是DubboCountCodec
          Codec2?codec?=?ExtensionLoader.getExtensionLoader(Codec2.class).getExtension("dubbo");
          URL?url?=?new?URL("dubbo",?"localhost",?22226);
          //創(chuàng)建適配器
          NettyCodecAdapter?adapter?=?new?NettyCodecAdapter(codec,?url,?NettyClient.this);
          //向ChannelPipeline中添加編解碼處理器
          ch.pipeline()
          ????.addLast("decoder",?adapter.getDecoder())
          ????.addLast("encoder",?adapter.getEncoder())

          以上,就是Dubbo中關于編解碼器對于適配器模式的應用。


          六,責任鏈模式


          責任鏈模式為請求創(chuàng)建了一個接收者對象的鏈。允許你將請求沿著處理者鏈進行發(fā)送。收到請求后,每個處理者均可對請求進行處理,或將其傳遞給鏈上的下個處理者。

          我們來看一個Netty中的例子。我們知道,在Netty中服務端處理消息,就要添加一個或多個ChannelHandler。那么,承載這些ChannelHandler的就是ChannelPipeline,它的實現過程就體現了責任鏈模式的應用。

          ServerBootstrap?serverBootstrap?=?new?ServerBootstrap();
          serverBootstrap.childHandler(new?ChannelInitializer()?{
          ????protected?void?initChannel(NioSocketChannel?channel)?{
          ????????channel.pipeline()
          ????????????.addLast(new?ChannelHandler1())
          ????????????.addLast(new?ChannelHandler2())
          ????????????.addLast(new?ChannelHandler3());
          ????}
          });

          需要知道的是,在Netty整個框架里面,一條連接對應著一個Channel,每一個新創(chuàng)建的Channel都將會被分配一個新的ChannelPipeline。

          ChannelPipeline里面保存的是ChannelHandlerContext對象,它是Channel相關的上下文對象,里面包著我們定義的處理器ChannelHandler。

          根據事件的起源,IO事件將會被ChannelInboundHandler或者ChannelOutboundHandler處理。隨后,通過調用ChannelHandlerContext實現,它將被轉發(fā)給同一超類型的下一個ChannelHandler。

          1、ChannelHandler

          首先,我們來看責任處理器接口,Netty中的ChannelHandler,它充當了所有處理入站和出站數據的應用程序邏輯的容器。

          public?interface?ChannelHandler?{
          ????//當把?ChannelHandler?添加到?ChannelPipeline?中時被調用
          ????void?handlerAdded(ChannelHandlerContext?ctx)?throws?Exception;
          ????//當從?ChannelPipeline?中移除?ChannelHandler?時被調用
          ????void?handlerRemoved(ChannelHandlerContext?ctx)?throws?Exception;
          ????//當處理過程中在?ChannelPipeline?中有錯誤產生時被調用
          ????void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?throws?Exception;
          }

          然后Netty定義了下面兩個重要的ChannelHandler子接口:

          1、ChannelInboundHandler,處理入站數據以及各種狀態(tài)變化;

          public?interface?ChannelInboundHandler?extends?ChannelHandler?{
          ????//當?Channel?已經注冊到它的?EventLoop?并且能夠處理?I/O?時被調用
          ????void?channelRegistered(ChannelHandlerContext?ctx)?throws?Exception;
          ????//當?Channel?從它的?EventLoop?注銷并且無法處理任何?I/O?時被調用
          ????void?channelUnregistered(ChannelHandlerContext?ctx)?throws?Exception;?
          ????//當 Channel 處于活動狀態(tài)時被調用;Channel 已經連接/綁定并且已經就緒
          ????void?channelActive(ChannelHandlerContext?ctx)?throws?Exception;???
          ????//當?Channel?離開活動狀態(tài)并且不再連接它的遠程節(jié)點時被調用
          ????void?channelInactive(ChannelHandlerContext?ctx)?throws?Exception;??
          ????當從?Channel?讀取數據時被調用
          ????void?channelRead(ChannelHandlerContext?ctx,?Object?msg)?throws?Exception;???
          ????//當?Channel上的一個讀操作完成時被調用
          ????void?channelReadComplete(ChannelHandlerContext?ctx)?throws?Exception;
          ????void?userEventTriggered(ChannelHandlerContext?ctx,?Object?evt)?throws?Exception;?
          ????void?channelWritabilityChanged(ChannelHandlerContext?ctx)?throws?Exception;
          ????void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?throws?Exception;
          }

          2、ChannelOutboundHandler,處理出站數據并且允許攔截所有的操作;

          public?interface?ChannelOutboundHandler?extends?ChannelHandler?{
          ????
          ????//當請求將?Channel?綁定到本地地址時被調用
          ????void?bind(ChannelHandlerContext?ctx,?SocketAddress?localAddress,?ChannelPromise?promise)?throws?Exception;
          ????//當請求將?Channel?連接到遠程節(jié)點時被調用
          ????void?connect(ChannelHandlerContext?ctx,?SocketAddress?remoteAddress,SocketAddress?localAddress,?
          ????????ChannelPromise?promise)?throws?Exception;
          ????//當請求將?Channel?從遠程節(jié)點斷開時被調用
          ????void?disconnect(ChannelHandlerContext?ctx,?ChannelPromise?promise)?throws?Exception;
          ????//當請求關閉?Channel?時被調用
          ????void?close(ChannelHandlerContext?ctx,?ChannelPromise?promise)?throws?Exception;
          ????//當請求將?Channel?從它的?EventLoop?注銷時被調用
          ????void?deregister(ChannelHandlerContext?ctx,?ChannelPromise?promise)?throws?Exception;
          ????//當請求從?Channel?讀取更多的數據時被調用
          ????void?read(ChannelHandlerContext?ctx)?throws?Exception;
          ????//當請求通過?Channel?將數據寫到遠程節(jié)點時被調用
          ????void?write(ChannelHandlerContext?ctx,?Object?msg,?ChannelPromise?promise)?throws?Exception;
          ????//當請求通過?Channel?將入隊數據沖刷到遠程節(jié)點時被調用
          ????void?flush(ChannelHandlerContext?ctx)?throws?Exception;
          }


          2、ChannelPipeline

          既然叫做責任鏈模式,那就需要有一個“鏈”,在Netty中就是ChannelPipeline。

          ChannelPipeline提供了ChannelHandler鏈的容器,并定義了用于在該鏈上傳播入站和出站事件流的方法,另外它還具有添加刪除責任處理器接口的功能。

          public?interface?ChannelPipeline{
          ????ChannelPipeline?addFirst(String?name,?ChannelHandler?handler);
          ????ChannelPipeline?addLast(String?name,?ChannelHandler?handler);
          ????ChannelPipeline?addBefore(String?baseName,?String?name,?ChannelHandler?handler);
          ????ChannelPipeline?addAfter(String?baseName,?String?name,?ChannelHandler?handler);
          ????ChannelPipeline?remove(ChannelHandler?handler);
          ????ChannelHandler?replace(String?oldName,?String?newName,?ChannelHandler?newHandler);
          ????@Override
          ????ChannelPipeline?fireChannelRegistered();
          ????@Override
          ????ChannelPipeline?fireChannelActive();
          ????@Override
          ????ChannelPipeline?fireExceptionCaught(Throwable?cause);
          ????@Override
          ????ChannelPipeline?fireUserEventTriggered(Object?event);
          ????@Override
          ????ChannelPipeline?fireChannelRead(Object?msg);
          ????@Override
          ????ChannelPipeline?flush();
          ????//省略部分方法.....
          }

          然后我們看它的實現,默認有兩個節(jié)點,頭結點和尾結點。并在構造函數中,使它們首尾相連。這就是標準的鏈式結構。

          public?class?DefaultChannelPipeline?implements?ChannelPipeline?{

          ????final?AbstractChannelHandlerContext?head;
          ????final?AbstractChannelHandlerContext?tail;
          ????private?final?Channel?channel;
          ????
          ????protected?DefaultChannelPipeline(Channel?channel)?{
          ????????this.channel?=?ObjectUtil.checkNotNull(channel,?"channel");
          ????????tail?=?new?TailContext(this);
          ????????head?=?new?HeadContext(this);
          ????????head.next?=?tail;
          ????????tail.prev?=?head;
          ????}
          }

          當有新的ChannelHandler被添加時,則將其封裝為ChannelHandlerContext對象,然后插入到鏈表中。

          private?void?addLast0(AbstractChannelHandlerContext?newCtx)?{
          ????AbstractChannelHandlerContext?prev?=?tail.prev;
          ????newCtx.prev?=?prev;
          ????newCtx.next?=?tail;
          ????prev.next?=?newCtx;
          ????tail.prev?=?newCtx;
          }


          3、ChannelHandlerContext

          ChannelHandlerContext代表了ChannelHandler和ChannelPipeline之間的關聯,每當有ChannelHandler添加到ChannelPipeline中時,都會創(chuàng)建ChannelHandlerContext。

          ChannelHandlerContext的主要功能是管理它所關聯的ChannelHandler和在同一個ChannelPipeline中的其他ChannelHandler之間的交互。

          public?interface?ChannelHandlerContext{
          ????Channel?channel();
          ????EventExecutor?executor();
          ????ChannelHandler?handler();
          ????ChannelPipeline?pipeline();
          ????@Override
          ????ChannelHandlerContext?fireChannelRegistered();
          ????@Override
          ????ChannelHandlerContext?fireChannelUnregistered();
          ????@Override
          ????ChannelHandlerContext?fireChannelActive();
          ????@Override
          ????ChannelHandlerContext?fireChannelRead(Object?msg);
          ????@Override
          ????ChannelHandlerContext?read();
          ????@Override
          ????ChannelHandlerContext?flush();
          ????//省略部分方法……
          }

          ChannelHandlerContext負責在鏈上傳播責任處理器接口的事件。

          它有兩個重要的方法,查找Inbound類型和Outbound類型的處理器。

          值得注意的是,如果一個入站事件被觸發(fā),它將被從ChannelPipeline的頭部開始一直被傳播到ChannelPipeline的尾端;一個出站事件將從ChannelPipeline的最右邊開始,然后向左傳播。

          abstract?class?AbstractChannelHandlerContext?implements?ChannelHandlerContext,?ResourceLeakHint?{
          ????
          ????volatile?AbstractChannelHandlerContext?next;
          ????volatile?AbstractChannelHandlerContext?prev;
          ????
          ????//查找下一個Inbound類型的處理器,左?>?右
          ????private?AbstractChannelHandlerContext?findContextInbound(int?mask)?{
          ????????AbstractChannelHandlerContext?ctx?=?this;
          ????????EventExecutor?currentExecutor?=?executor();
          ????????do?{
          ????????????ctx?=?ctx.next;
          ????????}?while?(skipContext(ctx,?currentExecutor,?mask,?MASK_ONLY_INBOUND));
          ????????return?ctx;
          ????}
          ????//查找下一個Outbound類型的處理器,右?>?左
          ????private?AbstractChannelHandlerContext?findContextOutbound(int?mask)?{
          ????????AbstractChannelHandlerContext?ctx?=?this;
          ????????EventExecutor?currentExecutor?=?executor();
          ????????do?{
          ????????????ctx?=?ctx.prev;
          ????????}?while?(skipContext(ctx,?currentExecutor,?mask,?MASK_ONLY_OUTBOUND));
          ????????return?ctx;
          ????}
          }


          4、處理流程

          當我們向服務端發(fā)送消息的時候,將會觸發(fā)read方法。

          public?abstract?class?AbstractNioByteChannel?extends?AbstractNioChannel?{
          ????public?final?void?read()?{??
          ????????//從Channel中獲取對應的ChannelPipeline
          ????????final?ChannelPipeline?pipeline?=?pipeline();
          ????????//數據載體
          ????????ByteBuf?byteBuf?=?allocHandle.allocate(allocator);
          ????????//傳遞數據
          ????????pipeline.fireChannelRead(byteBuf);
          ????}
          }

          上面的代碼中,就會調用到ChannelPipeline,它會從Head節(jié)點開始,根據上下文對象依次調用處理器。

          public?class?DefaultChannelPipeline?implements?ChannelPipeline?{
          ????public?final?ChannelPipeline?fireChannelRead(Object?msg)?{
          ????????AbstractChannelHandlerContext.invokeChannelRead(head,?msg);
          ????????return?this;
          ????}
          }

          因為第一個節(jié)點是默認的頭結點HeadContext,所以它是從ChannelHandlerContext開始的。

          abstract?class?AbstractChannelHandlerContext?implements?ChannelHandlerContext,?ResourceLeakHint?{
          ????//找到下一個ChannelHandler并執(zhí)行
          ????public?ChannelHandlerContext?fireChannelRead(final?Object?msg)?{
          ????????invokeChannelRead(findContextInbound(MASK_CHANNEL_READ),?msg);
          ????????return?this;
          ????}
          ????
          }

          然后在我們自定義的ChannelHandler中,就會被調用到。
          public?class?ChannelHandler1?extends?ChannelInboundHandlerAdapter?{
          ????public?void?channelRead(ChannelHandlerContext?ctx,?Object?msg){
          ????????System.out.println("ChannelHandler1:"+msg);
          ????????ctx.fireChannelRead(msg);
          ????}
          }
          如果消息有多個ChannelHandler,你可以自由選擇是否繼續(xù)往下傳遞請求。

          比如,如果你認為消息已經被處理且不應該繼續(xù)往下調用,把上面的ctx.fireChannelRead(msg);注釋掉就終止了整個責任鏈。

          ?

          七,策略模式



          該模式定義了一系列算法,并將每個算法封裝起來,使它們可以相互替換,且算法的變化不會影響使用算法的客戶。

          策略模式是一個很常見,而且也很好用的設計模式,如果你的業(yè)務代碼中有大量的if...else,那么就可以考慮是否可以使用策略模式改造一下。

          RocketMQ我們大家都熟悉,是一款優(yōu)秀的分布式消息中間件。消息中間件,簡單來說,就是客戶端發(fā)送一條消息,服務端存儲起來并提供給消費者去消費。

          請求消息的類型多種多樣,處理過程肯定也不一樣,每次都判斷一下再處理就落了下乘。在RocketMQ里,它會把所有處理器注冊起來,然后根據請求消息的code,讓對應的處理器處理請求,這就是策略模式的應用。

          首先,它們需要實現同一個接口,在這里就是請求處理器接口。

          public?interface?NettyRequestProcessor?{
          ????RemotingCommand?processRequest(ChannelHandlerContext?ctx,?RemotingCommand?request)throws?Exception;
          ????boolean?rejectRequest();
          }

          這個接口只做一件事,就是處理來自客戶端的請求。不同類型的請求封裝成不同的RemotingCommand對象。

          RocketMQ大概有90多種請求類型,都在RequestCode里以code來區(qū)分。

          然后,定義一系列策略類。我們來看幾個。

          //默認的消息處理器
          public?class?DefaultRequestProcessor?implements?NettyRequestProcessor?{}
          //發(fā)送消息的處理器
          public?class?SendMessageProcessor?extends?AbstractSendMessageProcessor?implements?NettyRequestProcessor?{}
          //拉取消息的處理器
          public?class?PullMessageProcessor?implements?NettyRequestProcessor?{}
          //查詢消息的處理器
          public?class?QueryMessageProcessor?implements?NettyRequestProcessor?{}
          //消費者端管理的處理器
          public?class?ConsumerManageProcessor?implements?NettyRequestProcessor?{}

          接著,將這些策略類封裝起來。在RocketMQ中,在啟動Broker服務器的時候,注冊這些處理器。

          public?class?BrokerController?{

          ????public?void?registerProcessor()?{
          ????
          ????????SendMessageProcessor?sendProcessor?=?new?SendMessageProcessor(this);
          ????????this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE,?sendProcessor,?this.sendMessageExecutor);
          ????????this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2,?sendProcessor,?this.sendMessageExecutor);
          ????????this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE,this.pullMessageProcessor,this.pullMessageExecutor);
          ????????
          ????????this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE,?replyMessageProcessor,?replyMessageExecutor);
          ????????this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2,?replyMessageProcessor,?replyMessageExecutor);
          ????????
          ????????NettyRequestProcessor?queryProcessor?=?new?QueryMessageProcessor(this);
          ????????this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE,?queryProcessor,?this.queryMessageExecutor);
          ????????this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID,?queryProcessor,?this.queryMessageExecutor);
          ????????
          ????????ClientManageProcessor?clientProcessor?=?new?ClientManageProcessor(this);
          ????????this.remotingServer.registerProcessor(RequestCode.HEART_BEAT,?clientProcessor,?this.heartbeatExecutor);
          ????????this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT,?clientProcessor,?this.clientManageExecutor);
          ????????//省略部分注冊過程.....
          ????}
          }

          最后,在Netty接收到客戶端的請求之后,就會根據消息的類型,找到對應的策略類,去處理消息。

          public?abstract?class?NettyRemotingAbstract?{

          ????public?void?processRequestCommand(final?ChannelHandlerContext?ctx,?final?RemotingCommand?cmd)?{
          ????????//根據請求類型找到對應的策略類
          ????????final?Pair?matched?=?this.processorTable.get(cmd.getCode());
          ????????//如果沒有找到就使用默認的
          ????????final?Pair?pair?=?
          ????????????????????null?==?matched???this.defaultRequestProcessor?:?matched;
          ????????//執(zhí)行策略
          ????????final?RemotingCommand?response?=?pair.getObject1().processRequest(ctx,?cmd);
          ????????//省略大部分代碼......
          ????}
          }

          如果有了新的請求消息類型,RocketMQ也無需修改業(yè)務代碼,新增策略類并將其注冊進來就好了。



          八,代理模式



          代理模式,為其他對象提供一種代理以控制對這個對象的訪問。

          在一些開源框架或中間件產品中,代理模式會非常常見。我們使用的時候越簡便,框架在背后幫我們做的事就可能越復雜。這里面往往都體現著代理模式的應用,頗有移花接木的味道。

          1、Dubbo

          Dubbo作為一個RPC框架,其中有一個很重要的功能就是:

          提供高性能的基于代理的遠程調用能力,服務以接口為粒度,為開發(fā)者屏蔽遠程調用底層細節(jié)。

          這里我們關注兩個重點:

          • 面向接口代理;

          • 屏蔽調用底層細節(jié)。


          比如我們有一個庫存服務,它提供一個扣減庫存的接口。

          public?interface?StorageDubboService?{
          ????int?decreaseStorage(StorageDTO?storage);
          }

          在別的服務里,需要扣減庫存的時候,就會通過Dubbo引用這個接口,也比較簡單。

          @Reference
          StorageDubboService?storageDubboService;

          我們使用起來很簡單,可StorageDubboService只是一個普通的服務類,并不具備遠程調用的能力。

          Dubbo就是給這些服務類,創(chuàng)建了代理類。通過ReferenceBean來創(chuàng)建并返回一個代理對象。
          public?class?ReferenceBean{
          ????@Override
          ????public?Object?getObject()?{
          ????????return?get();
          ????}
          ????public?synchronized?T?get()?{
          ????????if?(ref?==?null)?{
          ????????????init();
          ????????}
          ????????return?ref;
          ????}
          }
          在我們使用的時候,實則調用的是代理對象,代理對象完成復雜的遠程調用。比如連接注冊中心、負載均衡、集群容錯、連接服務器發(fā)送消息等功能。

          2、MyBatis

          還有一個典型的應用,就是我們經常在用的MyBatis。我們在使用的時候,一般只操作Mapper接口,然后MyBatis會找到對應的SQL語句來執(zhí)行。

          public?interface?UserMapper?{???
          ????List?getUserList();
          }

          如上代碼,UserMapper也只是一個普通的接口,它是怎樣最終執(zhí)行到我們的SQL語句的呢?

          答案也是代理。當MyBatis掃描到我們定義的Mapper接口時,會將其設置為MapperFactoryBean,并創(chuàng)建返回一個代理對象。

          protected?T?newInstance(SqlSession?sqlSession)?{
          ????final?MapperProxy?mapperProxy?=?new?MapperProxy<>(sqlSession,?mapperInterface,?methodCache);
          ????return?(T)?Proxy.newProxyInstance(mapperInterface.getClassLoader(),?new?Class[]?{?mapperInterface?},?mapperProxy);
          }

          代理對象去通過請求的方法名找到MappedStatement對象,調用執(zhí)行器,解析SqlSource對象來生成SQL,執(zhí)行并解析返回結果等。

          以上案例具體的實現過程,在這里就不再深入細聊。有興趣可能翻閱筆者其他文章~



          九,裝飾者模式



          裝飾器模式,在不改變現有對象結構的情況下,動態(tài)地給該對象增加一些職責(即增加其額外功能)的模式,它屬于對象結構型模式。

          MyBatis里的緩存設計,就是裝飾器模式的典型應用。

          首先,我們知道,MyBatis執(zhí)行器是MyBatis調度的核心,它負責SQL語句的生成和執(zhí)行。

          在創(chuàng)建SqlSession的時候,會創(chuàng)建這個執(zhí)行器,默認的執(zhí)行器是SimpleExecutor。

          但是為了給執(zhí)行器增加緩存的職責,就變成了在SimpleExecutor上一層添加了CachingExecutor。

          在CachingExecutor中的實際操作還是委托給SimpleExecutor去執(zhí)行,只是在執(zhí)行前后增加了緩存的操作。

          首先,我們來看看它的裝飾過程。

          public?Executor?newExecutor(Transaction?transaction,?ExecutorType?executorType)?{
          ????//默認的執(zhí)行器
          ????executorType?=?executorType?==?null???defaultExecutorType?:?executorType;
          ????executorType?=?executorType?==?null???ExecutorType.SIMPLE?:?executorType;
          ????Executor?executor;
          ????if?(ExecutorType.BATCH?==?executorType)?{
          ????????executor?=?new?BatchExecutor(this,?transaction);
          ????}?else?if?(ExecutorType.REUSE?==?executorType)?{
          ????????executor?=?new?ReuseExecutor(this,?transaction);
          ????}?else?{
          ????????executor?=?new?SimpleExecutor(this,?transaction);
          ????}
          ????//使用緩存執(zhí)行器來裝飾
          ????if?(cacheEnabled)?{
          ????????executor?=?new?CachingExecutor(executor);
          ????}
          ????executor?=?(Executor)?interceptorChain.pluginAll(executor);
          ????return?executor;
          }

          當SqlSession執(zhí)行方法的時候,則會先調用到CachingExecutor,我們來看查詢方法。

          public?class?CachingExecutor?implements?Executor?{
          ????@Override
          ????public??List?query()throws?SQLException?{
          ????????Cache?cache?=?ms.getCache();
          ????????if?(cache?!=?null)?{
          ????????????flushCacheIfRequired(ms);
          ????????????if?(ms.isUseCache()?&&?resultHandler?==?null)?{
          ????????????????List?list?=?(List)?tcm.getObject(cache,?key);
          ????????????????if?(list?==?null)?{
          ????????????????????list?=?delegate.query(ms,?parameterObject,?rowBounds,?resultHandler,?key,?boundSql);
          ????????????????????tcm.putObject(cache,?key,?list);?//?issue?#578?and?#116
          ????????????????}
          ????????????????return?list;
          ????????????}
          ????????}
          ????????return?delegate.query(ms,?parameterObject,?rowBounds,?resultHandler,?key,?boundSql);
          ????}
          }

          這里的代碼,如果開啟了緩存,則先從緩存中獲取結果。如果沒有開啟緩存或者緩存中沒有結果,則再調用SimpleExecutor執(zhí)行器去數據庫中查詢。

          十,觀察者模式



          觀察者模式,定義對象間的一種一對多的依賴關系,當一個對象的狀態(tài)發(fā)生改變時,所有依賴于它的對象都得到通知并被自動更新。

          在Spring或者SpringBoot項目中,有時候我們需要在Spring容器啟動并加載完之后,做一些系統(tǒng)初始化的事情。這時候,我們可以配置一個觀察者ApplicationListener,來達到這一目的。這就是觀察者模式的實踐。

          @Component
          public?class?ApplicationStartup?implements?ApplicationListener?{
          ????@Override
          ????public?void?onApplicationEvent(ContextRefreshedEvent?event)?{
          ????????System.out.println("干一些系統(tǒng)初始化的事情....");
          ????????ApplicationContext?context?=?event.getApplicationContext();
          ????????String[]?names?=?context.getBeanDefinitionNames();
          ????????for?(String?beanName:names){
          ????????????System.out.println("----------"+beanName+"---------");
          ????????}
          ????}
          }

          首先,我們知道,ApplicationContext是 Spring 中的核心容器。

          public?abstract?class?AbstractApplicationContext?extends?DefaultResourceLoader?implements?ConfigurableApplicationContext?{
          ????
          ????//觀察者容器
          ????private?final?Set>?applicationListeners?=?new?LinkedHashSet<>();
          ????//被觀察者
          ????private?ApplicationEventMulticaster?applicationEventMulticaster;
          }

          在ApplicationContext容器刷新的時候,會初始化一個被觀察者,并注冊到Spring容器中。

          然后,注冊各種觀察者到被觀察者中,形成一對多的依賴。

          public?abstract?class?AbstractApplicationContext{
          ????
          ????protected?void?registerListeners()?{
          ????????for?(ApplicationListener?listener?:?getApplicationListeners())?{
          ????????????getApplicationEventMulticaster().addApplicationListener(listener);
          ????????}
          ????????String[]?listenerBeanNames?=?getBeanNamesForType(ApplicationListener.class,?true,?false);
          ????????for?(String?listenerBeanName?:?listenerBeanNames)?{
          ????????????getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
          ????????}
          ????????Set?earlyEventsToProcess?=?this.earlyApplicationEvents;
          ????????this.earlyApplicationEvents?=?null;
          ????????if?(earlyEventsToProcess?!=?null)?{
          ????????????for?(ApplicationEvent?earlyEvent?:?earlyEventsToProcess)?{
          ????????????????getApplicationEventMulticaster().multicastEvent(earlyEvent);
          ????????????}
          ????????}
          ????}
          }

          這時候,我們自定義的觀察者對象也被注冊到了applicationEventMulticaster里面。

          最后,當ApplicationContext完成刷新后,則發(fā)布ContextRefreshedEvent事件。

          protected?void?finishRefresh()?{
          ????publishEvent(new?ContextRefreshedEvent(this));
          }

          通知觀察者,調用ApplicationListener.onApplicationEvent()。

          private?void?doInvokeListener(ApplicationListener?listener,?ApplicationEvent?event)?{
          ????listener.onApplicationEvent(event);
          }

          接下來我們再看看在Dubbo是如何應用這一機制的。

          Dubbo服務導出過程始于 Spring 容器發(fā)布刷新事件,Dubbo 在接收到事件后,會立即執(zhí)行服務導出邏輯。

          public?class?ServiceBean?extends?ServiceConfig?implements?InitializingBean,?DisposableBean,
          ????????ApplicationContextAware,?ApplicationListener,?BeanNameAware,
          ????????ApplicationEventPublisherAware?{

          ????@Override
          ????public?void?onApplicationEvent(ContextRefreshedEvent?event)?{
          ????????if?(!isExported()?&&?!isUnexported())?{
          ????????????if?(logger.isInfoEnabled())?{
          ????????????????logger.info("The?service?ready?on?spring?started.?service:?"?+?getInterface());
          ????????????}
          ????????????export();
          ????????}
          ????}
          }

          我們看到,Dubbo中的ServiceBean也實現了ApplicationListener接口,在Spring容器發(fā)布刷新事件之后就會執(zhí)行導出方法。我們重點關注,在Dubbo執(zhí)行完導出之后,它也發(fā)布了一個事件。

          public?class?ServiceBean{
          ????
          ????public?void?export()?{
          ????????super.export();
          ????????publishExportEvent();
          ????}
          ????private?void?publishExportEvent()?{
          ????????ServiceBeanExportedEvent?exportEvent?=?new?ServiceBeanExportedEvent(this);
          ????????applicationEventPublisher.publishEvent(exportEvent);
          ????}
          }

          ServiceBeanExportedEvent,服務導出事件,需要繼承Spring中的事件對象ApplicationEvent。

          public?class?ServiceBeanExportedEvent?extends?ApplicationEvent?{
          ????public?ServiceBeanExportedEvent(ServiceBean?serviceBean)?{
          ????????super(serviceBean);
          ????}
          ????public?ServiceBean?getServiceBean()?{
          ????????return?(ServiceBean)?super.getSource();
          ????}
          }

          然后我們自定義一個ApplicationListener,也就是觀察者,就可以監(jiān)聽到Dubbo服務接口導出事件了。

          @Component
          public?class?ServiceBeanListener?implements?ApplicationListener?{
          ????@Override
          ????public?void?onApplicationEvent(ServiceBeanExportedEvent?event)?{
          ????????ServiceBean?serviceBean?=?event.getServiceBean();
          ????????String?beanName?=?serviceBean.getBeanName();
          ????????Service?service?=?serviceBean.getService();
          ????????System.out.println(beanName+":"+service);
          ????}
          }


          十一,命令模式



          命令模式是一種行為設計模式,它可將請求轉換為一個包含與請求相關的所有信息的獨立對象。該轉換讓你能根據不同的請求將方法參數化、延遲請求執(zhí)行或將其放入隊列中,且能實現可撤銷操作。

          Hystrix是Netflix開源的一款容錯框架,具有自我保護能力??梢宰柚构收系倪B鎖反應,快速失敗和優(yōu)雅降級。

          它用一個HystrixCommand或者HystrixObservableCommand包裝所有對外部系統(tǒng)/依賴的調用,每個命令在單獨線程中/信號授權下執(zhí)行。這正是命令模式的典型應用。

          我們來看一個Hystrix應用的例子。

          首先,我們需要創(chuàng)建一個具體的命令類,通過構造函數傳遞接收者對象。

          public?class?OrderServiceHystrixCommand?extends?HystrixCommand?{
          ????
          ????//接收者,處理業(yè)務邏輯
          ????private?OrderService?orderService;

          ????public?OrderServiceHystrixCommand(OrderService?orderService)?{
          ????????super(setter());
          ????????this.orderService?=?orderService;
          ????}
          ????//設置Hystrix相關參數
          ????public?static?Setter?setter()?{
          ????????HystrixCommandGroupKey?groupKey?=?HystrixCommandGroupKey.Factory.asKey("orderGroup");
          ????????HystrixCommandKey?commandKey?=?HystrixCommandKey.Factory.asKey("orderService");
          ????????HystrixThreadPoolProperties.Setter?threadPoolProperties?=?HystrixThreadPoolProperties.Setter().withCoreSize(1)
          ????????????????.withQueueSizeRejectionThreshold(1);
          ????????HystrixCommandProperties.Setter?commandProperties?=?HystrixCommandProperties.Setter();
          ????????return?Setter.withGroupKey(groupKey)
          ????????????????.andCommandKey(commandKey)
          ????????????????.andThreadPoolPropertiesDefaults(threadPoolProperties)
          ????????????????.andCommandPropertiesDefaults(commandProperties);

          ????}
          ????@Override
          ????protected?Object?run()?throws?InterruptedException?{
          ????????Thread.sleep(500);
          ????????return?orderService.orders();
          ????}
          ????@Override
          ????protected?Object?getFallback()?{
          ????????System.out.println("-------------------------------");
          ????????return?new?ArrayList();
          ????}
          }

          然后,在客戶端調用的時候,創(chuàng)建這個命令類并執(zhí)行即可。

          @RestController
          public?class?OrderController?{

          ????@Autowired
          ????OrderService?orderService;

          ????@RequestMapping("/orders")
          ????public?Object?orders(){
          ????????OrderServiceHystrixCommand?command?=?new?OrderServiceHystrixCommand(orderService);
          ????????return?command.execute();
          ????}
          }

          看上去,命令模式和策略模式有些相像,它們都可以通過某些行為來參數化對象。但它們的思想有很大區(qū)別。

          比如說我們可以使用命令來將任何操作轉換為對象,操作的參數將成為對象的成員變量。同樣的,我們也可以對請求做任何操作,比如延遲執(zhí)行,記錄日志,保存歷史命令等。

          而策略模式側重點在于描述完成某件事的不同方式,讓你能夠在同一個上下文類中切換算法。


          總結


          本文重點介紹了設計模式在不同框架中的實現,以期讓大家更好地理解模式背后的思想和應用場景。歡迎有不同想法的朋友,留言探討~

          原文鏈接:https://juejin.im/post/6859160350692999181



          往期推薦



          JetCache埋點的騷操作,不服不行啊

          跟大佬聊天,被反問Redis6的多線程真的能提高性能嗎?

          SQL優(yōu)化不會?推薦4 款工具

          干貨 | 數據庫壓力降低90%,攜程機票訂單緩存系統(tǒng)實踐

          居然說HikariCP連接池慢?是你不會用吧!



          后臺回復?學習資料?領取學習視頻


          如有收獲,點個在看,誠摯感謝

          瀏覽 65
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                    <th id="afajh"><progress id="afajh"></progress></th>
                    无套内射学生妹 | 成人色情黄色电影 | 国产又爽又黄A片 | 色婷婷福利视频 | 亚洲AV男人天堂 |