面試官:Java 多線程怎么做事務(wù)控制?一半人答不上來。。
點擊關(guān)注公眾號,Java干貨及時送達
推薦閱讀:Spring Cloud Alibaba 殺瘋了。。
項目代碼基于:MySql 數(shù)據(jù),開發(fā)框架為:SpringBoot、Mybatis
開發(fā)語言為:Java8
前言
公司業(yè)務(wù)中遇到一個需求,需要同時修改最多約5萬條數(shù)據(jù),而且還不支持批量或異步修改操作。于是只能寫個for循環(huán)操作,但操作耗時太長,只能一步一步尋找其他解決方案。
具體操作如下:
一、循環(huán)操作的代碼
先寫一個最簡單的for循環(huán)代碼,看看耗時情況怎么樣。
/***
* 一條一條依次對50000條數(shù)據(jù)進行更新操作
* 耗時:2m27s,1m54s
*/
@Test
void updateStudent() {
List<Student> allStudents = studentMapper.getAll();
allStudents.forEach(s -> {
//更新教師信息
String teacher = s.getTeacher();
String newTeacher = "TNO_" + new Random().nextInt(100);
s.setTeacher(newTeacher);
studentMapper.update(s);
});
}
循環(huán)修改整體耗時約 1分54秒,且代碼中沒有手動事務(wù)控制應(yīng)該是自動事務(wù)提交,所以每次操作事務(wù)都會提交所以操作比較慢,我們先對代碼中添加手動事務(wù)控制,看查詢效率怎樣。
最新面試題整理:https://www.javastack.cn/mst/
二、使用手動事務(wù)的操作代碼
@Autowired
private DataSourceTransactionManager dataSourceTransactionManager;
@Autowired
private TransactionDefinition transactionDefinition;
/**
* 由于希望更新操作 一次性完成,需要手動控制添加事務(wù)
* 耗時:24s
* 從測試結(jié)果可以看出,添加事務(wù)后插入數(shù)據(jù)的效率有明顯的提升
*/
@Test
void updateStudentWithTrans() {
List<Student> allStudents = studentMapper.getAll();
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
try {
allStudents.forEach(s -> {
//更新教師信息
String teacher = s.getTeacher();
String newTeacher = "TNO_" + new Random().nextInt(100);
s.setTeacher(newTeacher);
studentMapper.update(s);
});
dataSourceTransactionManager.commit(transactionStatus);
} catch (Throwable e) {
dataSourceTransactionManager.rollback(transactionStatus);
throw e;
}
}
添加手動事務(wù)操控制后,整體耗時約 24秒,這相對于自動事務(wù)提交的代碼,快了約5倍,對于大量循環(huán)數(shù)據(jù)庫提交操作,添加手動事務(wù)可以有效提高操作效率。最新面試題整理好了,大家可以在Java面試庫小程序在線刷題。
三、嘗試多線程進行數(shù)據(jù)修改
添加數(shù)據(jù)庫手動事務(wù)后操作效率有明細提高,但還是比較長,接下來嘗試多線程提交看是不是能夠再快一些。
先添加一個Service將批量修改操作整合一下,具體代碼如下:
StudentServiceImpl.java
@Service
public class StudentServiceImpl implements StudentService {
@Autowired
private StudentMapper studentMapper;
@Autowired
private DataSourceTransactionManager dataSourceTransactionManager;
@Autowired
private TransactionDefinition transactionDefinition;
@Override
public void updateStudents(List<Student> students, CountDownLatch threadLatch) {
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
System.out.println("子線程:" + Thread.currentThread().getName());
try {
students.forEach(s -> {
// 更新教師信息
// String teacher = s.getTeacher();
String newTeacher = "TNO_" + new Random().nextInt(100);
s.setTeacher(newTeacher);
studentMapper.update(s);
});
dataSourceTransactionManager.commit(transactionStatus);
threadLatch.countDown();
} catch (Throwable e) {
e.printStackTrace();
dataSourceTransactionManager.rollback(transactionStatus);
}
}
}
批量測試代碼,我們采用了多線程進行提交,修改后測試代碼如下:
@Autowired
private DataSourceTransactionManager dataSourceTransactionManager;
@Autowired
private TransactionDefinition transactionDefinition;
@Autowired
private StudentService studentService;
/**
* 對用戶而言,27s 任是一個較長的時間,我們嘗試用多線程的方式來經(jīng)行修改操作看能否加快處理速度
* 預(yù)計創(chuàng)建10個線程,每個線程進行5000條數(shù)據(jù)修改操作
* 耗時統(tǒng)計
* 1 線程數(shù):1 耗時:25s
* 2 線程數(shù):2 耗時:14s
* 3 線程數(shù):5 耗時:15s
* 4 線程數(shù):10 耗時:15s
* 5 線程數(shù):100 耗時:15s
* 6 線程數(shù):200 耗時:15s
* 7 線程數(shù):500 耗時:17s
* 8 線程數(shù):1000 耗時:19s
* 8 線程數(shù):2000 耗時:23s
* 8 線程數(shù):5000 耗時:29s
*/
@Test
void updateStudentWithThreads() {
//查詢總數(shù)據(jù)
List<Student> allStudents = studentMapper.getAll();
// 線程數(shù)量
final Integer threadCount = 100;
//每個線程處理的數(shù)據(jù)量
final Integer dataPartionLength = (allStudents.size() + threadCount - 1) / threadCount;
// 創(chuàng)建多線程處理任務(wù)
ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);
CountDownLatch threadLatchs = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
// 每個線程處理的數(shù)據(jù)
List<Student> threadDatas = allStudents.stream()
.skip(i * dataPartionLength).limit(dataPartionLength).collect(Collectors.toList());
studentThreadPool.execute(() -> {
studentService.updateStudents(threadDatas, threadLatchs);
});
}
try {
// 倒計時鎖設(shè)置超時時間 30s
threadLatchs.await(30, TimeUnit.SECONDS);
} catch (Throwable e) {
e.printStackTrace();
}
System.out.println("主線程完成");
}
Spring Boot 基礎(chǔ)就不介紹了,推薦下這個實戰(zhàn)教程:https://github.com/javastacks/spring-boot-best-practice
多線程提交修改時,我們嘗試了不同線程數(shù)對提交速度的影響,具體可以看下面表格,
多線程修改50000條數(shù)據(jù)時 不同線程數(shù)耗時對比(秒)
根據(jù)表格,我們線程數(shù)增大提交速度并非一直增大,在當前情況下約在2-5個線程數(shù)時,提交速度最快(實際線程數(shù)還是需要根據(jù)服務(wù)器配置實際測試)。
另外,MySQL 系列面試題和答案全部整理好了,微信搜索Java技術(shù)棧,在后臺發(fā)送:面試,可以在線閱讀。
四、基于兩個CountDownLatch控制多線程事務(wù)提交
由于多線程提交時,每個線程事務(wù)時單獨的,無法保證一致性,我們嘗試給多線程添加事務(wù)控制,來保證每個線程都是在插入數(shù)據(jù)完成后在提交事務(wù),
這里我們使用兩個 CountDownLatch 來控制主線程與子線程事務(wù)提交,并設(shè)置了超時時間為 30 秒。我們對代碼進行了一點修改:
@Override
public void updateStudentsThread(List<Student> students, CountDownLatch threadLatch, CountDownLatch mainLatch, StudentTaskError taskStatus) {
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
System.out.println("子線程:" + Thread.currentThread().getName());
try {
students.forEach(s -> {
// 更新教師信息
// String teacher = s.getTeacher();
String newTeacher = "TNO_" + new Random().nextInt(100);
s.setTeacher(newTeacher);
studentMapper.update(s);
});
} catch (Throwable e) {
taskStatus.setIsError();
} finally {
threadLatch.countDown(); // 切換到主線程執(zhí)行
}
try {
mainLatch.await(); //等待主線程執(zhí)行
} catch (Throwable e) {
taskStatus.setIsError();
}
// 判斷是否有錯誤,如有錯誤 就回滾事務(wù)
if (taskStatus.getIsError()) {
dataSourceTransactionManager.rollback(transactionStatus);
} else {
dataSourceTransactionManager.commit(transactionStatus);
}
}
/**
* 由于每個線程都是單獨的事務(wù),需要添加對線程事務(wù)的統(tǒng)一控制
* 我們這邊使用兩個 CountDownLatch 對子線程的事務(wù)進行控制
*/
@Test
void updateStudentWithThreadsAndTrans() {
//查詢總數(shù)據(jù)
List<Student> allStudents = studentMapper.getAll();
// 線程數(shù)量
final Integer threadCount = 4;
//每個線程處理的數(shù)據(jù)量
final Integer dataPartionLength = (allStudents.size() + threadCount - 1) / threadCount;
// 創(chuàng)建多線程處理任務(wù)
ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);
CountDownLatch threadLatchs = new CountDownLatch(threadCount); // 用于計算子線程提交數(shù)量
CountDownLatch mainLatch = new CountDownLatch(1); // 用于判斷主線程是否提交
StudentTaskError taskStatus = new StudentTaskError(); // 用于判斷子線程任務(wù)是否有錯誤
for (int i = 0; i < threadCount; i++) {
// 每個線程處理的數(shù)據(jù)
List<Student> threadDatas = allStudents.stream()
.skip(i * dataPartionLength).limit(dataPartionLength)
.collect(Collectors.toList());
studentThreadPool.execute(() -> {
studentService.updateStudentsThread(threadDatas, threadLatchs, mainLatch, taskStatus);
});
}
try {
// 倒計時鎖設(shè)置超時時間 30s
boolean await = threadLatchs.await(30, TimeUnit.SECONDS);
if (!await) { // 等待超時,事務(wù)回滾
taskStatus.setIsError();
}
} catch (Throwable e) {
e.printStackTrace();
taskStatus.setIsError();
}
mainLatch.countDown(); // 切換到子線程執(zhí)行
studentThreadPool.shutdown(); //關(guān)閉線程池
System.out.println("主線程完成");
}
本想再次測試一下不同線程數(shù)對執(zhí)行效率的影響時,發(fā)現(xiàn)當線程數(shù)超過10個時,執(zhí)行時就報錯。具體錯誤內(nèi)容如下:
Exception in thread "pool-1-thread-2" org.springframework.transaction.CannotCreateTransactionException: Could not open JDBC Connection for transaction; nested exception is java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30055ms.
at org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:309)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.startTransaction(AbstractPlatformTransactionManager.java:400)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:373)
at com.example.springbootmybatis.service.Impl.StudentServiceImpl.updateStudentsThread(StudentServiceImpl.java:58)
at com.example.springbootmybatis.StudentTest.lambda$updateStudentWithThreadsAndTrans$3(StudentTest.java:164)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30055ms.
at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:696)
at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:197)
at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:162)
at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:128)
at org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:265)
... 7 more
錯誤的大致意思時,不能為數(shù)據(jù)庫事務(wù)打開 jdbc Connection,連接在30s的時候超時了。由于前面啟動的十個線程需要等待主線程完成后才能提交,所以一直占用連接未釋放,造成后面的進程創(chuàng)建連接超時。
看錯誤日志中錯誤的來源是 HikariPool ,我們來重新配置一下這個連接池的參數(shù),將最大連接數(shù)修改為100,具體配置如下:
# 連接池中允許的最小連接數(shù)。缺省值:10
spring.datasource.hikari.minimum-idle=10
# 連接池中允許的最大連接數(shù)。缺省值:10
spring.datasource.hikari.maximum-pool-size=100
# 自動提交
spring.datasource.hikari.auto-commit=true
# 一個連接idle狀態(tài)的最大時長(毫秒),超時則被釋放(retired),缺省:10分鐘
spring.datasource.hikari.idle-timeout=30000
# 一個連接的生命時長(毫秒),超時而且沒被使用則被釋放(retired),缺省:30分鐘,建議設(shè)置比數(shù)據(jù)庫超時時長少30秒
spring.datasource.hikari.max-lifetime=1800000
# 等待連接池分配連接的最大時長(毫秒),超過這個時長還沒可用的連接則發(fā)生SQLException, 缺省:30秒
再次執(zhí)行測試發(fā)現(xiàn)沒有報錯,修改線程數(shù)為20又執(zhí)行了一下,同樣執(zhí)行成功了。最新 MySQL 面試題整理好了,大家可以在Java面試庫小程序在線刷題。
五、基于TransactionStatus集合來控制多線程事務(wù)提交
在同事推薦下我們使用事務(wù)集合來進行多線程事務(wù)控制,主要代碼如下
@Service
public class StudentsTransactionThread {
@Autowired
private StudentMapper studentMapper;
@Autowired
private StudentService studentService;
@Autowired
private PlatformTransactionManager transactionManager;
List<TransactionStatus> transactionStatuses = Collections.synchronizedList(new ArrayList<TransactionStatus>());
@Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class})
public void updateStudentWithThreadsAndTrans() throws InterruptedException {
//查詢總數(shù)據(jù)
List<Student> allStudents = studentMapper.getAll();
// 線程數(shù)量
final Integer threadCount = 2;
//每個線程處理的數(shù)據(jù)量
final Integer dataPartionLength = (allStudents.size() + threadCount - 1) / threadCount;
// 創(chuàng)建多線程處理任務(wù)
ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);
CountDownLatch threadLatchs = new CountDownLatch(threadCount);
AtomicBoolean isError = new AtomicBoolean(false);
try {
for (int i = 0; i < threadCount; i++) {
// 每個線程處理的數(shù)據(jù)
List<Student> threadDatas = allStudents.stream()
.skip(i * dataPartionLength).limit(dataPartionLength).collect(Collectors.toList());
studentThreadPool.execute(() -> {
try {
try {
studentService.updateStudentsTransaction(transactionManager, transactionStatuses, threadDatas);
} catch (Throwable e) {
e.printStackTrace();
isError.set(true);
}finally {
threadLatchs.countDown();
}
} catch (Exception e) {
e.printStackTrace();
isError.set(true);
}
});
}
// 倒計時鎖設(shè)置超時時間 30s
boolean await = threadLatchs.await(30, TimeUnit.SECONDS);
// 判斷是否超時
if (!await) {
isError.set(true);
}
} catch (Throwable e) {
e.printStackTrace();
isError.set(true);
}
if (!transactionStatuses.isEmpty()) {
if (isError.get()) {
transactionStatuses.forEach(s -> transactionManager.rollback(s));
} else {
transactionStatuses.forEach(s -> transactionManager.commit(s));
}
}
System.out.println("主線程完成");
}
}
@Override
@Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class})
public void updateStudentsTransaction(PlatformTransactionManager transactionManager, List<TransactionStatus> transactionStatuses, List<Student> students) {
// 使用這種方式將事務(wù)狀態(tài)都放在同一個事務(wù)里面
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); // 事物隔離級別,開啟新事務(wù),這樣會比較安全些。
TransactionStatus status = transactionManager.getTransaction(def); // 獲得事務(wù)狀態(tài)
transactionStatuses.add(status);
students.forEach(s -> {
// 更新教師信息
// String teacher = s.getTeacher();
String newTeacher = "TNO_" + new Random().nextInt(100);
s.setTeacher(newTeacher);
studentMapper.update(s);
});
System.out.println("子線程:" + Thread.currentThread().getName());
}
由于這個中方式去前面方式相同,需要等待線程執(zhí)行完成后才會提交事務(wù),所有任會占用Jdbc連接池,如果線程數(shù)量超過連接池最大數(shù)量會產(chǎn)生連接超時。所以在使用過程中任要控制線程數(shù)量,
六、使用union連接多個select實現(xiàn)批量update
有些情況寫不支持,批量update,但支持insert 多條數(shù)據(jù),這個時候可嘗試將需要更新的數(shù)據(jù)拼接成多條select 語句,然后使用union 連接起來,再使用update 關(guān)聯(lián)這個數(shù)據(jù)進行update,具體代碼演示如下:
update student,(
(select 1 as id,'teacher_A' as teacher) union
(select 2 as id,'teacher_A' as teacher) union
(select 3 as id,'teacher_A' as teacher) union
(select 4 as id,'teacher_A' as teacher)
/* ....more data ... */
) as new_teacher
set
student.teacher=new_teacher.teacher
where
student.id=new_teacher.id
這種方式在Mysql 數(shù)據(jù)庫沒有配置 allowMultiQueries=true 也可以實現(xiàn)批量更新。
總結(jié)
對于大批量數(shù)據(jù)庫操作,使用手動事務(wù)提交可以很多程度上提高操作效率 多線程對數(shù)據(jù)庫進行操作時,并非線程數(shù)越多操作時間越快,按上述示例大約在2-5個線程時操作時間最快。 對于多線程阻塞事務(wù)提交時,線程數(shù)量不能過多。 如果能有辦法實現(xiàn)批量更新那是最好
版權(quán)聲明:本文為CSDN博主「圣心」的原創(chuàng)文章,遵循CC 4.0 BY-SA版權(quán)協(xié)議,轉(zhuǎn)載請附上原文出處鏈接及本聲明。原文鏈接:https://blog.csdn.net/qq273766764/article/details/119972911

關(guān)注Java技術(shù)??锤喔韶?/strong>


