<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          支付寶:多線(xiàn)程事務(wù)怎么回滾?說(shuō)用 @Transactional 可以回去等通知...

          共 19959字,需瀏覽 40分鐘

           ·

          2024-03-29 06:30

          aed6e59d913c549f2b9ee75098528d28.webp來(lái)源:blog.csdn.net/weixin_43225491/article/details/117705686

          • 背景介紹
          • 公用的類(lèi)和方法
          • 示例事務(wù)不成功操作

          背景介紹

          1,最近有一個(gè)大數(shù)據(jù)量插入的操作入庫(kù)的業(yè)務(wù)場(chǎng)景,需要先做一些其他修改操作,然后在執(zhí)行插入操作,由于插入數(shù)據(jù)可能會(huì)很多,用到多線(xiàn)程去拆分?jǐn)?shù)據(jù)并行處理來(lái)提高響應(yīng)時(shí)間,如果有一個(gè)線(xiàn)程執(zhí)行失敗,則全部回滾。

          2,在spring中可以使用@Transactional注解去控制事務(wù),使出現(xiàn)異常時(shí)會(huì)進(jìn)行回滾,在多線(xiàn)程中,這個(gè)注解則不會(huì)生效,如果主線(xiàn)程需要先執(zhí)行一些修改數(shù)據(jù)庫(kù)的操作,當(dāng)子線(xiàn)程在進(jìn)行處理出現(xiàn)異常時(shí),主線(xiàn)程修改的數(shù)據(jù)則不會(huì)回滾,導(dǎo)致數(shù)據(jù)錯(cuò)誤。

          3,下面用一個(gè)簡(jiǎn)單示例演示多線(xiàn)程事務(wù)。

          公用的類(lèi)和方法

                
                /**
           * 平均拆分list方法.
           * @param source
           * @param n
           * @param <T>
           * @return
           */
          public static <T> List<List<T>> averageAssign(List<T> source,int n){
              List<List<T>> result=new ArrayList<List<T>>();
              int remaider=source.size()%n; 
              int number=source.size()/n; 
              int offset=0;//偏移量
              for(int i=0;i<n;i++){
                  List<T> value=null;
                  if(remaider>0){
                      value=source.subList(i*number+offset, (i+1)*number+offset+1);
                      remaider--;
                      offset++;
                  }else{
                      value=source.subList(i*number+offset, (i+1)*number+offset);
                  }
                  result.add(value);
              }
              return result;
          }
          /**  線(xiàn)程池配置
           * @version V1.0
           */
          public class ExecutorConfig {
              private static int maxPoolSize = Runtime.getRuntime().availableProcessors();
              private volatile static ExecutorService executorService;
              public static ExecutorService getThreadPool() {
                  if (executorService == null){
                      synchronized (ExecutorConfig.class){
                          if (executorService == null){
                              executorService =  newThreadPool();
                          }
                      }
                  }
                  return executorService;
              }

              private static  ExecutorService newThreadPool(){
                  int queueSize = 500;
                  int corePool = Math.min(5, maxPoolSize);
                  return new ThreadPoolExecutor(corePool, maxPoolSize, 10000L, TimeUnit.MILLISECONDS,
                      new LinkedBlockingQueue<>(queueSize),new ThreadPoolExecutor.AbortPolicy());
              }
              private ExecutorConfig(){}
          }
          /** 獲取sqlSession
           * @author 86182
           * @version V1.0
           */
          @Component
          public class SqlContext {
              @Resource
              private SqlSessionTemplate sqlSessionTemplate;

              public SqlSession getSqlSession(){
                  SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();
                  return sqlSessionFactory.openSession();
              }
          }

          示例事務(wù)不成功操作

                
                  /**
           * 測(cè)試多線(xiàn)程事務(wù).
           * @param employeeDOList
           */
          @Override
          @Transactional
          public void saveThread(List<EmployeeDO> employeeDOList) {
              try {
                  //先做刪除操作,如果子線(xiàn)程出現(xiàn)異常,此操作不會(huì)回滾
                  this.getBaseMapper().delete(null);
                  //獲取線(xiàn)程池
                  ExecutorService service = ExecutorConfig.getThreadPool();
                  //拆分?jǐn)?shù)據(jù),拆分5份
                  List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);
                  //執(zhí)行的線(xiàn)程
                  Thread []threadArray = new Thread[lists.size()];
                  //監(jiān)控子線(xiàn)程執(zhí)行完畢,再執(zhí)行主線(xiàn)程,要不然會(huì)導(dǎo)致主線(xiàn)程關(guān)閉,子線(xiàn)程也會(huì)隨著關(guān)閉
                  CountDownLatch countDownLatch = new CountDownLatch(lists.size());
                  AtomicBoolean atomicBoolean = new AtomicBoolean(true);
                  for (int i =0;i<lists.size();i++){
                      if (i==lists.size()-1){
                          atomicBoolean.set(false);
                      }
                      List<EmployeeDO> list  = lists.get(i);
                      threadArray[i] =  new Thread(() -> {
                          try {
                           //最后一個(gè)線(xiàn)程拋出異常
                              if (!atomicBoolean.get()){
                                  throw new ServiceException("001","出現(xiàn)異常");
                              }
                              //批量添加,mybatisPlus中自帶的batch方法
                              this.saveBatch(list);
                          }finally {
                              countDownLatch.countDown();
                          }

                      });
                  }
                  for (int i = 0; i <lists.size(); i++){
                      service.execute(threadArray[i]);
                  }
                  //當(dāng)子線(xiàn)程執(zhí)行完畢時(shí),主線(xiàn)程再往下執(zhí)行
                  countDownLatch.await();
                  System.out.println("添加完畢");
              }catch (Exception e){
                  log.info("error",e);
                  throw new ServiceException("002","出現(xiàn)異常");
              }finally {
                   connection.close();
               }
          }

          數(shù)據(jù)庫(kù)中存在一條數(shù)據(jù):

          fb8d53b7b9139e253a7977887d74df68.webp

          圖片
                
                //測(cè)試用例
          @RunWith(SpringRunner.class)
          @SpringBootTest(classes = { ThreadTest01.class, MainApplication.class})
          public class ThreadTest01 {

              @Resource
              private EmployeeBO employeeBO;

              /**
               *   測(cè)試多線(xiàn)程事務(wù).
               * @throws InterruptedException
               */
              @Test
              public  void MoreThreadTest2() throws InterruptedException {
                  int size = 10;
                  List<EmployeeDO> employeeDOList = new ArrayList<>(size);
                  for (int i = 0; i<size;i++){
                      EmployeeDO employeeDO = new EmployeeDO();
                      employeeDO.setEmployeeName("lol"+i);
                      employeeDO.setAge(18);
                      employeeDO.setGender(1);
                      employeeDO.setIdNumber(i+"XX");
                      employeeDO.setCreatTime(Calendar.getInstance().getTime());
                      employeeDOList.add(employeeDO);
                  }
                  try {
                      employeeBO.saveThread(employeeDOList);
                      System.out.println("添加成功");
                  }catch (Exception e){
                      e.printStackTrace();
                  }
              }
          }

          測(cè)試結(jié)果:

          49dcf5ec9ecf9e928d5b284852e857d8.webp

          圖片

          d53ffa5029ec1c932b495cbb39afcae6.webp

          圖片

          可以發(fā)現(xiàn)子線(xiàn)程組執(zhí)行時(shí),有一個(gè)線(xiàn)程執(zhí)行失敗,其他線(xiàn)程也會(huì)拋出異常,但是主線(xiàn)程中執(zhí)行的刪除操作,沒(méi)有回滾,@Transactional注解沒(méi)有生效。

          使用sqlSession控制手動(dòng)提交事務(wù)

                
                 @Resource
            SqlContext sqlContext;
           /**
           * 測(cè)試多線(xiàn)程事務(wù).
           * @param employeeDOList
           */
          @Override
          public void saveThread(List<EmployeeDO> employeeDOList) throws SQLException {
              // 獲取數(shù)據(jù)庫(kù)連接,獲取會(huì)話(huà)(內(nèi)部自有事務(wù))
              SqlSession sqlSession = sqlContext.getSqlSession();
              Connection connection = sqlSession.getConnection();
              try {
                  // 設(shè)置手動(dòng)提交
                  connection.setAutoCommit(false);
                  //獲取mapper
                  EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);
                  //先做刪除操作
                  employeeMapper.delete(null);
                  //獲取執(zhí)行器
                  ExecutorService service = ExecutorConfig.getThreadPool();
                  List<Callable<Integer>> callableList  = new ArrayList<>();
                  //拆分list
                  List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);
                  AtomicBoolean atomicBoolean = new AtomicBoolean(true);
                  for (int i =0;i<lists.size();i++){
                      if (i==lists.size()-1){
                          atomicBoolean.set(false);
                      }
                      List<EmployeeDO> list  = lists.get(i);
                      //使用返回結(jié)果的callable去執(zhí)行,
                      Callable<Integer> callable = () -> {
                          //讓最后一個(gè)線(xiàn)程拋出異常
                          if (!atomicBoolean.get()){
                              throw new ServiceException("001","出現(xiàn)異常");
                          }
                        return employeeMapper.saveBatch(list);
                      };
                      callableList.add(callable);
                  }
                  //執(zhí)行子線(xiàn)程
                 List<Future<Integer>> futures = service.invokeAll(callableList);
                  for (Future<Integer> future:futures) {
                  //如果有一個(gè)執(zhí)行不成功,則全部回滾
                      if (future.get()<=0){
                          connection.rollback();
                           return;
                      }
                  }
                  connection.commit();
                  System.out.println("添加完畢");
              }catch (Exception e){
                  connection.rollback();
                  log.info("error",e);
                  throw new ServiceException("002","出現(xiàn)異常");
              }finally {
                   connection.close();
               }
          }
          // sql
          <insert id="saveBatch" parameterType="List">
           INSERT INTO
           employee (employee_id,age,employee_name,birth_date,gender,id_number,creat_time,update_time,status)
           values
               <foreach collection="list" item="item" index="index" separator=",">
               (
               #{item.employeeId},
               #{item.age},
               #{item.employeeName},
               #{item.birthDate},
               #{item.gender},
               #{item.idNumber},
               #{item.creatTime},
               #{item.updateTime},
               #{item.status}
                   )
               </foreach>
           </insert>

          數(shù)據(jù)庫(kù)中一條數(shù)據(jù):

          0e826ee17f8de30b0449eb4c02372b98.webp

          圖片

          測(cè)試結(jié)果:拋出異常,

          2109ce8f7084690c4ba2d0c1317f6145.webp圖片

          刪除操作的數(shù)據(jù)回滾了,數(shù)據(jù)庫(kù)中的數(shù)據(jù)依舊存在,說(shuō)明事務(wù)成功了。

          882912390a746d763c717039de24d4ef.webp

          圖片

          成功操作示例:

                
                 @Resource
          SqlContext sqlContext;
          /**
           * 測(cè)試多線(xiàn)程事務(wù).
           * @param employeeDOList
           */
          @Override
          public void saveThread(List<EmployeeDO> employeeDOList) throws SQLException {
              // 獲取數(shù)據(jù)庫(kù)連接,獲取會(huì)話(huà)(內(nèi)部自有事務(wù))
              SqlSession sqlSession = sqlContext.getSqlSession();
              Connection connection = sqlSession.getConnection();
              try {
                  // 設(shè)置手動(dòng)提交
                  connection.setAutoCommit(false);
                  EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);
                  //先做刪除操作
                  employeeMapper.delete(null);
                  ExecutorService service = ExecutorConfig.getThreadPool();
                  List<Callable<Integer>> callableList  = new ArrayList<>();
                  List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);
                  for (int i =0;i<lists.size();i++){
                      List<EmployeeDO> list  = lists.get(i);
                      Callable<Integer> callable = () -> employeeMapper.saveBatch(list);
                      callableList.add(callable);
                  }
                  //執(zhí)行子線(xiàn)程
                 List<Future<Integer>> futures = service.invokeAll(callableList);
                  for (Future<Integer> future:futures) {
                      if (future.get()<=0){
                          connection.rollback();
                           return;
                      }
                  }
                  connection.commit();
                  System.out.println("添加完畢");
              }catch (Exception e){
                  connection.rollback();
                  log.info("error",e);
                  throw new ServiceException("002","出現(xiàn)異常");
                 // throw new ServiceException(ExceptionCodeEnum.EMPLOYEE_SAVE_OR_UPDATE_ERROR);
              }
          }

          測(cè)試結(jié)果:

          6832834a9d3be28d86c9a5738c3521a4.webp

          圖片

          數(shù)據(jù)庫(kù)中數(shù)據(jù):

          刪除的刪除了,添加的添加成功了,測(cè)試成功。

          c76f36bee94eba301e0fe5791e4e4875.webp

          ef71c013469a985cf012c75caffb27cc.webp

          瀏覽 171
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  婷婷丁香亚洲 | 中文字幕日产av 中文字幕在线资源 | 一级a一级a爰片免费免免水l软件 | 三级影院麻 | 亚洲一区欧美一区在线 |