<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          面試官:Java 多線程怎么做事務(wù)控制?一半人答不上來。。

          共 26328字,需瀏覽 53分鐘

           ·

          2022-07-26 02:27

          點擊關(guān)注公眾號,Java干貨及時送達

          推薦閱讀:Spring Cloud Alibaba 殺瘋了。。

          項目代碼基于:MySql 數(shù)據(jù),開發(fā)框架為:SpringBoot、Mybatis

          開發(fā)語言為:Java8

          前言

          公司業(yè)務(wù)中遇到一個需求,需要同時修改最多約5萬條數(shù)據(jù),而且還不支持批量或異步修改操作。于是只能寫個for循環(huán)操作,但操作耗時太長,只能一步一步尋找其他解決方案。

          具體操作如下:

          一、循環(huán)操作的代碼

          先寫一個最簡單的for循環(huán)代碼,看看耗時情況怎么樣。

          /***
           * 一條一條依次對50000條數(shù)據(jù)進行更新操作
           * 耗時:2m27s,1m54s
           */
          @Test
          void updateStudent() {
              List<Student> allStudents = studentMapper.getAll();
              allStudents.forEach(s -> {
                  //更新教師信息
                  String teacher = s.getTeacher();
                  String newTeacher = "TNO_" + new Random().nextInt(100);
                  s.setTeacher(newTeacher);
                  studentMapper.update(s);
              });
          }

          循環(huán)修改整體耗時約 1分54秒,且代碼中沒有手動事務(wù)控制應(yīng)該是自動事務(wù)提交,所以每次操作事務(wù)都會提交所以操作比較慢,我們先對代碼中添加手動事務(wù)控制,看查詢效率怎樣。

          最新面試題整理:https://www.javastack.cn/mst/

          二、使用手動事務(wù)的操作代碼

          修改后的代碼如下:

          @Autowired
          private DataSourceTransactionManager dataSourceTransactionManager;

          @Autowired
          private TransactionDefinition transactionDefinition;

          /**
           * 由于希望更新操作 一次性完成,需要手動控制添加事務(wù)
           * 耗時:24s
           * 從測試結(jié)果可以看出,添加事務(wù)后插入數(shù)據(jù)的效率有明顯的提升
           */
          @Test
          void updateStudentWithTrans() {
              List<Student> allStudents = studentMapper.getAll();
              TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
              try {
                  allStudents.forEach(s -> {
                      //更新教師信息
                      String teacher = s.getTeacher();
                      String newTeacher = "TNO_" + new Random().nextInt(100);
                      s.setTeacher(newTeacher);
                      studentMapper.update(s);
                  });
                  dataSourceTransactionManager.commit(transactionStatus);
              } catch (Throwable e) {
                  dataSourceTransactionManager.rollback(transactionStatus);
                  throw e;
              }
          }

          添加手動事務(wù)操控制后,整體耗時約 24秒,這相對于自動事務(wù)提交的代碼,快了約5倍,對于大量循環(huán)數(shù)據(jù)庫提交操作,添加手動事務(wù)可以有效提高操作效率。最新面試題整理好了,大家可以在Java面試庫小程序在線刷題。

          三、嘗試多線程進行數(shù)據(jù)修改

          添加數(shù)據(jù)庫手動事務(wù)后操作效率有明細提高,但還是比較長,接下來嘗試多線程提交看是不是能夠再快一些。

          先添加一個Service將批量修改操作整合一下,具體代碼如下:

          StudentServiceImpl.java
          @Service
          public class StudentServiceImpl implements StudentService {
              @Autowired
              private StudentMapper studentMapper;

              @Autowired
              private DataSourceTransactionManager dataSourceTransactionManager;

              @Autowired
              private TransactionDefinition transactionDefinition;

              @Override
              public void updateStudents(List<Student> students, CountDownLatch threadLatch) {
                  TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
                  System.out.println("子線程:" + Thread.currentThread().getName());
                  try {
                      students.forEach(s -> {
                          // 更新教師信息
                          // String teacher = s.getTeacher();
                          String newTeacher = "TNO_" + new Random().nextInt(100);
                          s.setTeacher(newTeacher);
                          studentMapper.update(s);
                      });
                      dataSourceTransactionManager.commit(transactionStatus);
                      threadLatch.countDown();
                  } catch (Throwable e) {
                      e.printStackTrace();
                      dataSourceTransactionManager.rollback(transactionStatus);
                  }
              }
          }

          批量測試代碼,我們采用了多線程進行提交,修改后測試代碼如下:

          @Autowired
          private DataSourceTransactionManager dataSourceTransactionManager;

          @Autowired
          private TransactionDefinition transactionDefinition;

          @Autowired
          private StudentService studentService;

          /**
           * 對用戶而言,27s 任是一個較長的時間,我們嘗試用多線程的方式來經(jīng)行修改操作看能否加快處理速度
           * 預(yù)計創(chuàng)建10個線程,每個線程進行5000條數(shù)據(jù)修改操作
           * 耗時統(tǒng)計
           * 1 線程數(shù):1      耗時:25s
           * 2 線程數(shù):2      耗時:14s
           * 3 線程數(shù):5      耗時:15s
           * 4 線程數(shù):10     耗時:15s
           * 5 線程數(shù):100    耗時:15s
           * 6 線程數(shù):200    耗時:15s
           * 7 線程數(shù):500    耗時:17s
           * 8 線程數(shù):1000    耗時:19s
           * 8 線程數(shù):2000    耗時:23s
           * 8 線程數(shù):5000    耗時:29s
           */
          @Test
          void updateStudentWithThreads() {
              //查詢總數(shù)據(jù)
              List<Student> allStudents = studentMapper.getAll();
              // 線程數(shù)量
              final Integer threadCount = 100;

              //每個線程處理的數(shù)據(jù)量
              final Integer dataPartionLength = (allStudents.size() + threadCount - 1) / threadCount;

              // 創(chuàng)建多線程處理任務(wù)
              ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);
              CountDownLatch threadLatchs = new CountDownLatch(threadCount);

              for (int i = 0; i < threadCount; i++) {
                  // 每個線程處理的數(shù)據(jù)
                  List<Student> threadDatas = allStudents.stream()
                          .skip(i * dataPartionLength).limit(dataPartionLength).collect(Collectors.toList());
                  studentThreadPool.execute(() -> {
                      studentService.updateStudents(threadDatas, threadLatchs);
                  });
              }
              try {
                  // 倒計時鎖設(shè)置超時時間 30s
                  threadLatchs.await(30, TimeUnit.SECONDS);
              } catch (Throwable e) {
                  e.printStackTrace();
              }

              System.out.println("主線程完成");
          }

          Spring Boot 基礎(chǔ)就不介紹了,推薦下這個實戰(zhàn)教程:https://github.com/javastacks/spring-boot-best-practice

          多線程提交修改時,我們嘗試了不同線程數(shù)對提交速度的影響,具體可以看下面表格,

          多線程修改50000條數(shù)據(jù)時 不同線程數(shù)耗時對比(秒)

          根據(jù)表格,我們線程數(shù)增大提交速度并非一直增大,在當前情況下約在2-5個線程數(shù)時,提交速度最快(實際線程數(shù)還是需要根據(jù)服務(wù)器配置實際測試)。

          另外,MySQL 系列面試題和答案全部整理好了,微信搜索Java技術(shù)棧,在后臺發(fā)送:面試,可以在線閱讀。

          四、基于兩個CountDownLatch控制多線程事務(wù)提交

          由于多線程提交時,每個線程事務(wù)時單獨的,無法保證一致性,我們嘗試給多線程添加事務(wù)控制,來保證每個線程都是在插入數(shù)據(jù)完成后在提交事務(wù),

          這里我們使用兩個 CountDownLatch 來控制主線程與子線程事務(wù)提交,并設(shè)置了超時時間為 30 秒。我們對代碼進行了一點修改:

          @Override
          public void updateStudentsThread(List<Student> students, CountDownLatch threadLatch, CountDownLatch mainLatch, StudentTaskError taskStatus) {
              TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
              System.out.println("子線程:" + Thread.currentThread().getName());
              try {
                  students.forEach(s -> {
                      // 更新教師信息
                      // String teacher = s.getTeacher();
                      String newTeacher = "TNO_" + new Random().nextInt(100);
                      s.setTeacher(newTeacher);
                      studentMapper.update(s);
                  });
              } catch (Throwable e) {
                  taskStatus.setIsError();
              } finally {
                  threadLatch.countDown(); // 切換到主線程執(zhí)行
              }
              try {
                  mainLatch.await();  //等待主線程執(zhí)行
              } catch (Throwable e) {
                  taskStatus.setIsError();
              }
              // 判斷是否有錯誤,如有錯誤 就回滾事務(wù)
              if (taskStatus.getIsError()) {
                  dataSourceTransactionManager.rollback(transactionStatus);
              } else {
                  dataSourceTransactionManager.commit(transactionStatus);
              }
          }
          /**
           * 由于每個線程都是單獨的事務(wù),需要添加對線程事務(wù)的統(tǒng)一控制
           * 我們這邊使用兩個 CountDownLatch 對子線程的事務(wù)進行控制
           */
          @Test
          void updateStudentWithThreadsAndTrans() {
              //查詢總數(shù)據(jù)
              List<Student> allStudents = studentMapper.getAll();
              // 線程數(shù)量
              final Integer threadCount = 4;

              //每個線程處理的數(shù)據(jù)量
              final Integer dataPartionLength = (allStudents.size() + threadCount - 1) / threadCount;

              // 創(chuàng)建多線程處理任務(wù)
              ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);
              CountDownLatch threadLatchs = new CountDownLatch(threadCount); // 用于計算子線程提交數(shù)量
              CountDownLatch mainLatch = new CountDownLatch(1); // 用于判斷主線程是否提交
              StudentTaskError taskStatus = new StudentTaskError(); // 用于判斷子線程任務(wù)是否有錯誤

              for (int i = 0; i < threadCount; i++) {
                  // 每個線程處理的數(shù)據(jù)
                  List<Student> threadDatas = allStudents.stream()
                          .skip(i * dataPartionLength).limit(dataPartionLength)
                          .collect(Collectors.toList());
                  studentThreadPool.execute(() -> {
                      studentService.updateStudentsThread(threadDatas, threadLatchs, mainLatch, taskStatus);
                  });
              }
              try {
                  // 倒計時鎖設(shè)置超時時間 30s
                  boolean await = threadLatchs.await(30, TimeUnit.SECONDS);
                  if (!await) { // 等待超時,事務(wù)回滾
                      taskStatus.setIsError();
                  }
              } catch (Throwable e) {
                  e.printStackTrace();
                  taskStatus.setIsError();
              }
              mainLatch.countDown(); // 切換到子線程執(zhí)行
              studentThreadPool.shutdown(); //關(guān)閉線程池

              System.out.println("主線程完成");
          }

          本想再次測試一下不同線程數(shù)對執(zhí)行效率的影響時,發(fā)現(xiàn)當線程數(shù)超過10個時,執(zhí)行時就報錯。具體錯誤內(nèi)容如下:

          Exception in thread "pool-1-thread-2" org.springframework.transaction.CannotCreateTransactionException: Could not open JDBC Connection for transaction; nested exception is java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30055ms.
           at org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:309)
           at org.springframework.transaction.support.AbstractPlatformTransactionManager.startTransaction(AbstractPlatformTransactionManager.java:400)
           at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:373)
           at com.example.springbootmybatis.service.Impl.StudentServiceImpl.updateStudentsThread(StudentServiceImpl.java:58)
           at com.example.springbootmybatis.StudentTest.lambda$updateStudentWithThreadsAndTrans$3(StudentTest.java:164)
           at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
          Caused by: java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30055ms.
           at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:696)
           at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:197)
           at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:162)
           at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:128)
           at org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:265)
           ... 7 more

          錯誤的大致意思時,不能為數(shù)據(jù)庫事務(wù)打開 jdbc Connection,連接在30s的時候超時了。由于前面啟動的十個線程需要等待主線程完成后才能提交,所以一直占用連接未釋放,造成后面的進程創(chuàng)建連接超時。

          看錯誤日志中錯誤的來源是 HikariPool ,我們來重新配置一下這個連接池的參數(shù),將最大連接數(shù)修改為100,具體配置如下:

          # 連接池中允許的最小連接數(shù)。缺省值:10
          spring.datasource.hikari.minimum-idle=10
          # 連接池中允許的最大連接數(shù)。缺省值:10
          spring.datasource.hikari.maximum-pool-size=100
          # 自動提交
          spring.datasource.hikari.auto-commit=true
          # 一個連接idle狀態(tài)的最大時長(毫秒),超時則被釋放(retired),缺省:10分鐘
          spring.datasource.hikari.idle-timeout=30000
          # 一個連接的生命時長(毫秒),超時而且沒被使用則被釋放(retired),缺省:30分鐘,建議設(shè)置比數(shù)據(jù)庫超時時長少30秒
          spring.datasource.hikari.max-lifetime=1800000
          # 等待連接池分配連接的最大時長(毫秒),超過這個時長還沒可用的連接則發(fā)生SQLException, 缺省:30秒

          再次執(zhí)行測試發(fā)現(xiàn)沒有報錯,修改線程數(shù)為20又執(zhí)行了一下,同樣執(zhí)行成功了。最新 MySQL 面試題整理好了,大家可以在Java面試庫小程序在線刷題。

          五、基于TransactionStatus集合來控制多線程事務(wù)提交

          在同事推薦下我們使用事務(wù)集合來進行多線程事務(wù)控制,主要代碼如下

          @Service
          public class StudentsTransactionThread {

              @Autowired
              private StudentMapper studentMapper;
              @Autowired
              private StudentService studentService;
              @Autowired
              private PlatformTransactionManager transactionManager;

              List<TransactionStatus> transactionStatuses = Collections.synchronizedList(new ArrayList<TransactionStatus>());

              @Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class})
              public void updateStudentWithThreadsAndTrans() throws InterruptedException {

                  //查詢總數(shù)據(jù)
                  List<Student> allStudents = studentMapper.getAll();

                  // 線程數(shù)量
                  final Integer threadCount = 2;

                  //每個線程處理的數(shù)據(jù)量
                  final Integer dataPartionLength = (allStudents.size() + threadCount - 1) / threadCount;

                  // 創(chuàng)建多線程處理任務(wù)
                  ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);
                  CountDownLatch threadLatchs = new CountDownLatch(threadCount);
                  AtomicBoolean isError = new AtomicBoolean(false);
                  try {
                      for (int i = 0; i < threadCount; i++) {
                          // 每個線程處理的數(shù)據(jù)
                          List<Student> threadDatas = allStudents.stream()
                                  .skip(i * dataPartionLength).limit(dataPartionLength).collect(Collectors.toList());
                          studentThreadPool.execute(() -> {
                              try {
                                  try {
                                      studentService.updateStudentsTransaction(transactionManager, transactionStatuses, threadDatas);
                                  } catch (Throwable e) {
                                      e.printStackTrace();
                                      isError.set(true);
                                  }finally {
                                      threadLatchs.countDown();
                                  }
                              } catch (Exception e) {
                                  e.printStackTrace();
                                  isError.set(true);
                              }
                          });
                      }

                      // 倒計時鎖設(shè)置超時時間 30s
                      boolean await = threadLatchs.await(30, TimeUnit.SECONDS);
                      // 判斷是否超時
                      if (!await) {
                          isError.set(true);
                      }
                  } catch (Throwable e) {
                      e.printStackTrace();
                      isError.set(true);
                  }

                  if (!transactionStatuses.isEmpty()) {
                      if (isError.get()) {
                          transactionStatuses.forEach(s -> transactionManager.rollback(s));
                      } else {
                          transactionStatuses.forEach(s -> transactionManager.commit(s));
                      }
                  }

                  System.out.println("主線程完成");
              }
          }
          @Override
          @Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class})
          public void updateStudentsTransaction(PlatformTransactionManager transactionManager, List<TransactionStatus> transactionStatuses, List<Student> students) {
              // 使用這種方式將事務(wù)狀態(tài)都放在同一個事務(wù)里面
              DefaultTransactionDefinition def = new DefaultTransactionDefinition();
              def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); // 事物隔離級別,開啟新事務(wù),這樣會比較安全些。
              TransactionStatus status = transactionManager.getTransaction(def); // 獲得事務(wù)狀態(tài)
              transactionStatuses.add(status);

              students.forEach(s -> {
                  // 更新教師信息
                  // String teacher = s.getTeacher();
                  String newTeacher = "TNO_" + new Random().nextInt(100);
                  s.setTeacher(newTeacher);
                  studentMapper.update(s);
              });
              System.out.println("子線程:" + Thread.currentThread().getName());
          }

          由于這個中方式去前面方式相同,需要等待線程執(zhí)行完成后才會提交事務(wù),所有任會占用Jdbc連接池,如果線程數(shù)量超過連接池最大數(shù)量會產(chǎn)生連接超時。所以在使用過程中任要控制線程數(shù)量,

          六、使用union連接多個select實現(xiàn)批量update

          有些情況寫不支持,批量update,但支持insert 多條數(shù)據(jù),這個時候可嘗試將需要更新的數(shù)據(jù)拼接成多條select 語句,然后使用union 連接起來,再使用update 關(guān)聯(lián)這個數(shù)據(jù)進行update,具體代碼演示如下:

          update student,(
           (select  1 as id,'teacher_A' as teacher) union
           (select  2 as id,'teacher_A' as teacher) union
           (select  3 as id,'teacher_A' as teacher) union
           (select  4 as id,'teacher_A' as teacher)
              /* ....more data ... */
              ) as new_teacher
          set
           student.teacher=new_teacher.teacher
          where
           student.id=new_teacher.id

          這種方式在Mysql 數(shù)據(jù)庫沒有配置 allowMultiQueries=true 也可以實現(xiàn)批量更新。

          總結(jié)

          • 對于大批量數(shù)據(jù)庫操作,使用手動事務(wù)提交可以很多程度上提高操作效率
          • 多線程對數(shù)據(jù)庫進行操作時,并非線程數(shù)越多操作時間越快,按上述示例大約在2-5個線程時操作時間最快。
          • 對于多線程阻塞事務(wù)提交時,線程數(shù)量不能過多。
          • 如果能有辦法實現(xiàn)批量更新那是最好

          版權(quán)聲明:本文為CSDN博主「圣心」的原創(chuàng)文章,遵循CC 4.0 BY-SA版權(quán)協(xié)議,轉(zhuǎn)載請附上原文出處鏈接及本聲明。原文鏈接:https://blog.csdn.net/qq273766764/article/details/119972911








          Spring Cloud Alibaba 殺瘋了。。
          Spring Boot 定時任務(wù)開啟后,怎么自動停止?
          23 種設(shè)計模式實戰(zhàn)(很全)
          Spring Boot 保護敏感配置的 4 種方法!
          面了個 5 年 Java,兩個線程數(shù)據(jù)交換都不會!
          阿里為什么推薦使用 LongAdder?
          新來一個技術(shù)總監(jiān):禁止戴耳機寫代碼。。
          別用 System... 計時了,StopWatch 好用到爆!
          Java 8 排序的 10 個姿勢,太秀了吧!
          Spring Boot Admin 橫空出世!
          Spring Boot 學(xué)習(xí)筆記,這個太全了!



          關(guān)注Java技術(shù)??锤喔韶?/strong>



          Spring Cloud Alibaba 最新實戰(zhàn)!
          瀏覽 55
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  欧美性极品少妇精品网站 | 日比性免费在线 | 76人少妇精品导航 | 欧美在线中文字幕 | www,俺也去婷婷官网 |