<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          支付寶:多線程事務(wù)怎么回滾?說用 @Transactional 可以回去等通知了!

          共 20021字,需瀏覽 41分鐘

           ·

          2023-02-04 18:29

          點(diǎn)擊藍(lán)色“程序員黃小斜”關(guān)注我喲

          加個(gè)“星標(biāo)”,每天和你一起多進(jìn)步一點(diǎn)點(diǎn)!


          來源:blog.csdn.net/weixin_43225491/article/details/117705686

          • 背景介紹
          • 公用的類和方法
          • 示例事務(wù)不成功操作

          背景介紹

          1,最近有一個(gè)大數(shù)據(jù)量插入的操作入庫的業(yè)務(wù)場景,需要先做一些其他修改操作,然后在執(zhí)行插入操作,由于插入數(shù)據(jù)可能會(huì)很多,用到多線程去拆分?jǐn)?shù)據(jù)并行處理來提高響應(yīng)時(shí)間,如果有一個(gè)線程執(zhí)行失敗,則全部回滾。

          2,在spring中可以使用@Transactional注解去控制事務(wù),使出現(xiàn)異常時(shí)會(huì)進(jìn)行回滾,在多線程中,這個(gè)注解則不會(huì)生效,如果主線程需要先執(zhí)行一些修改數(shù)據(jù)庫的操作,當(dāng)子線程在進(jìn)行處理出現(xiàn)異常時(shí),主線程修改的數(shù)據(jù)則不會(huì)回滾,導(dǎo)致數(shù)據(jù)錯(cuò)誤。

          3,下面用一個(gè)簡單示例演示多線程事務(wù)。

          公用的類和方法

          /**
           * 平均拆分list方法.
           * @param source
           * @param n
           * @param <T>
           * @return
           */
          public static <T> List<List<T>> averageAssign(List<T> source,int n){
              List<List<T>> result=new ArrayList<List<T>>();
              int remaider=source.size()%n; 
              int number=source.size()/n; 
              int offset=0;//偏移量
              for(int i=0;i<n;i++){
                  List<T> value=null;
                  if(remaider>0){
                      value=source.subList(i*number+offset, (i+1)*number+offset+1);
                      remaider--;
                      offset++;
                  }else{
                      value=source.subList(i*number+offset, (i+1)*number+offset);
                  }
                  result.add(value);
              }
              return result;
          }
          /**  線程池配置
           * @version V1.0
           */
          public class ExecutorConfig {
              private static int maxPoolSize = Runtime.getRuntime().availableProcessors();
              private volatile static ExecutorService executorService;
              public static ExecutorService getThreadPool() {
                  if (executorService == null){
                      synchronized (ExecutorConfig.class){
                          if (executorService == null){
                              executorService =  newThreadPool();
                          }
                      }
                  }
                  return executorService;
              }

              private static  ExecutorService newThreadPool(){
                  int queueSize = 500;
                  int corePool = Math.min(5, maxPoolSize);
                  return new ThreadPoolExecutor(corePool, maxPoolSize, 10000L, TimeUnit.MILLISECONDS,
                      new LinkedBlockingQueue<>(queueSize),new ThreadPoolExecutor.AbortPolicy());
              }
              private ExecutorConfig(){}
          }
          /** 獲取sqlSession
           * @author 86182
           * @version V1.0
           */
          @Component
          public class SqlContext {
              @Resource
              private SqlSessionTemplate sqlSessionTemplate;

              public SqlSession getSqlSession(){
                  SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();
                  return sqlSessionFactory.openSession();
              }
          }

          示例事務(wù)不成功操作

            /**
           * 測試多線程事務(wù).
           * @param employeeDOList
           */
          @Override
          @Transactional
          public void saveThread(List<EmployeeDO> employeeDOList) {
              try {
                  //先做刪除操作,如果子線程出現(xiàn)異常,此操作不會(huì)回滾
                  this.getBaseMapper().delete(null);
                  //獲取線程池
                  ExecutorService service = ExecutorConfig.getThreadPool();
                  //拆分?jǐn)?shù)據(jù),拆分5份
                  List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);
                  //執(zhí)行的線程
                  Thread []threadArray = new Thread[lists.size()];
                  //監(jiān)控子線程執(zhí)行完畢,再執(zhí)行主線程,要不然會(huì)導(dǎo)致主線程關(guān)閉,子線程也會(huì)隨著關(guān)閉
                  CountDownLatch countDownLatch = new CountDownLatch(lists.size());
                  AtomicBoolean atomicBoolean = new AtomicBoolean(true);
                  for (int i =0;i<lists.size();i++){
                      if (i==lists.size()-1){
                          atomicBoolean.set(false);
                      }
                      List<EmployeeDO> list  = lists.get(i);
                      threadArray[i] =  new Thread(() -> {
                          try {
                           //最后一個(gè)線程拋出異常
                              if (!atomicBoolean.get()){
                                  throw new ServiceException("001","出現(xiàn)異常");
                              }
                              //批量添加,mybatisPlus中自帶的batch方法
                              this.saveBatch(list);
                          }finally {
                              countDownLatch.countDown();
                          }

                      });
                  }
                  for (int i = 0; i <lists.size(); i++){
                      service.execute(threadArray[i]);
                  }
                  //當(dāng)子線程執(zhí)行完畢時(shí),主線程再往下執(zhí)行
                  countDownLatch.await();
                  System.out.println("添加完畢");
              }catch (Exception e){
                  log.info("error",e);
                  throw new ServiceException("002","出現(xiàn)異常");
              }finally {
                   connection.close();
               }
          }

          數(shù)據(jù)庫中存在一條數(shù)據(jù):

          圖片
          //測試用例
          @RunWith(SpringRunner.class)
          @SpringBootTest(classes = { ThreadTest01.class, MainApplication.class})
          public class ThreadTest01 {

              @Resource
              private EmployeeBO employeeBO;

              /**
               *   測試多線程事務(wù).
               * @throws InterruptedException
               */
              @Test
              public  void MoreThreadTest2() throws InterruptedException {
                  int size = 10;
                  List<EmployeeDO> employeeDOList = new ArrayList<>(size);
                  for (int i = 0; i<size;i++){
                      EmployeeDO employeeDO = new EmployeeDO();
                      employeeDO.setEmployeeName("lol"+i);
                      employeeDO.setAge(18);
                      employeeDO.setGender(1);
                      employeeDO.setIdNumber(i+"XX");
                      employeeDO.setCreatTime(Calendar.getInstance().getTime());
                      employeeDOList.add(employeeDO);
                  }
                  try {
                      employeeBO.saveThread(employeeDOList);
                      System.out.println("添加成功");
                  }catch (Exception e){
                      e.printStackTrace();
                  }
              }
          }

          測試結(jié)果:

          圖片

          圖片

          可以發(fā)現(xiàn)子線程組執(zhí)行時(shí),有一個(gè)線程執(zhí)行失敗,其他線程也會(huì)拋出異常,但是主線程中執(zhí)行的刪除操作,沒有回滾,@Transactional注解沒有生效。

          使用sqlSession控制手動(dòng)提交事務(wù)

           @Resource
            SqlContext sqlContext;
           /**
           * 測試多線程事務(wù).
           * @param employeeDOList
           */
          @Override
          public void saveThread(List<EmployeeDO> employeeDOList) throws SQLException {
              // 獲取數(shù)據(jù)庫連接,獲取會(huì)話(內(nèi)部自有事務(wù))
              SqlSession sqlSession = sqlContext.getSqlSession();
              Connection connection = sqlSession.getConnection();
              try {
                  // 設(shè)置手動(dòng)提交
                  connection.setAutoCommit(false);
                  //獲取mapper
                  EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);
                  //先做刪除操作
                  employeeMapper.delete(null);
                  //獲取執(zhí)行器
                  ExecutorService service = ExecutorConfig.getThreadPool();
                  List<Callable<Integer>> callableList  = new ArrayList<>();
                  //拆分list
                  List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);
                  AtomicBoolean atomicBoolean = new AtomicBoolean(true);
                  for (int i =0;i<lists.size();i++){
                      if (i==lists.size()-1){
                          atomicBoolean.set(false);
                      }
                      List<EmployeeDO> list  = lists.get(i);
                      //使用返回結(jié)果的callable去執(zhí)行,
                      Callable<Integer> callable = () -> {
                          //讓最后一個(gè)線程拋出異常
                          if (!atomicBoolean.get()){
                              throw new ServiceException("001","出現(xiàn)異常");
                          }
                        return employeeMapper.saveBatch(list);
                      };
                      callableList.add(callable);
                  }
                  //執(zhí)行子線程
                 List<Future<Integer>> futures = service.invokeAll(callableList);
                  for (Future<Integer> future:futures) {
                  //如果有一個(gè)執(zhí)行不成功,則全部回滾
                      if (future.get()<=0){
                          connection.rollback();
                           return;
                      }
                  }
                  connection.commit();
                  System.out.println("添加完畢");
              }catch (Exception e){
                  connection.rollback();
                  log.info("error",e);
                  throw new ServiceException("002","出現(xiàn)異常");
              }finally {
                   connection.close();
               }
          }
          // sql
          <insert id="saveBatch" parameterType="List">
           INSERT INTO
           employee (employee_id,age,employee_name,birth_date,gender,id_number,creat_time,update_time,status)
           values
               <foreach collection="list" item="item" index="index" separator=",">
               (
               #{item.employeeId},
               #{item.age},
               #{item.employeeName},
               #{item.birthDate},
               #{item.gender},
               #{item.idNumber},
               #{item.creatTime},
               #{item.updateTime},
               #{item.status}
                   )
               </foreach>
           </insert>

          數(shù)據(jù)庫中一條數(shù)據(jù):

          圖片

          測試結(jié)果:拋出異常,

          圖片

          刪除操作的數(shù)據(jù)回滾了,數(shù)據(jù)庫中的數(shù)據(jù)依舊存在,說明事務(wù)成功了。

          圖片

          成功操作示例:

           @Resource
          SqlContext sqlContext;
          /**
           * 測試多線程事務(wù).
           * @param employeeDOList
           */
          @Override
          public void saveThread(List<EmployeeDO> employeeDOList) throws SQLException {
              // 獲取數(shù)據(jù)庫連接,獲取會(huì)話(內(nèi)部自有事務(wù))
              SqlSession sqlSession = sqlContext.getSqlSession();
              Connection connection = sqlSession.getConnection();
              try {
                  // 設(shè)置手動(dòng)提交
                  connection.setAutoCommit(false);
                  EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);
                  //先做刪除操作
                  employeeMapper.delete(null);
                  ExecutorService service = ExecutorConfig.getThreadPool();
                  List<Callable<Integer>> callableList  = new ArrayList<>();
                  List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);
                  for (int i =0;i<lists.size();i++){
                      List<EmployeeDO> list  = lists.get(i);
                      Callable<Integer> callable = () -> employeeMapper.saveBatch(list);
                      callableList.add(callable);
                  }
                  //執(zhí)行子線程
                 List<Future<Integer>> futures = service.invokeAll(callableList);
                  for (Future<Integer> future:futures) {
                      if (future.get()<=0){
                          connection.rollback();
                           return;
                      }
                  }
                  connection.commit();
                  System.out.println("添加完畢");
              }catch (Exception e){
                  connection.rollback();
                  log.info("error",e);
                  throw new ServiceException("002","出現(xiàn)異常");
                 // throw new ServiceException(ExceptionCodeEnum.EMPLOYEE_SAVE_OR_UPDATE_ERROR);
              }
          }

          測試結(jié)果:

          圖片

          數(shù)據(jù)庫中數(shù)據(jù):

          刪除的刪除了,添加的添加成功了,測試成功。

          圖片

          清華讀研三年再拿字節(jié)offer,沒想到月薪比三年前更低


          這樣寫代碼,同事樂開花


          程序員面試,能不能不考“八股文”?


          — 【 THE END 】—
          公眾號[程序員黃小斜]全部博文已整理成一個(gè)目錄,請?jiān)诠娞柪锘貜?fù)「m」獲取!

          最近面試BAT,整理一份面試資料Java面試BATJ通關(guān)手冊,覆蓋了Java核心技術(shù)、JVM、Java并發(fā)、SSM、微服務(wù)、數(shù)據(jù)庫、數(shù)據(jù)結(jié)構(gòu)等等。

          獲取方式:點(diǎn)“在看”,關(guān)注公眾號并回復(fù) PDF 領(lǐng)取,更多內(nèi)容陸續(xù)奉上。

          文章有幫助的話,在看,轉(zhuǎn)發(fā)吧。

          謝謝支持喲 (*^__^*)


          瀏覽 64
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  天堂色| 天天综合狠狠干 | 亚洲欧洲在线免费 | 蜜桃av免费 | 国产精品色婷婷99久久精品 |