<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          說(shuō)下你可能沒(méi)用過(guò)的EventBus,再順便送幾本書(shū)好了

          共 10207字,需瀏覽 21分鐘

           ·

          2021-09-25 19:10

          最近在Code Review的時(shí)候發(fā)現(xiàn)了這樣一個(gè)業(yè)務(wù)場(chǎng)景,某個(gè)業(yè)務(wù)處理完成之后需要通知審核人員,通知的方式包含短信和郵件,所以代碼大致是這樣:

          //業(yè)務(wù)校驗(yàn)
          validate();
          //處理業(yè)務(wù)邏輯
          doBusiness();
          //發(fā)送郵件或者發(fā)送其他類型消息
          sendMsg(); 

          這個(gè)對(duì)不對(duì)呢?

          基于這種普遍的業(yè)務(wù)場(chǎng)景來(lái)說(shuō),一般首先我們會(huì)考慮同步或者異步發(fā)送的問(wèn)題。

          同步的話對(duì)接口RT有影響,而且和業(yè)務(wù)邏輯耦合在一起,這樣的做法肯定不太好。

          一般情況下,我們會(huì)做成異步的方式,使用MQ自己發(fā)送自己消費(fèi),或者說(shuō)一個(gè)線程池搞定,這樣的話不影響主業(yè)務(wù)邏輯,可以提高性能,并且代碼做到了解耦。

          然后還有就是數(shù)據(jù)一致性的問(wèn)題,郵件一定要發(fā)送成功嗎?

          大多數(shù)時(shí)候其實(shí)我們并不要求郵件一定100%發(fā)送成功,失敗了就失敗好了,監(jiān)控告警打點(diǎn)做好失敗率不要超過(guò)閾值就好,還有就是消息服務(wù)一旦收到請(qǐng)求應(yīng)該自己保證消息能夠投遞。

          所以總的來(lái)說(shuō),使用MQ發(fā)送消息自己消費(fèi)處理,或者線程池異步處理,最后自己搞個(gè)補(bǔ)償?shù)倪壿嬀湍芴幚砗眠@類問(wèn)題。

          那么,今天要說(shuō)的是這兩個(gè)解決方案之外的處理方式,對(duì)于這種場(chǎng)景其實(shí)我們可以用EventBus來(lái)解決。

          EventBus使用

          看名字就知道,EventBus是事件總線的意思,它是Google Guava庫(kù)的一個(gè)工具,基于觀察者模式可以做到進(jìn)程內(nèi)的代碼解耦作用。

          就拿上面的例子來(lái)說(shuō),引入一個(gè)MQ太重了,其實(shí)不太需要這樣做,EventBus也能達(dá)到這個(gè)效果,和MQ相比他只能提供進(jìn)程內(nèi)的消息事件傳遞,這對(duì)于我們這種業(yè)務(wù)場(chǎng)景來(lái)說(shuō)足夠了不是嗎?

          我們先看EventBus怎么來(lái)使用,一般先創(chuàng)建一個(gè)EventBus實(shí)例。

          //1.創(chuàng)建EventBus
          private static EventBus eventBus = new EventBus();

          第二步,創(chuàng)建一個(gè)事件消息訂閱者,處理方式非常簡(jiǎn)單,只要在我們希望去處理事件的方法上加上@Subscribe注解即可。

          形參只能有一個(gè),如果定義0個(gè)或者多個(gè)的話運(yùn)行就會(huì)報(bào)錯(cuò)。

          public class EmailMsgHandler {

              @Subscribe
              public void handle(Long businessId) {
                  System.out.println("send email msg" + businessId);
              }
          }

          第三步,注冊(cè)事件。

          eventBus.register(new EmailMsgHandler());

          第四步,發(fā)送事件。

          eventBus.post(1L);

          這就是一個(gè)EventBus使用的最簡(jiǎn)單例子,下面我們看看結(jié)合開(kāi)頭說(shuō)的例子怎么處理。

          結(jié)合實(shí)際

          比如上面說(shuō)的案例,舉例來(lái)說(shuō)比如注冊(cè)和用戶下單的場(chǎng)景,都需要發(fā)送消息和郵件給用戶。

          EventBus并不強(qiáng)制說(shuō)我們一定要用單例模式,因?yàn)樗膭?chuàng)建銷(xiāo)毀成本比較低,所以更多是根據(jù)我們的業(yè)務(wù)場(chǎng)景和上下文自己來(lái)選擇。

          public class UserService {
              private static EventBus eventBus = new EventBus();

              public void regist(){
                  Long userId = 1L;
                  eventBus.register(new EmailMsgHandler());
                  eventBus.register(new SmsMsgHandler());
                  eventBus.post(userId);
              }
          }

          public class BookingService {
              private static EventBus eventBus = new EventBus();

              public void booking(){
                  //業(yè)務(wù)邏輯
                  Long bookingId = 2L;
                  eventBus.register(new EmailMsgHandler());
                  eventBus.register(new SmsMsgHandler());
                  eventBus.post(bookingId);
              }
          }

          然后在業(yè)務(wù)邏輯處理完成之后,分別去注冊(cè)了郵件和短信兩個(gè)事件訂閱者。

          public class EmailMsgHandler {

              @Subscribe
              public void handle(Long businessId) {
                  System.out.println("send email msg" + businessId);
              }
          }

          public class SmsMsgHandler {

              @Subscribe
              public void handle(Long businessId) {
                  System.out.println("send sms msg" + businessId);
              }
          }

          最后我們發(fā)送事件,用戶注冊(cè)我們發(fā)送了一個(gè)用戶ID,下單成功我們發(fā)送了一個(gè)訂單ID。

          再寫(xiě)一個(gè)測(cè)試類去測(cè)試一下,分別創(chuàng)建兩個(gè)service,然后分別調(diào)用方法。

          public class EventBusTest {

              public static void main(String[] args) {
                  UserService userService = new UserService();
                  userService.regist();

                  BookingService bookingService = new BookingService();
                  bookingService.booking();

              }
          }

          執(zhí)行測(cè)試類,我們可以看到輸出,分別去執(zhí)行了我們的事件訂閱的方法。

          send email msg1
          send sms msg1
          send email msg2
          send sms msg2

          使用起來(lái)你會(huì)發(fā)現(xiàn)非常簡(jiǎn)單,對(duì)于希望輕量級(jí)簡(jiǎn)單地做到解耦使用EventBus非常合適。

          注意別踩坑

          首先,注意一下例子中的參數(shù)都是Long類型,如果事件的參數(shù)是其他類型的話,那么消息是無(wú)法接受到的,比如我們把下單中發(fā)送的訂單ID改成String類型然后會(huì)發(fā)現(xiàn)沒(méi)有消費(fèi)了,因?yàn)槲覀儧](méi)有定義一個(gè)參數(shù)類型是String的方法。

          public class BookingService {
              private static AsyncEventBus eventBus = new AsyncEventBus(Executors.newFixedThreadPool(3));

              public void booking(){
                  //業(yè)務(wù)邏輯
                  String bookingId = "2";
                  eventBus.register(new EmailMsgHandler());
                  eventBus.register(new SmsMsgHandler());
                  eventBus.post(bookingId);
              }
          }
          //輸出
          send email msg1
          send sms msg1

          EmailMsgHandlerSmsMsgHandler都新增一個(gè)接收String類型的訂閱方法,這樣就可以接收到了。

          @Subscribe
          public void handle(String businessId) {
           System.out.println("send email msg for string" + businessId);
          }

          @Subscribe
          public void handle(String businessId) {
           System.out.println("send sms msg for string" + businessId);
          }

          //輸出
          send sms msg1
          send email msg1
          send email msg for string2
          send sms msg for string2

          除此之外,其實(shí)我們可以定義一個(gè)DeadEvent來(lái)處理這種情況,它相當(dāng)于是一個(gè)默認(rèn)的處理方式,當(dāng)沒(méi)有匹配的事件類型參數(shù)的話就會(huì)默認(rèn)發(fā)送一個(gè)DeadEvent事件。

          定義一個(gè)默認(rèn)處理器。

          public class DefaultEventHandler {

              @Subscribe
              public void handle(DeadEvent event) {
                  System.out.println("no subscriber," + event);
              }

          }

          BookingService新增一個(gè)pay()支付方法,下單完了去支付,注冊(cè)我們的默認(rèn)事件。

          public void pay(){
            //業(yè)務(wù)邏輯
            eventBus.register(new DefaultEventHandler());
            eventBus.post(new Payment(UUID.randomUUID().toString()));
          }

          @ToString
          @Data
          @NoArgsConstructor
          @AllArgsConstructor
          public class Payment {
              private String paymentId;
          }

          執(zhí)行測(cè)試bookingService.pay()看到輸出結(jié)果:

          no subscriber,DeadEvent{source=AsyncEventBus{default}, event=Payment(paymentId=255da942-7128-4bd1-baca-f0a8e569ed88)}

          源碼分析

          OK,簡(jiǎn)單的介紹就到這里,那其實(shí)到目前為止我們說(shuō)的這個(gè)都是同步調(diào)用的,這不太符合我們的要求,我們當(dāng)然使用異步處理更好。

          那就看看源碼它是怎么實(shí)現(xiàn)的。

          @Beta
          public class EventBus {

            private static final Logger logger = Logger.getLogger(EventBus.class.getName());

            private final String identifier;
            private final Executor executor;
            private final SubscriberExceptionHandler exceptionHandler;

            private final SubscriberRegistry subscribers = new SubscriberRegistry(this);
            private final Dispatcher dispatcher;
            
            public EventBus() {
              this("default");
            }

            public EventBus(String identifier) {
              this(
                  identifier,
                  MoreExecutors.directExecutor(),
                  Dispatcher.perThreadDispatchQueue(),
                  LoggingHandler.INSTANCE);
            }
          }

          identifier就是個(gè)名字,標(biāo)記,默認(rèn)就是default。

          executor執(zhí)行器,默認(rèn)創(chuàng)建一個(gè)MoreExecutors.directExecutor(),事件訂閱者根據(jù)你自己提供的executor來(lái)決定如何執(zhí)行事件訂閱的處理方式。

          exceptionHandler是異常處理器,默認(rèn)創(chuàng)建的就是打點(diǎn)日志。

          subscribers就是我們的消費(fèi)者,訂閱者。

          dispatcher用來(lái)做事件分發(fā)。

          默認(rèn)創(chuàng)建的executor是一個(gè)MoreExecutors.directExecutor(),看到command.run()你就會(huì)發(fā)現(xiàn)他這不就是同步執(zhí)行嘛。

          public static Executor directExecutor() {
           return DirectExecutor.INSTANCE;
          }

          private enum DirectExecutor implements Executor {
           INSTANCE;

          @Override
          public void execute(Runnable command) {
           command.run();
          }

          @Override
          public String toString() {
           return "MoreExecutors.directExecutor()";
          }

          同步執(zhí)行還是不太好,我們希望不光給我們解耦,還要異步執(zhí)行,EventBus給我們提供了AsyncEventBus,Executor我們自己傳入就好了。

          public class AsyncEventBus extends EventBus {

            public AsyncEventBus(String identifier, Executor executor) {
              super(identifier, executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
            }
           
            public AsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) {
              super("default", executor, Dispatcher.legacyAsync(), subscriberExceptionHandler);
            }

            public AsyncEventBus(Executor executor) {
              super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
            }

          上面的代碼我們改成異步的,這樣不就好起來(lái)了嘛,這樣的話,實(shí)際上可以結(jié)合我們自己的線程池來(lái)處理了。

          private static AsyncEventBus eventBus = new AsyncEventBus(Executors.newFixedThreadPool(3));

          OK,這個(gè)說(shuō)清楚了,我們可以順便再看看事件分發(fā)的處理,看到DeadEvent了吧,沒(méi)有當(dāng)前事件的訂閱者,就會(huì)發(fā)送一個(gè)DeadEvent事件,bingo!

          public void post(Object event) {
            Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
             if (eventSubscribers.hasNext()) {
             dispatcher.dispatch(event, eventSubscribers);
            } else if (!(event instanceof DeadEvent)) {
              // the event had no subscribers and was not itself a DeadEvent
              post(new DeadEvent(this, event));
            }
          }

          總結(jié)

          OK,這個(gè)使用和源碼還是比較簡(jiǎn)單的哈,有興趣的同學(xué)可以自己去瞅瞅,花不了多少工夫。

          總的來(lái)說(shuō),EventBus就是提供了我們一個(gè)更優(yōu)雅的代碼解耦的方式,實(shí)際工作中的業(yè)務(wù)你肯定能用上它!

          送書(shū)

          又到了送書(shū)環(huán)節(jié)了!

          本次送書(shū)是3本Spring Cloud Alibaba微服務(wù)實(shí)戰(zhàn)》(前幾天朋友圈送過(guò)2本,這次繼續(xù)再來(lái)),其實(shí)我們知道現(xiàn)在Spring Cloud可能會(huì)慢慢被淘汰了,未來(lái)可能Spring Cloud Alibaba會(huì)是個(gè)大主流,目前更新非常勤快,所以提前學(xué)習(xí)是個(gè)好事。

          抽獎(jiǎng)方式:還是老樣子,關(guān)注我的小號(hào)《互聯(lián)網(wǎng)日記》,回復(fù)923即可參與抽獎(jiǎng),開(kāi)獎(jiǎng)時(shí)間本周五下午6點(diǎn)鐘,大家參與起來(lái)。


          瀏覽 38
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  人妻精品一区二区 | 天天干 夜夜操 | 天天干天天插 | 女人18片毛片120分钟免费观看 | 亚洲丝袜足交 |