<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          用了SpringBoot的發(fā)布訂閱模式,代碼看著高級多了

          共 14867字,需瀏覽 30分鐘

           ·

          2022-11-27 19:09

          大家好,我是小富~

          在項(xiàng)目里,經(jīng)常會有一些主線業(yè)務(wù)之外的其它業(yè)務(wù),比如,下單之后,發(fā)送通知、監(jiān)控埋點(diǎn)、記錄日志……

          這些非核心業(yè)務(wù),如果全部一梭子寫下去,有兩個(gè)問題,一個(gè)是業(yè)務(wù)耦合,一個(gè)是串行耗時(shí)。

          e31e1e582e5c56da708ec1e25eb8f2c8.webp下單之后的邏輯

          所以,一般在開發(fā)的時(shí)候,都會把這些操作?抽象成觀察者模式,也就是發(fā)布/訂閱模式(這里就不討論觀察者模式和發(fā)布/訂閱模式的不同),而且一般會采用多線程的方式來異步執(zhí)行這些觀察者方法。

          93076387489c1273f85afef1b2a7db83.webp觀察者模式

          一開始,我們都是自己去寫觀察者模式。

          自己實(shí)現(xiàn)觀察者模式 e89ae5215b071aa0a6c24b7ead0aeb3d.webp觀察者簡圖

          觀察者

          • 觀察者定義接口
                
                /**
          ?*?@Author:?fighter3
          ?*?@Description:?觀察者接口
          ?*?@Date:?2022/11/7?11:40?下午
          ?*/

          public?interface?OrderObserver?{

          ????void?afterPlaceOrder(PlaceOrderMessage?placeOrderMessage);
          }
          • 具體觀察者

                        
                        @Slf4j
            public?class?OrderMetricsObserver?implements?OrderObserver?{

            ????@Override
            ????public?void?afterPlaceOrder(PlaceOrderMessage?placeOrderMessage)?{
            ????????log.info("[afterPlaceOrder]?metrics");
            ????}
            }
                        
                        @Slf4j
            public?class?OrderLogObserver?implements?OrderObserver{

            ????@Override
            ????public?void?afterPlaceOrder(PlaceOrderMessage?placeOrderMessage)?{
            ????????log.info("[afterPlaceOrder]?log.");
            ????}
            }
                        
                        @Slf4j
            public?class?OrderNotifyObserver?implements?OrderObserver{

            ????@Override
            ????public?void?afterPlaceOrder(PlaceOrderMessage?placeOrderMessage)?{
            ????????log.info("[afterPlaceOrder]?notify.");
            ????}
            }
            • 業(yè)務(wù)通知觀察者
            • 日志記錄觀察者
            • 監(jiān)控埋點(diǎn)觀察者

          被觀察者

          • 消息實(shí)體定義
                
                @Data
          public?class?PlaceOrderMessage?implements?Serializable?{
          ????/**
          ?????*?訂單號
          ?????*/

          ????private?String?orderId;
          ????/**
          ?????*?訂單狀態(tài)
          ?????*/

          ????private?Integer?orderStatus;
          ????/**
          ?????*?下單用戶ID
          ?????*/

          ????private?String?userId;
          ????//……
          }
          • 被觀察者抽象類
                
                public?abstract?class?OrderSubject?{
          ????//定義一個(gè)觀察者列表
          ????private?List<OrderObserver>?orderObserverList?=?new?ArrayList<>();
          ????//定義一個(gè)線程池,這里參數(shù)隨便寫的
          ????ThreadPoolExecutor?threadPoolExecutor?=?new?ThreadPoolExecutor(6,?12,?6,?TimeUnit.SECONDS,?new?ArrayBlockingQueue<>(30));

          ????//增加一個(gè)觀察者
          ????public?void?addObserver(OrderObserver?o)?{
          ????????this.orderObserverList.add(o);
          ????}

          ????//刪除一個(gè)觀察者
          ????public?void?delObserver(OrderObserver?o)?{
          ????????this.orderObserverList.remove(o);
          ????}

          ????//通知所有觀察者
          ????public?void?notifyObservers(PlaceOrderMessage?placeOrderMessage)?{
          ????????for?(OrderObserver?orderObserver?:?orderObserverList)?{
          ????????????//利用多線程異步執(zhí)行
          ????????????threadPoolExecutor.execute(()?->?{
          ????????????????orderObserver.afterPlaceOrder(placeOrderMessage);
          ????????????});
          ????????}
          ????}
          }

          這里利用了多線程,來異步執(zhí)行觀察者。

          • 被觀察者實(shí)現(xiàn)類
                
                /**
          ?*?@Author:?fighter3
          ?*?@Description:?訂單實(shí)現(xiàn)類-被觀察者實(shí)現(xiàn)類
          ?*?@Date:?2022/11/7?11:52?下午
          ?*/

          @Service
          @Slf4j
          public?class?OrderServiceImpl?extends?OrderSubject?implements?OrderService?{

          ????/**
          ?????*?下單
          ?????*/

          ????@Override
          ????public?PlaceOrderResVO?placeOrder(PlaceOrderReqVO?reqVO)?{
          ????????PlaceOrderResVO?resVO?=?new?PlaceOrderResVO();
          ????????//添加觀察者
          ????????this.addObserver(new?OrderMetricsObserver());
          ????????this.addObserver(new?OrderLogObserver());
          ????????this.addObserver(new?OrderNotifyObserver());
          ????????//通知觀察者
          ????????this.notifyObservers(new?PlaceOrderMessage());
          ????????log.info("[placeOrder]?end.");
          ????????return?resVO;
          ????}
          }

          測試

                
                ????@Test
          ????@DisplayName("下單")
          ????void?placeOrder()?{
          ????????PlaceOrderReqVO?placeOrderReqVO?=?new?PlaceOrderReqVO();
          ????????orderService.placeOrder(placeOrderReqVO);
          ????}
          • 測試執(zhí)行結(jié)果
                
                2022-11-08?00:11:13.617??INFO?20235?---?[pool-1-thread-1]?c.f.obverser.OrderMetricsObserver????????:?[afterPlaceOrder]?metrics
          2022-11-08?00:11:13.618??INFO?20235?---?[???????????main]?cn.fighter3.obverser.OrderServiceImpl????:?[placeOrder]?end.
          2022-11-08?00:11:13.618??INFO?20235?---?[pool-1-thread-3]?c.fighter3.obverser.OrderNotifyObserver??:?[afterPlaceOrder]?notify.
          2022-11-08?00:11:13.617??INFO?20235?---?[pool-1-thread-2]?cn.fighter3.obverser.OrderLogObserver????:?[afterPlaceOrder]?log.

          可以看到,觀察者是異步執(zhí)行的。

          利用Spring精簡

          可以看到,觀察者模式寫起來還是比較簡單的,但是既然都用到了Spring來管理Bean的生命周期,代碼還可以更精簡一些。

          1f859c149b8128770c698e4ea5944167.webpSpring精簡觀察者模式

          觀察者實(shí)現(xiàn)類:定義成Bean

          • OrderLogObserver

                        
                        @Slf4j
            @Service
            public?class?OrderLogObserver?implements?OrderObserver?{

            ????@Override
            ????public?void?afterPlaceOrder(PlaceOrderMessage?placeOrderMessage)?{
            ????????log.info("[afterPlaceOrder]?log.");
            ????}
            }
          • OrderMetricsObserver

                
                @Slf4j
          @Service
          public?class?OrderMetricsObserver?implements?OrderObserver?{

          ????@Override
          ????public?void?afterPlaceOrder(PlaceOrderMessage?placeOrderMessage)?{
          ????????log.info("[afterPlaceOrder]?metrics");
          ????}
          }
          • OrderNotifyObserver
                
                @Slf4j
          @Service
          public?class?OrderNotifyObserver?implements?OrderObserver?{

          ????@Override
          ????public?void?afterPlaceOrder(PlaceOrderMessage?placeOrderMessage)?{
          ????????log.info("[afterPlaceOrder]?notify.");
          ????}
          }

          被觀察者:自動注入Bean

          • OrderSubject

                        
                        public?abstract?class?OrderSubject?{
            ????/**
            ?????*?利用Spring的特性直接注入觀察者
            ?????*/

            ????@Autowired
            ????protected?List<OrderObserver>?orderObserverList;

            ????//定義一個(gè)線程池,這里參數(shù)隨便寫的
            ????ThreadPoolExecutor?threadPoolExecutor?=?new?ThreadPoolExecutor(6,?12,?6,?TimeUnit.SECONDS,?new?ArrayBlockingQueue<>(30));

            ????//通知所有觀察者
            ????public?void?notifyObservers(PlaceOrderMessage?placeOrderMessage)?{
            ????????for?(OrderObserver?orderObserver?:?orderObserverList)?{
            ????????????//利用多線程異步執(zhí)行
            ????????????threadPoolExecutor.execute(()?->?{
            ????????????????orderObserver.afterPlaceOrder(placeOrderMessage);
            ????????????});
            ????????}
            ????}
            }
          • OrderServiceImpl

                
                @Service
          @Slf4j
          public?class?OrderServiceImpl?extends?OrderSubject?implements?OrderService?{

          ????/**
          ?????*?實(shí)現(xiàn)類里也要注入一下
          ?????*/

          ????@Autowired
          ????private?List<OrderObserver>?orderObserverList;

          ????/**
          ?????*?下單
          ?????*/

          ????@Override
          ????public?PlaceOrderResVO?placeOrder(PlaceOrderReqVO?reqVO)?{
          ????????PlaceOrderResVO?resVO?=?new?PlaceOrderResVO();
          ????????//通知觀察者
          ????????this.notifyObservers(new?PlaceOrderMessage());
          ????????log.info("[placeOrder]?end.");
          ????????return?resVO;
          ????}
          }

          這樣一來,發(fā)現(xiàn)被觀察者又簡潔了很多,但是后來我發(fā)現(xiàn),在SpringBoot項(xiàng)目里,利用Spring事件驅(qū)動驅(qū)動模型(event)模型來實(shí)現(xiàn),更加地簡練。

          Spring Event實(shí)現(xiàn)發(fā)布/訂閱模式

          Spring Event對發(fā)布/訂閱模式進(jìn)行了封裝,使用起來更加簡單,還是以我們這個(gè)場景為例,看看怎么來實(shí)現(xiàn)吧。

          自定義事件

          • PlaceOrderEvent:繼承ApplicationEvent,并重寫構(gòu)造函數(shù)。ApplicationEvent是Spring提供的所有應(yīng)用程序事件擴(kuò)展類。
                
                public?class?PlaceOrderEvent?extends?ApplicationEvent?{

          ????public?PlaceOrderEvent(PlaceOrderEventMessage?source)?{
          ????????super(source);
          ????}
          }
          • PlaceOrderEventMessage:事件消息,定義了事件的消息體。
                
                @Data
          public?class?PlaceOrderEventMessage?implements?Serializable?{
          ????/**
          ?????*?訂單號
          ?????*/

          ????private?String?orderId;
          ????/**
          ?????*?訂單狀態(tài)
          ?????*/

          ????private?Integer?orderStatus;
          ????/**
          ?????*?下單用戶ID
          ?????*/

          ????private?String?userId;
          ????//……
          }

          事件監(jiān)聽者

          事件監(jiān)聽者,有兩種實(shí)現(xiàn)方式,一種是實(shí)現(xiàn)ApplicationListener接口,另一種是使用@EventListener注解。

          43a560f98906ca9eb614a9c5c8ba8f08.webp事件監(jiān)聽者實(shí)現(xiàn)

          實(shí)現(xiàn)ApplicationListener接口

          實(shí)現(xiàn)ApplicationListener接口,重寫onApplicationEvent方法,將類定義為Bean,這樣,一個(gè)監(jiān)聽者就完成了。

          • OrderLogListener
                
                @Slf4j
          @Service
          public?class?OrderLogListener?implements?ApplicationListener<PlaceOrderEvent>?{

          ????@Override
          ????public?void?onApplicationEvent(PlaceOrderEvent?event)?{
          ????????log.info("[afterPlaceOrder]?log.");
          ????}
          }
          • OrderMetricsListener
                
                @Slf4j
          @Service
          public?class?OrderMetricsListener?implements?ApplicationListener<PlaceOrderEvent>?{

          ????@Override
          ????public?void?onApplicationEvent(PlaceOrderEvent?event)?{
          ????????log.info("[afterPlaceOrder]?metrics");
          ????}
          }
          • OrderNotifyListener
                
                @Slf4j
          @Service
          public?class?OrderNotifyListener?implements?ApplicationListener<PlaceOrderEvent>?{

          ????@Override
          ????public?void?onApplicationEvent(PlaceOrderEvent?event)?{
          ????????log.info("[afterPlaceOrder]?notify.");
          ????}
          }

          使用@EventListener注解

          使用@EventListener注解就更簡單了,直接在方法上,加上@EventListener注解就行了。

          • OrderLogListener

                        
                        @Slf4j
            @Service
            public?class?OrderLogListener??{

            ????@EventListener
            ????public?void?orderLog(PlaceOrderEvent?event)?{
            ????????log.info("[afterPlaceOrder]?log.");
            ????}
            }
          • OrderMetricsListener

                        
                        @Slf4j
            @Service
            public?class?OrderMetricsListener?{

            ????@EventListener
            ????public?void?metrics(PlaceOrderEvent?event)?{
            ????????log.info("[afterPlaceOrder]?metrics");
            ????}
            }
          • OrderNotifyListener

                        
                        @Slf4j
            @Service
            public?class?OrderNotifyListener{

            ????@EventListener
            ????public?void?notify(PlaceOrderEvent?event)?{
            ????????log.info("[afterPlaceOrder]?notify.");
            ????}
            }

          異步和自定義線程池

          異步執(zhí)行

          異步執(zhí)行也非常簡單,使用Spring的異步注解@Async就可以了。例如:

          • OrderLogListener
                
                @Slf4j
          @Service
          public?class?OrderLogListener??{

          ????@EventListener
          ????@Async
          ????public?void?orderLog(PlaceOrderEvent?event)?{
          ????????log.info("[afterPlaceOrder]?log.");
          ????}
          }

          當(dāng)然,還需要開啟異步,SpringBoot項(xiàng)目默認(rèn)是沒有開啟異步的,我們需要手動配置開啟異步功能,很簡單,只需要在配置類上加上@EnableAsync注解就行了,這個(gè)注解用于聲明啟用Spring的異步方法執(zhí)行功能,需要和@Configuration注解一起使用,也可以直接加在啟動類上。

                
                @SpringBootApplication
          @EnableAsync
          public?class?DailyApplication?{

          ????public?static?void?main(String[]?args)?{
          ????????SpringApplication.run(DairlyLearnApplication.class,?args);
          ????}

          }

          自定義線程池

          使用@Async的時(shí)候,一般都會自定義線程池,因?yàn)?code style="font-size:14px;background-color:rgba(27,31,35,.05);font-family:'Operator Mono', Consolas, Monaco, Menlo, monospace;color:rgb(40,202,113);">@Async的默認(rèn)線程池為SimpleAsyncTaskExecutor,不是真的線程池,這個(gè)類不重用線程,默認(rèn)每次調(diào)用都會創(chuàng)建一個(gè)新的線程。

          自定義線程池有三種方式:

          e35b373ef6774284f47636c44035b1f4.webp@Async自定義線程池
          • 實(shí)現(xiàn)接口AsyncConfigurer
          • 繼承AsyncConfigurerSupport
          • 配置由自定義的TaskExecutor替代內(nèi)置的任務(wù)執(zhí)行器

          我們來看看三種寫法:

          • 實(shí)現(xiàn)接口AsyncConfigurer
                
                @Configuration
          @Slf4j
          public?class?AsyncConfiguration?implements?AsyncConfigurer?{

          ????@Bean("fighter3AsyncExecutor")
          ????public?ThreadPoolTaskExecutor?executor()?{
          ????????//Spring封裝的一個(gè)線程池
          ????????ThreadPoolTaskExecutor?executor?=?new?ThreadPoolTaskExecutor();
          ????????//隨便寫的一些配置
          ????????executor.setCorePoolSize(10);
          ????????executor.setMaxPoolSize(50);
          ????????executor.setQueueCapacity(30);
          ????????executor.setRejectedExecutionHandler(new?ThreadPoolExecutor.CallerRunsPolicy());
          ????????executor.setThreadNamePrefix("fighter3AsyncExecutor-");
          ????????executor.initialize();
          ????????return?executor;
          ????}

          ????@Override
          ????public?Executor?getAsyncExecutor()?{
          ????????return?executor();
          ????}

          ????@Override
          ????public?AsyncUncaughtExceptionHandler?getAsyncUncaughtExceptionHandler()?{
          ????????return?(ex,?method,?params)?->?log.error(String.format("[async]?task{}?error:",?method),?ex);
          ????}
          }

          • 繼承AsyncConfigurerSupport
                
                @Configuration
          @Slf4j
          public?class?SpringAsyncConfigurer?extends?AsyncConfigurerSupport?{

          ????@Bean
          ????public?ThreadPoolTaskExecutor?asyncExecutor()?{
          ????????ThreadPoolTaskExecutor?threadPool?=?new?ThreadPoolTaskExecutor();
          ????????//隨便寫的一些配置
          ????????threadPool.setCorePoolSize(10);
          ????????threadPool.setMaxPoolSize(30);
          ????????threadPool.setWaitForTasksToCompleteOnShutdown(true);
          ????????threadPool.setAwaitTerminationSeconds(60?*?15);
          ????????return?threadPool;
          ????}

          ????@Override
          ????public?Executor?getAsyncExecutor()?{
          ????????return?asyncExecutor();
          ????}

          ????@Override
          ????public?AsyncUncaughtExceptionHandler?getAsyncUncaughtExceptionHandler()?{
          ????????return?(ex,?method,?params)?->?log.error(String.format("[async]?task{}?error:",?method),?ex);
          ????}
          }
          • 配置自定義的TaskExecutor

                        
                        @Slf4j
            @Service
            public?class?OrderLogListener??{

            ????@EventListener
            ????@Async("asyncExecutor")
            ????public?void?orderLog(PlaceOrderEvent?event)?{
            ????????log.info("[afterPlaceOrder]?log.");
            ????}
            }
            • 配置線程池

                            
                            @Configuration
              public?class?TaskPoolConfig?{

              ????@Bean(name?=?"asyncExecutor")
              ????public?Executor?taskExecutor()?{
              ????????ThreadPoolTaskExecutor?executor?=?new?ThreadPoolTaskExecutor();
              ????????//隨便寫的一些配置
              ????????executor.setCorePoolSize(10);
              ????????executor.setMaxPoolSize(20);
              ????????executor.setQueueCapacity(200);
              ????????executor.setKeepAliveSeconds(60);
              ????????executor.setThreadNamePrefix("asyncExecutor-");
              ????????executor.setRejectedExecutionHandler(new?ThreadPoolExecutor.CallerRunsPolicy());
              ????????return?executor;
              ????}
              }
            • 使用@Async注解的時(shí)候,指定線程池,推薦使用這種方式,因?yàn)樵陧?xiàng)目里,盡量做到線程池隔離,不同的任務(wù)使用不同的線程池

          異步和自定義線程池這一部分只是一些擴(kuò)展,稍微占了一些篇幅,大家可不要覺得Spring Event用起來很繁瑣。


          發(fā)布事件

          發(fā)布事件也非常簡單,只需要使用Spring 提供的ApplicationEventPublisher來發(fā)布自定義事件。

          • OrderServiceImpl

                        
                        @Service
            @Slf4j
            public?class?OrderServiceImpl?implements?OrderService?{
            ????@Autowired
            ????private?ApplicationEventPublisher?applicationEventPublisher;

            ????/**
            ?????*?下單
            ?????*/

            ????@Override
            ????public?PlaceOrderResVO?placeOrder(PlaceOrderReqVO?reqVO)?{
            ????????log.info("[placeOrder]?start.");
            ????????PlaceOrderResVO?resVO?=?new?PlaceOrderResVO();
            ????????//消息
            ????????PlaceOrderEventMessage?eventMessage?=?new?PlaceOrderEventMessage();
            ????????//發(fā)布事件
            ????????applicationEventPublisher.publishEvent(new?PlaceOrderEvent(eventMessage));
            ????????log.info("[placeOrder]?end.");
            ????????return?resVO;
            ????}
            }

          在Idea里查看事件的監(jiān)聽者也比較方便,點(diǎn)擊下面圖中的圖標(biāo),就可以查看監(jiān)聽者。

          e6654c09ab82dc99a4bbd96f4d543137.webp查看監(jiān)聽者9329db6dae3b677a95468e0e9b50a233.webp監(jiān)聽者

          測試

          最后,我們還是測試一下。

                
                ????@Test
          ????void?placeOrder()?{
          ????????PlaceOrderReqVO?placeOrderReqVO?=?new?PlaceOrderReqVO();
          ????????orderService.placeOrder(placeOrderReqVO);
          ????}
          • 執(zhí)行結(jié)果
                
                2022-11-08?10:05:14.415??INFO?22674?---?[???????????main]?c.f.o.event.event.OrderServiceImpl???????:?[placeOrder]?start.
          2022-11-08?10:05:14.424??INFO?22674?---?[???????????main]?c.f.o.event.event.OrderServiceImpl???????:?[placeOrder]?end.
          2022-11-08?10:05:14.434??INFO?22674?---?[sync-executor-3]?c.f.o.event.event.OrderNotifyListener????:?[afterPlaceOrder]?notify.
          2022-11-08?10:05:14.435??INFO?22674?---?[sync-executor-2]?c.f.o.event.event.OrderMetricsListener???:?[afterPlaceOrder]?metrics
          2022-11-08?10:05:14.436??INFO?22674?---?[sync-executor-1]?c.f.o.event.event.OrderLogListener???????:?[afterPlaceOrder]?log.

          可以看到,異步執(zhí)行,而且用到了我們自定義的線程池。

          小結(jié)

          這篇文章里,從最開始自己實(shí)現(xiàn)的觀察者模式,再到利用Spring簡化的觀察者模式,再到使用Spring Event實(shí)現(xiàn)發(fā)布/訂閱模式,可以看到,Spring Event用起來還是比較簡單的。除此之外,還有Guava EventBus這樣的事件驅(qū)動實(shí)現(xiàn),大家更習(xí)慣使用哪種呢?

          ··········? END? ··············

          在看 點(diǎn)贊 轉(zhuǎn)發(fā) ,是對我最大的鼓勵 ? ? ? ?
          瀏覽 47
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  蜜芽亚洲日韩欧美 | 很很五月婷婷 | 亚洲国产精品成人无码区 | www.性欧美 | 久久婷婷视频 |