多線程事務怎么回滾?說用 @Transactional 可以回去等通知了!
程序員的成長之路互聯(lián)網(wǎng)/程序員/技術/資料共享?
關注
閱讀本文大概需要 5.5 分鐘。
來自:
blog.csdn.net/weixin_43225491/article/
details/117705686
背景介紹
1.最近有一個大數(shù)據(jù)量插入的操作入庫的業(yè)務場景,需要先做一些其他修改操作,然后在執(zhí)行插入操作,由于插入數(shù)據(jù)可能會很多,用到多線程去拆分數(shù)據(jù)并行處理來提高響應時間,如果有一個線程執(zhí)行失敗,則全部回滾。2.在spring中可以使用@Transactional注解去控制事務,使出現(xiàn)異常時會進行回滾,在多線程中,這個注解則不會生效,如果主線程需要先執(zhí)行一些修改數(shù)據(jù)庫的操作,當子線程在進行處理出現(xiàn)異常時,主線程修改的數(shù)據(jù)則不會回滾,導致數(shù)據(jù)錯誤。3.下面用一個簡單示例演示多線程事務。公用的類和方法
/**
?*?平均拆分list方法.
?*?@param?source
?*?@param?n
?*?@param?<T>
?*?@return
?*/
public?static?<T>?List<List<T>>?averageAssign(List<T>?source,int?n){
????List<List<T>>?result=new?ArrayList<List<T>>();
????int?remaider=source.size()%n;?
????int?number=source.size()/n;?
????int?offset=0;//偏移量
????for(int?i=0;i<n;i++){
????????List<T>?value=null;
????????if(remaider>0){
????????????value=source.subList(i*number+offset,?(i+1)*number+offset+1);
????????????remaider--;
????????????offset++;
????????}else{
????????????value=source.subList(i*number+offset,?(i+1)*number+offset);
????????}
????????result.add(value);
????}
????return?result;
}
/**??線程池配置
?*?@version?V1.0
?*/
public?class?ExecutorConfig?{
????private?static?int?maxPoolSize?=?Runtime.getRuntime().availableProcessors();
????private?volatile?static?ExecutorService?executorService;
????public?static?ExecutorService?getThreadPool()?{
????????if?(executorService?==?null){
????????????synchronized?(ExecutorConfig.class){
????????????????if?(executorService?==?null){
????????????????????executorService?=??newThreadPool();
????????????????}
????????????}
????????}
????????return?executorService;
????}
????private?static??ExecutorService?newThreadPool(){
????????int?queueSize?=?500;
????????int?corePool?=?Math.min(5,?maxPoolSize);
????????return?new?ThreadPoolExecutor(corePool,?maxPoolSize,?10000L,?TimeUnit.MILLISECONDS,
????????????new?LinkedBlockingQueue<>(queueSize),new?ThreadPoolExecutor.AbortPolicy());
????}
????private?ExecutorConfig(){}
}
/**?獲取sqlSession
?*?@author?86182
?*?@version?V1.0
?*/
@Component
public?class?SqlContext?{
????@Resource
????private?SqlSessionTemplate?sqlSessionTemplate;
????public?SqlSession?getSqlSession(){
????????SqlSessionFactory?sqlSessionFactory?=?sqlSessionTemplate.getSqlSessionFactory();
????????return?sqlSessionFactory.openSession();
????}
}
示例事務不成功操作
/**
?*?測試多線程事務.
?*?@param?employeeDOList
?*/
@Override
@Transactional
public?void?saveThread(List<EmployeeDO>?employeeDOList)?{
????try?{
????????//先做刪除操作,如果子線程出現(xiàn)異常,此操作不會回滾
????????this.getBaseMapper().delete(null);
????????//獲取線程池
????????ExecutorService?service?=?ExecutorConfig.getThreadPool();
????????//拆分數(shù)據(jù),拆分5份
????????List<List<EmployeeDO>>?lists=averageAssign(employeeDOList,?5);
????????//執(zhí)行的線程
????????Thread?[]threadArray?=?new?Thread[lists.size()];
????????//監(jiān)控子線程執(zhí)行完畢,再執(zhí)行主線程,要不然會導致主線程關閉,子線程也會隨著關閉
????????CountDownLatch?countDownLatch?=?new?CountDownLatch(lists.size());
????????AtomicBoolean?atomicBoolean?=?new?AtomicBoolean(true);
????????for?(int?i?=0;i<lists.size();i++){
????????????if?(i==lists.size()-1){
????????????????atomicBoolean.set(false);
????????????}
????????????List<EmployeeDO>?list??=?lists.get(i);
????????????threadArray[i]?=??new?Thread(()?->?{
????????????????try?{
?????????????????//最后一個線程拋出異常
????????????????????if?(!atomicBoolean.get()){
????????????????????????throw?new?ServiceException("001","出現(xiàn)異常");
????????????????????}
????????????????????//批量添加,mybatisPlus中自帶的batch方法
????????????????????this.saveBatch(list);
????????????????}finally?{
????????????????????countDownLatch.countDown();
????????????????}
????????????});
????????}
????????for?(int?i?=?0;?i?<lists.size();?i++){
????????????service.execute(threadArray[i]);
????????}
????????//當子線程執(zhí)行完畢時,主線程再往下執(zhí)行
????????countDownLatch.await();
????????System.out.println("添加完畢");
????}catch?(Exception?e){
????????log.info("error",e);
????????throw?new?ServiceException("002","出現(xiàn)異常");
????}finally?{
?????????connection.close();
?????}
}

