說(shuō)下你可能沒(méi)用過(guò)的EventBus,再順便送幾本書(shū)好了
最近在Code Review的時(shí)候發(fā)現(xiàn)了這樣一個(gè)業(yè)務(wù)場(chǎng)景,某個(gè)業(yè)務(wù)處理完成之后需要通知審核人員,通知的方式包含短信和郵件,所以代碼大致是這樣:
//業(yè)務(wù)校驗(yàn)
validate();
//處理業(yè)務(wù)邏輯
doBusiness();
//發(fā)送郵件或者發(fā)送其他類型消息
sendMsg();
這個(gè)對(duì)不對(duì)呢?
基于這種普遍的業(yè)務(wù)場(chǎng)景來(lái)說(shuō),一般首先我們會(huì)考慮同步或者異步發(fā)送的問(wèn)題。
同步的話對(duì)接口RT有影響,而且和業(yè)務(wù)邏輯耦合在一起,這樣的做法肯定不太好。
一般情況下,我們會(huì)做成異步的方式,使用MQ自己發(fā)送自己消費(fèi),或者說(shuō)一個(gè)線程池搞定,這樣的話不影響主業(yè)務(wù)邏輯,可以提高性能,并且代碼做到了解耦。
然后還有就是數(shù)據(jù)一致性的問(wèn)題,郵件一定要發(fā)送成功嗎?
大多數(shù)時(shí)候其實(shí)我們并不要求郵件一定100%發(fā)送成功,失敗了就失敗好了,監(jiān)控告警打點(diǎn)做好失敗率不要超過(guò)閾值就好,還有就是消息服務(wù)一旦收到請(qǐng)求應(yīng)該自己保證消息能夠投遞。
所以總的來(lái)說(shuō),使用MQ發(fā)送消息自己消費(fèi)處理,或者線程池異步處理,最后自己搞個(gè)補(bǔ)償?shù)倪壿嬀湍芴幚砗眠@類問(wèn)題。
那么,今天要說(shuō)的是這兩個(gè)解決方案之外的處理方式,對(duì)于這種場(chǎng)景其實(shí)我們可以用EventBus來(lái)解決。
EventBus使用
看名字就知道,EventBus是事件總線的意思,它是Google Guava庫(kù)的一個(gè)工具,基于觀察者模式可以做到進(jìn)程內(nèi)的代碼解耦作用。
就拿上面的例子來(lái)說(shuō),引入一個(gè)MQ太重了,其實(shí)不太需要這樣做,EventBus也能達(dá)到這個(gè)效果,和MQ相比他只能提供進(jìn)程內(nèi)的消息事件傳遞,這對(duì)于我們這種業(yè)務(wù)場(chǎng)景來(lái)說(shuō)足夠了不是嗎?
我們先看EventBus怎么來(lái)使用,一般先創(chuàng)建一個(gè)EventBus實(shí)例。
//1.創(chuàng)建EventBus
private static EventBus eventBus = new EventBus();
第二步,創(chuàng)建一個(gè)事件消息訂閱者,處理方式非常簡(jiǎn)單,只要在我們希望去處理事件的方法上加上@Subscribe注解即可。
形參只能有一個(gè),如果定義0個(gè)或者多個(gè)的話運(yùn)行就會(huì)報(bào)錯(cuò)。
public class EmailMsgHandler {
@Subscribe
public void handle(Long businessId) {
System.out.println("send email msg" + businessId);
}
}
第三步,注冊(cè)事件。
eventBus.register(new EmailMsgHandler());
第四步,發(fā)送事件。
eventBus.post(1L);
這就是一個(gè)EventBus使用的最簡(jiǎn)單例子,下面我們看看結(jié)合開(kāi)頭說(shuō)的例子怎么處理。
結(jié)合實(shí)際
比如上面說(shuō)的案例,舉例來(lái)說(shuō)比如注冊(cè)和用戶下單的場(chǎng)景,都需要發(fā)送消息和郵件給用戶。
EventBus并不強(qiáng)制說(shuō)我們一定要用單例模式,因?yàn)樗膭?chuàng)建銷(xiāo)毀成本比較低,所以更多是根據(jù)我們的業(yè)務(wù)場(chǎng)景和上下文自己來(lái)選擇。
public class UserService {
private static EventBus eventBus = new EventBus();
public void regist(){
Long userId = 1L;
eventBus.register(new EmailMsgHandler());
eventBus.register(new SmsMsgHandler());
eventBus.post(userId);
}
}
public class BookingService {
private static EventBus eventBus = new EventBus();
public void booking(){
//業(yè)務(wù)邏輯
Long bookingId = 2L;
eventBus.register(new EmailMsgHandler());
eventBus.register(new SmsMsgHandler());
eventBus.post(bookingId);
}
}
然后在業(yè)務(wù)邏輯處理完成之后,分別去注冊(cè)了郵件和短信兩個(gè)事件訂閱者。
public class EmailMsgHandler {
@Subscribe
public void handle(Long businessId) {
System.out.println("send email msg" + businessId);
}
}
public class SmsMsgHandler {
@Subscribe
public void handle(Long businessId) {
System.out.println("send sms msg" + businessId);
}
}
最后我們發(fā)送事件,用戶注冊(cè)我們發(fā)送了一個(gè)用戶ID,下單成功我們發(fā)送了一個(gè)訂單ID。
再寫(xiě)一個(gè)測(cè)試類去測(cè)試一下,分別創(chuàng)建兩個(gè)service,然后分別調(diào)用方法。
public class EventBusTest {
public static void main(String[] args) {
UserService userService = new UserService();
userService.regist();
BookingService bookingService = new BookingService();
bookingService.booking();
}
}
執(zhí)行測(cè)試類,我們可以看到輸出,分別去執(zhí)行了我們的事件訂閱的方法。
send email msg1
send sms msg1
send email msg2
send sms msg2
使用起來(lái)你會(huì)發(fā)現(xiàn)非常簡(jiǎn)單,對(duì)于希望輕量級(jí)簡(jiǎn)單地做到解耦使用EventBus非常合適。
注意別踩坑
首先,注意一下例子中的參數(shù)都是Long類型,如果事件的參數(shù)是其他類型的話,那么消息是無(wú)法接受到的,比如我們把下單中發(fā)送的訂單ID改成String類型然后會(huì)發(fā)現(xiàn)沒(méi)有消費(fèi)了,因?yàn)槲覀儧](méi)有定義一個(gè)參數(shù)類型是String的方法。
public class BookingService {
private static AsyncEventBus eventBus = new AsyncEventBus(Executors.newFixedThreadPool(3));
public void booking(){
//業(yè)務(wù)邏輯
String bookingId = "2";
eventBus.register(new EmailMsgHandler());
eventBus.register(new SmsMsgHandler());
eventBus.post(bookingId);
}
}
//輸出
send email msg1
send sms msg1
去EmailMsgHandler和SmsMsgHandler都新增一個(gè)接收String類型的訂閱方法,這樣就可以接收到了。
@Subscribe
public void handle(String businessId) {
System.out.println("send email msg for string" + businessId);
}
@Subscribe
public void handle(String businessId) {
System.out.println("send sms msg for string" + businessId);
}
//輸出
send sms msg1
send email msg1
send email msg for string2
send sms msg for string2
除此之外,其實(shí)我們可以定義一個(gè)DeadEvent來(lái)處理這種情況,它相當(dāng)于是一個(gè)默認(rèn)的處理方式,當(dāng)沒(méi)有匹配的事件類型參數(shù)的話就會(huì)默認(rèn)發(fā)送一個(gè)DeadEvent事件。
定義一個(gè)默認(rèn)處理器。
public class DefaultEventHandler {
@Subscribe
public void handle(DeadEvent event) {
System.out.println("no subscriber," + event);
}
}
給BookingService新增一個(gè)pay()支付方法,下單完了去支付,注冊(cè)我們的默認(rèn)事件。
public void pay(){
//業(yè)務(wù)邏輯
eventBus.register(new DefaultEventHandler());
eventBus.post(new Payment(UUID.randomUUID().toString()));
}
@ToString
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Payment {
private String paymentId;
}
執(zhí)行測(cè)試bookingService.pay()看到輸出結(jié)果:
no subscriber,DeadEvent{source=AsyncEventBus{default}, event=Payment(paymentId=255da942-7128-4bd1-baca-f0a8e569ed88)}
源碼分析
OK,簡(jiǎn)單的介紹就到這里,那其實(shí)到目前為止我們說(shuō)的這個(gè)都是同步調(diào)用的,這不太符合我們的要求,我們當(dāng)然使用異步處理更好。
那就看看源碼它是怎么實(shí)現(xiàn)的。
@Beta
public class EventBus {
private static final Logger logger = Logger.getLogger(EventBus.class.getName());
private final String identifier;
private final Executor executor;
private final SubscriberExceptionHandler exceptionHandler;
private final SubscriberRegistry subscribers = new SubscriberRegistry(this);
private final Dispatcher dispatcher;
public EventBus() {
this("default");
}
public EventBus(String identifier) {
this(
identifier,
MoreExecutors.directExecutor(),
Dispatcher.perThreadDispatchQueue(),
LoggingHandler.INSTANCE);
}
}
identifier就是個(gè)名字,標(biāo)記,默認(rèn)就是default。
executor執(zhí)行器,默認(rèn)創(chuàng)建一個(gè)MoreExecutors.directExecutor(),事件訂閱者根據(jù)你自己提供的executor來(lái)決定如何執(zhí)行事件訂閱的處理方式。
exceptionHandler是異常處理器,默認(rèn)創(chuàng)建的就是打點(diǎn)日志。
subscribers就是我們的消費(fèi)者,訂閱者。
dispatcher用來(lái)做事件分發(fā)。
默認(rèn)創(chuàng)建的executor是一個(gè)MoreExecutors.directExecutor(),看到command.run()你就會(huì)發(fā)現(xiàn)他這不就是同步執(zhí)行嘛。
public static Executor directExecutor() {
return DirectExecutor.INSTANCE;
}
private enum DirectExecutor implements Executor {
INSTANCE;
@Override
public void execute(Runnable command) {
command.run();
}
@Override
public String toString() {
return "MoreExecutors.directExecutor()";
}
同步執(zhí)行還是不太好,我們希望不光給我們解耦,還要異步執(zhí)行,EventBus給我們提供了AsyncEventBus,Executor我們自己傳入就好了。
public class AsyncEventBus extends EventBus {
public AsyncEventBus(String identifier, Executor executor) {
super(identifier, executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
}
public AsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) {
super("default", executor, Dispatcher.legacyAsync(), subscriberExceptionHandler);
}
public AsyncEventBus(Executor executor) {
super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
}
上面的代碼我們改成異步的,這樣不就好起來(lái)了嘛,這樣的話,實(shí)際上可以結(jié)合我們自己的線程池來(lái)處理了。
private static AsyncEventBus eventBus = new AsyncEventBus(Executors.newFixedThreadPool(3));
OK,這個(gè)說(shuō)清楚了,我們可以順便再看看事件分發(fā)的處理,看到DeadEvent了吧,沒(méi)有當(dāng)前事件的訂閱者,就會(huì)發(fā)送一個(gè)DeadEvent事件,bingo!
public void post(Object event) {
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
if (eventSubscribers.hasNext()) {
dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
post(new DeadEvent(this, event));
}
}
總結(jié)
OK,這個(gè)使用和源碼還是比較簡(jiǎn)單的哈,有興趣的同學(xué)可以自己去瞅瞅,花不了多少工夫。
總的來(lái)說(shuō),EventBus就是提供了我們一個(gè)更優(yōu)雅的代碼解耦的方式,實(shí)際工作中的業(yè)務(wù)你肯定能用上它!
送書(shū)
又到了送書(shū)環(huán)節(jié)了!
本次送書(shū)是3本《Spring Cloud Alibaba微服務(wù)實(shí)戰(zhàn)》(前幾天朋友圈送過(guò)2本,這次繼續(xù)再來(lái)),其實(shí)我們知道現(xiàn)在Spring Cloud可能會(huì)慢慢被淘汰了,未來(lái)可能Spring Cloud Alibaba會(huì)是個(gè)大主流,目前更新非常勤快,所以提前學(xué)習(xí)是個(gè)好事。
抽獎(jiǎng)方式:還是老樣子,關(guān)注我的小號(hào)《互聯(lián)網(wǎng)日記》,回復(fù)923即可參與抽獎(jiǎng),開(kāi)獎(jiǎng)時(shí)間本周五下午6點(diǎn)鐘,大家參與起來(lái)。

