炸鍋了,Java多線程批量操作,居然有人不做事務控制?
答應我, 不要再用 if (obj != null) 判空了 20個示例!詳解 Java8 Stream 用法,從此告別shi山(垃圾代碼) 利用Java8新特征,重構傳統(tǒng)設計模式,你學會了嗎? 竟然有一半的人不知道 for 與 foreach 的區(qū)別??? 利用多線程批量拆分 List 導入數據庫,效率杠杠的!
來源:blog.csdn.net/qq273766764/article/
details/119972911
項目代碼基于:MySql 數據,開發(fā)框架為:SpringBoot、Mybatis
開發(fā)語言為:Java8
項目代碼:https://gitee.com/john273766764/springboot-mybatis-threads
文章目錄
前言 循環(huán)操作的代碼 使用手動事務的操作代碼 嘗試多線程進行數據修改 基于兩個CountDownLatch控制多線程事務提交 基于TransactionStatus集合來控制多線程事務提交 使用union連接多個select實現(xiàn)批量update 總結
前言
公司業(yè)務中遇到一個需求,需要同時修改最多約5萬條數據,而且還不支持批量或異步修改操作。于是只能寫個for循環(huán)操作,但操作耗時太長,只能一步一步尋找其他解決方案。
具體操作如下:
一、循環(huán)操作的代碼
先寫一個最簡單的for循環(huán)代碼,看看耗時情況怎么樣。
/***
?*?一條一條依次對50000條數據進行更新操作
?*?耗時:2m27s,1m54s
?*/
@Test
void?updateStudent()?{
????List?allStudents?=?studentMapper.getAll();
????allStudents.forEach(s?->?{
????????//更新教師信息
????????String?teacher?=?s.getTeacher();
????????String?newTeacher?=?"TNO_"?+?new?Random().nextInt(100);
????????s.setTeacher(newTeacher);
????????studentMapper.update(s);
????});
}
循環(huán)修改整體耗時約 1分54秒,且代碼中沒有手動事務控制應該是自動事務提交,所以每次操作事務都會提交所以操作比較慢,我們先對代碼中添加手動事務控制,看查詢效率怎樣。
二、使用手動事務的操作代碼
修改后的代碼如下:
@Autowired
private?DataSourceTransactionManager?dataSourceTransactionManager;
@Autowired
private?TransactionDefinition?transactionDefinition;
/**
?*?由于希望更新操作?一次性完成,需要手動控制添加事務
?*?耗時:24s
?*?從測試結果可以看出,添加事務后插入數據的效率有明顯的提升
?*/
@Test
void?updateStudentWithTrans()?{
????List?allStudents?=?studentMapper.getAll();
????TransactionStatus?transactionStatus?=?dataSourceTransactionManager.getTransaction(transactionDefinition);
????try?{
????????allStudents.forEach(s?->?{
????????????//更新教師信息
????????????String?teacher?=?s.getTeacher();
????????????String?newTeacher?=?"TNO_"?+?new?Random().nextInt(100);
????????????s.setTeacher(newTeacher);
????????????studentMapper.update(s);
????????});
????????dataSourceTransactionManager.commit(transactionStatus);
????}?catch?(Throwable?e)?{
????????dataSourceTransactionManager.rollback(transactionStatus);
????????throw?e;
????}
}
添加手動事務操控制后,整體耗時約 24秒,這相對于自動事務提交的代碼,快了約5倍,對于大量循環(huán)數據庫提交操作,添加手動事務可以有效提高操作效率。
三、嘗試多線程進行數據修改
添加數據庫手動事務后操作效率有明細提高,但還是比較長,接下來嘗試多線程提交看是不是能夠再快一些。
先添加一個Service將批量修改操作整合一下,具體代碼如下:
StudentServiceImpl.java
@Service
public?class?StudentServiceImpl?implements?StudentService?{
????@Autowired
????private?StudentMapper?studentMapper;
?
????@Autowired
????private?DataSourceTransactionManager?dataSourceTransactionManager;
?
????@Autowired
????private?TransactionDefinition?transactionDefinition;
?
????@Override
????public?void?updateStudents(List?students,?CountDownLatch?threadLatch) ?{
????????TransactionStatus?transactionStatus?=?dataSourceTransactionManager.getTransaction(transactionDefinition);
????????System.out.println("子線程:"?+?Thread.currentThread().getName());
????????try?{
????????????students.forEach(s?->?{
????????????????//?更新教師信息
????????????????//?String?teacher?=?s.getTeacher();
????????????????String?newTeacher?=?"TNO_"?+?new?Random().nextInt(100);
????????????????s.setTeacher(newTeacher);
????????????????studentMapper.update(s);
????????????});
????????????dataSourceTransactionManager.commit(transactionStatus);
????????????threadLatch.countDown();
????????}?catch?(Throwable?e)?{
????????????e.printStackTrace();
????????????dataSourceTransactionManager.rollback(transactionStatus);
????????}
????}
}
批量測試代碼,我們采用了多線程進行提交,修改后測試代碼如下:
@Autowired
private?DataSourceTransactionManager?dataSourceTransactionManager;
@Autowired
private?TransactionDefinition?transactionDefinition;
@Autowired
private?StudentService?studentService;
/**
?*?對用戶而言,27s?任是一個較長的時間,我們嘗試用多線程的方式來經行修改操作看能否加快處理速度
?*?預計創(chuàng)建10個線程,每個線程進行5000條數據修改操作
?*?耗時統(tǒng)計
?* 1 線程數:1 ?????耗時:25s
?* 2 線程數:2 ?????耗時:14s
?* 3 線程數:5 ?????耗時:15s
?* 4 線程數:10?????耗時:15s
?* 5 線程數:100????耗時:15s
?* 6 線程數:200????耗時:15s
?* 7 線程數:500????耗時:17s
?* 8 線程數:1000????耗時:19s
?* 8 線程數:2000????耗時:23s
?* 8 線程數:5000????耗時:29s
?*/
@Test
void?updateStudentWithThreads()?{
????//查詢總數據
????List?allStudents?=?studentMapper.getAll();
????//?線程數量
????final?Integer?threadCount?=?100;
????//每個線程處理的數據量
????final?Integer?dataPartionLength?=?(allStudents.size()?+?threadCount?-?1)?/?threadCount;
????//?創(chuàng)建多線程處理任務
????ExecutorService?studentThreadPool?=?Executors.newFixedThreadPool(threadCount);
????CountDownLatch?threadLatchs?=?new?CountDownLatch(threadCount);
????for?(int?i?=?0;?i?????????//?每個線程處理的數據
????????List?threadDatas?=?allStudents.stream()
????????????????.skip(i?*?dataPartionLength).limit(dataPartionLength).collect(Collectors.toList());
????????studentThreadPool.execute(()?->?{
????????????studentService.updateStudents(threadDatas,?threadLatchs);
????????});
????}
????try?{
????????//?倒計時鎖設置超時時間?30s
????????threadLatchs.await(30,?TimeUnit.SECONDS);
????}?catch?(Throwable?e)?{
????????e.printStackTrace();
????}
????System.out.println("主線程完成");
}
多線程提交修改時,我們嘗試了不同線程數對提交速度的影響,具體可以看下面表格,

