<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Spring 在多線程環(huán)境下如何確保事務(wù)一致性?

          共 40124字,需瀏覽 81分鐘

           ·

          2023-10-16 10:39

          程序員的成長之路
          互聯(lián)網(wǎng)/程序員/技術(shù)/資料共享 
          關(guān)注


          閱讀本文大概需要 14 分鐘。

          來自:blog.csdn.net/m0_53157173 /article/details/127423286

          • 問題在現(xiàn)
          • 如何解決異步執(zhí)行
          • 多線程環(huán)境下如何確保事務(wù)一致性
          • 事務(wù)王國回顧
          • 事務(wù)實(shí)現(xiàn)方式回顧
          • 編程式事務(wù)
          • 利用編程式事務(wù)解決問題
          • 問題分析完了,那么如何解決問題呢?
          • 小結(jié)

          問題在現(xiàn)

          我先把問題拋出來,大家就明白本文目的在于解決什么樣的業(yè)務(wù)痛點(diǎn)了:

          public void removeAuthorityModuleSeq(Integer authorityModuleId, IAuthorityService iAuthorityService, IRoleAuthorityService iRoleAuthorityService) {
              //1.查詢出當(dāng)前資源模塊下所有資源,查詢出來后進(jìn)行刪除
              deleteAuthoritiesOfCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService);
              //2.查詢出當(dāng)前資源模塊下所有子模塊,遞歸查詢,當(dāng)刪除完所有子模塊下的資源后,再刪除所有子模塊,最終刪除當(dāng)前資源模塊
              deleteSonAuthorityModuleUnderCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService);
              //3.刪除當(dāng)前資源模塊
              removeById(authorityModuleId);
          }

          如果我希望將步驟1和步驟2并行執(zhí)行,然后確保步驟1和步驟2執(zhí)行成功后,再執(zhí)行步驟3,等到步驟3執(zhí)行完畢后,再提交全部事務(wù),這個(gè)需求該如何實(shí)現(xiàn)呢?

          如何解決異步執(zhí)行

          上面需求第一點(diǎn)是: 如何讓任務(wù)異步并行執(zhí)行,如何實(shí)現(xiàn)二元依賴呢?
          說到異步執(zhí)行,很多小伙伴首先想到Spring中提供的@Async注解,但是Spring提供的異步執(zhí)行任務(wù)能力并不足以解決我們當(dāng)前的需求。
          @Async注解原理簡單來說,就是掃描IOC中的bean,給方法上標(biāo)注有@Async注解的bean進(jìn)行代理,代理的核心是添加一個(gè)MethodInterceptorAsyncExecutionInterceptor,該方法攔截器負(fù)責(zé)將方法真正的執(zhí)行包裝為任務(wù),放入線程池中執(zhí)行。
          下面我們先使用CompletableFuture來完成我們第一步需求:

          public void removeAuthorityModuleSeq(Integer authorityModuleId, IAuthorityService iAuthorityService, IRoleAuthorityService iRoleAuthorityService) {
              CompletableFuture.runAsync(()->{
                  //兩個(gè)并行執(zhí)行的任務(wù)
                  CompletableFuture<Void> future1 = CompletableFuture.runAsync(() ->
                          deleteAuthoritiesOfCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService),executor);
                  CompletableFuture<Void> future2 = CompletableFuture.runAsync(() ->
                          deleteSonAuthorityModuleUnderCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService), executor);
                  //等待兩個(gè)并行任務(wù)執(zhí)行完后,再執(zhí)行最后一個(gè)步驟
                  CompletableFuture.allOf(future1,future2).thenRun(()->removeById(authorityModuleId)); 
              },executor);
          }

          多線程環(huán)境下如何確保事務(wù)一致性

          我們已經(jīng)完成了任務(wù)的異步執(zhí)行化,那么又如何確保多線程環(huán)境下的事務(wù)一致性問題呢?

          public void removeAuthorityModuleSeq(Integer authorityModuleId, IAuthorityService iAuthorityService, IRoleAuthorityService iRoleAuthorityService) {
              CompletableFuture.runAsync(()->{
                  //兩個(gè)并行執(zhí)行的任務(wù)
                  CompletableFuture<Void> future1 = CompletableFuture.runAsync(() ->
                          deleteAuthoritiesOfCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService),executor);
                  CompletableFuture<Void> future2 = CompletableFuture.runAsync(() ->
                          deleteSonAuthorityModuleUnderCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService), executor);
                  //等待兩個(gè)并行任務(wù)執(zhí)行完后,再執(zhí)行最后一個(gè)步驟
                  CompletableFuture.allOf(future1,future2).thenRun(()->removeById(authorityModuleId));
              },executor);
          }

          在Spring環(huán)境下說到事務(wù)控制,大家第一反應(yīng)就想到使用@Transactional注解解決問題,但是這里顯然行不通,為什么行不通呢?
          我還是簡單的對Spring事務(wù)實(shí)現(xiàn)原理進(jìn)行一番概括:

          事務(wù)王國回顧

          事務(wù)管理大體分為三個(gè)流程:事務(wù)創(chuàng)建 ,事務(wù)執(zhí)行,事務(wù)結(jié)束
          事務(wù)創(chuàng)建涉及到一些屬性的配置,如:
          • 事務(wù)的隔離級別
          • 事務(wù)的傳播行為
          • 事務(wù)的超時(shí)時(shí)間
          • 是否為只讀事務(wù)
          由于涉及屬性頗多,并且后期還有可能進(jìn)行擴(kuò)展,因此必須通過一個(gè)類來封裝這些屬性,在Spring中對應(yīng)TransactionDefinition。
          有了事務(wù)相關(guān)屬性定義后,我們就可以利用TransactionDefinition來創(chuàng)建一個(gè)事務(wù)了,在Spring中局部事務(wù)由PlatformTransactionManager負(fù)責(zé)管理,創(chuàng)建事務(wù)也是由PlatformTransactionManager負(fù)責(zé)提供:

           TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
             throws TransactionException;

          如果我們希望追蹤事務(wù)的狀態(tài),例如: 事務(wù)已完成,事務(wù)回滾等,那么就需要一個(gè)事務(wù)狀態(tài)類貫穿當(dāng)前事務(wù)的執(zhí)行流程,在Spring中由TransactionStatus負(fù)責(zé)完成。
          對于常見的數(shù)據(jù)源而言,通常需要記錄的事務(wù)狀態(tài)有如下幾點(diǎn):
          • 當(dāng)前事務(wù)是否是新事務(wù)
          • 當(dāng)前事務(wù)是否結(jié)束
          • 當(dāng)前事務(wù)是否需要回滾(通過標(biāo)記來判斷,因此我也可以在業(yè)務(wù)流程中手動(dòng)設(shè)置標(biāo)記為true,來讓事務(wù)在沒有發(fā)生異常的情況下進(jìn)行回滾)
          • 當(dāng)前事務(wù)是否設(shè)置了回滾點(diǎn)(savePoint)
          事務(wù)的執(zhí)行過程就是具體業(yè)務(wù)代碼的執(zhí)行流程,這里就不多說了。
          事務(wù)的結(jié)束分為兩種情況: 需要進(jìn)行事務(wù)回滾或者事務(wù)正常提交,如果是事務(wù)回滾,還需要判斷TransactionStatus 中的savePoint是否被設(shè)置了。

          事務(wù)實(shí)現(xiàn)方式回顧

          Spring中常見的事務(wù)實(shí)現(xiàn)方式有兩種: 編程式和聲明式。
          編程式事務(wù)使用是本文重點(diǎn),因此這里按下不表,我們先來復(fù)習(xí)一下聲明式事務(wù)的使用
          聲明式事務(wù)就是使用我們常見的@Transactional注解完成的,聲明式事務(wù)優(yōu)點(diǎn)就在于讓事務(wù)代碼與業(yè)務(wù)代碼解耦,通過Spring中提供的聲明式事務(wù)使用,我們也可以發(fā)覺我們只需要編寫業(yè)務(wù)代碼即可,而事務(wù)的管理基本不需要我們操心,Spring就像使用了魔法一樣,幫我們自動(dòng)完成了。
          之所以那么神奇,本質(zhì)還是依靠Spring框架提供的Bean生命周期相關(guān)回調(diào)接口和AOP結(jié)合完成的,簡述如下:
          • 通過自動(dòng)代理創(chuàng)建器依次嘗試為每個(gè)放入容器中的bean嘗試進(jìn)行代理
          • 嘗試進(jìn)行代理的過程對于事務(wù)管理來說,就是利用事務(wù)管理涉及到的增強(qiáng)器advisor,即TransactionAttributeSourceAdvisor
          • 判斷當(dāng)前增強(qiáng)器是否能夠應(yīng)用與當(dāng)前bean上,怎么判斷呢?—> advisor內(nèi)部的pointCut嘍 !
          • 如果能夠應(yīng)用,那么好,為當(dāng)前bean創(chuàng)建代理對象返回,并且往代理對象內(nèi)部添加一個(gè)TransactionInterceptor攔截器。
          • 此時(shí)我們再從容器中獲取,拿到的就是代理對象了,當(dāng)我們調(diào)用代理對象的方法時(shí),首先要經(jīng)過代理對象內(nèi)部攔截器鏈的處理,處理完后,最終才會(huì)調(diào)用被代理對象的方法。(這里其實(shí)就是責(zé)任鏈模式的應(yīng)用)
          對于被事務(wù)增強(qiáng)器TransactionAttributeSourceAdvisor代理的bean而言,代理對象內(nèi)部會(huì)存在一個(gè)TransactionInterceptor,該攔截器內(nèi)部構(gòu)造了一個(gè)事務(wù)執(zhí)行的模板流程:

          protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
             final InvocationCallback invocation) throws Throwable {
            //TransactionAttributeSource內(nèi)部保存著當(dāng)前類某個(gè)方法對應(yīng)的TransactionAttribute---事務(wù)屬性源
            //可以看做是一個(gè)存放TransactionAttribute與method方法映射的池子
            TransactionAttributeSource tas = getTransactionAttributeSource();
            //獲取當(dāng)前事務(wù)方法對應(yīng)的TransactionAttribute
            final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
            //定位TransactionManager
            final TransactionManager tm = determineTransactionManager(txAttr);
                  .....
                  //類型轉(zhuǎn)換為局部事務(wù)管理器
            PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
            final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

            if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
             //TransactionManager根據(jù)TransactionAttribute創(chuàng)建事務(wù)后返回
             //TransactionInfo封裝了當(dāng)前事務(wù)的信息--包括TransactionStatus
             TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

             Object retVal;
             try {
              //繼續(xù)執(zhí)行過濾器鏈---過濾鏈最終會(huì)調(diào)用目標(biāo)方法
              //因此可以理解為這里是調(diào)用目標(biāo)方法
              retVal = invocation.proceedWithInvocation();
             }
             catch (Throwable ex) {
              //目標(biāo)方法拋出異常則進(jìn)行判斷是否需要回滾
              completeTransactionAfterThrowing(txInfo, ex);
              throw ex;
             }
             finally {
                 //清除當(dāng)前事務(wù)信息
              cleanupTransactionInfo(txInfo);
             }
                      ...
                      //正常返回,那么就正常提交事務(wù)唄(當(dāng)然還是需要判斷TransactionStatus狀態(tài)先)
             commitTransactionAfterReturning(txInfo);
             return retVal;
            }
            ...

          編程式事務(wù)

          還記得本文一開始提出的業(yè)務(wù)需求嗎?
          不清楚,可以回看一下,在上文,我們已經(jīng)解決了任務(wù)異步并行執(zhí)行的難題,下面我們需要解決的就是如何確保Spring在多線程環(huán)境下也能保持事務(wù)一致性。
          通過上文對Spring事務(wù)基礎(chǔ)和聲明式事務(wù)的原理回顧,相信大家也發(fā)現(xiàn)了,聲明式事務(wù)并不能解決我們當(dāng)前的問題,那么就只能求助于編程式事務(wù)了。
          那么編程式事務(wù)是什么樣子呢?
          其實(shí)上面TransactionInterceptor給出的那套模板流程,就是編程式事務(wù)使用的模范案例,我們可以簡化上面的模板流程,簡單使用如下:

          public class TransactionMain {
              public static void main(String[] args) throws ClassNotFoundException, SQLException {
                  test();
              }

              private static void test() {
                  DataSource dataSource = getDS();
                  JdbcTransactionManager jtm = new JdbcTransactionManager(dataSource);
                  //JdbcTransactionManager根據(jù)TransactionDefinition信息來進(jìn)行一些連接屬性的設(shè)置
                  //包括隔離級別和傳播行為等
                  DefaultTransactionDefinition transactionDef = new DefaultTransactionDefinition();
                  //開啟一個(gè)新事務(wù)---此時(shí)autocommit已經(jīng)被設(shè)置為了false,并且當(dāng)前沒有事務(wù),這里創(chuàng)建的是一個(gè)新事務(wù)
                  TransactionStatus ts = jtm.getTransaction(transactionDef);
                  //進(jìn)行業(yè)務(wù)邏輯操作
                  try {
                      update(dataSource);
                      jtm.commit(ts);
                  }catch (Exception e){
                      jtm.rollback(ts);
                      System.out.println("發(fā)生異常,我已回滾");
                  }
              }

              private static void update(DataSource dataSource) throws Exception {
                  JdbcTemplate jt = new JdbcTemplate();
                  jt.setDataSource(dataSource);
                  jt.update("UPDATE Department SET Dname=\"大忽悠\" WHERE id=6");
                  throw new Exception("我是來搗亂的");
              }
          }

          利用編程式事務(wù)解決問題

          我們明白了編程式事務(wù)的使用,相信大家也都知道問題如何解決了,下面我給出一份看似正確的解決方案:

          package com.user.util;

          import lombok.RequiredArgsConstructor;
          import org.springframework.jdbc.datasource.DataSourceTransactionManager;
          import org.springframework.stereotype.Component;
          import org.springframework.transaction.TransactionStatus;
          import org.springframework.transaction.support.DefaultTransactionDefinition;

          import javax.sql.DataSource;
          import java.util.ArrayList;
          import java.util.List;
          import java.util.concurrent.CompletableFuture;
          import java.util.concurrent.ExecutionException;
          import java.util.concurrent.Executor;
          import java.util.concurrent.atomic.AtomicBoolean;

          /**
           * 多線程事務(wù)一致性管理 <br>
           * 聲明式事務(wù)管理無法完成,此時(shí)我們只能采用初期的編程式事務(wù)管理才行
           * @author 大忽悠
           * @create 2022/10/19 21:34
           */
          @Component
          @RequiredArgsConstructor
          public class MultiplyThreadTransactionManager {
              /**
               * 如果是多數(shù)據(jù)源的情況下,需要指定具體是哪一個(gè)數(shù)據(jù)源
               */
              private final DataSource dataSource;

              /**
               * 執(zhí)行的是無返回值的任務(wù)
               * @param tasks 異步執(zhí)行的任務(wù)列表
               * @param executor 異步執(zhí)行任務(wù)需要用到的線程池,考慮到線程池需要隔離,這里強(qiáng)制要求傳
               */
              public void runAsyncButWaitUntilAllDown(List<Runnable> tasks, Executor executor) {
                  if(executor==null){
                      throw new IllegalArgumentException("線程池不能為空");
                  }
                  DataSourceTransactionManager transactionManager = getTransactionManager();
                  //是否發(fā)生了異常
                  AtomicBoolean ex=new AtomicBoolean();

                  List<CompletableFuture> taskFutureList=new ArrayList<>(tasks.size());
                  List<TransactionStatus> transactionStatusList=new ArrayList<>(tasks.size());

                  tasks.forEach(task->{
                      taskFutureList.add(CompletableFuture.runAsync(
                              () -> {
                                  try{
                                      //1.開啟新事務(wù)
                                      transactionStatusList.add(openNewTransaction(transactionManager));
                                      //2.異步任務(wù)執(zhí)行
                                      task.run();
                                  }catch (Throwable throwable){
                                      //打印異常
                                      throwable.printStackTrace();
                                      //其中某個(gè)異步任務(wù)執(zhí)行出現(xiàn)了異常,進(jìn)行標(biāo)記
                                      ex.set(Boolean.TRUE);
                                      //其他任務(wù)還沒執(zhí)行的不需要執(zhí)行了
                                      taskFutureList.forEach(completableFuture -> completableFuture.cancel(true));
                                  }
                              }
                              , executor)
                      );
                  });

                  try {
                      //阻塞直到所有任務(wù)全部執(zhí)行結(jié)束---如果有任務(wù)被取消,這里會(huì)拋出異常滴,需要捕獲
                      CompletableFuture.allOf(taskFutureList.toArray(new CompletableFuture[]{})).get();
                  } catch (InterruptedException | ExecutionException e) {
                      e.printStackTrace();
                  }

                  //發(fā)生了異常則進(jìn)行回滾操作,否則提交
                  if(ex.get()){
                      System.out.println("發(fā)生異常,全部事務(wù)回滾");
                      transactionStatusList.forEach(transactionManager::rollback);
                  }else {
                      System.out.println("全部事務(wù)正常提交");
                      transactionStatusList.forEach(transactionManager::commit);
                  }
              }

              private TransactionStatus openNewTransaction(DataSourceTransactionManager transactionManager) {
                  //JdbcTransactionManager根據(jù)TransactionDefinition信息來進(jìn)行一些連接屬性的設(shè)置
                  //包括隔離級別和傳播行為等
                  DefaultTransactionDefinition transactionDef = new DefaultTransactionDefinition();
                  //開啟一個(gè)新事務(wù)---此時(shí)autocommit已經(jīng)被設(shè)置為了false,并且當(dāng)前沒有事務(wù),這里創(chuàng)建的是一個(gè)新事務(wù)
                  return transactionManager.getTransaction(transactionDef);
              }

              private DataSourceTransactionManager getTransactionManager() {
                  return new DataSourceTransactionManager(dataSource);
              }
          }

          大家思考上面的代碼存在問題嗎?
          測試:

          public void test(){
              List<Runnable> tasks=new ArrayList<>();

              tasks.add(()->{
                 userMapper.deleteById(26);
              });

              tasks.add(()->{
                  signMapper.deleteById(10);
              });

              multiplyThreadTransactionManager.runAsyncButWaitUntilAllDown(tasks, Executors.newCachedThreadPool());
          }

          任務(wù)正常都執(zhí)行完畢,事務(wù)進(jìn)行提交,但是會(huì)拋出異常,導(dǎo)致事務(wù)回滾:
          圖片
          抓關(guān)鍵字:

          No value for key [HikariDataSource (HikariPool-1)] bound to thread [main]
          解釋: 無法在當(dāng)前線程綁定的threadLocal中尋找到HikariDataSource作為key,對應(yīng)關(guān)聯(lián)的資源對象ConnectionHolder

          這里需要再次回顧一下Spring事務(wù)實(shí)現(xiàn)的小細(xì)節(jié):
          一次事務(wù)的完成通常都是默認(rèn)在當(dāng)前線程內(nèi)完成的,又因?yàn)橐淮问聞?wù)的執(zhí)行過程中,涉及到對當(dāng)前數(shù)據(jù)庫連接Connection的操作,因此為了避免將Connection在事務(wù)執(zhí)行過程中來回傳遞,我們可以將Connextion綁定到當(dāng)前事務(wù)執(zhí)行線程對應(yīng)的ThreadLocalMap內(nèi)部,順便還可以將一些其他屬性也放入其中進(jìn)行保存,在Spring中,負(fù)責(zé)保存這些ThreadLocal屬性的實(shí)現(xiàn)類由TransactionSynchronizationManager承擔(dān)。
          TransactionSynchronizationManager類內(nèi)部默認(rèn)提供了下面六個(gè)ThreadLocal屬性,分別保存當(dāng)前線程對應(yīng)的不同事務(wù)資源:

             //保存當(dāng)前事務(wù)關(guān)聯(lián)的資源--默認(rèn)只會(huì)在新建事務(wù)的時(shí)候保存當(dāng)前獲取到的DataSource和當(dāng)前事務(wù)對應(yīng)Connection的映射關(guān)系--當(dāng)然這里Connection被包裝為了ConnectionHolder
           private static final ThreadLocal<Map<Object, Object>> resources =
             new NamedThreadLocal<>("Transactional resources");
              //事務(wù)監(jiān)聽者--在事務(wù)執(zhí)行到某個(gè)階段的過程中,會(huì)去回調(diào)監(jiān)聽者對應(yīng)的回調(diào)接口(典型觀察者模式的應(yīng)用)---默認(rèn)為空集合
           private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
             new NamedThreadLocal<>("Transaction synchronizations");
             //見名知意: 存放當(dāng)前事務(wù)名字
           private static final ThreadLocal<String> currentTransactionName =
             new NamedThreadLocal<>("Current transaction name");
             //見名知意: 存放當(dāng)前事務(wù)是否是只讀事務(wù)
           private static final ThreadLocal<Boolean> currentTransactionReadOnly =
             new NamedThreadLocal<>("Current transaction read-only status");
             //見名知意: 存放當(dāng)前事務(wù)的隔離級別
           private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
             new NamedThreadLocal<>("Current transaction isolation level");
             //見名知意: 存放當(dāng)前事務(wù)是否處于激活狀態(tài)
           private static final ThreadLocal<Boolean> actualTransactionActive =
             new NamedThreadLocal<>("Actual transaction active");

          那么上面拋出的異常的原因也就很清楚了,無法在main線程找到當(dāng)前事務(wù)對應(yīng)的資源,原因如下:
          圖片
          開啟新事務(wù)時(shí),事務(wù)相關(guān)資源都被綁定到了thread-cache-pool-1線程對應(yīng)的threadLocalMap內(nèi)部,而當(dāng)執(zhí)行事務(wù)提交代碼時(shí),commit內(nèi)部需要從TransactionSynchronizationManager中獲取當(dāng)前事務(wù)的資源,顯然我們無法從main線程對應(yīng)的threadLocalMap中獲取到對應(yīng)的事務(wù)資源,這也就是異常拋出的原因。

          問題分析完了,那么如何解決問題呢?

          這里給出一個(gè)我首先想到的簡單粗暴的方法—CopyTransactionResource—將事務(wù)資源在兩個(gè)線程間來回復(fù)制
          這里給出解決后問題后的代碼示例:

          package com.user.util;

          import lombok.Builder;
          import lombok.RequiredArgsConstructor;
          import org.springframework.jdbc.datasource.DataSourceTransactionManager;
          import org.springframework.stereotype.Component;
          import org.springframework.transaction.TransactionStatus;
          import org.springframework.transaction.support.DefaultTransactionDefinition;
          import org.springframework.transaction.support.TransactionSynchronization;
          import org.springframework.transaction.support.TransactionSynchronizationManager;
          import javax.sql.DataSource;
          import java.util.*;
          import java.util.concurrent.CompletableFuture;
          import java.util.concurrent.ExecutionException;
          import java.util.concurrent.Executor;
          import java.util.concurrent.atomic.AtomicBoolean;

          /**
           * 多線程事務(wù)一致性管理 <br>
           * 聲明式事務(wù)管理無法完成,此時(shí)我們只能采用初期的編程式事務(wù)管理才行
           * @author 大忽悠
           * @create 2022/10/19 21:34
           */
          @Component
          @RequiredArgsConstructor
          public class MultiplyThreadTransactionManager {
              /**
               * 如果是多數(shù)據(jù)源的情況下,需要指定具體是哪一個(gè)數(shù)據(jù)源
               */
              private final DataSource dataSource;

              /**
               * 執(zhí)行的是無返回值的任務(wù)
               * @param tasks 異步執(zhí)行的任務(wù)列表
               * @param executor 異步執(zhí)行任務(wù)需要用到的線程池,考慮到線程池需要隔離,這里強(qiáng)制要求傳
               */
              public void runAsyncButWaitUntilAllDown(List<Runnable> tasks, Executor executor) {
                  if(executor==null){
                      throw new IllegalArgumentException("線程池不能為空");
                  }
                  DataSourceTransactionManager transactionManager = getTransactionManager();
                  //是否發(fā)生了異常
                  AtomicBoolean ex=new AtomicBoolean();

                  List<CompletableFuture> taskFutureList=new ArrayList<>(tasks.size());
                  List<TransactionStatus> transactionStatusList=new ArrayList<>(tasks.size());
                  List<TransactionResource> transactionResources=new ArrayList<>(tasks.size());

                  tasks.forEach(task->{
                      taskFutureList.add(CompletableFuture.runAsync(
                              () -> {
                                  try{
                                      //1.開啟新事務(wù)
                                      transactionStatusList.add(openNewTransaction(transactionManager));
                                      //2.copy事務(wù)資源
                                   transactionResources.add(TransactionResource.copyTransactionResource());
                                      //3.異步任務(wù)執(zhí)行
                                      task.run();
                                  }catch (Throwable throwable){
                                      //打印異常
                                      throwable.printStackTrace();
                                      //其中某個(gè)異步任務(wù)執(zhí)行出現(xiàn)了異常,進(jìn)行標(biāo)記
                                      ex.set(Boolean.TRUE);
                                      //其他任務(wù)還沒執(zhí)行的不需要執(zhí)行了
                                      taskFutureList.forEach(completableFuture -> completableFuture.cancel(true));
                                  }
                              }
                              , executor)
                      );
                  });

                  try {
                      //阻塞直到所有任務(wù)全部執(zhí)行結(jié)束---如果有任務(wù)被取消,這里會(huì)拋出異常滴,需要捕獲
                      CompletableFuture.allOf(taskFutureList.toArray(new CompletableFuture[]{})).get();
                  } catch (InterruptedException | ExecutionException e) {
                      e.printStackTrace();
                  }

                  //發(fā)生了異常則進(jìn)行回滾操作,否則提交
                  if(ex.get()){
                      System.out.println("發(fā)生異常,全部事務(wù)回滾");
                      for (int i = 0; i < tasks.size(); i++) {
                          transactionResources.get(i).autoWiredTransactionResource();
                          transactionManager.rollback(transactionStatusList.get(i));
                          transactionResources.get(i).removeTransactionResource();
                      }
                  }else {
                      System.out.println("全部事務(wù)正常提交");
                      for (int i = 0; i < tasks.size(); i++) {
                          transactionResources.get(i).autoWiredTransactionResource();
                          transactionManager.commit(transactionStatusList.get(i));
                          transactionResources.get(i).removeTransactionResource();
                      }
                  }
              }

              private TransactionStatus openNewTransaction(DataSourceTransactionManager transactionManager) {
                  //JdbcTransactionManager根據(jù)TransactionDefinition信息來進(jìn)行一些連接屬性的設(shè)置
                  //包括隔離級別和傳播行為等
                  DefaultTransactionDefinition transactionDef = new DefaultTransactionDefinition();
                  //開啟一個(gè)新事務(wù)---此時(shí)autocommit已經(jīng)被設(shè)置為了false,并且當(dāng)前沒有事務(wù),這里創(chuàng)建的是一個(gè)新事務(wù)
                  return transactionManager.getTransaction(transactionDef);
              }

              private DataSourceTransactionManager getTransactionManager() {
                  return new DataSourceTransactionManager(dataSource);
              }

              /**
               * 保存當(dāng)前事務(wù)資源,用于線程間的事務(wù)資源COPY操作
               */
              @Builder
              private static class TransactionResource{
                  //事務(wù)結(jié)束后默認(rèn)會(huì)移除集合中的DataSource作為key關(guān)聯(lián)的資源記錄
                  private  Map<Object, Object> resources = new HashMap<>();

                  //下面五個(gè)屬性會(huì)在事務(wù)結(jié)束后被自動(dòng)清理,無需我們手動(dòng)清理
                  private  Set<TransactionSynchronization> synchronizations =new HashSet<>();

                  private  String currentTransactionName;

                  private Boolean currentTransactionReadOnly;

                  private Integer currentTransactionIsolationLevel;

                  private Boolean actualTransactionActive;

                  public static TransactionResource copyTransactionResource(){
                      return TransactionResource.builder()
                              //返回的是不可變集合
                              .resources(TransactionSynchronizationManager.getResourceMap())
                              //如果需要注冊事務(wù)監(jiān)聽者,這里記得修改--我們這里不需要,就采用默認(rèn)負(fù)責(zé)--spring事務(wù)內(nèi)部默認(rèn)也是這個(gè)值
                              .synchronizations(new LinkedHashSet<>())
                              .currentTransactionName(TransactionSynchronizationManager.getCurrentTransactionName())
                              .currentTransactionReadOnly(TransactionSynchronizationManager.isCurrentTransactionReadOnly())
                              .currentTransactionIsolationLevel(TransactionSynchronizationManager.getCurrentTransactionIsolationLevel())
                              .actualTransactionActive(TransactionSynchronizationManager.isActualTransactionActive())
                              .build();
                  }

                  public void autoWiredTransactionResource(){
                       resources.forEach(TransactionSynchronizationManager::bindResource);
                       //如果需要注冊事務(wù)監(jiān)聽者,這里記得修改--我們這里不需要,就采用默認(rèn)負(fù)責(zé)--spring事務(wù)內(nèi)部默認(rèn)也是這個(gè)值
                       TransactionSynchronizationManager.initSynchronization();
                       TransactionSynchronizationManager.setActualTransactionActive(actualTransactionActive);
                       TransactionSynchronizationManager.setCurrentTransactionName(currentTransactionName);
                       TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(currentTransactionIsolationLevel);
                       TransactionSynchronizationManager.setCurrentTransactionReadOnly(currentTransactionReadOnly);
                  }

                  public void removeTransactionResource() {
                      //事務(wù)結(jié)束后默認(rèn)會(huì)移除集合中的DataSource作為key關(guān)聯(lián)的資源記錄
                      //DataSource如果重復(fù)移除,unbindResource時(shí)會(huì)因?yàn)椴淮嬖诖薻ey關(guān)聯(lián)的事務(wù)資源而報(bào)錯(cuò)
                      resources.keySet().forEach(key->{
                          if(!(key instanceof  DataSource)){
                              TransactionSynchronizationManager.unbindResource(key);
                          }
                      });
                  }
              }
          }

          增加異常拋出,測試是否能夠保證多線程間的事務(wù)一致性:

          @SpringBootTest(classes = UserMain.class)
          public class Test {
              @Resource
              private UserMapper userMapper;
              @Resource
              private SignMapper signMapper;
              @Resource
              private MultiplyThreadTransactionManager multiplyThreadTransactionManager;

              @SneakyThrows
              @org.junit.jupiter.api.Test
              public void test(){
                  List<Runnable> tasks=new ArrayList<>();

                  tasks.add(()->{
                          userMapper.deleteById(26);
                          throw new RuntimeException("我就要拋出異常!");
                  });

                  tasks.add(()->{
                      signMapper.deleteById(10);
                  });

                  multiplyThreadTransactionManager.runAsyncButWaitUntilAllDown(tasks, Executors.newCachedThreadPool());
              }

          }

          事務(wù)都進(jìn)行了回滾,數(shù)據(jù)庫數(shù)據(jù)沒變。

          小結(jié)

          本文給出的只是一個(gè)方法,為了實(shí)現(xiàn)多線程事務(wù)一致性,我們還有很多方法,例如和本文一樣的思想,直接利用JDBC提供的API來手動(dòng)控制事務(wù)提交和回滾,或者可以嘗試采用分布式事務(wù)的思路來解決問題。
          大家之所以會(huì)被這個(gè)問題難住,主要是因?yàn)閷pring框架提供的便捷聲明式事務(wù)支持中毒太深,以至于腦海中對事務(wù)的認(rèn)知完全停留在@Transactional注解的層面,多了解底層基礎(chǔ)設(shè)施,才能做到遇事不慌。
          <END>

          推薦閱讀:

          Java21正式發(fā)布,史詩級增強(qiáng)!虛擬線程、分代 ZGC 正式來襲!!

          Spring Security 為啥是個(gè)垃圾框架?

             
             
          互聯(lián)網(wǎng)初中高級大廠面試題(9個(gè)G)

          內(nèi)容包含Java基礎(chǔ)、JavaWeb、MySQL性能優(yōu)化、JVM、鎖、百萬并發(fā)、消息隊(duì)列、高性能緩存、反射、Spring全家桶原理、微服務(wù)、Zookeeper......等技術(shù)棧!

          ?戳閱讀原文領(lǐng)??!                                  朕已閱 

          瀏覽 227
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  www.免费看黄色 | 国产二区中文字幕 | 艹逼国产| 影音先锋AV黄色免费电影! | 正在播放国产AV |