<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          多線程事務怎么回滾?說用 @Transactional 可以回去等通知了!

          共 9622字,需瀏覽 20分鐘

           ·

          2023-01-07 11:39

          5b4cf034e866a3c92bb6ac537b218b42.webp程序員的成長之路互聯(lián)網(wǎng)/程序員/技術/資料共享? 關注


          閱讀本文大概需要 5.5 分鐘。

          來自: blog.csdn.net/weixin_43225491/article/ details/117705686

          背景介紹

          1.最近有一個大數(shù)據(jù)量插入的操作入庫的業(yè)務場景,需要先做一些其他修改操作,然后在執(zhí)行插入操作,由于插入數(shù)據(jù)可能會很多,用到多線程去拆分數(shù)據(jù)并行處理來提高響應時間,如果有一個線程執(zhí)行失敗,則全部回滾。2.在spring中可以使用@Transactional注解去控制事務,使出現(xiàn)異常時會進行回滾,在多線程中,這個注解則不會生效,如果主線程需要先執(zhí)行一些修改數(shù)據(jù)庫的操作,當子線程在進行處理出現(xiàn)異常時,主線程修改的數(shù)據(jù)則不會回滾,導致數(shù)據(jù)錯誤。3.下面用一個簡單示例演示多線程事務。

          公用的類和方法

              
              

          /**
          ?*?平均拆分list方法.
          ?*?@param?source
          ?*?@param?n
          ?*?@param?<T>
          ?*?@return
          ?*/

          public?static?<T>?List<List<T>>?averageAssign(List<T>?source,int?n){
          ????List<List<T>>?result=new?ArrayList<List<T>>();
          ????int?remaider=source.size()%n;?
          ????int?number=source.size()/n;?
          ????int?offset=0;//偏移量
          ????for(int?i=0;i<n;i++){
          ????????List<T>?value=null;
          ????????if(remaider>0){
          ????????????value=source.subList(i*number+offset,?(i+1)*number+offset+1);
          ????????????remaider--;
          ????????????offset++;
          ????????}else{
          ????????????value=source.subList(i*number+offset,?(i+1)*number+offset);
          ????????}
          ????????result.add(value);
          ????}
          ????return?result;
          }

              
              

          /**??線程池配置
          ?*?@version?V1.0
          ?*/

          public?class?ExecutorConfig?{
          ????private?static?int?maxPoolSize?=?Runtime.getRuntime().availableProcessors();
          ????private?volatile?static?ExecutorService?executorService;
          ????public?static?ExecutorService?getThreadPool()?{
          ????????if?(executorService?==?null){
          ????????????synchronized?(ExecutorConfig.class){
          ????????????????if?(executorService?==?null){
          ????????????????????executorService?=??newThreadPool();
          ????????????????}
          ????????????}
          ????????}
          ????????return?executorService;
          ????}

          ????private?static??ExecutorService?newThreadPool(){
          ????????int?queueSize?=?500;
          ????????int?corePool?=?Math.min(5,?maxPoolSize);
          ????????return?new?ThreadPoolExecutor(corePool,?maxPoolSize,?10000L,?TimeUnit.MILLISECONDS,
          ????????????new?LinkedBlockingQueue<>(queueSize),new?ThreadPoolExecutor.AbortPolicy());
          ????}
          ????private?ExecutorConfig(){}
          }

              
              

          /**?獲取sqlSession
          ?*?@author?86182
          ?*?@version?V1.0
          ?*/

          @Component
          public?class?SqlContext?{
          ????@Resource
          ????private?SqlSessionTemplate?sqlSessionTemplate;

          ????public?SqlSession?getSqlSession(){
          ????????SqlSessionFactory?sqlSessionFactory?=?sqlSessionTemplate.getSqlSessionFactory();
          ????????return?sqlSessionFactory.openSession();
          ????}
          }

          示例事務不成功操作

              
              

          /**
          ?*?測試多線程事務.
          ?*?@param?employeeDOList
          ?*/

          @Override
          @Transactional
          public?void?saveThread(List<EmployeeDO>?employeeDOList)?{
          ????try?{
          ????????//先做刪除操作,如果子線程出現(xiàn)異常,此操作不會回滾
          ????????this.getBaseMapper().delete(null);
          ????????//獲取線程池
          ????????ExecutorService?service?=?ExecutorConfig.getThreadPool();
          ????????//拆分數(shù)據(jù),拆分5份
          ????????List<List<EmployeeDO>>?lists=averageAssign(employeeDOList,?5);
          ????????//執(zhí)行的線程
          ????????Thread?[]threadArray?=?new?Thread[lists.size()];
          ????????//監(jiān)控子線程執(zhí)行完畢,再執(zhí)行主線程,要不然會導致主線程關閉,子線程也會隨著關閉
          ????????CountDownLatch?countDownLatch?=?new?CountDownLatch(lists.size());
          ????????AtomicBoolean?atomicBoolean?=?new?AtomicBoolean(true);
          ????????for?(int?i?=0;i<lists.size();i++){
          ????????????if?(i==lists.size()-1){
          ????????????????atomicBoolean.set(false);
          ????????????}
          ????????????List<EmployeeDO>?list??=?lists.get(i);
          ????????????threadArray[i]?=??new?Thread(()?->?{
          ????????????????try?{
          ?????????????????//最后一個線程拋出異常
          ????????????????????if?(!atomicBoolean.get()){
          ????????????????????????throw?new?ServiceException("001","出現(xiàn)異常");
          ????????????????????}
          ????????????????????//批量添加,mybatisPlus中自帶的batch方法
          ????????????????????this.saveBatch(list);
          ????????????????}finally?{
          ????????????????????countDownLatch.countDown();
          ????????????????}

          ????????????});
          ????????}
          ????????for?(int?i?=?0;?i?<lists.size();?i++){
          ????????????service.execute(threadArray[i]);
          ????????}
          ????????//當子線程執(zhí)行完畢時,主線程再往下執(zhí)行
          ????????countDownLatch.await();
          ????????System.out.println("添加完畢");
          ????}catch?(Exception?e){
          ????????log.info("error",e);
          ????????throw?new?ServiceException("002","出現(xiàn)異常");
          ????}finally?{
          ?????????connection.close();
          ?????}
          }

          數(shù)據(jù)庫中存在一條數(shù)據(jù):129b804f6b48ab019ccd1fc2b3ddd879.webp
              
              

          //測試用例
          @RunWith(SpringRunner.class)
          @SpringBootTest(classes?
          =?{?ThreadTest01.class,?MainApplication.class})
          public?class?ThreadTest01?
          {

          ????@Resource
          ????private?EmployeeBO?employeeBO;

          ????/**
          ?????*???測試多線程事務.
          ?????*?@throws?InterruptedException
          ?????*/

          ????@Test
          ????public??void?MoreThreadTest2()?throws?InterruptedException?{
          ????????int?size?=?10;
          ????????List<EmployeeDO>?employeeDOList?=?new?ArrayList<>(size);
          ????????for?(int?i?=?0;?i<size;i++){
          ????????????EmployeeDO?employeeDO?=?new?EmployeeDO();
          ????????????employeeDO.setEmployeeName("lol"+i);
          ????????????employeeDO.setAge(18);
          ????????????employeeDO.setGender(1);
          ????????????employeeDO.setIdNumber(i+"XX");
          ????????????employeeDO.setCreatTime(Calendar.getInstance().getTime());
          ????????????employeeDOList.add(employeeDO);
          ????????}
          ????????try?{
          ????????????employeeBO.saveThread(employeeDOList);
          ????????????System.out.println("添加成功");
          ????????}catch?(Exception?e){
          ????????????e.printStackTrace();
          ????????}
          ????}
          }

          測試結果:6f98a2bb827e9592a61caec93a06f36d.webpbb6d80a2deb94971aee0deff95dce8f4.webp可以發(fā)現(xiàn)子線程組執(zhí)行時,有一個線程執(zhí)行失敗,其他線程也會拋出異常,但是主線程中執(zhí)行的刪除操作,沒有回滾,Transactional注解沒有生效。

          使用sqlSession控制手動提交事務

              
              

          @Resource
          SqlContext?sqlContext;
          ?/**
          ?*?測試多線程事務.
          ?*?@param?employeeDOList
          ?*/

          @Override
          public?void?saveThread(List<EmployeeDO>?employeeDOList)?throws?SQLException?{
          ????//?獲取數(shù)據(jù)庫連接,獲取會話(內部自有事務)
          ????SqlSession?sqlSession?=?sqlContext.getSqlSession();
          ????Connection?connection?=?sqlSession.getConnection();
          ????try?{
          ????????//?設置手動提交
          ????????connection.setAutoCommit(false);
          ????????//獲取mapper
          ????????EmployeeMapper?employeeMapper?=?sqlSession.getMapper(EmployeeMapper.class);
          ????????//先做刪除操作
          ????????employeeMapper.delete(null);
          ????????//獲取執(zhí)行器
          ????????ExecutorService?service?=?ExecutorConfig.getThreadPool();
          ????????List<Callable<Integer>>?callableList??=?new?ArrayList<>();
          ????????//拆分list
          ????????List<List<EmployeeDO>>?lists=averageAssign(employeeDOList,?5);
          ????????AtomicBoolean?atomicBoolean?=?new?AtomicBoolean(true);
          ????????for?(int?i?=0;i<lists.size();i++){
          ????????????if?(i==lists.size()-1){
          ????????????????atomicBoolean.set(false);
          ????????????}
          ????????????List<EmployeeDO>?list??=?lists.get(i);
          ????????????//使用返回結果的callable去執(zhí)行,
          ????????????Callable<Integer>?callable?=?()?->?{
          ????????????????//讓最后一個線程拋出異常
          ????????????????if?(!atomicBoolean.get()){
          ????????????????????throw?new?ServiceException("001","出現(xiàn)異常");
          ????????????????}
          ??????????????return?employeeMapper.saveBatch(list);
          ????????????};
          ????????????callableList.add(callable);
          ????????}
          ????????//執(zhí)行子線程
          ???????List<Future<Integer>>?futures?=?service.invokeAll(callableList);
          ????????for?(Future<Integer>?future:futures)?{
          ????????//如果有一個執(zhí)行不成功,則全部回滾
          ????????????if?(future.get()<=0){
          ????????????????connection.rollback();
          ?????????????????return;
          ????????????}
          ????????}
          ????????connection.commit();
          ????????System.out.println("添加完畢");
          ????}catch?(Exception?e){
          ????????connection.rollback();
          ????????log.info("error",e);
          ????????throw?new?ServiceException("002","出現(xiàn)異常");
          ????}finally?{
          ?????????connection.close();
          ?????}
          }

              
              

          //?sql
          <insert?id="saveBatch"?parameterType="List">
          ?INSERT?INTO
          ?employee?(employee_id,age,employee_name,birth_date,gender,id_number,creat_time,update_time,status)
          ?values
          ?????<foreach?collection="list"?item="item"?index="index"?separator=",">
          ?????(
          ?????#{item.employeeId},
          ?????#{item.age},
          ?????#{item.employeeName},
          ?????#{item.birthDate},
          ?????#{item.gender},
          ?????#{item.idNumber},
          ?????#{item.creatTime},
          ?????#{item.updateTime},
          ?????#{item.status}
          ?????????)
          ?????</foreach>
          ?</insert>

          數(shù)據(jù)庫中一條數(shù)據(jù):6aba1621351f5bb9ed6aa72f91673045.webp測試結果,拋出異常:90d043c940c160e8b671900df87b70d9.webp刪除操作的數(shù)據(jù)回滾了,數(shù)據(jù)庫中的數(shù)據(jù)依舊存在,說明事務成功了。42639628908025fe9b244ac15fdbd0f5.webp
          成功操作示例:
              
              

          ?@Resource
          SqlContext?sqlContext;
          /**
          ?*?測試多線程事務.
          ?*?@param?employeeDOList
          ?*/

          @Override
          public?void?saveThread(List<EmployeeDO>?employeeDOList)?throws?SQLException?{
          ????//?獲取數(shù)據(jù)庫連接,獲取會話(內部自有事務)
          ????SqlSession?sqlSession?=?sqlContext.getSqlSession();
          ????Connection?connection?=?sqlSession.getConnection();
          ????try?{
          ????????//?設置手動提交
          ????????connection.setAutoCommit(false);
          ????????EmployeeMapper?employeeMapper?=?sqlSession.getMapper(EmployeeMapper.class);
          ????????//先做刪除操作
          ????????employeeMapper.delete(null);
          ????????ExecutorService?service?=?ExecutorConfig.getThreadPool();
          ????????List<Callable<Integer>>?callableList??=?new?ArrayList<>();
          ????????List<List<EmployeeDO>>?lists=averageAssign(employeeDOList,?5);
          ????????for?(int?i?=0;i<lists.size();i++){
          ????????????List<EmployeeDO>?list??=?lists.get(i);
          ????????????Callable<Integer>?callable?=?()?->?employeeMapper.saveBatch(list);
          ????????????callableList.add(callable);
          ????????}
          ????????//執(zhí)行子線程
          ???????List<Future<Integer>>?futures?=?service.invokeAll(callableList);
          ????????for?(Future<Integer>?future:futures)?{
          ????????????if?(future.get()<=0){
          ????????????????connection.rollback();
          ?????????????????return;
          ????????????}
          ????????}
          ????????connection.commit();
          ????????System.out.println("添加完畢");
          ????}catch?(Exception?e){
          ????????connection.rollback();
          ????????log.info("error",e);
          ????????throw?new?ServiceException("002","出現(xiàn)異常");
          ???????//?throw?new?ServiceException(ExceptionCodeEnum.EMPLOYEE_SAVE_OR_UPDATE_ERROR);
          ????}
          }

          測試結果:c4b59969e6fe61bfb1bafe2195f00585.webp數(shù)據(jù)庫中數(shù)據(jù):刪除的刪除了,添加的添加成功了,測試成功。6b4344115b5e11dd923ac43a1ae2cacf.webp<END>

          推薦閱讀:

          千萬級數(shù)據(jù)查詢:CK、ES、RediSearch 誰才是王炸?

          僅需一個注解,實現(xiàn) SpringBoot 項目中的隱私數(shù)據(jù)脫敏!

              
                  互聯(lián)網(wǎng)初中高級大廠面試題(9個G)
                
              

          內容包含Java基礎、JavaWeb、MySQL性能優(yōu)化、JVM、鎖、百萬并發(fā)、消息隊列、高性能緩存、反射、Spring全家桶原理、微服務、Zookeeper......等技術棧!

          ?戳閱讀原文領?。?/span> ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??朕已閱? 6a6678a0cd375cf3b0360e4a8edf12b7.webp

          瀏覽 51
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  狠狠插狠狠干 | 欧美日韩在线视频免费观看 | 操逼在线网站观看 | 亚洲三级片免费手机网站 | 777三级|