<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          策略模式之Google EventBus

          共 8840字,需瀏覽 18分鐘

           ·

          2020-09-16 05:15

          ?寫給自己看,說給別人聽。你好,這是think123的第74篇原創(chuàng)文章



          觀察者模式又叫發(fā)布-訂閱模式,它定義了一種一對多的依賴關(guān)系,多個觀察者對象可同時監(jiān)聽某一主題對象,當(dāng)該主題對象狀態(tài)發(fā)生變化時,相應(yīng)的所有觀察者對象都可收到通知。

          比如求職者,他們訂閱了一些工作發(fā)布網(wǎng)站,當(dāng)有合適的工作機(jī)會時,他們會收到提醒。

          又或者是當(dāng)用戶注冊網(wǎng)站成功的時候,發(fā)送一封郵件或者發(fā)送一條短信。我們都可以使用觀察者模式來解決類似的問題

          關(guān)于觀察者模式的基本模型代碼如下:

          public?interface?Subject?{

          ??void?registerObserver(Observer?observer);

          ??void?unregisterObserver(Observer?observer);

          ??void?notifyObservers(Message?message);
          }

          interface?Observer?{
          ????void?update(Message?message);
          }
          @Data
          class?Message?{
          ??String?id;
          ??String?name;
          }

          //?具體的主題
          class?UserRegisterSubject?implements?Subject?{

          ????List?observerList?=?new?ArrayList();

          ????public?void?registerObserver(Observer?observer)?{
          ??????observerList.add(observer);
          ????}

          ????public?void?unregisterObserver(Observer?observer)?{
          ??????observerList.remove(observer);
          ????}

          ????public?void?notifyObservers(Message?message)?{

          ??????for?(Observer?observer?:?observerList)?{
          ????????observer.update(message);
          ??????}
          ????}
          }

          //?觀察者
          class?RegNotificationObserver?implements?Observer?{

          ??public?void?update(Message?message)?{
          ????System.out.println("注冊成功,已經(jīng)發(fā)送郵件給"?+?message.getName());
          ??}
          }

          class?RegOtherObserver?implements?Observer?{

          ??public?void?update(Message?message)?{
          ????System.out.println("注冊成功,發(fā)送優(yōu)惠券給"?+?message.getName());
          ??}
          }

          class?Main?{

          ??public?static?void?main(String[]?args)?{
          ??????
          ????//?實際使用的時候配合Spring使用
          ????Subject?subject?=?new?UserRegisterSubject();
          ????subject.registerObserver(new?RegNotificationObserver());
          ????subject.registerObserver(new?RegOtherObserver());

          ????boolean?registSuccess?=?true;
          ????if(registSuccess)?{
          ??????Message?msg?=?new?Message();
          ??????msg.setId("123456");
          ??????msg.setName("think123");
          ??????subject.notifyObservers(msg);
          ????}
          ??}
          }

          輸出結(jié)果如下:


          注冊成功,已經(jīng)發(fā)送郵件給think123

          注冊成功,發(fā)送優(yōu)惠券給think123

          從上面的代碼可以看出,觀察者模式中我們首先需要注冊觀察者,然后當(dāng)某個事件發(fā)生的時候通知觀察者。

          而在google guava中對于觀察者模式的框架實現(xiàn)叫做EventBus,實現(xiàn)方式更為優(yōu)雅,我們來看看如何使用EventBus,然后再深入分析下它的源碼。

          public?class?EventBusDemo?{

          ??public?static?void?main(String[]?args)?{
          ??????EventBus?eventBus?=?new?EventBus("think123");

          ????Offer?offer?=?new?Offer();
          ????offer.setCompany("螞蟻金服");
          ????offer.setMoney(20000);

          ????//?注冊觀察者
          ????eventBus.register(new?EmailNotificationObserver());

          ????eventBus.register(new?MessageNotificationObserver());
          ????
          ????//?觸發(fā)MessageNotification
          ????eventBus.post(offer.getCompany());

          ????//?觸發(fā)EmailNotification
          ????eventBus.post(offer);
          ??}

          }

          @Data
          class?Offer?{
          ????private?String?company;
          ????private?Integer?money;
          }

          //?發(fā)送郵件
          class?EmailNotificationObserver?{
          ????@Subscribe
          ????public?void?mailNotification(Offer?offer)?{
          ??????System.out.println("恭喜你被?"?+?offer.getCompany()?+?"?錄取,每月工資為"?+?offer.getMoney()?+?"元");
          ????}

          }

          //?發(fā)送消息
          class?MessageNotificationObserver?{
          ????@Subscribe
          ????public?void?messageNotification(String?company)?{
          ??????System.out.println("恭喜你被"?+?company?+?"錄取了");
          ????}
          }

          可以看出來,EventBus的使用更加簡單,我們只需要編寫自己的observer就可以了,然后在需要處理通知的方法上加上@Subscribe注解就行了。

          然后當(dāng)post傳入?yún)?shù)的時候,就會找到哪些觀察者可以處理這樣的參數(shù),就調(diào)用觀察者的這個方法。

          可以理解為觀察者訂閱了某個事件,當(dāng)事件發(fā)生的時候,觀察者會執(zhí)行指定的動作。

          比如EmailNotificationObserver訂閱了Offer事件(事件就可以認(rèn)為是參數(shù)),所以在收到通知后會發(fā)送郵件(這里使用打印來代替)

          讓我們看看EventBus的核心代碼:

          public?class?EventBus?{

          ??//?標(biāo)識EventBus,可以理解為name
          ??private?final?String?identifier;

          ??//?具體的線程池,實際上directExecutor,它實際上是單線程
          ??private?final?Executor?executor;

          ??//?異常處理器,負(fù)責(zé)處理異常
          ??private?final?SubscriberExceptionHandler?exceptionHandler;

          ??//?訂閱中心,存儲有哪些訂閱者。?這里將eventBus傳遞給了訂閱中心
          ??private?final?SubscriberRegistry?subscribers?=?new?SubscriberRegistry(this);

          ??//?事件轉(zhuǎn)發(fā)器,負(fù)責(zé)轉(zhuǎn)發(fā)event給訂閱者
          ??private?final?Dispatcher?dispatcher;

          ??//?構(gòu)造方法
          ??public?EventBus(String?identifier)?{
          ????this(
          ????????identifier,
          ????????MoreExecutors.directExecutor(),
          ????????Dispatcher.perThreadDispatchQueue(),
          ????????LoggingHandler.INSTANCE);
          ??}

          ??//?注冊訂閱者
          ??public?void?register(Object?object)?{
          ????subscribers.register(object);
          ??}

          ??//?移除訂閱者
          ??public?void?unregister(Object?object)?{
          ????subscribers.unregister(object);
          ??}

          ??//?投送event給所有注冊的訂閱者
          ??public?void?post(Object?event)?{
          ????Iterator?eventSubscribers?=?subscribers.getSubscribers(event);
          ????if?(eventSubscribers.hasNext())?{
          ??????dispatcher.dispatch(event,?eventSubscribers);
          ????}?else?if?(!(event?instanceof?DeadEvent))?{

          ??????//?沒有找到訂閱者,則封裝成DeadEvent(默認(rèn)是丟棄掉了)
          ??????post(new?DeadEvent(this,?event));
          ????}
          ??}

          EventBus中主要的方法就是注冊/移除訂閱者,然后分發(fā)事件。保留了主體流程的同時也讓不同的類承擔(dān)自己的職責(zé),真的很贊。

          在注冊訂閱者中,會調(diào)用findAllSubscribers方法從緩存中加載已有的訂閱者,并且為了保證線程安全,會使用CopyOnWriteArraySet來保存對應(yīng)的訂閱者。

          訂閱者為什么會存在多個(用了set保存)呢?這是因為我們eventBus.post方法的參數(shù)是Object類型,而在訂閱者中可能會存在多個方法可以處理這個類型的參數(shù)(有多個訂閱者都訂閱了該事件),所以會是多個。

          然后會根據(jù)訂閱者的Class加載所有標(biāo)明了@Subscribe注解的方法,并將其放到緩存中


          void?register(Object?listener)?{

          ??//?從緩存中獲取所有的訂閱者
          ??Multimap,?Subscriber>?listenerMethods?=?findAllSubscribers(listener);

          ??for?(Entry,?Collection>?entry?:?listenerMethods.asMap().entrySet())?{
          ????Class?eventType?=?entry.getKey();
          ????Collection?eventMethodsInListener?=?entry.getValue();

          ????//?根據(jù)參數(shù)類型獲取到所有的訂閱者
          ????CopyOnWriteArraySet?eventSubscribers?=?subscribers.get(eventType);

          ????//?使用CopyOnWriteArraySet,保證線程安全
          ????if?(eventSubscribers?==?null)?{
          ??????CopyOnWriteArraySet?newSet?=?new?CopyOnWriteArraySet<>();
          ??????eventSubscribers?=
          ??????????MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType,?newSet),?newSet);
          ????}

          ????eventSubscribers.addAll(eventMethodsInListener);
          ??}
          }


          ?private?Multimap,?Subscriber>?findAllSubscribers(Object?listener)?{
          ????Multimap,?Subscriber>?methodsInListener?=?HashMultimap.create();
          ????Class?clazz?=?listener.getClass();
          ????for?(Method?method?:?getAnnotatedMethods(clazz))?{
          ??????Class[]?parameterTypes?=?method.getParameterTypes();
          ??????Class?eventType?=?parameterTypes[0];
          ??????//?Subscriber中保存了要執(zhí)行的對象以及方法
          ??????//?eventType就是參數(shù)類型,這里就形成了參數(shù)類型---》訂閱者的映射
          ??????//?而訂閱者中保存了具體需要執(zhí)行的類以及方法
          ??????methodsInListener.put(eventType,?Subscriber.create(bus,?listener,?method));
          ????}
          ????return?methodsInListener;
          ??}

          //?當(dāng)緩存中沒有的時候,會調(diào)用這個方法。所以最開始注冊訂閱者的時候都會調(diào)用這個方法
          private?static?ImmutableList?getAnnotatedMethodsNotCached(Class?clazz)?{
          ????Set>?supertypes?=?TypeToken.of(clazz).getTypes().rawTypes();
          ????Map?identifiers?=?Maps.newHashMap();
          ????for?(Class?supertype?:?supertypes)?{
          ??????for?(Method?method?:?supertype.getDeclaredMethods())?{
          ????????//?只處理被Subscribe注解標(biāo)明的方法并且method不能是合成的(isSynthetic)
          ????????if?(method.isAnnotationPresent(Subscribe.class)?&&?!method.isSynthetic())?{
          ??????????Class[]?parameterTypes?=?method.getParameterTypes();
          ??????????//?參數(shù)個數(shù)只能為1
          ??????????checkArgument(
          ??????????????parameterTypes.length?==?1,
          ??????????????"Method?%s?has?@Subscribe?annotation?but?has?%s?parameters."
          ??????????????????+?"Subscriber?methods?must?have?exactly?1?parameter.",
          ??????????????method,
          ??????????????parameterTypes.length);

          ??????????MethodIdentifier?ident?=?new?MethodIdentifier(method);
          ??????????if?(!identifiers.containsKey(ident))?{
          ????????????identifiers.put(ident,?method);
          ??????????}
          ????????}
          ??????}
          ????}
          ????return?ImmutableList.copyOf(identifiers.values());
          ??}

          可以看到,EventBus的訂閱者之所以不用實現(xiàn)特定的接口實際上是利用了反射將訂閱者和要執(zhí)行的方法對應(yīng)起來了的。

          經(jīng)過register方法之后,我們就知道每個訂閱者分別訂閱了哪些事件(能處理什么參數(shù)),并且形成了這樣的對應(yīng)關(guān)系:

          事件類型(參數(shù))?--->?訂閱者(target?object,?method)

          Offer??-->?EmailNotificationObserver::mailNotification

          String?-->?MessageNotificationObserver::messageNotification

          EventBus中,我們會通過post方法分發(fā)事件。在post方法中,首先會根據(jù)參數(shù)找到我們之前處理好的對應(yīng)關(guān)系,然后通過反射調(diào)用對應(yīng)的方法

          public?void?post(Object?event)?{
          ????Iterator?eventSubscribers?=?subscribers.getSubscribers(event);
          ????if?(eventSubscribers.hasNext())?{
          ??????//?轉(zhuǎn)發(fā)事件
          ??????dispatcher.dispatch(event,?eventSubscribers);
          ????}?else?if?(!(event?instanceof?DeadEvent))?{
          ??????//?對于找不到訂閱者的包裝成DeadEvent處理,實際上就是丟棄掉
          ??????post(new?DeadEvent(this,?event));
          ????}
          ??}

          //?PerThreadQueuedDispatcher實現(xiàn)
          @Override
          void?dispatch(Object?event,?Iterator?subscribers)?{
          ??checkNotNull(event);
          ??checkNotNull(subscribers);

          ??//?每個線程都對應(yīng)一個隊列,如果多線程插入則先來的先處理
          ??Queue?queueForThread?=?queue.get();
          ??queueForThread.offer(new?Event(event,?subscribers));

          ??if?(!dispatching.get())?{
          ????dispatching.set(true);
          ????try?{
          ??????Event?nextEvent;
          ??????//?找到對應(yīng)的訂閱者進(jìn)行處理
          ??????while?((nextEvent?=?queueForThread.poll())?!=?null)?{
          ????????while?(nextEvent.subscribers.hasNext())?{
          ??????????nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
          ????????}
          ??????}
          ????}?finally?{
          ??????dispatching.remove();
          ??????queue.remove();
          ????}
          ??}
          }

          Dispatcher是一個抽象類,這個類的作用是負(fù)責(zé)轉(zhuǎn)發(fā)event給訂閱者,提供不同的event順序。這里這樣的實現(xiàn)主要是考慮到了多線程。

          我們的默認(rèn)實現(xiàn)使用的是PerThreadQueuedDispatcher,看名字的意思就是每個線程一個隊列,實行先來先處理的原則。

          最終調(diào)用Subscriber的invokeSubscriberMethod()方法

          final?void?dispatchEvent(final?Object?event)?{
          ??executor.execute(
          ????new?Runnable()?{
          ??????@Override
          ??????public?void?run()?{
          ????????try?{
          ??????????invokeSubscriberMethod(event);
          ????????}?catch?(InvocationTargetException?e)?{
          ??????????bus.handleSubscriberException(e.getCause(),?context(event));
          ????????}
          ??????}
          ????});
          }
          void?invokeSubscriberMethod(Object?event)?throws?InvocationTargetException?{
          ???
          ???//?省略異常捕獲代碼
          ??//?反射調(diào)用方法執(zhí)行?
          ??method.invoke(target,?checkNotNull(event));??
          }

          最終這樣就調(diào)用了我們使用@Subscribe注解標(biāo)明的方法了。

          而這里的executor實際上是創(chuàng)建EventBus的executor,它的execute方法實現(xiàn)如下:

          @GwtCompatible
          enum?DirectExecutor?implements?Executor?{
          ??INSTANCE;

          ??@Override
          ??public?void?execute(Runnable?command)?{
          ????command.run();
          ??}
          }

          所以說EventBus實際上是同步阻塞執(zhí)行,那么為什么還要寫成線程池的方式呢?雖然EventBus默認(rèn)是同步執(zhí)行的,但是它還有一個異步執(zhí)行的子類AsyncEventBus,異步的EventBus需要指定線程池,所以這里是為了兼容才這么寫的。



          作者:think123, 一個試圖把問題想簡單的程序員。

          "三思而后行 , think23"






          瀏覽 53
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  牛牛精品视频 | 男女操逼高清视频 | 欧美一级 片内射欧美AA99 | 久草大香蕉网 | 久草一区 |