炸了!Java多線程批量操作,居然有人不做事務(wù)控制
閱讀本文大概需要 9?分鐘。
來(lái)自:https://c1n.cn/by3nt
目錄
前言
循環(huán)操作的代碼
使用手動(dòng)事務(wù)的操作代碼
嘗試多線程進(jìn)行數(shù)據(jù)修改
基于兩個(gè) CountDownLatch 控制多線程事務(wù)提交
基于 TransactionStatus 集合來(lái)控制多線程事務(wù)提交
使用 union 連接多個(gè) select 實(shí)現(xiàn)批量 update
總結(jié)
前言
項(xiàng)目代碼基于:MySQL 數(shù)據(jù)
開發(fā)框架為:SpringBoot、Mybatis
開發(fā)語(yǔ)言為:Java8
https://gitee.com/john273766764/springboot-mybatis-threads
循環(huán)操作的代碼
/***
?*?一條一條依次對(duì)50000條數(shù)據(jù)進(jìn)行更新操作
?*?耗時(shí):2m27s,1m54s
?*/
@Test
void?updateStudent()?{
????List?allStudents?=?studentMapper.getAll();
????allStudents.forEach(s?->?{
????????//更新教師信息
????????String?teacher?=?s.getTeacher();
????????String?newTeacher?=?"TNO_"?+?new?Random().nextInt(100);
????????s.setTeacher(newTeacher);
????????studentMapper.update(s);
????});
}
使用手動(dòng)事務(wù)的操作代碼
@Autowired
private?DataSourceTransactionManager?dataSourceTransactionManager;
@Autowired
private?TransactionDefinition?transactionDefinition;
/**
?*?由于希望更新操作?一次性完成,需要手動(dòng)控制添加事務(wù)
?*?耗時(shí):24s
?*?從測(cè)試結(jié)果可以看出,添加事務(wù)后插入數(shù)據(jù)的效率有明顯的提升
?*/
@Test
void?updateStudentWithTrans()?{
????List?allStudents?=?studentMapper.getAll();
????TransactionStatus?transactionStatus?=?dataSourceTransactionManager.getTransaction(transactionDefinition);
????try?{
????????allStudents.forEach(s?->?{
????????????//更新教師信息
????????????String?teacher?=?s.getTeacher();
????????????String?newTeacher?=?"TNO_"?+?new?Random().nextInt(100);
????????????s.setTeacher(newTeacher);
????????????studentMapper.update(s);
????????});
????????dataSourceTransactionManager.commit(transactionStatus);
????}?catch?(Throwable?e)?{
????????dataSourceTransactionManager.rollback(transactionStatus);
????????throw?e;
????}
}
嘗試多線程進(jìn)行數(shù)據(jù)修改
@Service
public?class?StudentServiceImpl?implements?StudentService?{
????@Autowired
????private?StudentMapper?studentMapper;
????@Autowired
????private?DataSourceTransactionManager?dataSourceTransactionManager;
????@Autowired
????private?TransactionDefinition?transactionDefinition;
????@Override
????public?void?updateStudents(List?students,?CountDownLatch?threadLatch) ?{
????????TransactionStatus?transactionStatus?=?dataSourceTransactionManager.getTransaction(transactionDefinition);
????????System.out.println("子線程:"?+?Thread.currentThread().getName());
????????try?{
????????????students.forEach(s?->?{
????????????????//?更新教師信息
????????????????//?String?teacher?=?s.getTeacher();
????????????????String?newTeacher?=?"TNO_"?+?new?Random().nextInt(100);
????????????????s.setTeacher(newTeacher);
????????????????studentMapper.update(s);
????????????});
????????????dataSourceTransactionManager.commit(transactionStatus);
????????????threadLatch.countDown();
????????}?catch?(Throwable?e)?{
????????????e.printStackTrace();
????????????dataSourceTransactionManager.rollback(transactionStatus);
????????}
????}
}
@Autowired
private?DataSourceTransactionManager?dataSourceTransactionManager;
@Autowired
private?TransactionDefinition?transactionDefinition;
@Autowired
private?StudentService?studentService;
/**
?*?對(duì)用戶而言,27s?任是一個(gè)較長(zhǎng)的時(shí)間,我們嘗試用多線程的方式來(lái)經(jīng)行修改操作看能否加快處理速度
?*?預(yù)計(jì)創(chuàng)建10個(gè)線程,每個(gè)線程進(jìn)行5000條數(shù)據(jù)修改操作
?*?耗時(shí)統(tǒng)計(jì)
?* 1 線程數(shù):1 ?????耗時(shí):25s
?* 2 線程數(shù):2 ?????耗時(shí):14s
?* 3 線程數(shù):5 ?????耗時(shí):15s
?* 4 線程數(shù):10?????耗時(shí):15s
?* 5 線程數(shù):100????耗時(shí):15s
?* 6 線程數(shù):200????耗時(shí):15s
?* 7 線程數(shù):500????耗時(shí):17s
?* 8 線程數(shù):1000????耗時(shí):19s
?* 8 線程數(shù):2000????耗時(shí):23s
?* 8 線程數(shù):5000????耗時(shí):29s
?*/
@Test
void?updateStudentWithThreads()?{
????//查詢總數(shù)據(jù)
????List?allStudents?=?studentMapper.getAll();
????//?線程數(shù)量
????final?Integer?threadCount?=?100;
????//每個(gè)線程處理的數(shù)據(jù)量
????final?Integer?dataPartionLength?=?(allStudents.size()?+?threadCount?-?1)?/?threadCount;
????//?創(chuàng)建多線程處理任務(wù)
????ExecutorService?studentThreadPool?=?Executors.newFixedThreadPool(threadCount);
????CountDownLatch?threadLatchs?=?new?CountDownLatch(threadCount);
????for?(int?i?=?0;?i?????????//?每個(gè)線程處理的數(shù)據(jù)
????????List?threadDatas?=?allStudents.stream()
????????????????.skip(i?*?dataPartionLength).limit(dataPartionLength).collect(Collectors.toList());
????????studentThreadPool.execute(()?->?{
????????????studentService.updateStudents(threadDatas,?threadLatchs);
????????});
????}
????try?{
????????//?倒計(jì)時(shí)鎖設(shè)置超時(shí)時(shí)間?30s
????????threadLatchs.await(30,?TimeUnit.SECONDS);
????}?catch?(Throwable?e)?{
????????e.printStackTrace();
????}
????System.out.println("主線程完成");
}