根據表格,我們線程數增大提交速度并非一直增大,在當前情況下約在2-5個線程數時,提交速度最快(實際線程數還是需要根據服務器配置實際測試)。
四、基于兩個CountDownLatch控制多線程事務提交
由于多線程提交時,每個線程事務時單獨的,無法保證一致性,我們嘗試給多線程添加事務控制,來保證每個線程都是在插入數據完成后在提交事務,
這里我們使用兩個 CountDownLatch 來控制主線程與子線程事務提交,并設置了超時時間為 30 秒。我們對代碼進行了一點修改:
@Override
public?void?updateStudentsThread(List?students,?CountDownLatch?threadLatch,?CountDownLatch?mainLatch,?StudentTaskError?taskStatus) ?{
????TransactionStatus?transactionStatus?=?dataSourceTransactionManager.getTransaction(transactionDefinition);
????System.out.println("子線程:"?+?Thread.currentThread().getName());
????try?{
????????students.forEach(s?->?{
????????????//?更新教師信息
????????????//?String?teacher?=?s.getTeacher();
????????????String?newTeacher?=?"TNO_"?+?new?Random().nextInt(100);
????????????s.setTeacher(newTeacher);
????????????studentMapper.update(s);
????????});
????}?catch?(Throwable?e)?{
????????taskStatus.setIsError();
????}?finally?{
????????threadLatch.countDown();?//?切換到主線程執(zhí)行
????}
????try?{
????????mainLatch.await();??//等待主線程執(zhí)行
????}?catch?(Throwable?e)?{
????????taskStatus.setIsError();
????}
????//?判斷是否有錯誤,如有錯誤?就回滾事務
????if?(taskStatus.getIsError())?{
????????dataSourceTransactionManager.rollback(transactionStatus);
????}?else?{
????????dataSourceTransactionManager.commit(transactionStatus);
????}
}
/**
?*?由于每個線程都是單獨的事務,需要添加對線程事務的統(tǒng)一控制
?*?我們這邊使用兩個?CountDownLatch?對子線程的事務進行控制
?*/
@Test
void?updateStudentWithThreadsAndTrans()?{
????//查詢總數據
????List?allStudents?=?studentMapper.getAll();
????//?線程數量
????final?Integer?threadCount?=?4;
????//每個線程處理的數據量
????final?Integer?dataPartionLength?=?(allStudents.size()?+?threadCount?-?1)?/?threadCount;
????//?創(chuàng)建多線程處理任務
????ExecutorService?studentThreadPool?=?Executors.newFixedThreadPool(threadCount);
????CountDownLatch?threadLatchs?=?new?CountDownLatch(threadCount);?//?用于計算子線程提交數量
????CountDownLatch?mainLatch?=?new?CountDownLatch(1);?//?用于判斷主線程是否提交
????StudentTaskError?taskStatus?=?new?StudentTaskError();?//?用于判斷子線程任務是否有錯誤
????for?(int?i?=?0;?i?????????//?每個線程處理的數據
????????List?threadDatas?=?allStudents.stream()
????????????????.skip(i?*?dataPartionLength).limit(dataPartionLength)
????????????????.collect(Collectors.toList());
????????studentThreadPool.execute(()?->?{
????????????studentService.updateStudentsThread(threadDatas,?threadLatchs,?mainLatch,?taskStatus);
????????});
????}
????try?{
????????//?倒計時鎖設置超時時間?30s
????????boolean?await?=?threadLatchs.await(30,?TimeUnit.SECONDS);
????????if?(!await)?{?//?等待超時,事務回滾
????????????taskStatus.setIsError();
????????}
????}?catch?(Throwable?e)?{
????????e.printStackTrace();
????????taskStatus.setIsError();
????}
????mainLatch.countDown();?//?切換到子線程執(zhí)行
????studentThreadPool.shutdown();?//關閉線程池
????System.out.println("主線程完成");
}
本想再次測試一下不同線程數對執(zhí)行效率的影響時,發(fā)現(xiàn)當線程數超過10個時,執(zhí)行時就報錯。具體錯誤內容如下:
Exception?in?thread?"pool-1-thread-2"?org.springframework.transaction.CannotCreateTransactionException:?Could?not?open?JDBC?Connection?for?transaction;?nested?exception?is?java.sql.SQLTransientConnectionException:?HikariPool-1?-?Connection?is?not?available,?request?timed?out?after?30055ms.
?at?org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:309)
?at?org.springframework.transaction.support.AbstractPlatformTransactionManager.startTransaction(AbstractPlatformTransactionManager.java:400)
?at?org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:373)
?at?com.example.springbootmybatis.service.Impl.StudentServiceImpl.updateStudentsThread(StudentServiceImpl.java:58)
?at?com.example.springbootmybatis.StudentTest.lambda$updateStudentWithThreadsAndTrans$3(StudentTest.java:164)
?at?java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
?at?java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
?at?java.lang.Thread.run(Thread.java:748)
Caused?by:?java.sql.SQLTransientConnectionException:?HikariPool-1?-?Connection?is?not?available,?request?timed?out?after?30055ms.
?at?com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:696)
?at?com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:197)
?at?com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:162)
?at?com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:128)
?at?org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:265)
?...?7?more
錯誤的大致意思時,不能為數據庫事務打開 jdbc Connection,連接在30s的時候超時了。由于前面啟動的十個線程需要等待主線程完成后才能提交,所以一直占用連接未釋放,造成后面的進程創(chuàng)建連接超時。
看錯誤日志中錯誤的來源是 HikariPool ,我們來重新配置一下這個連接池的參數,將最大連接數修改為100,具體配置如下:
#?連接池中允許的最小連接數。缺省值:10
spring.datasource.hikari.minimum-idle=10
#?連接池中允許的最大連接數。缺省值:10
spring.datasource.hikari.maximum-pool-size=100
#?自動提交
spring.datasource.hikari.auto-commit=true
#?一個連接idle狀態(tài)的最大時長(毫秒),超時則被釋放(retired),缺省:10分鐘
spring.datasource.hikari.idle-timeout=30000
#?一個連接的生命時長(毫秒),超時而且沒被使用則被釋放(retired),缺省:30分鐘,建議設置比數據庫超時時長少30秒
spring.datasource.hikari.max-lifetime=1800000
#?等待連接池分配連接的最大時長(毫秒),超過這個時長還沒可用的連接則發(fā)生SQLException,?缺省:30秒
再次執(zhí)行測試發(fā)現(xiàn)沒有報錯,修改線程數為20又執(zhí)行了一下,同樣執(zhí)行成功了。
五、基于TransactionStatus集合來控制多線程事務提交
在同事推薦下我們使用事務集合來進行多線程事務控制,主要代碼如下
@Service
public?class?StudentsTransactionThread?{
?
????@Autowired
????private?StudentMapper?studentMapper;
????@Autowired
????private?StudentService?studentService;
????@Autowired
????private?PlatformTransactionManager?transactionManager;
?
????List?transactionStatuses?=?Collections.synchronizedList(new?ArrayList());
?
????@Transactional(propagation?=?Propagation.REQUIRED,?rollbackFor?=?{Exception.class})
????public?void?updateStudentWithThreadsAndTrans()?throws?InterruptedException?{
?
????????//查詢總數據
????????List?allStudents?=?studentMapper.getAll();
?
????????//?線程數量
????????final?Integer?threadCount?=?2;
?
????????//每個線程處理的數據量
????????final?Integer?dataPartionLength?=?(allStudents.size()?+?threadCount?-?1)?/?threadCount;
?
????????//?創(chuàng)建多線程處理任務
????????ExecutorService?studentThreadPool?=?Executors.newFixedThreadPool(threadCount);
????????CountDownLatch?threadLatchs?=?new?CountDownLatch(threadCount);
????????AtomicBoolean?isError?=?new?AtomicBoolean(false);
????????try?{
????????????for?(int?i?=?0;?i?????????????????//?每個線程處理的數據
????????????????List?threadDatas?=?allStudents.stream()
????????????????????????.skip(i?*?dataPartionLength).limit(dataPartionLength).collect(Collectors.toList());
????????????????studentThreadPool.execute(()?->?{
????????????????????try?{
????????????????????????try?{
????????????????????????????studentService.updateStudentsTransaction(transactionManager,?transactionStatuses,?threadDatas);
????????????????????????}?catch?(Throwable?e)?{
????????????????????????????e.printStackTrace();
????????????????????????????isError.set(true);
????????????????????????}finally?{
????????????????????????????threadLatchs.countDown();
????????????????????????}
????????????????????}?catch?(Exception?e)?{
????????????????????????e.printStackTrace();
????????????????????????isError.set(true);
????????????????????}
????????????????});
????????????}
?
????????????//?倒計時鎖設置超時時間?30s
????????????boolean?await?=?threadLatchs.await(30,?TimeUnit.SECONDS);
????????????//?判斷是否超時
????????????if?(!await)?{
????????????????isError.set(true);
????????????}
????????}?catch?(Throwable?e)?{
????????????e.printStackTrace();
????????????isError.set(true);
????????}
?
????????if?(!transactionStatuses.isEmpty())?{
????????????if?(isError.get())?{
????????????????transactionStatuses.forEach(s?->?transactionManager.rollback(s));
????????????}?else?{
????????????????transactionStatuses.forEach(s?->?transactionManager.commit(s));
????????????}
????????}
?
????????System.out.println("主線程完成");
????}
}
@Override
@Transactional(propagation?=?Propagation.REQUIRED,?rollbackFor?=?{Exception.class})
public?void?updateStudentsTransaction(PlatformTransactionManager?transactionManager,?List<TransactionStatus>?transactionStatuses,?List<Student>?students)?{
????//?使用這種方式將事務狀態(tài)都放在同一個事務里面
????DefaultTransactionDefinition?def?=?new?DefaultTransactionDefinition();
????def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);?//?事物隔離級別,開啟新事務,這樣會比較安全些。
????TransactionStatus?status?=?transactionManager.getTransaction(def);?//?獲得事務狀態(tài)
????transactionStatuses.add(status);
????students.forEach(s?->?{
????????//?更新教師信息
????????//?String?teacher?=?s.getTeacher();
????????String?newTeacher?=?"TNO_"?+?new?Random().nextInt(100);
????????s.setTeacher(newTeacher);
????????studentMapper.update(s);
????});
????System.out.println("子線程:"?+?Thread.currentThread().getName());
}
由于這個中方式去前面方式相同,需要等待線程執(zhí)行完成后才會提交事務,所有任會占用Jdbc連接池,如果線程數量超過連接池最大數量會產生連接超時。所以在使用過程中任要控制線程數量,
六、使用union連接多個select實現(xiàn)批量update
有些情況寫不支持,批量update,但支持insert 多條數據,這個時候可嘗試將需要更新的數據拼接成多條select 語句,然后使用union 連接起來,再使用update 關聯(lián)這個數據進行update,具體代碼演示如下:
update?student,(
?(select??1?as?id,'teacher_A'?as?teacher)?union
?(select??2?as?id,'teacher_A'?as?teacher)?union
?(select??3?as?id,'teacher_A'?as?teacher)?union
?(select??4?as?id,'teacher_A'?as?teacher)
????/*?....more?data?...?*/
????)?as?new_teacher
set
?student.teacher=new_teacher.teacher
where
?student.id=new_teacher.id
這種方式在Mysql 數據庫沒有配置 allowMultiQueries=true 也可以實現(xiàn)批量更新。
總結
對于大批量數據庫操作,使用手動事務提交可以很多程度上提高操作效率 多線程對數據庫進行操作時,并非線程數越多操作時間越快,按上述示例大約在2-5個線程時操作時間最快。 對于多線程阻塞事務提交時,線程數量不能過多。 如果能有辦法實現(xiàn)批量更新那是最好
最后,再給大家推薦一個GitHub項目,該項目整理了上千本常用技術PDF,技術書籍都可以在這里找到。 GitHub地址:https://github.com/hello-go-maker/cs-books 電子書已經更新好了,拿走不謝,記得點一個star,持續(xù)更新中...

