炸了!Java多線程批量操作,居然有人不做事務(wù)控制
點(diǎn)擊上方“碼農(nóng)突圍”,馬上關(guān)注 這里是碼農(nóng)充電第一站,回復(fù)“666”,獲取一份專屬大禮包 真愛,請(qǐng)?jiān)O(shè)置“星標(biāo)”或點(diǎn)個(gè)“在看”

文章來源:https://c1n.cn/by3nt
前言
循環(huán)操作的代碼
使用手動(dòng)事務(wù)的操作代碼
嘗試多線程進(jìn)行數(shù)據(jù)修改
基于兩個(gè) CountDownLatch 控制多線程事務(wù)提交
基于 TransactionStatus 集合來控制多線程事務(wù)提交
使用 union 連接多個(gè) select 實(shí)現(xiàn)批量 update
總結(jié)
前言
項(xiàng)目概況如下:
項(xiàng)目代碼基于:MySQL 數(shù)據(jù)
開發(fā)框架為:SpringBoot、Mybatis
開發(fā)語言為:Java8
https://gitee.com/john273766764/springboot-mybatis-threads
公司業(yè)務(wù)中遇到一個(gè)需求,需要同時(shí)修改最多約 5 萬條數(shù)據(jù),而且還不支持批量或異步修改操作。于是只能寫個(gè) for 循環(huán)操作,但操作耗時(shí)太長,只能一步一步尋找其他解決方案。具體操作如下。
循環(huán)操作的代碼
/***
?*?一條一條依次對(duì)50000條數(shù)據(jù)進(jìn)行更新操作
?*?耗時(shí):2m27s,1m54s
?*/
@Test
void?updateStudent()?{
????List?allStudents?=?studentMapper.getAll();
????allStudents.forEach(s?->?{
????????//更新教師信息
????????String?teacher?=?s.getTeacher();
????????String?newTeacher?=?"TNO_"?+?new?Random().nextInt(100);
????????s.setTeacher(newTeacher);
????????studentMapper.update(s);
????});
}
循環(huán)修改整體耗時(shí)約 1 分 54 秒,且代碼中沒有手動(dòng)事務(wù)控制應(yīng)該是自動(dòng)事務(wù)提交,所以每次操作事務(wù)都會(huì)提交所以操作比較慢,我們先對(duì)代碼中添加手動(dòng)事務(wù)控制,看查詢效率怎樣。
使用手動(dòng)事務(wù)的操作代碼
@Autowired
private?DataSourceTransactionManager?dataSourceTransactionManager;
@Autowired
private?TransactionDefinition?transactionDefinition;
/**
?*?由于希望更新操作?一次性完成,需要手動(dòng)控制添加事務(wù)
?*?耗時(shí):24s
?*?從測試結(jié)果可以看出,添加事務(wù)后插入數(shù)據(jù)的效率有明顯的提升
?*/
@Test
void?updateStudentWithTrans()?{
????List?allStudents?=?studentMapper.getAll();
????TransactionStatus?transactionStatus?=?dataSourceTransactionManager.getTransaction(transactionDefinition);
????try?{
????????allStudents.forEach(s?->?{
????????????//更新教師信息
????????????String?teacher?=?s.getTeacher();
????????????String?newTeacher?=?"TNO_"?+?new?Random().nextInt(100);
????????????s.setTeacher(newTeacher);
????????????studentMapper.update(s);
????????});
????????dataSourceTransactionManager.commit(transactionStatus);
????}?catch?(Throwable?e)?{
????????dataSourceTransactionManager.rollback(transactionStatus);
????????throw?e;
????}
}
添加手動(dòng)事務(wù)操控制后,整體耗時(shí)約 24 秒,這相對(duì)于自動(dòng)事務(wù)提交的代碼,快了約 5 倍,對(duì)于大量循環(huán)數(shù)據(jù)庫提交操作,添加手動(dòng)事務(wù)可以有效提高操作效率。
嘗試多線程進(jìn)行數(shù)據(jù)修改
添加數(shù)據(jù)庫手動(dòng)事務(wù)后操作效率有明細(xì)提高,但還是比較長,接下來嘗試多線程提交看是不是能夠再快一些。
StudentServiceImpl.java:
@Service
public?class?StudentServiceImpl?implements?StudentService?{
????@Autowired
????private?StudentMapper?studentMapper;
????@Autowired
????private?DataSourceTransactionManager?dataSourceTransactionManager;
????@Autowired
????private?TransactionDefinition?transactionDefinition;
????@Override
????public?void?updateStudents(List?students,?CountDownLatch?threadLatch) ?{
????????TransactionStatus?transactionStatus?=?dataSourceTransactionManager.getTransaction(transactionDefinition);
????????System.out.println("子線程:"?+?Thread.currentThread().getName());
????????try?{
????????????students.forEach(s?->?{
????????????????//?更新教師信息
????????????????//?String?teacher?=?s.getTeacher();
????????????????String?newTeacher?=?"TNO_"?+?new?Random().nextInt(100);
????????????????s.setTeacher(newTeacher);
????????????????studentMapper.update(s);
????????????});
????????????dataSourceTransactionManager.commit(transactionStatus);
????????????threadLatch.countDown();
????????}?catch?(Throwable?e)?{
????????????e.printStackTrace();
????????????dataSourceTransactionManager.rollback(transactionStatus);
????????}
????}
}
@Autowired
private?DataSourceTransactionManager?dataSourceTransactionManager;
@Autowired
private?TransactionDefinition?transactionDefinition;
@Autowired
private?StudentService?studentService;
/**
?*?對(duì)用戶而言,27s?任是一個(gè)較長的時(shí)間,我們嘗試用多線程的方式來經(jīng)行修改操作看能否加快處理速度
?*?預(yù)計(jì)創(chuàng)建10個(gè)線程,每個(gè)線程進(jìn)行5000條數(shù)據(jù)修改操作
?*?耗時(shí)統(tǒng)計(jì)
?* 1 線程數(shù):1 ?????耗時(shí):25s
?* 2 線程數(shù):2 ?????耗時(shí):14s
?* 3 線程數(shù):5 ?????耗時(shí):15s
?* 4 線程數(shù):10?????耗時(shí):15s
?* 5 線程數(shù):100????耗時(shí):15s
?* 6 線程數(shù):200????耗時(shí):15s
?* 7 線程數(shù):500????耗時(shí):17s
?* 8 線程數(shù):1000????耗時(shí):19s
?* 8 線程數(shù):2000????耗時(shí):23s
?* 8 線程數(shù):5000????耗時(shí):29s
?*/
@Test
void?updateStudentWithThreads()?{
????//查詢總數(shù)據(jù)
????List?allStudents?=?studentMapper.getAll();
????//?線程數(shù)量
????final?Integer?threadCount?=?100;
????//每個(gè)線程處理的數(shù)據(jù)量
????final?Integer?dataPartionLength?=?(allStudents.size()?+?threadCount?-?1)?/?threadCount;
????//?創(chuàng)建多線程處理任務(wù)
????ExecutorService?studentThreadPool?=?Executors.newFixedThreadPool(threadCount);
????CountDownLatch?threadLatchs?=?new?CountDownLatch(threadCount);
????for?(int?i?=?0;?i?????????//?每個(gè)線程處理的數(shù)據(jù)
????????List?threadDatas?=?allStudents.stream()
????????????????.skip(i?*?dataPartionLength).limit(dataPartionLength).collect(Collectors.toList());
????????studentThreadPool.execute(()?->?{
????????????studentService.updateStudents(threadDatas,?threadLatchs);
????????});
????}
????try?{
????????//?倒計(jì)時(shí)鎖設(shè)置超時(shí)時(shí)間?30s
????????threadLatchs.await(30,?TimeUnit.SECONDS);
????}?catch?(Throwable?e)?{
????????e.printStackTrace();
????}
????System.out.println("主線程完成");
}

