<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          聊聊并發(fā)編程的12種業(yè)務(wù)場景

          共 11096字,需瀏覽 23分鐘

           ·

          2022-05-22 12:44

          前言

          今天聊聊我之前在項目中用并發(fā)編程的12種業(yè)務(wù)場景,給有需要的朋友一個參考。

          1. 簡單定時任務(wù)

          各位親愛的朋友,你沒看錯,Thread類真的能做定時任務(wù)。如果你看過一些定時任務(wù)框架的源碼,你最后會發(fā)現(xiàn),它們的底層也會使用Thread類。

          實現(xiàn)這種定時任務(wù)的具體代碼如下:

          public?static?void?init()?{
          ????new?Thread(()?->?{
          ????????while?(true)?{
          ????????????try?{
          ????????????????System.out.println("下載文件");
          ????????????????Thread.sleep(1000?*?60?*?5);
          ????????????}?catch?(Exception?e)?{
          ????????????????log.error(e);
          ????????????}
          ????????}
          ????}).start();
          }

          使用Thread類可以做最簡單的定時任務(wù),在run方法中有個while的死循環(huán)(當(dāng)然還有其他方式),執(zhí)行我們自己的任務(wù)。有個需要特別注意的地方是,需要用try...catch捕獲異常,否則如果出現(xiàn)異常,就直接退出循環(huán),下次將無法繼續(xù)執(zhí)行了。

          但這種方式做的定時任務(wù),只能周期性執(zhí)行,不能支持定時在某個時間點執(zhí)行。

          特別提醒一下,該線程建議定義成守護(hù)線程,可以通過setDaemon方法設(shè)置,讓它在后臺默默執(zhí)行就好。

          使用場景:比如項目中有時需要每隔5分鐘去下載某個文件,或者每隔10分鐘去讀取模板文件生成靜態(tài)html頁面等等,一些簡單的周期性任務(wù)場景。

          使用Thread類做定時任務(wù)的優(yōu)缺點:

          • 優(yōu)點:這種定時任務(wù)非常簡單,學(xué)習(xí)成本低,容易入手,對于那些簡單的周期性任務(wù),是個不錯的選擇。

          • 缺點:不支持指定某個時間點執(zhí)行任務(wù),不支持延遲執(zhí)行等操作,功能過于單一,無法應(yīng)對一些較為復(fù)雜的場景。

          2.監(jiān)聽器

          有時候,我們需要寫個監(jiān)聽器,去監(jiān)聽某些數(shù)據(jù)的變化。

          比如:我們在使用canal的時候,需要監(jiān)聽binlog的變化,能夠及時把數(shù)據(jù)庫中的數(shù)據(jù),同步到另外一個業(yè)務(wù)數(shù)據(jù)庫中。

          如果直接寫一個監(jiān)聽器去監(jiān)聽數(shù)據(jù)就太沒意思了,我們想實現(xiàn)這樣一個功能:在配置中心有個開關(guān),配置監(jiān)聽器是否開啟,如果開啟了使用單線程異步執(zhí)行。

          主要代碼如下:

          @Service
          public?CanalService?{
          ????private?volatile?boolean?running?=?false;
          ????private?Thread?thread;

          ????@Autowired
          ????private?CanalConnector?canalConnector;
          ????
          ????public?void?handle()?{
          ????????//連接canal
          ????????while(running)?{
          ???????????//業(yè)務(wù)處理
          ????????}
          ????}
          ????
          ????public?void?start()?{
          ???????thread?=?new?Thread(this::handle,?"name");
          ???????running?=?true;
          ???????thread.start();
          ????}
          ????
          ????public?void?stop()?{
          ???????if(!running)?{
          ??????????return;
          ???????}
          ???????running?=?false;
          ????}
          }

          在start方法中開啟了一個線程,在該線程中異步執(zhí)行handle方法的具體任務(wù)。然后通過調(diào)用stop方法,可以停止該線程。

          其中,使用volatile關(guān)鍵字控制的running變量作為開關(guān),它可以控制線程中的狀態(tài)。

          接下來,有個比較關(guān)鍵的點是:如何通過配置中心的配置,控制這個開關(guān)呢?

          apollo配置為例,我們在配置中心的后臺,修改配置之后,自動獲取最新配置的核心代碼如下:

          public?class?CanalConfig?{
          ????@Autowired
          ????private?CanalService?canalService;

          ????@ApolloConfigChangeListener
          ????public?void?change(ConfigChangeEvent?event)?{
          ????????String?value?=?event.getChange("test.canal.enable").getNewValue();
          ????????if(BooleanUtils.toBoolean(value))?{
          ????????????canalService.start();
          ????????}?else?{
          ????????????canalService.stop();
          ????????}
          ????}
          }

          通過apolloApolloConfigChangeListener注解,可以監(jiān)聽配置參數(shù)的變化。

          如果test.canal.enable開關(guān)配置的true,則調(diào)用canalService類的start方法開啟canal數(shù)據(jù)同步功能。如果開關(guān)配置的false,則調(diào)用canalService類的stop方法,自動停止canal數(shù)據(jù)同步功能。

          3.收集日志

          在某些高并發(fā)的場景中,我們需要收集部分用戶的日志(比如:用戶登錄的日志),寫到數(shù)據(jù)庫中,以便于做分析。

          但由于項目中,還沒有引入消息中間件,比如:kafkarocketmq等。

          如果直接將日志同步寫入數(shù)據(jù)庫,可能會影響接口性能。

          所以,大家很自然想到了異步處理。

          實現(xiàn)這個需求最簡單的做法是,開啟一個線程,異步寫入數(shù)據(jù)到數(shù)據(jù)庫即可。

          這樣做,可以是可以。

          但如果用戶登錄操作的耗時,比異步寫入數(shù)據(jù)庫的時間要少得多。這樣導(dǎo)致的結(jié)果是:生產(chǎn)日志的速度,比消費日志的速度要快得多,最終的性能瓶頸在消費端。

          其實,還有更優(yōu)雅的處理方式,雖說沒有使用消息中間件,但借用了它的思想。

          這套記錄登錄日志的功能,分為:日志生產(chǎn)端、日志存儲端和日志消費端。

          如下圖所示:

          先定義了一個阻塞隊列。

          @Component
          public?class?LoginLogQueue?{
          ????private?static?final?int?QUEUE_MAX_SIZE????=?1000;

          ????private?BlockingQueueblockingQueue?queue?=?new?LinkedBlockingQueue<>(QUEUE_MAX_SIZE);

          ????//生成消息
          ????public?boolean?push(LoginLog?loginLog)?{
          ????????return?this.queue.add(loginLog);
          ????}?

          ????//消費消息
          ????public?LoginLog?poll()?{
          ????????LoginLog?loginLog?=?null;
          ????????try?{
          ????????????loginLog?=?this.queue.take();
          ????????}?catch?(InterruptedException?e)?{
          ????????????e.printStackTrace();
          ????????}
          ????????return?result;
          ????}
          }

          然后定義了一個日志的生產(chǎn)者。

          @Service
          public?class?LoginSerivce?{
          ????
          ????@Autowired
          ????private?LoginLogQueue?loginLogQueue;

          ????public?int?login(UserInfo?userInfo)?{
          ????????//業(yè)務(wù)處理
          ????????LoginLog?loginLog?=?convert(userInfo);
          ????????loginLogQueue.push(loginLog);
          ????}??
          }

          接下來,定義了日志的消費者。

          @Service
          public?class?LoginInfoConsumer?{
          ????@Autowired
          ????private?LoginLogQueue?queue;

          ????@PostConstruct
          ????public?voit?init?{
          ???????new?Thread(()?->?{
          ??????????while?(true)?{
          ??????????????LoginLog?loginLog?=?queue.take();
          ??????????????//寫入數(shù)據(jù)庫
          ??????????}
          ????????}).start();
          ????}
          }

          當(dāng)然,這個例子中使用單線程接收登錄日志,為了提升性能,也可以使用線程池來處理業(yè)務(wù)邏輯(比如:寫入數(shù)據(jù)庫)等。

          4.excel導(dǎo)入

          我們可能會經(jīng)常收到運營同學(xué)提過來的excel數(shù)據(jù)導(dǎo)入需求,比如:將某一大類下的所有子類一次性導(dǎo)入系統(tǒng),或者導(dǎo)入一批新的供應(yīng)商數(shù)據(jù)等等。

          我們以導(dǎo)入供應(yīng)商數(shù)據(jù)為例,它所涉及的業(yè)務(wù)流程很長,比如:

          1. 調(diào)用天眼查接口校驗企業(yè)名稱和統(tǒng)一社會信用代碼。
          2. 寫入供應(yīng)商基本表
          3. 寫入組織表
          4. 給供應(yīng)商自動創(chuàng)建一個用戶
          5. 給該用戶分配權(quán)限
          6. 自定義域名
          7. 發(fā)站內(nèi)通知

          等等。

          如果在程序中,解析完excel,讀取了所有數(shù)據(jù)之后。用單線程一條條處理業(yè)務(wù)邏輯,可能耗時會非常長。

          為了提升excel數(shù)據(jù)導(dǎo)入效率,非常有必要使用多線程來處理。

          當(dāng)然在java中實現(xiàn)多線程的手段有很多種,下面重點聊聊java8中最簡單的實現(xiàn)方式:parallelStream

          偽代碼如下:

          supplierList.parallelStream().forEach(x?->?importSupplier(x));

          parallelStream是一個并行執(zhí)行的流,它默認(rèn)通過ForkJoinPool實現(xiàn)的,能提高你的多線程任務(wù)的速度。

          ForkJoinPool處理的過程會分而治之,它的核心思想是:將一個大任務(wù)切分成多個小任務(wù)。每個小任務(wù)都能單獨執(zhí)行,最后它會把所用任務(wù)的執(zhí)行結(jié)果進(jìn)行匯總。

          下面用一張圖簡單介紹一下ForkJoinPool的原理:

          當(dāng)然除了excel導(dǎo)入之外,還有類似的讀取文本文件,也可以用類似的方法處理。

          溫馨的提醒一下,如果一次性導(dǎo)入的數(shù)據(jù)非常多,用多線程處理,可能會使系統(tǒng)的cpu使用率飆升,需要特別關(guān)注。

          5.查詢接口

          很多時候,我們需要在某個查詢接口中,調(diào)用其他服務(wù)的接口,組合數(shù)據(jù)之后,一起返回。

          比如有這樣的業(yè)務(wù)場景:

          在用戶信息查詢接口中需要返回:用戶名稱、性別、等級、頭像、積分、成長值等信息。

          而用戶名稱、性別、等級、頭像在用戶服務(wù)中,積分在積分服務(wù)中,成長值在成長值服務(wù)中。為了匯總這些數(shù)據(jù)統(tǒng)一返回,需要另外提供一個對外接口服務(wù)。

          于是,用戶信息查詢接口需要調(diào)用用戶查詢接口、積分查詢接口 和 成長值查詢接口,然后匯總數(shù)據(jù)統(tǒng)一返回。

          調(diào)用過程如下圖所示:

          調(diào)用遠(yuǎn)程接口總耗時 530ms = 200ms + 150ms + 180ms

          顯然這種串行調(diào)用遠(yuǎn)程接口性能是非常不好的,調(diào)用遠(yuǎn)程接口總的耗時為所有的遠(yuǎn)程接口耗時之和。

          那么如何優(yōu)化遠(yuǎn)程接口性能呢?

          既然串行調(diào)用多個遠(yuǎn)程接口性能很差,為什么不改成并行呢?

          如下圖所示:

          調(diào)用遠(yuǎn)程接口總耗時 200ms = 200ms(即耗時最長的那次遠(yuǎn)程接口調(diào)用)

          在java8之前可以通過實現(xiàn)Callable接口,獲取線程返回結(jié)果。

          java8以后通過CompleteFuture類實現(xiàn)該功能。我們這里以CompleteFuture為例:

          public?UserInfo?getUserInfo(Long?id)?throws?InterruptedException,?ExecutionException?{
          ????final?UserInfo?userInfo?=?new?UserInfo();
          ????CompletableFuture?userFuture?=?CompletableFuture.supplyAsync(()?->?{
          ????????getRemoteUserAndFill(id,?userInfo);
          ????????return?Boolean.TRUE;
          ????},?executor);

          ????CompletableFuture?bonusFuture?=?CompletableFuture.supplyAsync(()?->?{
          ????????getRemoteBonusAndFill(id,?userInfo);
          ????????return?Boolean.TRUE;
          ????},?executor);

          ????CompletableFuture?growthFuture?=?CompletableFuture.supplyAsync(()?->?{
          ????????getRemoteGrowthAndFill(id,?userInfo);
          ????????return?Boolean.TRUE;
          ????},?executor);
          ????CompletableFuture.allOf(userFuture,?bonusFuture,?growthFuture).join();

          ????userFuture.get();
          ????bonusFuture.get();
          ????growthFuture.get();
          ????return?userInfo;
          }

          溫馨提醒一下,這兩種方式別忘了使用線程池。示例中我用到了executor,表示自定義的線程池,為了防止高并發(fā)場景下,出現(xiàn)線程過多的問題。

          6.獲取用戶上下文

          不知道你在項目開發(fā)時,有沒有遇到過這樣的需求:用戶登錄之后,在所有的請求接口中,通過某個公共方法,就能獲取到當(dāng)前登錄用戶的信息?

          獲取的用戶上下文,我們以CurrentUser為例。

          CurrentUser內(nèi)部包含了一個ThreadLocal對象,它負(fù)責(zé)保存當(dāng)前線程的用戶上下文信息。當(dāng)然為了保證在線程池中,也能從用戶上下文中獲取到正確的用戶信息,這里用了阿里的TransmittableThreadLocal。偽代碼如下:

          @Data
          public?class?CurrentUser?{
          ????private?static?final?TransmittableThreadLocal?THREA_LOCAL?=?new?TransmittableThreadLocal<>();
          ????
          ????private?String?id;
          ????private?String?userName;
          ????private?String?password;
          ????private?String?phone;
          ????...
          ????
          ????public?statis?void?set(CurrentUser?user)?{
          ??????THREA_LOCAL.set(user);
          ????}
          ????
          ????public?static?void?getCurrent()?{
          ??????return?THREA_LOCAL.get();
          ????}
          }

          這里為什么用了阿里的TransmittableThreadLocal,而不是普通的ThreadLocal呢?在線程池中,由于線程會被多次復(fù)用,導(dǎo)致從普通的ThreadLocal中無法獲取正確的用戶信息。父線程中的參數(shù),沒法傳遞給子線程,而TransmittableThreadLocal很好解決了這個問題。

          然后在項目中定義一個全局的spring mvc攔截器,專門設(shè)置用戶上下文到ThreadLocal中。偽代碼如下:

          public?class?UserInterceptor?extends?HandlerInterceptorAdapter?{
          ???
          ???@Override??
          ???public?boolean?preHandle(HttpServletRequest?request,?HttpServletResponse?response,?Object?handler)?throws?Exception?{
          ??????CurrentUser?user?=?getUser(request);
          ??????if(Objects.nonNull(user))?{
          ?????????CurrentUser.set(user);
          ??????}
          ???}?
          }

          用戶在請求我們接口時,會先觸發(fā)該攔截器,它會根據(jù)用戶cookie中的token,調(diào)用調(diào)用接口獲取redis中的用戶信息。如果能獲取到,說明用戶已經(jīng)登錄,則把用戶信息設(shè)置到CurrentUser類的ThreadLocal中。

          接下來,在api服務(wù)的下層,即business層的方法中,就能輕松通過CurrentUser.getCurrent();方法獲取到想要的用戶上下文信息了。

          這套用戶體系的想法是很good的,但深入使用后,發(fā)現(xiàn)了一個小插曲:

          api服務(wù)和mq消費者服務(wù)都引用了business層,business層中的方法兩個服務(wù)都能直接調(diào)用。

          我們都知道在api服務(wù)中用戶是需要登錄的,而mq消費者服務(wù)則不需要登錄。

          如果business中的某個方法剛開始是給api開發(fā)的,在方法深處使用了CurrentUser.getCurrent();獲取用戶上下文。但后來,某位新來的帥哥在mq消費者中也調(diào)用了那個方法,并未發(fā)覺這個小機(jī)關(guān),就會中招,出現(xiàn)找不到用戶上下文的問題。

          所以我當(dāng)時的第一個想法是:代碼沒做兼容處理,因為之前這類問題偶爾會發(fā)生一次。

          想要解決這個問題,其實也很簡單。只需先判斷一下能否從CurrentUser中獲取用戶信息,如果不能,則取配置的系統(tǒng)用戶信息。偽代碼如下:

          @Autowired
          private?BusinessConfig?businessConfig;

          CurrentUser?user?=?CurrentUser.getCurrent();
          if(Objects.nonNull(user))?{
          ???entity.setUserId(user.getUserId());
          ???entity.setUserName(user.getUserName());
          }?else?{
          ???entity.setUserId(businessConfig.getDefaultUserId());
          ???entity.setUserName(businessConfig.getDefaultUserName());
          }

          這種簡單無公害的代碼,如果只是在一兩個地方加還OK。

          此外,眾所周知,SimpleDateFormat在java8以前,是用來處理時間的工具類,它是非線程安全的。也就是說,用該方法解析日期會有線程安全問題。

          為了避免線程安全問題的出現(xiàn),我們可以把SimpleDateFormat對象定義成局部變量。但如果你一定要把它定義成靜態(tài)變量,可以使用ThreadLocal保存日期,也能解決線程安全問題。

          8. 傳遞參數(shù)

          之前見過有些同事寫代碼時,一個非常有趣的用法,即:使用MDC傳遞參數(shù)。

          MDC是什么?

          MDCorg.slf4j包下的一個類,它的全稱是Mapped Diagnostic Context,我們可以認(rèn)為它是一個線程安全的存放診斷日志的容器。

          MDC的底層是用了ThreadLocal來保存數(shù)據(jù)的。

          例如現(xiàn)在有這樣一種場景:我們使用RestTemplate調(diào)用遠(yuǎn)程接口時,有時需要在header中傳遞信息,比如:traceId,source等,便于在查詢?nèi)罩緯r能夠串聯(lián)一次完整的請求鏈路,快速定位問題。

          這種業(yè)務(wù)場景就能通過ClientHttpRequestInterceptor接口實現(xiàn),具體做法如下:

          第一步,定義一個LogFilter攔截所有接口請求,在MDC中設(shè)置traceId:

          public?class?LogFilter?implements?Filter?{
          ????@Override
          ????public?void?init(FilterConfig?filterConfig)?throws?ServletException?{
          ????}

          ????@Override
          ????public?void?doFilter(ServletRequest?request,?ServletResponse?response,?FilterChain?chain)?throws?IOException,?ServletException?{
          ????????MdcUtil.add(UUID.randomUUID().toString());
          ????????System.out.println("記錄請求日志");
          ????????chain.doFilter(request,?response);
          ????????System.out.println("記錄響應(yīng)日志");
          ????}

          ????@Override
          ????public?void?destroy()?{
          ????}
          }

          第二步,實現(xiàn)ClientHttpRequestInterceptor接口,MDC中獲取當(dāng)前請求的traceId,然后設(shè)置到header中:

          public?class?RestTemplateInterceptor?implements?ClientHttpRequestInterceptor?{

          ????@Override
          ????public?ClientHttpResponse?intercept(HttpRequest?request,?byte[]?body,?ClientHttpRequestExecution?execution)?throws?IOException?{
          ????????request.getHeaders().set("traceId",?MdcUtil.get());
          ????????return?execution.execute(request,?body);
          ????}
          }

          第三步,定義配置類,配置上面定義的RestTemplateInterceptor類:

          @Configuration
          public?class?RestTemplateConfiguration?{

          ????@Bean
          ????public?RestTemplate?restTemplate()?{
          ????????RestTemplate?restTemplate?=?new?RestTemplate();
          ????????restTemplate.setInterceptors(Collections.singletonList(restTemplateInterceptor()));
          ????????return?restTemplate;
          ????}

          ????@Bean
          ????public?RestTemplateInterceptor?restTemplateInterceptor()?{
          ????????return?new?RestTemplateInterceptor();
          ????}
          }

          其中MdcUtil其實是利用MDC工具在ThreadLocal中存儲和獲取traceId

          public?class?MdcUtil?{

          ????private?static?final?String?TRACE_ID?=?"TRACE_ID";

          ????public?static?String?get()?{
          ????????return?MDC.get(TRACE_ID);
          ????}

          ????public?static?void?add(String?value)?{
          ????????MDC.put(TRACE_ID,?value);
          ????}
          }

          當(dāng)然,這個例子中沒有演示MdcUtil類的add方法具體調(diào)的地方,我們可以在filter中執(zhí)行接口方法之前,生成traceId,調(diào)用MdcUtil類的add方法添加到MDC中,然后在同一個請求的其他地方就能通過MdcUtil類的get方法獲取到該traceId。

          能使用MDC保存traceId等參數(shù)的根本原因是,用戶請求到應(yīng)用服務(wù)器,Tomcat會從線程池中分配一個線程去處理該請求。

          那么該請求的整個過程中,保存到MDCThreadLocal中的參數(shù),也是該線程獨享的,所以不會有線程安全問題。

          9. 模擬高并發(fā)

          有時候我們寫的接口,在低并發(fā)的場景下,一點問題都沒有。

          但如果一旦出現(xiàn)高并發(fā)調(diào)用,該接口可能會出現(xiàn)一些意想不到的問題。

          為了防止類似的事情發(fā)生,一般在項目上線前,我們非常有必要對接口做一下壓力測試

          當(dāng)然,現(xiàn)在已經(jīng)有比較成熟的壓力測試工具,比如:JmeterLoadRunner等。

          如果你覺得下載壓測工具比較麻煩,也可以手寫一個簡單的模擬并發(fā)操作的工具,用CountDownLatch就能實現(xiàn),例如:

          public?static?void?concurrenceTest()?{
          ????/**
          ?????*?模擬高并發(fā)情況代碼
          ?????*/

          ????final?AtomicInteger?atomicInteger?=?new?AtomicInteger(0);
          ????final?CountDownLatch?countDownLatch?=?new?CountDownLatch(1000);?//?相當(dāng)于計數(shù)器,當(dāng)所有都準(zhǔn)備好了,再一起執(zhí)行,模仿多并發(fā),保證并發(fā)量
          ????final?CountDownLatch?countDownLatch2?=?new?CountDownLatch(1000);?//?保證所有線程執(zhí)行完了再打印atomicInteger的值
          ????ExecutorService?executorService?=?Executors.newFixedThreadPool(10);
          ????try?{
          ????????for?(int?i?=?0;?i?1000;?i++)?{
          ????????????executorService.submit(new?Runnable()?{
          ????????????????@Override
          ????????????????public?void?run()?{
          ????????????????????try?{
          ????????????????????????countDownLatch.await();?//一直阻塞當(dāng)前線程,直到計時器的值為0,保證同時并發(fā)
          ????????????????????}?catch?(InterruptedException?e)?{
          ????????????????????????log.error(e.getMessage(),e);
          ????????????????????}
          ????????????????????//每個線程增加1000次,每次加1
          ????????????????????for?(int?j?=?0;?j?1000;?j++)?{
          ????????????????????????atomicInteger.incrementAndGet();
          ????????????????????}
          ????????????????????countDownLatch2.countDown();
          ????????????????}
          ????????????});
          ????????????countDownLatch.countDown();
          ????????}

          ????????countDownLatch2.await();//?保證所有線程執(zhí)行完
          ????????executorService.shutdown();
          ????}?catch?(Exception?e){
          ????????log.error(e.getMessage(),e);
          ????}
          }

          10. 處理mq消息

          在高并發(fā)的場景中,消息積壓問題,可以說如影隨形,真的沒辦法從根本上解決。表面上看,已經(jīng)解決了,但后面不知道什么時候,就會冒出一次,比如這次:

          有天下午,產(chǎn)品過來說:有幾個商戶投訴過來了,他們說菜品有延遲,快查一下原因。

          這次問題出現(xiàn)得有點奇怪。

          為什么這么說?

          首先這個時間點就有點奇怪,平常出問題,不都是中午或者晚上用餐高峰期嗎?怎么這次問題出現(xiàn)在下午?

          根據(jù)以往積累的經(jīng)驗,我直接看了kafkatopic的數(shù)據(jù),果然上面消息有積壓,但這次每個partition都積壓了十幾萬的消息沒有消費,比以往加壓的消息數(shù)量增加了幾百倍。這次消息積壓得極不尋常。

          我趕緊查服務(wù)監(jiān)控看看消費者掛了沒,還好沒掛。又查服務(wù)日志沒有發(fā)現(xiàn)異常。這時我有點迷茫,碰運氣問了問訂單組下午發(fā)生了什么事情沒?他們說下午有個促銷活動,跑了一個JOB批量更新過有些商戶的訂單信息。

          這時,我一下子如夢初醒,是他們在JOB中批量發(fā)消息導(dǎo)致的問題。怎么沒有通知我們呢?實在太坑了。

          雖說知道問題的原因了,倒是眼前積壓的這十幾萬的消息該如何處理呢?

          此時,如果直接調(diào)大partition數(shù)量是不行的,歷史消息已經(jīng)存儲到4個固定的partition,只有新增的消息才會到新的partition。我們重點需要處理的是已有的partition。

          直接加服務(wù)節(jié)點也不行,因為kafka允許同組的多個partition被一個consumer消費,但不允許一個partition被同組的多個consumer消費,可能會造成資源浪費。

          看來只有用多線程處理了。

          為了緊急解決問題,我改成了用線程池處理消息,核心線程和最大線程數(shù)都配置成了50

          大致用法如下:

          1. 先定義一個線程池:
          @Configuration
          public?class?ThreadPoolConfig?{

          ????@Value("${thread.pool.corePoolSize:5}")
          ????private?int?corePoolSize;

          ????@Value("${thread.pool.maxPoolSize:10}")
          ????private?int?maxPoolSize;

          ????@Value("${thread.pool.queueCapacity:200}")
          ????private?int?queueCapacity;

          ????@Value("${thread.pool.keepAliveSeconds:30}")
          ????private?int?keepAliveSeconds;

          ????@Value("${thread.pool.threadNamePrefix:ASYNC_}")
          ????private?String?threadNamePrefix;

          ????@Bean("messageExecutor")
          ????public?Executor?messageExecutor()?{
          ????????ThreadPoolTaskExecutor?executor?=?new?ThreadPoolTaskExecutor();
          ????????executor.setCorePoolSize(corePoolSize);
          ????????executor.setMaxPoolSize(maxPoolSize);
          ????????executor.setQueueCapacity(queueCapacity);
          ????????executor.setKeepAliveSeconds(keepAliveSeconds);
          ????????executor.setThreadNamePrefix(threadNamePrefix);
          ????????executor.setRejectedExecutionHandler(new?ThreadPoolExecutor.CallerRunsPolicy());
          ????????executor.initialize();
          ????????return?executor;
          ????}
          }
          1. 再定義一個消息的consumer:
          @Service
          public?class?MyConsumerService?{
          ????@Autowired
          ????private?Executor?messageExecutor;
          ????
          ????@KafkaListener(id="test",topics={"topic-test"})
          ????public?void?listen(String?message){
          ????????System.out.println("收到消息:"?+?message);
          ????????messageExecutor.submit(new?MyWork(message);
          ????}
          }
          1. 在定義的Runable實現(xiàn)類中處理業(yè)務(wù)邏輯:
          public?class?MyWork?implements?Runnable?{
          ????private?String?message;
          ????
          ????public?MyWork(String?message)?{
          ???????this.message?=?message;
          ????}

          ????@Override
          ????public?void?run()?{
          ????????System.out.println(message);
          ????}
          }

          果然,調(diào)整之后消息積壓數(shù)量確實下降的非常快,大約半小時后,積壓的消息就非常順利的處理完了。

          但此時有個更嚴(yán)重的問題出現(xiàn):我收到了報警郵件,有兩個訂單系統(tǒng)的節(jié)點down機(jī)了。。。

          更詳細(xì)內(nèi)容,請看看我的另一篇文章《我用kafka兩年踩過的一些非比尋常的坑

          11. 統(tǒng)計數(shù)量

          在多線程的場景中,有時候需要統(tǒng)計數(shù)量,比如:用多線程導(dǎo)入供應(yīng)商數(shù)據(jù)時,統(tǒng)計導(dǎo)入成功的供應(yīng)商數(shù)有多少。

          如果這時候用count++統(tǒng)計次數(shù),最終的結(jié)果可能會不準(zhǔn)。因為count++并非原子操作,如果多個線程同時執(zhí)行該操作,則統(tǒng)計的次數(shù),可能會出現(xiàn)異常。

          為了解決這個問題,就需要使用concurentatomic包下面的類,比如:AtomicIntegerAtomicLong等。

          @Servcie
          public?class?ImportSupplierService?{
          ??private?static?AtomicInteger?count?=?new?AtomicInteger(0);

          ??public?int?importSupplier(List?supplierList)?{
          ???????if(CollectionUtils.isEmpty(supplierList))?{
          ???????????return?0;
          ???????}

          ???????supplierList.parallelStream().forEach(x?->?{
          ???????????try?{
          ?????????????importSupplier(x);
          ?????????????count.addAndGet(1);
          ???????????}?catch(Exception?e)?{
          ??????????????log.error(e.getMessage(),e);
          ???????????}
          ???????);

          ??????return?count.get();
          ??}????
          }

          AtomicInteger的底層說白了使用自旋鎖+CAS

          public?final?int?incrementAndGet()?{
          ????for?(;;)?{
          ????????int?current?=?get();
          ????????int?next?=?current?+?1;
          ????????if?(compareAndSet(current,?next))
          ????????????return?next;
          ????}
          }

          自旋鎖說白了就是一個死循環(huán)

          CAS比較交換的意思。

          它的實現(xiàn)邏輯是:將內(nèi)存位置處的舊值預(yù)期值進(jìn)行比較,若相等,則將內(nèi)存位置處的值替換為新值。若不相等,則不做任何操作。

          12. 延遲定時任務(wù)

          我們經(jīng)常有延遲處理數(shù)據(jù)的需求,比如:如果用戶下單后,超過30分鐘還未完成支付,則系統(tǒng)自動將該訂單取消。

          這里需求就可以使用延遲定時任務(wù)實現(xiàn)。

          ScheduledExecutorServiceJDK1.5+版本引進(jìn)的定時任務(wù),該類位于java.util.concurrent并發(fā)包下。

          ScheduledExecutorService是基于多線程的,設(shè)計的初衷是為了解決Timer單線程執(zhí)行,多個任務(wù)之間會互相影響的問題。

          它主要包含4個方法:

          • schedule(Runnable command,long delay,TimeUnit unit),帶延遲時間的調(diào)度,只執(zhí)行一次,調(diào)度之后可通過Future.get()阻塞直至任務(wù)執(zhí)行完畢。
          • schedule(Callablecallable,long delay,TimeUnit unit),帶延遲時間的調(diào)度,只執(zhí)行一次,調(diào)度之后可通過Future.get()阻塞直至任務(wù)執(zhí)行完畢,并且可以獲取執(zhí)行結(jié)果。
          • scheduleAtFixedRate,表示以固定頻率執(zhí)行的任務(wù),如果當(dāng)前任務(wù)耗時較多,超過定時周期period,則當(dāng)前任務(wù)結(jié)束后會立即執(zhí)行。
          • scheduleWithFixedDelay,表示以固定延時執(zhí)行任務(wù),延時是相對當(dāng)前任務(wù)結(jié)束為起點計算開始時間。

          實現(xiàn)這種定時任務(wù)的具體代碼如下:

          public?class?ScheduleExecutorTest?{

          ????public?static?void?main(String[]?args)?{
          ????????ScheduledExecutorService?scheduledExecutorService?=?Executors.newScheduledThreadPool(5);
          ????????scheduledExecutorService.scheduleAtFixedRate(()?->?{
          ????????????System.out.println("doSomething");
          ????????},1000,1000,?TimeUnit.MILLISECONDS);
          ????}
          }

          調(diào)用ScheduledExecutorService類的scheduleAtFixedRate方法實現(xiàn)周期性任務(wù),每隔1秒鐘執(zhí)行一次,每次延遲1秒再執(zhí)行。

          這種定時任務(wù)是阿里巴巴開發(fā)者規(guī)范中用來替代Timer類的方案,對于多線程執(zhí)行周期性任務(wù),是個不錯的選擇。

          使用ScheduledExecutorService類做延遲定時任務(wù)的優(yōu)缺點:

          • 優(yōu)點:基于多線程的定時任務(wù),多個任務(wù)之間不會相關(guān)影響,支持周期性的執(zhí)行任務(wù),并且?guī)а舆t功能。

          • 缺點:不支持一些較復(fù)雜的定時規(guī)則。

          當(dāng)然,你也可以使用分布式定時任務(wù),比如:xxl-job或者elastic-job等等。

          其實,在實際工作中我使用多線程的場景遠(yuǎn)遠(yuǎn)不只這12種,在這里只是拋磚引玉,介紹了一些我認(rèn)為比較常見的業(yè)務(wù)場景。?

          瀏覽 35
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲字| 在线观看免费成人网站 | 亚洲精品福利视频导航 | 91成人免费电影 | 成人偷拍自拍在线观看 |