<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          炸鍋了,Java多線程批量操作,居然有人不做事務控制?

          共 13185字,需瀏覽 27分鐘

           ·

          2022-05-12 20:01

          今日推薦
          答應我, 不要再用 if (obj != null) 判空了
          20個示例!詳解 Java8 Stream 用法,從此告別shi山(垃圾代碼)
          利用Java8新特征,重構傳統(tǒng)設計模式,你學會了嗎?
          竟然有一半的人不知道 for 與 foreach 的區(qū)別???
          利用多線程批量拆分 List 導入數據庫,效率杠杠的!

          來源:blog.csdn.net/qq273766764/article/

          details/119972911

          項目代碼基于:MySql 數據,開發(fā)框架為:SpringBoot、Mybatis

          開發(fā)語言為:Java8

          項目代碼:https://gitee.com/john273766764/springboot-mybatis-threads

          文章目錄

          • 前言
          • 循環(huán)操作的代碼
          • 使用手動事務的操作代碼
          • 嘗試多線程進行數據修改
          • 基于兩個CountDownLatch控制多線程事務提交
          • 基于TransactionStatus集合來控制多線程事務提交
          • 使用union連接多個select實現(xiàn)批量update
          • 總結

          前言

          公司業(yè)務中遇到一個需求,需要同時修改最多約5萬條數據,而且還不支持批量或異步修改操作。于是只能寫個for循環(huán)操作,但操作耗時太長,只能一步一步尋找其他解決方案。

          具體操作如下:

          一、循環(huán)操作的代碼

          先寫一個最簡單的for循環(huán)代碼,看看耗時情況怎么樣。

          /***
          ?*?一條一條依次對50000條數據進行更新操作
          ?*?耗時:2m27s,1m54s
          ?*/

          @Test
          void?updateStudent()?{
          ????List?allStudents?=?studentMapper.getAll();
          ????allStudents.forEach(s?->?{
          ????????//更新教師信息
          ????????String?teacher?=?s.getTeacher();
          ????????String?newTeacher?=?"TNO_"?+?new?Random().nextInt(100);
          ????????s.setTeacher(newTeacher);
          ????????studentMapper.update(s);
          ????});
          }

          循環(huán)修改整體耗時約 1分54秒,且代碼中沒有手動事務控制應該是自動事務提交,所以每次操作事務都會提交所以操作比較慢,我們先對代碼中添加手動事務控制,看查詢效率怎樣。

          二、使用手動事務的操作代碼

          修改后的代碼如下:

          @Autowired
          private?DataSourceTransactionManager?dataSourceTransactionManager;

          @Autowired
          private?TransactionDefinition?transactionDefinition;

          /**
          ?*?由于希望更新操作?一次性完成,需要手動控制添加事務
          ?*?耗時:24s
          ?*?從測試結果可以看出,添加事務后插入數據的效率有明顯的提升
          ?*/

          @Test
          void?updateStudentWithTrans()?{
          ????List?allStudents?=?studentMapper.getAll();
          ????TransactionStatus?transactionStatus?=?dataSourceTransactionManager.getTransaction(transactionDefinition);
          ????try?{
          ????????allStudents.forEach(s?->?{
          ????????????//更新教師信息
          ????????????String?teacher?=?s.getTeacher();
          ????????????String?newTeacher?=?"TNO_"?+?new?Random().nextInt(100);
          ????????????s.setTeacher(newTeacher);
          ????????????studentMapper.update(s);
          ????????});
          ????????dataSourceTransactionManager.commit(transactionStatus);
          ????}?catch?(Throwable?e)?{
          ????????dataSourceTransactionManager.rollback(transactionStatus);
          ????????throw?e;
          ????}
          }

          添加手動事務操控制后,整體耗時約 24秒,這相對于自動事務提交的代碼,快了約5倍,對于大量循環(huán)數據庫提交操作,添加手動事務可以有效提高操作效率。

          三、嘗試多線程進行數據修改

          添加數據庫手動事務后操作效率有明細提高,但還是比較長,接下來嘗試多線程提交看是不是能夠再快一些。

          先添加一個Service將批量修改操作整合一下,具體代碼如下:

          StudentServiceImpl.java
          @Service
          public?class?StudentServiceImpl?implements?StudentService?{
          ????@Autowired
          ????private?StudentMapper?studentMapper;
          ?
          ????@Autowired
          ????private?DataSourceTransactionManager?dataSourceTransactionManager;
          ?
          ????@Autowired
          ????private?TransactionDefinition?transactionDefinition;
          ?
          ????@Override
          ????public?void?updateStudents(List?students,?CountDownLatch?threadLatch)?{
          ????????TransactionStatus?transactionStatus?=?dataSourceTransactionManager.getTransaction(transactionDefinition);
          ????????System.out.println("子線程:"?+?Thread.currentThread().getName());
          ????????try?{
          ????????????students.forEach(s?->?{
          ????????????????//?更新教師信息
          ????????????????//?String?teacher?=?s.getTeacher();
          ????????????????String?newTeacher?=?"TNO_"?+?new?Random().nextInt(100);
          ????????????????s.setTeacher(newTeacher);
          ????????????????studentMapper.update(s);
          ????????????});
          ????????????dataSourceTransactionManager.commit(transactionStatus);
          ????????????threadLatch.countDown();
          ????????}?catch?(Throwable?e)?{
          ????????????e.printStackTrace();
          ????????????dataSourceTransactionManager.rollback(transactionStatus);
          ????????}
          ????}
          }

          批量測試代碼,我們采用了多線程進行提交,修改后測試代碼如下:

          @Autowired
          private?DataSourceTransactionManager?dataSourceTransactionManager;

          @Autowired
          private?TransactionDefinition?transactionDefinition;

          @Autowired
          private?StudentService?studentService;

          /**
          ?*?對用戶而言,27s?任是一個較長的時間,我們嘗試用多線程的方式來經行修改操作看能否加快處理速度
          ?*?預計創(chuàng)建10個線程,每個線程進行5000條數據修改操作
          ?*?耗時統(tǒng)計
          ?* 1 線程數:1 ?????耗時:25s
          ?* 2 線程數:2 ?????耗時:14s
          ?* 3 線程數:5 ?????耗時:15s
          ?* 4 線程數:10?????耗時:15s
          ?* 5 線程數:100????耗時:15s
          ?* 6 線程數:200????耗時:15s
          ?* 7 線程數:500????耗時:17s
          ?* 8 線程數:1000????耗時:19s
          ?* 8 線程數:2000????耗時:23s
          ?* 8 線程數:5000????耗時:29s
          ?*/

          @Test
          void?updateStudentWithThreads()?{
          ????//查詢總數據
          ????List?allStudents?=?studentMapper.getAll();
          ????//?線程數量
          ????final?Integer?threadCount?=?100;

          ????//每個線程處理的數據量
          ????final?Integer?dataPartionLength?=?(allStudents.size()?+?threadCount?-?1)?/?threadCount;

          ????//?創(chuàng)建多線程處理任務
          ????ExecutorService?studentThreadPool?=?Executors.newFixedThreadPool(threadCount);
          ????CountDownLatch?threadLatchs?=?new?CountDownLatch(threadCount);

          ????for?(int?i?=?0;?i?????????//?每個線程處理的數據
          ????????List?threadDatas?=?allStudents.stream()
          ????????????????.skip(i?*?dataPartionLength).limit(dataPartionLength).collect(Collectors.toList());
          ????????studentThreadPool.execute(()?->?{
          ????????????studentService.updateStudents(threadDatas,?threadLatchs);
          ????????});
          ????}
          ????try?{
          ????????//?倒計時鎖設置超時時間?30s
          ????????threadLatchs.await(30,?TimeUnit.SECONDS);
          ????}?catch?(Throwable?e)?{
          ????????e.printStackTrace();
          ????}

          ????System.out.println("主線程完成");
          }

          多線程提交修改時,我們嘗試了不同線程數對提交速度的影響,具體可以看下面表格,

          多線程修改50000條數據時 不同線程數耗時對比(秒)

          根據表格,我們線程數增大提交速度并非一直增大,在當前情況下約在2-5個線程數時,提交速度最快(實際線程數還是需要根據服務器配置實際測試)。

          四、基于兩個CountDownLatch控制多線程事務提交

          由于多線程提交時,每個線程事務時單獨的,無法保證一致性,我們嘗試給多線程添加事務控制,來保證每個線程都是在插入數據完成后在提交事務,

          這里我們使用兩個 CountDownLatch 來控制主線程與子線程事務提交,并設置了超時時間為 30 秒。我們對代碼進行了一點修改:

          @Override
          public?void?updateStudentsThread(List?students,?CountDownLatch?threadLatch,?CountDownLatch?mainLatch,?StudentTaskError?taskStatus)?{
          ????TransactionStatus?transactionStatus?=?dataSourceTransactionManager.getTransaction(transactionDefinition);
          ????System.out.println("子線程:"?+?Thread.currentThread().getName());
          ????try?{
          ????????students.forEach(s?->?{
          ????????????//?更新教師信息
          ????????????//?String?teacher?=?s.getTeacher();
          ????????????String?newTeacher?=?"TNO_"?+?new?Random().nextInt(100);
          ????????????s.setTeacher(newTeacher);
          ????????????studentMapper.update(s);
          ????????});
          ????}?catch?(Throwable?e)?{
          ????????taskStatus.setIsError();
          ????}?finally?{
          ????????threadLatch.countDown();?//?切換到主線程執(zhí)行
          ????}
          ????try?{
          ????????mainLatch.await();??//等待主線程執(zhí)行
          ????}?catch?(Throwable?e)?{
          ????????taskStatus.setIsError();
          ????}
          ????//?判斷是否有錯誤,如有錯誤?就回滾事務
          ????if?(taskStatus.getIsError())?{
          ????????dataSourceTransactionManager.rollback(transactionStatus);
          ????}?else?{
          ????????dataSourceTransactionManager.commit(transactionStatus);
          ????}
          }
          /**
          ?*?由于每個線程都是單獨的事務,需要添加對線程事務的統(tǒng)一控制
          ?*?我們這邊使用兩個?CountDownLatch?對子線程的事務進行控制
          ?*/

          @Test
          void?updateStudentWithThreadsAndTrans()?{
          ????//查詢總數據
          ????List?allStudents?=?studentMapper.getAll();
          ????//?線程數量
          ????final?Integer?threadCount?=?4;

          ????//每個線程處理的數據量
          ????final?Integer?dataPartionLength?=?(allStudents.size()?+?threadCount?-?1)?/?threadCount;

          ????//?創(chuàng)建多線程處理任務
          ????ExecutorService?studentThreadPool?=?Executors.newFixedThreadPool(threadCount);
          ????CountDownLatch?threadLatchs?=?new?CountDownLatch(threadCount);?//?用于計算子線程提交數量
          ????CountDownLatch?mainLatch?=?new?CountDownLatch(1);?//?用于判斷主線程是否提交
          ????StudentTaskError?taskStatus?=?new?StudentTaskError();?//?用于判斷子線程任務是否有錯誤

          ????for?(int?i?=?0;?i?????????//?每個線程處理的數據
          ????????List?threadDatas?=?allStudents.stream()
          ????????????????.skip(i?*?dataPartionLength).limit(dataPartionLength)
          ????????????????.collect(Collectors.toList());
          ????????studentThreadPool.execute(()?->?{
          ????????????studentService.updateStudentsThread(threadDatas,?threadLatchs,?mainLatch,?taskStatus);
          ????????});
          ????}
          ????try?{
          ????????//?倒計時鎖設置超時時間?30s
          ????????boolean?await?=?threadLatchs.await(30,?TimeUnit.SECONDS);
          ????????if?(!await)?{?//?等待超時,事務回滾
          ????????????taskStatus.setIsError();
          ????????}
          ????}?catch?(Throwable?e)?{
          ????????e.printStackTrace();
          ????????taskStatus.setIsError();
          ????}
          ????mainLatch.countDown();?//?切換到子線程執(zhí)行
          ????studentThreadPool.shutdown();?//關閉線程池

          ????System.out.println("主線程完成");
          }

          本想再次測試一下不同線程數對執(zhí)行效率的影響時,發(fā)現(xiàn)當線程數超過10個時,執(zhí)行時就報錯。具體錯誤內容如下:

          Exception?in?thread?"pool-1-thread-2"?org.springframework.transaction.CannotCreateTransactionException:?Could?not?open?JDBC?Connection?for?transaction;?nested?exception?is?java.sql.SQLTransientConnectionException:?HikariPool-1?-?Connection?is?not?available,?request?timed?out?after?30055ms.
          ?at?org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:309)
          ?at?org.springframework.transaction.support.AbstractPlatformTransactionManager.startTransaction(AbstractPlatformTransactionManager.java:400)
          ?at?org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:373)
          ?at?com.example.springbootmybatis.service.Impl.StudentServiceImpl.updateStudentsThread(StudentServiceImpl.java:58)
          ?at?com.example.springbootmybatis.StudentTest.lambda$updateStudentWithThreadsAndTrans$3(StudentTest.java:164)
          ?at?java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
          ?at?java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
          ?at?java.lang.Thread.run(Thread.java:748)
          Caused?by:?java.sql.SQLTransientConnectionException:?HikariPool-1?-?Connection?is?not?available,?request?timed?out?after?30055ms.
          ?at?com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:696)
          ?at?com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:197)
          ?at?com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:162)
          ?at?com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:128)
          ?at?org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:265)
          ?...?7?more

          錯誤的大致意思時,不能為數據庫事務打開 jdbc Connection,連接在30s的時候超時了。由于前面啟動的十個線程需要等待主線程完成后才能提交,所以一直占用連接未釋放,造成后面的進程創(chuàng)建連接超時。

          看錯誤日志中錯誤的來源是 HikariPool ,我們來重新配置一下這個連接池的參數,將最大連接數修改為100,具體配置如下:

          #?連接池中允許的最小連接數。缺省值:10
          spring.datasource.hikari.minimum-idle=10
          #?連接池中允許的最大連接數。缺省值:10
          spring.datasource.hikari.maximum-pool-size=100
          #?自動提交
          spring.datasource.hikari.auto-commit=true
          #?一個連接idle狀態(tài)的最大時長(毫秒),超時則被釋放(retired),缺省:10分鐘
          spring.datasource.hikari.idle-timeout=30000
          #?一個連接的生命時長(毫秒),超時而且沒被使用則被釋放(retired),缺省:30分鐘,建議設置比數據庫超時時長少30秒
          spring.datasource.hikari.max-lifetime=1800000
          #?等待連接池分配連接的最大時長(毫秒),超過這個時長還沒可用的連接則發(fā)生SQLException,?缺省:30秒

          再次執(zhí)行測試發(fā)現(xiàn)沒有報錯,修改線程數為20又執(zhí)行了一下,同樣執(zhí)行成功了。

          五、基于TransactionStatus集合來控制多線程事務提交

          在同事推薦下我們使用事務集合來進行多線程事務控制,主要代碼如下

          @Service
          public?class?StudentsTransactionThread?{
          ?
          ????@Autowired
          ????private?StudentMapper?studentMapper;
          ????@Autowired
          ????private?StudentService?studentService;
          ????@Autowired
          ????private?PlatformTransactionManager?transactionManager;
          ?
          ????List?transactionStatuses?=?Collections.synchronizedList(new?ArrayList());
          ?
          ????@Transactional(propagation?=?Propagation.REQUIRED,?rollbackFor?=?{Exception.class})
          ????public?void?updateStudentWithThreadsAndTrans()?throws?InterruptedException?
          {
          ?
          ????????//查詢總數據
          ????????List?allStudents?=?studentMapper.getAll();
          ?
          ????????//?線程數量
          ????????final?Integer?threadCount?=?2;
          ?
          ????????//每個線程處理的數據量
          ????????final?Integer?dataPartionLength?=?(allStudents.size()?+?threadCount?-?1)?/?threadCount;
          ?
          ????????//?創(chuàng)建多線程處理任務
          ????????ExecutorService?studentThreadPool?=?Executors.newFixedThreadPool(threadCount);
          ????????CountDownLatch?threadLatchs?=?new?CountDownLatch(threadCount);
          ????????AtomicBoolean?isError?=?new?AtomicBoolean(false);
          ????????try?{
          ????????????for?(int?i?=?0;?i?????????????????//?每個線程處理的數據
          ????????????????List?threadDatas?=?allStudents.stream()
          ????????????????????????.skip(i?*?dataPartionLength).limit(dataPartionLength).collect(Collectors.toList());
          ????????????????studentThreadPool.execute(()?->?{
          ????????????????????try?{
          ????????????????????????try?{
          ????????????????????????????studentService.updateStudentsTransaction(transactionManager,?transactionStatuses,?threadDatas);
          ????????????????????????}?catch?(Throwable?e)?{
          ????????????????????????????e.printStackTrace();
          ????????????????????????????isError.set(true);
          ????????????????????????}finally?{
          ????????????????????????????threadLatchs.countDown();
          ????????????????????????}
          ????????????????????}?catch?(Exception?e)?{
          ????????????????????????e.printStackTrace();
          ????????????????????????isError.set(true);
          ????????????????????}
          ????????????????});
          ????????????}
          ?
          ????????????//?倒計時鎖設置超時時間?30s
          ????????????boolean?await?=?threadLatchs.await(30,?TimeUnit.SECONDS);
          ????????????//?判斷是否超時
          ????????????if?(!await)?{
          ????????????????isError.set(true);
          ????????????}
          ????????}?catch?(Throwable?e)?{
          ????????????e.printStackTrace();
          ????????????isError.set(true);
          ????????}
          ?
          ????????if?(!transactionStatuses.isEmpty())?{
          ????????????if?(isError.get())?{
          ????????????????transactionStatuses.forEach(s?->?transactionManager.rollback(s));
          ????????????}?else?{
          ????????????????transactionStatuses.forEach(s?->?transactionManager.commit(s));
          ????????????}
          ????????}
          ?
          ????????System.out.println("主線程完成");
          ????}
          }
          @Override
          @Transactional(propagation?=?Propagation.REQUIRED,?rollbackFor?=?{Exception.class})
          public?void?updateStudentsTransaction(PlatformTransactionManager?transactionManager,?List<TransactionStatus>?transactionStatuses,?List<Student>?students)?
          {
          ????//?使用這種方式將事務狀態(tài)都放在同一個事務里面
          ????DefaultTransactionDefinition?def?=?new?DefaultTransactionDefinition();
          ????def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);?//?事物隔離級別,開啟新事務,這樣會比較安全些。
          ????TransactionStatus?status?=?transactionManager.getTransaction(def);?//?獲得事務狀態(tài)
          ????transactionStatuses.add(status);

          ????students.forEach(s?->?{
          ????????//?更新教師信息
          ????????//?String?teacher?=?s.getTeacher();
          ????????String?newTeacher?=?"TNO_"?+?new?Random().nextInt(100);
          ????????s.setTeacher(newTeacher);
          ????????studentMapper.update(s);
          ????});
          ????System.out.println("子線程:"?+?Thread.currentThread().getName());
          }

          由于這個中方式去前面方式相同,需要等待線程執(zhí)行完成后才會提交事務,所有任會占用Jdbc連接池,如果線程數量超過連接池最大數量會產生連接超時。所以在使用過程中任要控制線程數量,

          六、使用union連接多個select實現(xiàn)批量update

          有些情況寫不支持,批量update,但支持insert 多條數據,這個時候可嘗試將需要更新的數據拼接成多條select 語句,然后使用union 連接起來,再使用update 關聯(lián)這個數據進行update,具體代碼演示如下:

          update?student,(
          ?(select??1?as?id,'teacher_A'?as?teacher)?union
          ?(select??2?as?id,'teacher_A'?as?teacher)?union
          ?(select??3?as?id,'teacher_A'?as?teacher)?union
          ?(select??4?as?id,'teacher_A'?as?teacher)
          ????/*?....more?data?...?*/
          ????)?as?new_teacher
          set
          ?student.teacher=new_teacher.teacher
          where
          ?student.id=new_teacher.id

          這種方式在Mysql 數據庫沒有配置 allowMultiQueries=true 也可以實現(xiàn)批量更新。

          總結

          • 對于大批量數據庫操作,使用手動事務提交可以很多程度上提高操作效率
          • 多線程對數據庫進行操作時,并非線程數越多操作時間越快,按上述示例大約在2-5個線程時操作時間最快。
          • 對于多線程阻塞事務提交時,線程數量不能過多。
          • 如果能有辦法實現(xiàn)批量更新那是最好
          最后,再給大家推薦一個GitHub項目,該項目整理了上千本常用技術PDF,技術書籍都可以在這里找到。
          GitHub地址:https://github.com/hello-go-maker/cs-books
          電子書已經更新好了,拿走不謝,記得點一個star,持續(xù)更新中...

          瀏覽 29
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  激情五月天网址 | 秋霞午夜电影 | 日本一级黄色 | 亚洲AV无码乱码国产精品黑人 | 亚州又视频|