根據(jù)表格,我們線程數(shù)增大提交速度并非一直增大,在當(dāng)前情況下約在 2-5 個(gè)線程數(shù)時(shí),提交速度最快(實(shí)際線程數(shù)還是需要根據(jù)服務(wù)器配置實(shí)際測試)。
基于兩個(gè) CountDownLatch 控制多線程事務(wù)提交
由于多線程提交時(shí),每個(gè)線程事務(wù)時(shí)單獨(dú)的,無法保證一致性,我們嘗試給多線程添加事務(wù)控制,來保證每個(gè)線程都是在插入數(shù)據(jù)完成后在提交事務(wù)。
這里我們使用兩個(gè) CountDownLatch 來控制主線程與子線程事務(wù)提交,并設(shè)置了超時(shí)時(shí)間為 30 秒。
@Override
public?void?updateStudentsThread(List?students,?CountDownLatch?threadLatch,?CountDownLatch?mainLatch,?StudentTaskError?taskStatus)?{
????TransactionStatus?transactionStatus?=?dataSourceTransactionManager.getTransaction(transactionDefinition);
????System.out.println("子線程:"?+?Thread.currentThread().getName());
????try?{
????????students.forEach(s?->?{
????????????//?更新教師信息
????????????//?String?teacher?=?s.getTeacher();
????????????String?newTeacher?=?"TNO_"?+?new?Random().nextInt(100);
????????????s.setTeacher(newTeacher);
????????????studentMapper.update(s);
????????});
????}?catch?(Throwable?e)?{
????????taskStatus.setIsError();
????}?finally?{
????????threadLatch.countDown();?//?切換到主線程執(zhí)行
????}
????try?{
????????mainLatch.await();??//等待主線程執(zhí)行
????}?catch?(Throwable?e)?{
????????taskStatus.setIsError();
????}
????//?判斷是否有錯(cuò)誤,如有錯(cuò)誤?就回滾事務(wù)
????if?(taskStatus.getIsError())?{
????????dataSourceTransactionManager.rollback(transactionStatus);
????}?else?{
????????dataSourceTransactionManager.commit(transactionStatus);
????}
}
/**
?*?由于每個(gè)線程都是單獨(dú)的事務(wù),需要添加對(duì)線程事務(wù)的統(tǒng)一控制
?*?我們這邊使用兩個(gè)?CountDownLatch?對(duì)子線程的事務(wù)進(jìn)行控制
?*/
@Test
void?updateStudentWithThreadsAndTrans()?{
????//查詢總數(shù)據(jù)
????List?allStudents?=?studentMapper.getAll();
????//?線程數(shù)量
????final?Integer?threadCount?=?4;
????//每個(gè)線程處理的數(shù)據(jù)量
????final?Integer?dataPartionLength?=?(allStudents.size()?+?threadCount?-?1)?/?threadCount;
????//?創(chuàng)建多線程處理任務(wù)
????ExecutorService?studentThreadPool?=?Executors.newFixedThreadPool(threadCount);
????CountDownLatch?threadLatchs?=?new?CountDownLatch(threadCount);?//?用于計(jì)算子線程提交數(shù)量
????CountDownLatch?mainLatch?=?new?CountDownLatch(1);?//?用于判斷主線程是否提交
????StudentTaskError?taskStatus?=?new?StudentTaskError();?//?用于判斷子線程任務(wù)是否有錯(cuò)誤
????for?(int?i?=?0;?i?????????//?每個(gè)線程處理的數(shù)據(jù)
????????List?threadDatas?=?allStudents.stream()
????????????????.skip(i?*?dataPartionLength).limit(dataPartionLength)
????????????????.collect(Collectors.toList());
????????studentThreadPool.execute(()?->?{
????????????studentService.updateStudentsThread(threadDatas,?threadLatchs,?mainLatch,?taskStatus);
????????});
????}
????try?{
????????//?倒計(jì)時(shí)鎖設(shè)置超時(shí)時(shí)間?30s
????????boolean?await?=?threadLatchs.await(30,?TimeUnit.SECONDS);
????????if?(!await)?{?//?等待超時(shí),事務(wù)回滾
????????????taskStatus.setIsError();
????????}
????}?catch?(Throwable?e)?{
????????e.printStackTrace();
????????taskStatus.setIsError();
????}
????mainLatch.countDown();?//?切換到子線程執(zhí)行
????studentThreadPool.shutdown();?//關(guān)閉線程池
????System.out.println("主線程完成");
}
本想再次測試一下不同線程數(shù)對(duì)執(zhí)行效率的影響時(shí),發(fā)現(xiàn)當(dāng)線程數(shù)超過 10 個(gè)時(shí),執(zhí)行時(shí)就報(bào)錯(cuò)。
Exception?in?thread?"pool-1-thread-2"?org.springframework.transaction.CannotCreateTransactionException:?Could?not?open?JDBC?Connection?for?transaction;?nested?exception?is?java.sql.SQLTransientConnectionException:?HikariPool-1?-?Connection?is?not?available,?request?timed?out?after?30055ms.
?at?org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:309)
?at?org.springframework.transaction.support.AbstractPlatformTransactionManager.startTransaction(AbstractPlatformTransactionManager.java:400)
?at?org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:373)
?at?com.example.springbootmybatis.service.Impl.StudentServiceImpl.updateStudentsThread(StudentServiceImpl.java:58)
?at?com.example.springbootmybatis.StudentTest.lambda$updateStudentWithThreadsAndTrans$3(StudentTest.java:164)
?at?java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
?at?java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
?at?java.lang.Thread.run(Thread.java:748)
Caused?by:?java.sql.SQLTransientConnectionException:?HikariPool-1?-?Connection?is?not?available,?request?timed?out?after?30055ms.
?at?com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:696)
?at?com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:197)
?at?com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:162)
?at?com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:128)
?at?org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:265)
?...?7?more
錯(cuò)誤的大致意思時(shí),不能為數(shù)據(jù)庫事務(wù)打開 jdbc Connection,連接在 30s 的時(shí)候超時(shí)了。
由于前面啟動(dòng)的十個(gè)線程需要等待主線程完成后才能提交,所以一直占用連接未釋放,造成后面的進(jìn)程創(chuàng)建連接超時(shí)。
看錯(cuò)誤日志中錯(cuò)誤的來源是 HikariPool ,我們來重新配置一下這個(gè)連接池的參數(shù),將最大連接數(shù)修改為 100。
#?連接池中允許的最小連接數(shù)。缺省值:10
spring.datasource.hikari.minimum-idle=10
#?連接池中允許的最大連接數(shù)。缺省值:10
spring.datasource.hikari.maximum-pool-size=100
#?自動(dòng)提交
spring.datasource.hikari.auto-commit=true
#?一個(gè)連接idle狀態(tài)的最大時(shí)長(毫秒),超時(shí)則被釋放(retired),缺省:10分鐘
spring.datasource.hikari.idle-timeout=30000
#?一個(gè)連接的生命時(shí)長(毫秒),超時(shí)而且沒被使用則被釋放(retired),缺省:30分鐘,建議設(shè)置比數(shù)據(jù)庫超時(shí)時(shí)長少30秒
spring.datasource.hikari.max-lifetime=1800000
#?等待連接池分配連接的最大時(shí)長(毫秒),超過這個(gè)時(shí)長還沒可用的連接則發(fā)生SQLException,?缺省:30秒
再次執(zhí)行測試發(fā)現(xiàn)沒有報(bào)錯(cuò),修改線程數(shù)為 20 又執(zhí)行了一下,同樣執(zhí)行成功了。
基于 TransactionStatus 集合來控制多線程事務(wù)提交
@Service
public?class?StudentsTransactionThread?{
????@Autowired
????private?StudentMapper?studentMapper;
????@Autowired
????private?StudentService?studentService;
????@Autowired
????private?PlatformTransactionManager?transactionManager;
????List?transactionStatuses?=?Collections.synchronizedList(new?ArrayList());
????@Transactional(propagation?=?Propagation.REQUIRED,?rollbackFor?=?{Exception.class})
????public?void?updateStudentWithThreadsAndTrans()?throws?InterruptedException?{
????????//查詢總數(shù)據(jù)
????????List?allStudents?=?studentMapper.getAll();
????????//?線程數(shù)量
????????final?Integer?threadCount?=?2;
????????//每個(gè)線程處理的數(shù)據(jù)量
????????final?Integer?dataPartionLength?=?(allStudents.size()?+?threadCount?-?1)?/?threadCount;
????????//?創(chuàng)建多線程處理任務(wù)
????????ExecutorService?studentThreadPool?=?Executors.newFixedThreadPool(threadCount);
????????CountDownLatch?threadLatchs?=?new?CountDownLatch(threadCount);
????????AtomicBoolean?isError?=?new?AtomicBoolean(false);
????????try?{
????????????for?(int?i?=?0;?i?????????????????//?每個(gè)線程處理的數(shù)據(jù)
????????????????List?threadDatas?=?allStudents.stream()
????????????????????????.skip(i?*?dataPartionLength).limit(dataPartionLength).collect(Collectors.toList());
????????????????studentThreadPool.execute(()?->?{
????????????????????try?{
????????????????????????try?{
????????????????????????????studentService.updateStudentsTransaction(transactionManager,?transactionStatuses,?threadDatas);
????????????????????????}?catch?(Throwable?e)?{
????????????????????????????e.printStackTrace();
????????????????????????????isError.set(true);
????????????????????????}finally?{
????????????????????????????threadLatchs.countDown();
????????????????????????}
????????????????????}?catch?(Exception?e)?{
????????????????????????e.printStackTrace();
????????????????????????isError.set(true);
????????????????????}
????????????????});
????????????}
????????????//?倒計(jì)時(shí)鎖設(shè)置超時(shí)時(shí)間?30s
????????????boolean?await?=?threadLatchs.await(30,?TimeUnit.SECONDS);
????????????//?判斷是否超時(shí)
????????????if?(!await)?{
????????????????isError.set(true);
????????????}
????????}?catch?(Throwable?e)?{
????????????e.printStackTrace();
????????????isError.set(true);
????????}
????????if?(!transactionStatuses.isEmpty())?{
????????????if?(isError.get())?{
????????????????transactionStatuses.forEach(s?->?transactionManager.rollback(s));
????????????}?else?{
????????????????transactionStatuses.forEach(s?->?transactionManager.commit(s));
????????????}
????????}
????????System.out.println("主線程完成");
????}
}
@Override
@Transactional(propagation?=?Propagation.REQUIRED,?rollbackFor?=?{Exception.class})
public?void?updateStudentsTransaction(PlatformTransactionManager?transactionManager,?List?transactionStatuses,?List?students)?{
????//?使用這種方式將事務(wù)狀態(tài)都放在同一個(gè)事務(wù)里面
????DefaultTransactionDefinition?def?=?new?DefaultTransactionDefinition();
????def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);?//?事物隔離級(jí)別,開啟新事務(wù),這樣會(huì)比較安全些。
????TransactionStatus?status?=?transactionManager.getTransaction(def);?//?獲得事務(wù)狀態(tài)
????transactionStatuses.add(status);
????students.forEach(s?->?{
????????//?更新教師信息
????????//?String?teacher?=?s.getTeacher();
????????String?newTeacher?=?"TNO_"?+?new?Random().nextInt(100);
????????s.setTeacher(newTeacher);
????????studentMapper.update(s);
????});
????System.out.println("子線程:"?+?Thread.currentThread().getName());
}
由于這個(gè)中方式去前面方式相同,需要等待線程執(zhí)行完成后才會(huì)提交事務(wù),所有任會(huì)占用 Jdbc?連接池,如果線程數(shù)量超過連接池最大數(shù)量會(huì)產(chǎn)生連接超時(shí)。所以在使用過程中任要控制線程數(shù)量。
使用 union 連接多個(gè) select 實(shí)現(xiàn)批量 update
有些情況寫不支持,批量 update,但支持 insert 多條數(shù)據(jù),這個(gè)時(shí)候可嘗試將需要更新的數(shù)據(jù)拼接成多條 select 語句,然后使用 union 連接起來,再使用 update 關(guān)聯(lián)這個(gè)數(shù)據(jù)進(jìn)行 update。
update?student,(
?(select??1?as?id,'teacher_A'?as?teacher)?union
?(select??2?as?id,'teacher_A'?as?teacher)?union
?(select??3?as?id,'teacher_A'?as?teacher)?union
?(select??4?as?id,'teacher_A'?as?teacher)
????/*?....more?data?...?*/
????)?as?new_teacher
set
?student.teacher=new_teacher.teacher
where
?student.id=new_teacher.id
這種方式在 MySQL 數(shù)據(jù)庫沒有配置 allowMultiQueries=true 也可以實(shí)現(xiàn)批量更新。
總結(jié)
如下:
對(duì)于大批量數(shù)據(jù)庫操作,使用手動(dòng)事務(wù)提交可以很多程度上提高操作效率
多線程對(duì)數(shù)據(jù)庫進(jìn)行操作時(shí),并非線程數(shù)越多操作時(shí)間越快,按上述示例大約在 2-5 個(gè)線程時(shí)操作時(shí)間最快。
對(duì)于多線程阻塞事務(wù)提交時(shí),線程數(shù)量不能過多
如果能有辦法實(shí)現(xiàn)批量更新那是最好
(完)
碼農(nóng)突圍資料鏈接
1、臥槽!字節(jié)跳動(dòng)《算法中文手冊(cè)》火了,完整版 PDF 開放下載!
2、計(jì)算機(jī)基礎(chǔ)知識(shí)總結(jié)與操作系統(tǒng) PDF 下載
3、艾瑪,終于來了!《LeetCode Java版題解》.PDF
4、Github 10K+,《LeetCode刷題C/C++版答案》出爐.PDF歡迎添加魚哥個(gè)人微信:smartfish2020,進(jìn)粉絲群或圍觀朋友圈
