策略模式之Google EventBus
“?寫給自己看,說給別人聽。你好,這是think123的第74篇原創(chuàng)文章”
觀察者模式又叫發(fā)布-訂閱模式,它定義了一種一對多的依賴關(guān)系,多個觀察者對象可同時監(jiān)聽某一主題對象,當(dāng)該主題對象狀態(tài)發(fā)生變化時,相應(yīng)的所有觀察者對象都可收到通知。
比如求職者,他們訂閱了一些工作發(fā)布網(wǎng)站,當(dāng)有合適的工作機(jī)會時,他們會收到提醒。
又或者是當(dāng)用戶注冊網(wǎng)站成功的時候,發(fā)送一封郵件或者發(fā)送一條短信。我們都可以使用觀察者模式來解決類似的問題
關(guān)于觀察者模式的基本模型代碼如下:
public?interface?Subject?{
??void?registerObserver(Observer?observer);
??void?unregisterObserver(Observer?observer);
??void?notifyObservers(Message?message);
}
interface?Observer?{
????void?update(Message?message);
}
@Data
class?Message?{
??String?id;
??String?name;
}
//?具體的主題
class?UserRegisterSubject?implements?Subject?{
????List?observerList?=?new?ArrayList();
????public?void?registerObserver(Observer?observer)?{
??????observerList.add(observer);
????}
????public?void?unregisterObserver(Observer?observer)?{
??????observerList.remove(observer);
????}
????public?void?notifyObservers(Message?message)?{
??????for?(Observer?observer?:?observerList)?{
????????observer.update(message);
??????}
????}
}
//?觀察者
class?RegNotificationObserver?implements?Observer?{
??public?void?update(Message?message)?{
????System.out.println("注冊成功,已經(jīng)發(fā)送郵件給"?+?message.getName());
??}
}
class?RegOtherObserver?implements?Observer?{
??public?void?update(Message?message)?{
????System.out.println("注冊成功,發(fā)送優(yōu)惠券給"?+?message.getName());
??}
}
class?Main?{
??public?static?void?main(String[]?args)?{
??????
????//?實際使用的時候配合Spring使用
????Subject?subject?=?new?UserRegisterSubject();
????subject.registerObserver(new?RegNotificationObserver());
????subject.registerObserver(new?RegOtherObserver());
????boolean?registSuccess?=?true;
????if(registSuccess)?{
??????Message?msg?=?new?Message();
??????msg.setId("123456");
??????msg.setName("think123");
??????subject.notifyObservers(msg);
????}
??}
}
輸出結(jié)果如下:
注冊成功,已經(jīng)發(fā)送郵件給think123
注冊成功,發(fā)送優(yōu)惠券給think123
從上面的代碼可以看出,觀察者模式中我們首先需要注冊觀察者,然后當(dāng)某個事件發(fā)生的時候通知觀察者。
而在google guava中對于觀察者模式的框架實現(xiàn)叫做EventBus,實現(xiàn)方式更為優(yōu)雅,我們來看看如何使用EventBus,然后再深入分析下它的源碼。
public?class?EventBusDemo?{
??public?static?void?main(String[]?args)?{
??????EventBus?eventBus?=?new?EventBus("think123");
????Offer?offer?=?new?Offer();
????offer.setCompany("螞蟻金服");
????offer.setMoney(20000);
????//?注冊觀察者
????eventBus.register(new?EmailNotificationObserver());
????eventBus.register(new?MessageNotificationObserver());
????
????//?觸發(fā)MessageNotification
????eventBus.post(offer.getCompany());
????//?觸發(fā)EmailNotification
????eventBus.post(offer);
??}
}
@Data
class?Offer?{
????private?String?company;
????private?Integer?money;
}
//?發(fā)送郵件
class?EmailNotificationObserver?{
????@Subscribe
????public?void?mailNotification(Offer?offer)?{
??????System.out.println("恭喜你被?"?+?offer.getCompany()?+?"?錄取,每月工資為"?+?offer.getMoney()?+?"元");
????}
}
//?發(fā)送消息
class?MessageNotificationObserver?{
????@Subscribe
????public?void?messageNotification(String?company)?{
??????System.out.println("恭喜你被"?+?company?+?"錄取了");
????}
}
可以看出來,EventBus的使用更加簡單,我們只需要編寫自己的observer就可以了,然后在需要處理通知的方法上加上@Subscribe注解就行了。
然后當(dāng)post傳入?yún)?shù)的時候,就會找到哪些觀察者可以處理這樣的參數(shù),就調(diào)用觀察者的這個方法。
可以理解為觀察者訂閱了某個事件,當(dāng)事件發(fā)生的時候,觀察者會執(zhí)行指定的動作。
比如EmailNotificationObserver訂閱了Offer事件(事件就可以認(rèn)為是參數(shù)),所以在收到通知后會發(fā)送郵件(這里使用打印來代替)
讓我們看看EventBus的核心代碼:
public?class?EventBus?{
??//?標(biāo)識EventBus,可以理解為name
??private?final?String?identifier;
??//?具體的線程池,實際上directExecutor,它實際上是單線程
??private?final?Executor?executor;
??//?異常處理器,負(fù)責(zé)處理異常
??private?final?SubscriberExceptionHandler?exceptionHandler;
??//?訂閱中心,存儲有哪些訂閱者。?這里將eventBus傳遞給了訂閱中心
??private?final?SubscriberRegistry?subscribers?=?new?SubscriberRegistry(this);
??//?事件轉(zhuǎn)發(fā)器,負(fù)責(zé)轉(zhuǎn)發(fā)event給訂閱者
??private?final?Dispatcher?dispatcher;
??//?構(gòu)造方法
??public?EventBus(String?identifier)?{
????this(
????????identifier,
????????MoreExecutors.directExecutor(),
????????Dispatcher.perThreadDispatchQueue(),
????????LoggingHandler.INSTANCE);
??}
??//?注冊訂閱者
??public?void?register(Object?object)?{
????subscribers.register(object);
??}
??//?移除訂閱者
??public?void?unregister(Object?object)?{
????subscribers.unregister(object);
??}
??//?投送event給所有注冊的訂閱者
??public?void?post(Object?event)?{
????Iterator?eventSubscribers?=?subscribers.getSubscribers(event);
????if?(eventSubscribers.hasNext())?{
??????dispatcher.dispatch(event,?eventSubscribers);
????}?else?if?(!(event?instanceof?DeadEvent))?{
??????//?沒有找到訂閱者,則封裝成DeadEvent(默認(rèn)是丟棄掉了)
??????post(new?DeadEvent(this,?event));
????}
??}
EventBus中主要的方法就是注冊/移除訂閱者,然后分發(fā)事件。保留了主體流程的同時也讓不同的類承擔(dān)自己的職責(zé),真的很贊。
在注冊訂閱者中,會調(diào)用findAllSubscribers方法從緩存中加載已有的訂閱者,并且為了保證線程安全,會使用CopyOnWriteArraySet來保存對應(yīng)的訂閱者。
訂閱者為什么會存在多個(用了set保存)呢?這是因為我們eventBus.post方法的參數(shù)是Object類型,而在訂閱者中可能會存在多個方法可以處理這個類型的參數(shù)(有多個訂閱者都訂閱了該事件),所以會是多個。
然后會根據(jù)訂閱者的Class加載所有標(biāo)明了@Subscribe注解的方法,并將其放到緩存中
void?register(Object?listener)?{
??//?從緩存中獲取所有的訂閱者
??Multimap,?Subscriber>?listenerMethods?=?findAllSubscribers(listener);
??for?(Entry,?Collection>?entry?:?listenerMethods.asMap().entrySet())?{
????Class>?eventType?=?entry.getKey();
????Collection?eventMethodsInListener?=?entry.getValue();
????//?根據(jù)參數(shù)類型獲取到所有的訂閱者
????CopyOnWriteArraySet?eventSubscribers?=?subscribers.get(eventType);
????//?使用CopyOnWriteArraySet,保證線程安全
????if?(eventSubscribers?==?null)?{
??????CopyOnWriteArraySet?newSet?=?new?CopyOnWriteArraySet<>();
??????eventSubscribers?=
??????????MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType,?newSet),?newSet);
????}
????eventSubscribers.addAll(eventMethodsInListener);
??}
}
?private?Multimap,?Subscriber>?findAllSubscribers(Object?listener)?{
????Multimap,?Subscriber>?methodsInListener?=?HashMultimap.create();
????Class>?clazz?=?listener.getClass();
????for?(Method?method?:?getAnnotatedMethods(clazz))?{
??????Class>[]?parameterTypes?=?method.getParameterTypes();
??????Class>?eventType?=?parameterTypes[0];
??????//?Subscriber中保存了要執(zhí)行的對象以及方法
??????//?eventType就是參數(shù)類型,這里就形成了參數(shù)類型---》訂閱者的映射
??????//?而訂閱者中保存了具體需要執(zhí)行的類以及方法
??????methodsInListener.put(eventType,?Subscriber.create(bus,?listener,?method));
????}
????return?methodsInListener;
??}
//?當(dāng)緩存中沒有的時候,會調(diào)用這個方法。所以最開始注冊訂閱者的時候都會調(diào)用這個方法
private?static?ImmutableList?getAnnotatedMethodsNotCached(Class>?clazz)? {
????Set?extends?Class>>?supertypes?=?TypeToken.of(clazz).getTypes().rawTypes();
????Map?identifiers?=?Maps.newHashMap();
????for?(Class>?supertype?:?supertypes)?{
??????for?(Method?method?:?supertype.getDeclaredMethods())?{
????????//?只處理被Subscribe注解標(biāo)明的方法并且method不能是合成的(isSynthetic)
????????if?(method.isAnnotationPresent(Subscribe.class)?&&?!method.isSynthetic())?{
??????????Class>[]?parameterTypes?=?method.getParameterTypes();
??????????//?參數(shù)個數(shù)只能為1
??????????checkArgument(
??????????????parameterTypes.length?==?1,
??????????????"Method?%s?has?@Subscribe?annotation?but?has?%s?parameters."
??????????????????+?"Subscriber?methods?must?have?exactly?1?parameter.",
??????????????method,
??????????????parameterTypes.length);
??????????MethodIdentifier?ident?=?new?MethodIdentifier(method);
??????????if?(!identifiers.containsKey(ident))?{
????????????identifiers.put(ident,?method);
??????????}
????????}
??????}
????}
????return?ImmutableList.copyOf(identifiers.values());
??}
可以看到,EventBus的訂閱者之所以不用實現(xiàn)特定的接口實際上是利用了反射將訂閱者和要執(zhí)行的方法對應(yīng)起來了的。
經(jīng)過register方法之后,我們就知道每個訂閱者分別訂閱了哪些事件(能處理什么參數(shù)),并且形成了這樣的對應(yīng)關(guān)系:
事件類型(參數(shù))?--->?訂閱者(target?object,?method)
Offer??-->?EmailNotificationObserver::mailNotification
String?-->?MessageNotificationObserver::messageNotification
EventBus中,我們會通過post方法分發(fā)事件。在post方法中,首先會根據(jù)參數(shù)找到我們之前處理好的對應(yīng)關(guān)系,然后通過反射調(diào)用對應(yīng)的方法
public?void?post(Object?event)?{
????Iterator?eventSubscribers?=?subscribers.getSubscribers(event);
????if?(eventSubscribers.hasNext())?{
??????//?轉(zhuǎn)發(fā)事件
??????dispatcher.dispatch(event,?eventSubscribers);
????}?else?if?(!(event?instanceof?DeadEvent))?{
??????//?對于找不到訂閱者的包裝成DeadEvent處理,實際上就是丟棄掉
??????post(new?DeadEvent(this,?event));
????}
??}
//?PerThreadQueuedDispatcher實現(xiàn)
@Override
void?dispatch(Object?event,?Iterator?subscribers) ?{
??checkNotNull(event);
??checkNotNull(subscribers);
??//?每個線程都對應(yīng)一個隊列,如果多線程插入則先來的先處理
??Queue?queueForThread?=?queue.get();
??queueForThread.offer(new?Event(event,?subscribers));
??if?(!dispatching.get())?{
????dispatching.set(true);
????try?{
??????Event?nextEvent;
??????//?找到對應(yīng)的訂閱者進(jìn)行處理
??????while?((nextEvent?=?queueForThread.poll())?!=?null)?{
????????while?(nextEvent.subscribers.hasNext())?{
??????????nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
????????}
??????}
????}?finally?{
??????dispatching.remove();
??????queue.remove();
????}
??}
}
Dispatcher是一個抽象類,這個類的作用是負(fù)責(zé)轉(zhuǎn)發(fā)event給訂閱者,提供不同的event順序。這里這樣的實現(xiàn)主要是考慮到了多線程。
我們的默認(rèn)實現(xiàn)使用的是PerThreadQueuedDispatcher,看名字的意思就是每個線程一個隊列,實行先來先處理的原則。
最終調(diào)用Subscriber的invokeSubscriberMethod()方法
final?void?dispatchEvent(final?Object?event)?{
??executor.execute(
????new?Runnable()?{
??????@Override
??????public?void?run()?{
????????try?{
??????????invokeSubscriberMethod(event);
????????}?catch?(InvocationTargetException?e)?{
??????????bus.handleSubscriberException(e.getCause(),?context(event));
????????}
??????}
????});
}
void?invokeSubscriberMethod(Object?event)?throws?InvocationTargetException?{
???
???//?省略異常捕獲代碼
??//?反射調(diào)用方法執(zhí)行?
??method.invoke(target,?checkNotNull(event));??
}
最終這樣就調(diào)用了我們使用@Subscribe注解標(biāo)明的方法了。
而這里的executor實際上是創(chuàng)建EventBus的executor,它的execute方法實現(xiàn)如下:
@GwtCompatible
enum?DirectExecutor?implements?Executor?{
??INSTANCE;
??@Override
??public?void?execute(Runnable?command)?{
????command.run();
??}
}
所以說EventBus實際上是同步阻塞執(zhí)行,那么為什么還要寫成線程池的方式呢?雖然EventBus默認(rèn)是同步執(zhí)行的,但是它還有一個異步執(zhí)行的子類AsyncEventBus,異步的EventBus需要指定線程池,所以這里是為了兼容才這么寫的。
作者:think123, 一個試圖把問題想簡單的程序員。
"三思而后行 , think23"