//測試用例
@RunWith(SpringRunner.class)
@SpringBootTest(classes?=?{?ThreadTest01.class,?MainApplication.class})
public?class?ThreadTest01?{
????@Resource
????private?EmployeeBO?employeeBO;
????/**
?????*???測試多線程事務.
?????*?@throws?InterruptedException
?????*/
????@Test
????public??void?MoreThreadTest2()?throws?InterruptedException?{
????????int?size?=?10;
????????List<EmployeeDO>?employeeDOList?=?new?ArrayList<>(size);
????????for?(int?i?=?0;?i<size;i++){
????????????EmployeeDO?employeeDO?=?new?EmployeeDO();
????????????employeeDO.setEmployeeName("lol"+i);
????????????employeeDO.setAge(18);
????????????employeeDO.setGender(1);
????????????employeeDO.setIdNumber(i+"XX");
????????????employeeDO.setCreatTime(Calendar.getInstance().getTime());
????????????employeeDOList.add(employeeDO);
????????}
????????try?{
????????????employeeBO.saveThread(employeeDOList);
????????????System.out.println("添加成功");
????????}catch?(Exception?e){
????????????e.printStackTrace();
????????}
????}
}

可以發(fā)現(xiàn)子線程組執(zhí)行時,有一個線程執(zhí)行失敗,其他線程也會拋出異常,但是主線程中執(zhí)行的刪除操作,沒有回滾,Transactional注解沒有生效。使用sqlSession控制手動提交事務
@Resource
SqlContext?sqlContext;
?/**
?*?測試多線程事務.
?*?@param?employeeDOList
?*/
@Override
public?void?saveThread(List<EmployeeDO>?employeeDOList)?throws?SQLException?{
????//?獲取數(shù)據(jù)庫連接,獲取會話(內部自有事務)
????SqlSession?sqlSession?=?sqlContext.getSqlSession();
????Connection?connection?=?sqlSession.getConnection();
????try?{
????????//?設置手動提交
????????connection.setAutoCommit(false);
????????//獲取mapper
????????EmployeeMapper?employeeMapper?=?sqlSession.getMapper(EmployeeMapper.class);
????????//先做刪除操作
????????employeeMapper.delete(null);
????????//獲取執(zhí)行器
????????ExecutorService?service?=?ExecutorConfig.getThreadPool();
????????List<Callable<Integer>>?callableList??=?new?ArrayList<>();
????????//拆分list
????????List<List<EmployeeDO>>?lists=averageAssign(employeeDOList,?5);
????????AtomicBoolean?atomicBoolean?=?new?AtomicBoolean(true);
????????for?(int?i?=0;i<lists.size();i++){
????????????if?(i==lists.size()-1){
????????????????atomicBoolean.set(false);
????????????}
????????????List<EmployeeDO>?list??=?lists.get(i);
????????????//使用返回結果的callable去執(zhí)行,
????????????Callable<Integer>?callable?=?()?->?{
????????????????//讓最后一個線程拋出異常
????????????????if?(!atomicBoolean.get()){
????????????????????throw?new?ServiceException("001","出現(xiàn)異常");
????????????????}
??????????????return?employeeMapper.saveBatch(list);
????????????};
????????????callableList.add(callable);
????????}
????????//執(zhí)行子線程
???????List<Future<Integer>>?futures?=?service.invokeAll(callableList);
????????for?(Future<Integer>?future:futures)?{
????????//如果有一個執(zhí)行不成功,則全部回滾
????????????if?(future.get()<=0){
????????????????connection.rollback();
?????????????????return;
????????????}
????????}
????????connection.commit();
????????System.out.println("添加完畢");
????}catch?(Exception?e){
????????connection.rollback();
????????log.info("error",e);
????????throw?new?ServiceException("002","出現(xiàn)異常");
????}finally?{
?????????connection.close();
?????}
}
//?sql
<insert?id="saveBatch"?parameterType="List">
?INSERT?INTO
?employee?(employee_id,age,employee_name,birth_date,gender,id_number,creat_time,update_time,status)
?values
?????<foreach?collection="list"?item="item"?index="index"?separator=",">
?????(
?????#{item.employeeId},
?????#{item.age},
?????#{item.employeeName},
?????#{item.birthDate},
?????#{item.gender},
?????#{item.idNumber},
?????#{item.creatTime},
?????#{item.updateTime},
?????#{item.status}
?????????)
?????</foreach>
?</insert>
測試結果,拋出異常:
刪除操作的數(shù)據(jù)回滾了,數(shù)據(jù)庫中的數(shù)據(jù)依舊存在,說明事務成功了。
成功操作示例:
?@Resource
SqlContext?sqlContext;
/**
?*?測試多線程事務.
?*?@param?employeeDOList
?*/
@Override
public?void?saveThread(List<EmployeeDO>?employeeDOList)?throws?SQLException?{
????//?獲取數(shù)據(jù)庫連接,獲取會話(內部自有事務)
????SqlSession?sqlSession?=?sqlContext.getSqlSession();
????Connection?connection?=?sqlSession.getConnection();
????try?{
????????//?設置手動提交
????????connection.setAutoCommit(false);
????????EmployeeMapper?employeeMapper?=?sqlSession.getMapper(EmployeeMapper.class);
????????//先做刪除操作
????????employeeMapper.delete(null);
????????ExecutorService?service?=?ExecutorConfig.getThreadPool();
????????List<Callable<Integer>>?callableList??=?new?ArrayList<>();
????????List<List<EmployeeDO>>?lists=averageAssign(employeeDOList,?5);
????????for?(int?i?=0;i<lists.size();i++){
????????????List<EmployeeDO>?list??=?lists.get(i);
????????????Callable<Integer>?callable?=?()?->?employeeMapper.saveBatch(list);
????????????callableList.add(callable);
????????}
????????//執(zhí)行子線程
???????List<Future<Integer>>?futures?=?service.invokeAll(callableList);
????????for?(Future<Integer>?future:futures)?{
????????????if?(future.get()<=0){
????????????????connection.rollback();
?????????????????return;
????????????}
????????}
????????connection.commit();
????????System.out.println("添加完畢");
????}catch?(Exception?e){
????????connection.rollback();
????????log.info("error",e);
????????throw?new?ServiceException("002","出現(xiàn)異常");
???????//?throw?new?ServiceException(ExceptionCodeEnum.EMPLOYEE_SAVE_OR_UPDATE_ERROR);
????}
}
數(shù)據(jù)庫中數(shù)據(jù):刪除的刪除了,添加的添加成功了,測試成功。
<END>
推薦閱讀:
千萬級數(shù)據(jù)查詢:CK、ES、RediSearch 誰才是王炸?
僅需一個注解,實現(xiàn) SpringBoot 項目中的隱私數(shù)據(jù)脫敏!
互聯(lián)網(wǎng)初中高級大廠面試題(9個G)
內容包含Java基礎、JavaWeb、MySQL性能優(yōu)化、JVM、鎖、百萬并發(fā)、消息隊列、高性能緩存、反射、Spring全家桶原理、微服務、Zookeeper......等技術棧!
?戳閱讀原文領?。?/span>
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??朕已閱?
![]()