基于兩個(gè) CountDownLatch 控制多線程事務(wù)提交
@Override
public?void?updateStudentsThread(List?students,?CountDownLatch?threadLatch,?CountDownLatch?mainLatch,?StudentTaskError?taskStatus)?{
????TransactionStatus?transactionStatus?=?dataSourceTransactionManager.getTransaction(transactionDefinition);
????System.out.println("子線程:"?+?Thread.currentThread().getName());
????try?{
????????students.forEach(s?->?{
????????????//?更新教師信息
????????????//?String?teacher?=?s.getTeacher();
????????????String?newTeacher?=?"TNO_"?+?new?Random().nextInt(100);
????????????s.setTeacher(newTeacher);
????????????studentMapper.update(s);
????????});
????}?catch?(Throwable?e)?{
????????taskStatus.setIsError();
????}?finally?{
????????threadLatch.countDown();?//?切換到主線程執(zhí)行
????}
????try?{
????????mainLatch.await();??//等待主線程執(zhí)行
????}?catch?(Throwable?e)?{
????????taskStatus.setIsError();
????}
????//?判斷是否有錯(cuò)誤,如有錯(cuò)誤?就回滾事務(wù)
????if?(taskStatus.getIsError())?{
????????dataSourceTransactionManager.rollback(transactionStatus);
????}?else?{
????????dataSourceTransactionManager.commit(transactionStatus);
????}
}
/**
?*?由于每個(gè)線程都是單獨(dú)的事務(wù),需要添加對(duì)線程事務(wù)的統(tǒng)一控制
?*?我們這邊使用兩個(gè)?CountDownLatch?對(duì)子線程的事務(wù)進(jìn)行控制
?*/
@Test
void?updateStudentWithThreadsAndTrans()?{
????//查詢總數(shù)據(jù)
????List?allStudents?=?studentMapper.getAll();
????//?線程數(shù)量
????final?Integer?threadCount?=?4;
????//每個(gè)線程處理的數(shù)據(jù)量
????final?Integer?dataPartionLength?=?(allStudents.size()?+?threadCount?-?1)?/?threadCount;
????//?創(chuàng)建多線程處理任務(wù)
????ExecutorService?studentThreadPool?=?Executors.newFixedThreadPool(threadCount);
????CountDownLatch?threadLatchs?=?new?CountDownLatch(threadCount);?//?用于計(jì)算子線程提交數(shù)量
????CountDownLatch?mainLatch?=?new?CountDownLatch(1);?//?用于判斷主線程是否提交
????StudentTaskError?taskStatus?=?new?StudentTaskError();?//?用于判斷子線程任務(wù)是否有錯(cuò)誤
????for?(int?i?=?0;?i?????????//?每個(gè)線程處理的數(shù)據(jù)
????????List?threadDatas?=?allStudents.stream()
????????????????.skip(i?*?dataPartionLength).limit(dataPartionLength)
????????????????.collect(Collectors.toList());
????????studentThreadPool.execute(()?->?{
????????????studentService.updateStudentsThread(threadDatas,?threadLatchs,?mainLatch,?taskStatus);
????????});
????}
????try?{
????????//?倒計(jì)時(shí)鎖設(shè)置超時(shí)時(shí)間?30s
????????boolean?await?=?threadLatchs.await(30,?TimeUnit.SECONDS);
????????if?(!await)?{?//?等待超時(shí),事務(wù)回滾
????????????taskStatus.setIsError();
????????}
????}?catch?(Throwable?e)?{
????????e.printStackTrace();
????????taskStatus.setIsError();
????}
????mainLatch.countDown();?//?切換到子線程執(zhí)行
????studentThreadPool.shutdown();?//關(guān)閉線程池
????System.out.println("主線程完成");
}
Exception?in?thread?"pool-1-thread-2"?org.springframework.transaction.CannotCreateTransactionException:?Could?not?open?JDBC?Connection?for?transaction;?nested?exception?is?java.sql.SQLTransientConnectionException:?HikariPool-1?-?Connection?is?not?available,?request?timed?out?after?30055ms.
?at?org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:309)
?at?org.springframework.transaction.support.AbstractPlatformTransactionManager.startTransaction(AbstractPlatformTransactionManager.java:400)
?at?org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:373)
?at?com.example.springbootmybatis.service.Impl.StudentServiceImpl.updateStudentsThread(StudentServiceImpl.java:58)
?at?com.example.springbootmybatis.StudentTest.lambda$updateStudentWithThreadsAndTrans$3(StudentTest.java:164)
?at?java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
?at?java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
?at?java.lang.Thread.run(Thread.java:748)
Caused?by:?java.sql.SQLTransientConnectionException:?HikariPool-1?-?Connection?is?not?available,?request?timed?out?after?30055ms.
?at?com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:696)
?at?com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:197)
?at?com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:162)
?at?com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:128)
?at?org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:265)
?...?7?more
#?連接池中允許的最小連接數(shù)。缺省值:10
spring.datasource.hikari.minimum-idle=10
#?連接池中允許的最大連接數(shù)。缺省值:10
spring.datasource.hikari.maximum-pool-size=100
#?自動(dòng)提交
spring.datasource.hikari.auto-commit=true
#?一個(gè)連接idle狀態(tài)的最大時(shí)長(zhǎng)(毫秒),超時(shí)則被釋放(retired),缺省:10分鐘
spring.datasource.hikari.idle-timeout=30000
#?一個(gè)連接的生命時(shí)長(zhǎng)(毫秒),超時(shí)而且沒(méi)被使用則被釋放(retired),缺省:30分鐘,建議設(shè)置比數(shù)據(jù)庫(kù)超時(shí)時(shí)長(zhǎng)少30秒
spring.datasource.hikari.max-lifetime=1800000
#?等待連接池分配連接的最大時(shí)長(zhǎng)(毫秒),超過(guò)這個(gè)時(shí)長(zhǎng)還沒(méi)可用的連接則發(fā)生SQLException,?缺省:30秒
再次執(zhí)行測(cè)試發(fā)現(xiàn)沒(méi)有報(bào)錯(cuò),修改線程數(shù)為 20 又執(zhí)行了一下,同樣執(zhí)行成功了。
基于 TransactionStatus 集合來(lái)控制多線程事務(wù)提交
@Service
public?class?StudentsTransactionThread?{
????@Autowired
????private?StudentMapper?studentMapper;
????@Autowired
????private?StudentService?studentService;
????@Autowired
????private?PlatformTransactionManager?transactionManager;
????List?transactionStatuses?=?Collections.synchronizedList(new?ArrayList ());
????@Transactional(propagation?=?Propagation.REQUIRED,?rollbackFor?=?{Exception.class})
????public?void?updateStudentWithThreadsAndTrans()?throws?InterruptedException?{
????????//查詢總數(shù)據(jù)
????????List?allStudents?=?studentMapper.getAll();
????????//?線程數(shù)量
????????final?Integer?threadCount?=?2;
????????//每個(gè)線程處理的數(shù)據(jù)量
????????final?Integer?dataPartionLength?=?(allStudents.size()?+?threadCount?-?1)?/?threadCount;
????????//?創(chuàng)建多線程處理任務(wù)
????????ExecutorService?studentThreadPool?=?Executors.newFixedThreadPool(threadCount);
????????CountDownLatch?threadLatchs?=?new?CountDownLatch(threadCount);
????????AtomicBoolean?isError?=?new?AtomicBoolean(false);
????????try?{
????????????for?(int?i?=?0;?i?????????????????//?每個(gè)線程處理的數(shù)據(jù)
????????????????List?threadDatas?=?allStudents.stream()
????????????????????????.skip(i?*?dataPartionLength).limit(dataPartionLength).collect(Collectors.toList());
????????????????studentThreadPool.execute(()?->?{
????????????????????try?{
????????????????????????try?{
????????????????????????????studentService.updateStudentsTransaction(transactionManager,?transactionStatuses,?threadDatas);
????????????????????????}?catch?(Throwable?e)?{
????????????????????????????e.printStackTrace();
????????????????????????????isError.set(true);
????????????????????????}finally?{
????????????????????????????threadLatchs.countDown();
????????????????????????}
????????????????????}?catch?(Exception?e)?{
????????????????????????e.printStackTrace();
????????????????????????isError.set(true);
????????????????????}
????????????????});
????????????}
????????????//?倒計(jì)時(shí)鎖設(shè)置超時(shí)時(shí)間?30s
????????????boolean?await?=?threadLatchs.await(30,?TimeUnit.SECONDS);
????????????//?判斷是否超時(shí)
????????????if?(!await)?{
????????????????isError.set(true);
????????????}
????????}?catch?(Throwable?e)?{
????????????e.printStackTrace();
????????????isError.set(true);
????????}
????????if?(!transactionStatuses.isEmpty())?{
????????????if?(isError.get())?{
????????????????transactionStatuses.forEach(s?->?transactionManager.rollback(s));
????????????}?else?{
????????????????transactionStatuses.forEach(s?->?transactionManager.commit(s));
????????????}
????????}
????????System.out.println("主線程完成");
????}
}
@Override
@Transactional(propagation?=?Propagation.REQUIRED,?rollbackFor?=?{Exception.class})
public?void?updateStudentsTransaction(PlatformTransactionManager?transactionManager,?List?transactionStatuses,?List ?students)?{
????//?使用這種方式將事務(wù)狀態(tài)都放在同一個(gè)事務(wù)里面
????DefaultTransactionDefinition?def?=?new?DefaultTransactionDefinition();
????def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);?//?事物隔離級(jí)別,開啟新事務(wù),這樣會(huì)比較安全些。
????TransactionStatus?status?=?transactionManager.getTransaction(def);?//?獲得事務(wù)狀態(tài)
????transactionStatuses.add(status);
????students.forEach(s?->?{
????????//?更新教師信息
????????//?String?teacher?=?s.getTeacher();
????????String?newTeacher?=?"TNO_"?+?new?Random().nextInt(100);
????????s.setTeacher(newTeacher);
????????studentMapper.update(s);
????});
????System.out.println("子線程:"?+?Thread.currentThread().getName());
}
使用 union 連接多個(gè) select 實(shí)現(xiàn)批量 update
update?student,(
?(select??1?as?id,'teacher_A'?as?teacher)?union
?(select??2?as?id,'teacher_A'?as?teacher)?union
?(select??3?as?id,'teacher_A'?as?teacher)?union
?(select??4?as?id,'teacher_A'?as?teacher)
????/*?....more?data?...?*/
????)?as?new_teacher
set
?student.teacher=new_teacher.teacher
where
?student.id=new_teacher.id
總結(jié)
對(duì)于大批量數(shù)據(jù)庫(kù)操作,使用手動(dòng)事務(wù)提交可以很多程度上提高操作效率
多線程對(duì)數(shù)據(jù)庫(kù)進(jìn)行操作時(shí),并非線程數(shù)越多操作時(shí)間越快,按上述示例大約在 2-5 個(gè)線程時(shí)操作時(shí)間最快。
對(duì)于多線程阻塞事務(wù)提交時(shí),線程數(shù)量不能過(guò)多
如果能有辦法實(shí)現(xiàn)批量更新那是最好
內(nèi)容包含Java基礎(chǔ)、JavaWeb、MySQL性能優(yōu)化、JVM、鎖、百萬(wàn)并發(fā)、消息隊(duì)列、高性能緩存、反射、Spring全家桶原理、微服務(wù)、Zookeeper......等技術(shù)棧!
?戳閱讀原文領(lǐng)?。?/span>? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??朕已閱?

