<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          JUC 常用4大并發(fā)工具類萬字詳解,收藏起來!

          共 49118字,需瀏覽 99分鐘

           ·

          2023-08-01 12:24


          什么是 JUC?

          JUC 就是 java.util.concurrent 包,這個包俗稱 JUC,里面都是解決并發(fā)問題的一些東西,該包的位置位于 java 下面的 rt.jar 包下面

          JUC 中 4 大常用并發(fā)工具類

          • CountDownLatch
          • CyclicBarrier
          • Semaphore
          • ExChanger

          CountDownLatch

          CountDownLatch,俗稱閉鎖,作用是類似加強版的 Join,是讓一組線程等待其他的線程完成工作以后才執(zhí)行

          就比如在啟動框架服務(wù)的時候,我們主線程需要在環(huán)境線程初始化完成之后才能啟動,這時候我們就可以實現(xiàn)使用 CountDownLatch 來完成

          /**
           * Constructs a {@code CountDownLatch} initialized with the given count.
           *
           * @param count the number of times {@link #countDown} must be invoked
           *              before threads can pass through {@link #await}
           * @throws IllegalArgumentException if {@code count} is negative
           */

          public CountDownLatch(int count) {
              if (count < 0throw new IllegalArgumentException("count < 0");
              this.sync = new Sync(count);
          }

          在源碼中可以看到,創(chuàng)建 CountDownLatch 時,需要傳入一個 int 類型的參數(shù),將決定在執(zhí)行次扣減之后,等待的線程被喚醒微信搜索公眾號:Java后端編程,回復(fù):java 領(lǐng)取資料 。

          通過這個類圖就可以知道其實 CountDownLatch 并沒有多少東西

          方法介紹:

          • CountDownLatch:初始化方法
          • await:等待方法,同時帶參數(shù)的是超時重載方法
          • countDown:每執(zhí)行一次,計數(shù)器減一,就是初始化傳入的數(shù)字,也代表著一個線程完成了任務(wù)
          • getCount:獲取當(dāng)前值
          • toString:這個就不用說了

          里面的 Sync 是一個內(nèi)部類,外面的方法其實都是操作這個內(nèi)部類的,這個內(nèi)部類繼承了 AQS,實現(xiàn)的標準方法,AQS 將在后面的章節(jié)寫

          主線程中創(chuàng)建 CountDownLatch(3),然后主線程 await 阻塞,然后線程 A,B,C 各自完成了任務(wù),調(diào)用了 countDown,之后,每個線程調(diào)用一次計數(shù)器就會減一,初始是 3,然后 A 線程調(diào)用后變成 2,B 線程調(diào)用后變成 1,C 線程調(diào)用后,變成 0,這時就會喚醒正在 await 的主線程,然后主線程繼續(xù)執(zhí)行

          說一千道一萬,不如代碼寫幾行,上代碼:

          休眠工具類,之后的代碼都會用到

          package org.dance.tools;

          import java.util.concurrent.TimeUnit;

          /**
           * 類說明:線程休眠輔助工具類
           */

          public class SleepTools {

              /**
               * 按秒休眠
               *
               * @param seconds 秒數(shù)
               */

              public static final void second(int seconds) {
                  try {
                      TimeUnit.SECONDS.sleep(seconds);
                  } catch (InterruptedException e) {
                  }
              }

              /**
               * 按毫秒數(shù)休眠
               *
               * @param seconds 毫秒數(shù)
               */

              public static final void ms(int seconds) {
                  try {
                      TimeUnit.MILLISECONDS.sleep(seconds);
                  } catch (InterruptedException e) {
                  }
              }
          }
          package org.dance.day2.util;

          import org.dance.tools.SleepTools;

          import java.util.concurrent.CountDownLatch;

          /**
           * CountDownLatch的使用,有五個線程,6個扣除點
           * 扣除完成后主線程和業(yè)務(wù)線程,才能執(zhí)行工作
           *  扣除點一般都是大于等于需要初始化的線程的
           * @author ZYGisComputer
           */

          public class UseCountDownLatch {

              /**
               * 設(shè)置為6個扣除點
               */

              static CountDownLatch countDownLatch = new CountDownLatch(6);

              /**
               * 初始化線程
               */

              private static class InitThread implements Runnable {

                  @Override
                  public void run() {

                      System.out.println("thread_" + Thread.currentThread().getId() + " ready init work .....");

                      // 執(zhí)行扣減 扣減不代表結(jié)束
                      countDownLatch.countDown();

                      for (int i = 0; i < 2; i++) {
                          System.out.println("thread_" + Thread.currentThread().getId() + ".....continue do its work");
                      }

                  }
              }

              /**
               * 業(yè)務(wù)線程
               */

              private static class BusiThread implements Runnable {

                  @Override
                  public void run() {

                      // 業(yè)務(wù)線程需要在等初始化完畢后才能執(zhí)行
                      try {
                          countDownLatch.await();
                          for (int i = 0; i < 3; i++) {
                              System.out.println("BusiThread " + Thread.currentThread().getId() + " do business-----");
                          }
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                  }
              }

              public static void main(String[] args) {

                  // 創(chuàng)建單獨的初始化線程
                  new Thread(){
                      @Override
                      public void run() {
                          SleepTools.ms(1);
                          System.out.println("thread_" + Thread.currentThread().getId() + " ready init work step 1st.....");
                          // 扣減一次
                          countDownLatch.countDown();
                          System.out.println("begin stop 2nd.....");
                          SleepTools.ms(1);
                          System.out.println("thread_" + Thread.currentThread().getId() + " ready init work step 2nd.....");
                          // 扣減一次
                          countDownLatch.countDown();

                      }
                  }.start();
                  // 啟動業(yè)務(wù)線程
                  new Thread(new BusiThread()).start();
                  // 啟動初始化線程
                  for (int i = 0; i <= 3; i++) {
                      new Thread(new InitThread()).start();
                  }
                  // 主線程進入等待
                  try {
                      countDownLatch.await();
                      System.out.println("Main do ites work.....");
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }

              }

          }

          返回結(jié)果:

          thread_13 ready init work .....
          thread_13.....continue do its work
          thread_13.....continue do its work
          thread_14 ready init work .....
          thread_14.....continue do its work
          thread_14.....continue do its work
          thread_15 ready init work .....
          thread_15.....continue do its work
          thread_11 ready init work step 1st.....
          begin stop 2nd.....
          thread_16 ready init work .....
          thread_16.....continue do its work
          thread_16.....continue do its work
          thread_15.....continue do its work
          thread_11 ready init work step 2nd.....
          Main do ites work.....
          BusiThread 12 do business-----
          BusiThread 12 do business-----
          BusiThread 12 do business-----

          通過返回結(jié)果就可以很直接的看到業(yè)務(wù)線程是在初始化線程完全跑完之后,才開始執(zhí)行的

          CyclicBarrier

          CyclicBarrier,俗稱柵欄鎖,作用是讓一組線程到達某個屏障,被阻塞,一直到組內(nèi)的最后一個線程到達,然后屏障開放,接著,所有的線程繼續(xù)運行

          這個感覺和 CountDownLatch 有點相似,但是其實是不一樣的,所謂的差別,將在下面詳解

          CyclicBarrier 的構(gòu)造參數(shù)有兩個

          /**
           * Creates a new {@code CyclicBarrier} that will trip when the
           * given number of parties (threads) are waiting upon it, and
           * does not perform a predefined action when the barrier is tripped.
           *
           * @param parties the number of threads that must invoke {@link #await}
           *                before the barrier is tripped
           * @throws IllegalArgumentException if {@code parties} is less than 1
           */

          public CyclicBarrier(int parties) {
              this(parties, null);
          }
          package com.example.demo;

          /**
           * <b>description</b>: <br>
           * <b>time</b>:2021/10/25 18:28 <br>
           * <b>author</b>:ready [email protected]
           */

          public class Demo {
              /**
               * Creates a new {@code CyclicBarrier} that will trip when the
               * given number of parties (threads) are waiting upon it, and which
               * will execute the given barrier action when the barrier is tripped,
               * performed by the last thread entering the barrier.
               *
               * @param parties       the number of threads that must invoke {@link #await}
               *                      before the barrier is tripped
               * @param barrierAction the command to execute when the barrier is
               *                      tripped, or {@code null} if there is no action
               * @throws IllegalArgumentException if {@code parties} is less than 1
               */

              public CyclicBarrier(int parties, Runnable barrierAction) {
                  if (parties <= 0throw new IllegalArgumentException();
                  this.parties = parties;
                  this.count = parties;
                  this.barrierCommand = barrierAction;
              }
          }

          很明顯能感覺出來,上面的構(gòu)造參數(shù)調(diào)用了下面的構(gòu)造參數(shù),是一個構(gòu)造方法重載

          首先這個第一個參數(shù)也是 Int 類型的,傳入的是執(zhí)行線程的個數(shù),這個數(shù)量和 CountDownLatch 不一樣,這個數(shù)量是需要和線程數(shù)量吻合的,CountDownLatch 則不一樣,CountDownLatch 可以大于等于,而 CyclicBarrier 只能等于,然后是第二個參數(shù),第二個參數(shù)是 barrierAction,這個參數(shù)是當(dāng)屏障開放后,執(zhí)行的任務(wù)線程,如果當(dāng)屏障開放后需要執(zhí)行什么任務(wù),可以寫在這個線程中

          主線程創(chuàng)建 CyclicBarrier(3,barrierAction),然后由線程開始執(zhí)行,線程 A,B 執(zhí)行完成后都調(diào)用了 await,然后他們都在一個屏障前阻塞者,需要等待線程 C 也,執(zhí)行完成,調(diào)用 await 之后,然后三個線程都達到屏障后,屏障開放,然后線程繼續(xù)執(zhí)行,并且 barrierAction 在屏障開放的一瞬間也開始執(zhí)行

          上代碼:

          package org.dance.day2.util;

          import org.dance.tools.SleepTools;

          import java.util.Map;
          import java.util.Random;
          import java.util.concurrent.BrokenBarrierException;
          import java.util.concurrent.ConcurrentHashMap;
          import java.util.concurrent.CyclicBarrier;

          /**
           * CyclicBarrier的使用
           *
           * @author ZYGisComputer
           */

          public class UseCyclicBarrier {

              /**
               * 存放子線程工作結(jié)果的安全容器
               */

              private static ConcurrentHashMap<String, Long> resultMap = new ConcurrentHashMap<>();

              private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5,new CollectThread());

              /**
               * 結(jié)果打印線程
               * 用來演示CyclicBarrier的第二個參數(shù),barrierAction
               */

              private static class CollectThread implements Runnable {

                  @Override
                  public void run() {
                      StringBuffer result = new StringBuffer();
                      for (Map.Entry<String, Long> workResult : resultMap.entrySet()) {
                          result.append("[" + workResult.getValue() + "]");
                      }
                      System.out.println("the result = " + result);
                      System.out.println("do other business.....");

                  }
              }

              /**
               * 工作子線程
               * 用于CyclicBarrier的一組線程
               */

              private static class SubThread implements Runnable {

                  @Override
                  public void run() {
                      // 獲取當(dāng)前線程的ID
                      long id = Thread.currentThread().getId();
                      // 放入統(tǒng)計容器中
                      resultMap.put(String.valueOf(id), id);
                      Random random = new Random();
                      try {
                          if (random.nextBoolean()) {
                              Thread.sleep(1000 + id);
                              System.out.println("Thread_"+id+"..... do something");
                          }
                          System.out.println(id+" is await");
                          cyclicBarrier.await();
                          Thread.sleep(1000+id);
                          System.out.println("Thread_"+id+".....do its business");
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      } catch (BrokenBarrierException e) {
                          e.printStackTrace();
                      }

                  }
              }

              public static void main(String[] args) {
                  for (int i = 0; i <= 4; i++) {
                      Thread thread = new Thread(new SubThread());
                      thread.start();
                  }
              }

          }

          返回結(jié)果:

          11 is await
          14 is await
          15 is await
          Thread_12..... do something
          12 is await
          Thread_13..... do something
          13 is await
          the result = [11][12][13][14][15]
          do other business.....
          Thread_11.....do its business
          Thread_12.....do its business
          Thread_13.....do its business
          Thread_14.....do its business
          Thread_15.....do its business

          通過返回結(jié)果可以看出前面的 11 14 15 三個線程沒有進入 if 語句塊,在執(zhí)行到 await 的時候進入了等待,而另外 12 13 兩個線程進入到了 if 語句塊當(dāng)中,多休眠了 1 秒多,然后當(dāng) 5 個線程同時到達 await 的時候,屏障開放,執(zhí)行了 barrierAction 線程,然后線程組繼續(xù)執(zhí)行

          解釋一下 CountDownLatch 和 CyclicBarrier 的區(qū)別吧!

          首先就是 CountDownLatch 的構(gòu)造參數(shù)傳入的數(shù)量一般都是大于等于線程,數(shù)量的,因為他是有第三方控制的,可以扣減多次,然后就是 CyclicBarrier 的構(gòu)造參數(shù)第一個參數(shù)傳入的數(shù)量一定是等于線程的個數(shù)的,因為他是由一組線程自身控制的。

          區(qū)別 CountDownLatch CyclicBarrier
          控制 第三方控制 自身控制
          傳入數(shù)量 大于等于線程數(shù)量 等于線程數(shù)量

          Semaphore

          Semaphore,俗稱信號量,作用于控制同時訪問某個特定資源的線程數(shù)量,用在流量控制

          一說特定資源控制,那么第一時間就想到了數(shù)據(jù)庫連接..

          之前用等待超時模式寫了一個數(shù)據(jù)庫連接池,打算用這個 Semaphone 也寫一個

          /**
           * Creates a {@code Semaphore} with the given number of
           * permits and nonfair fairness setting.
           *
           * @param permits the initial number of permits available.
           *                This value may be negative, in which case releases
           *                must occur before any acquires will be granted.
           */

          public Semaphore(int permits) {
              sync = new NonfairSync(permits);
          }

          在源碼中可以看到在構(gòu)建 Semaphore 信號量的時候,需要傳入許可證的數(shù)量,這個數(shù)量就是資源的最大允許的訪問的線程數(shù)

          接下里用信號量實現(xiàn)一個數(shù)據(jù)庫連接池

          連接對象

          package org.dance.day2.util.pool;

          import org.dance.tools.SleepTools;

          import java.sql.*;
          import java.util.Map;
          import java.util.Properties;
          import java.util.concurrent.Executor;

          /**
           * 數(shù)據(jù)庫連接
           * @author ZYGisComputer
           */

          public class SqlConnection implements Connection {

              /**
               * 獲取數(shù)據(jù)庫連接
               * @return
               */

              public static final Connection fetchConnection(){
                  return new SqlConnection();
              }

              @Override
              public void commit() throws SQLException {
                  SleepTools.ms(70);
              }

              @Override
              public Statement createStatement() throws SQLException {
                  SleepTools.ms(1);
                  return null;
              }

              @Override
              public PreparedStatement prepareStatement(String sql) throws SQLException {
                  return null;
              }

              @Override
              public CallableStatement prepareCall(String sql) throws SQLException {
                  return null;
              }

              @Override
              public String nativeSQL(String sql) throws SQLException {
                  return null;
              }

              @Override
              public void setAutoCommit(boolean autoCommit) throws SQLException {

              }

              @Override
              public boolean getAutoCommit() throws SQLException {
                  return false;
              }

              @Override
              public void rollback() throws SQLException {

              }

              @Override
              public void close() throws SQLException {

              }

              @Override
              public boolean isClosed() throws SQLException {
                  return false;
              }

              @Override
              public DatabaseMetaData getMetaData() throws SQLException {
                  return null;
              }

              @Override
              public void setReadOnly(boolean readOnly) throws SQLException {

              }

              @Override
              public boolean isReadOnly() throws SQLException {
                  return false;
              }

              @Override
              public void setCatalog(String catalog) throws SQLException {

              }

              @Override
              public String getCatalog() throws SQLException {
                  return null;
              }

              @Override
              public void setTransactionIsolation(int level) throws SQLException {

              }

              @Override
              public int getTransactionIsolation() throws SQLException {
                  return 0;
              }

              @Override
              public SQLWarning getWarnings() throws SQLException {
                  return null;
              }

              @Override
              public void clearWarnings() throws SQLException {

              }

              @Override
              public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
                  return null;
              }

              @Override
              public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
                  return null;
              }

              @Override
              public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
                  return null;
              }

              @Override
              public Map<String, Class<?>> getTypeMap() throws SQLException {
                  return null;
              }

              @Override
              public void setTypeMap(Map<String, Class<?>> map) throws SQLException {

              }

              @Override
              public void setHoldability(int holdability) throws SQLException {

              }

              @Override
              public int getHoldability() throws SQLException {
                  return 0;
              }

              @Override
              public Savepoint setSavepoint() throws SQLException {
                  return null;
              }

              @Override
              public Savepoint setSavepoint(String name) throws SQLException {
                  return null;
              }

              @Override
              public void rollback(Savepoint savepoint) throws SQLException {

              }

              @Override
              public void releaseSavepoint(Savepoint savepoint) throws SQLException {

              }

              @Override
              public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
                  return null;
              }

              @Override
              public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
                  return null;
              }

              @Override
              public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
                  return null;
              }

              @Override
              public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
                  return null;
              }

              @Override
              public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
                  return null;
              }

              @Override
              public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
                  return null;
              }

              @Override
              public Clob createClob() throws SQLException {
                  return null;
              }

              @Override
              public Blob createBlob() throws SQLException {
                  return null;
              }

              @Override
              public NClob createNClob() throws SQLException {
                  return null;
              }

              @Override
              public SQLXML createSQLXML() throws SQLException {
                  return null;
              }

              @Override
              public boolean isValid(int timeout) throws SQLException {
                  return false;
              }

              @Override
              public void setClientInfo(String name, String value) throws SQLClientInfoException {

              }

              @Override
              public void setClientInfo(Properties properties) throws SQLClientInfoException {

              }

              @Override
              public String getClientInfo(String name) throws SQLException {
                  return null;
              }

              @Override
              public Properties getClientInfo() throws SQLException {
                  return null;
              }

              @Override
              public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
                  return null;
              }

              @Override
              public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
                  return null;
              }

              @Override
              public void setSchema(String schema) throws SQLException {

              }

              @Override
              public String getSchema() throws SQLException {
                  return null;
              }

              @Override
              public void abort(Executor executor) throws SQLException {

              }

              @Override
              public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {

              }

              @Override
              public int getNetworkTimeout() throws SQLException {
                  return 0;
              }

              @Override
              public <T> unwrap(Class<T> iface) throws SQLException {
                  return null;
              }

              @Override
              public boolean isWrapperFor(Class<?> iface) throws SQLException {
                  return false;
              }
          }

          連接池對象

          package org.dance.day2.util.pool;

          import java.sql.Connection;
          import java.util.ArrayList;
          import java.util.HashSet;
          import java.util.Iterator;
          import java.util.LinkedList;
          import java.util.concurrent.Semaphore;

          /**
           * 使用信號量控制數(shù)據(jù)庫的鏈接和釋放
           *
           * @author ZYGisComputer
           */

          public class DBPoolSemaphore {

              /**
               * 池容量
               */

              private final static int POOL_SIZE = 10;

              /**
               * useful 代表可用連接
               * useless 代表已用連接
               *  為什么要使用兩個Semaphore呢?是因為,在連接池中不只有連接本身是資源,空位也是資源,也需要記錄
               */

              private final Semaphore useful, useless;

              /**
               * 連接池
               */

              private final static LinkedList<Connection> POOL = new LinkedList<>();

              /**
               * 使用靜態(tài)塊初始化池
               */

              static {
                  for (int i = 0; i < POOL_SIZE; i++) {
                      POOL.addLast(SqlConnection.fetchConnection());
                  }
              }

              public DBPoolSemaphore() {
                  // 初始可用的許可證等于池容量
                  useful = new Semaphore(POOL_SIZE);
                  // 初始不可用的許可證容量為0
                  useless = new Semaphore(0);
              }

              /**
               * 獲取數(shù)據(jù)庫連接
               *
               * @return 連接對象
               */

              public Connection takeConnection() throws InterruptedException {
                  // 可用許可證減一
                  useful.acquire();
                  Connection connection;
                  synchronized (POOL) {
                      connection = POOL.removeFirst();
                  }
                  // 不可用許可證數(shù)量加一
                  useless.release();
                  return connection;
              }

              /**
               * 釋放鏈接
               *
               * @param connection 連接對象
               */

              public void returnConnection(Connection connection) throws InterruptedException {
                  if(null!=connection){
                      // 打印日志
                      System.out.println("當(dāng)前有"+useful.getQueueLength()+"個線程等待獲取連接,,"
                              +"可用連接有"+useful.availablePermits()+"個");
                      // 不可用許可證減一
                      useless.acquire();
                      synchronized (POOL){
                          POOL.addLast(connection);
                      }
                      // 可用許可證加一
                      useful.release();
                  }
              }

          }

          測試類:

          package org.dance.day2.util.pool;

          import org.dance.tools.SleepTools;

          import java.sql.Connection;
          import java.util.Random;

          /**
           * 測試Semaphore
           * @author ZYGisComputer
           */

          public class UseSemaphore {

              /**
               * 連接池
               */

              public static final DBPoolSemaphore pool = new DBPoolSemaphore();

              private static class BusiThread extends Thread{
                  @Override
                  public void run() {
                      // 隨機數(shù)工具類 為了讓每個線程持有連接的時間不一樣
                      Random random = new Random();
                      long start = System.currentTimeMillis();
                      try {
                          Connection connection = pool.takeConnection();
                          System.out.println("Thread_"+Thread.currentThread().getId()+
                                  "_獲取數(shù)據(jù)庫連接耗時["+(System.currentTimeMillis()-start)+"]ms.");
                          // 模擬使用連接查詢數(shù)據(jù)
                          SleepTools.ms(100+random.nextInt(100));
                          System.out.println("查詢數(shù)據(jù)完成歸還連接");
                          pool.returnConnection(connection);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                  }
              }

              public static void main(String[] args) {
                  for (int i = 0; i < 50; i++) {
                      BusiThread busiThread = new BusiThread();
                      busiThread.start();
                  }
              }

          }

          測試返回結(jié)果:

          Thread_11_獲取數(shù)據(jù)庫連接耗時[0]ms.
          Thread_12_獲取數(shù)據(jù)庫連接耗時[0]ms.
          Thread_13_獲取數(shù)據(jù)庫連接耗時[0]ms.
          Thread_14_獲取數(shù)據(jù)庫連接耗時[0]ms.
          Thread_15_獲取數(shù)據(jù)庫連接耗時[0]ms.
          Thread_16_獲取數(shù)據(jù)庫連接耗時[0]ms.
          Thread_17_獲取數(shù)據(jù)庫連接耗時[0]ms.
          Thread_18_獲取數(shù)據(jù)庫連接耗時[0]ms.
          Thread_19_獲取數(shù)據(jù)庫連接耗時[0]ms.
          Thread_20_獲取數(shù)據(jù)庫連接耗時[0]ms.
          查詢數(shù)據(jù)完成歸還連接
          當(dāng)前有40個線程等待獲取連接,,可用連接有0
          Thread_21_獲取數(shù)據(jù)庫連接耗時[112]ms.
          查詢數(shù)據(jù)完成歸還連接
          ...................查詢數(shù)據(jù)完成歸還連接
          當(dāng)前有2個線程等待獲取連接,,可用連接有0
          Thread_59_獲取數(shù)據(jù)庫連接耗時[637]ms.
          查詢數(shù)據(jù)完成歸還連接
          當(dāng)前有1個線程等待獲取連接,,可用連接有0
          Thread_60_獲取數(shù)據(jù)庫連接耗時[660]ms.
          查詢數(shù)據(jù)完成歸還連接
          當(dāng)前有0個線程等待獲取連接,,可用連接有0
          查詢數(shù)據(jù)完成歸還連接...................
          當(dāng)前有0個線程等待獲取連接,,可用連接有8
          查詢數(shù)據(jù)完成歸還連接
          當(dāng)前有0個線程等待獲取連接,,可用連接有9

          通過執(zhí)行結(jié)果可以很明確的看到,一上來就有 10 個線程獲取到了連接,,然后后面的 40 個線程進入阻塞,然后只有釋放鏈接之后,等待的線程就會有一個拿到,然后越后面的線程等待的時間就越長,然后一直到所有的線程執(zhí)行完畢。

          最后打印的可用連接有九個不是因為少了一個是因為在釋放之前打印的,不是錯誤。

          從結(jié)果中可以看到,我們對連接池中的資源的到了控制,這就是信號量的流量控制。

          Exchanger

          Exchanger,俗稱交換器,用于在線程之間交換數(shù)據(jù),但是比較受限,因為只能兩個線程之間交換數(shù)據(jù)

          /**
           * Creates a new Exchanger.
           */

          public Exchanger() {
              participant = new Participant();
          }

          這個構(gòu)造函數(shù)沒有什么好說的,也沒有入?yún)?只有在創(chuàng)建的時候指定一下需要交換的數(shù)據(jù)的泛型即可,下面看代碼

          package org.dance.day2.util;

          import java.util.HashSet;
          import java.util.Set;
          import java.util.concurrent.Exchanger;

          /**
           * 線程之間交換數(shù)據(jù)
           * @author ZYGisComputer
           */

          public class UseExchange {

              private static final Exchanger<Set<String>> exchanger = new Exchanger<>();

              public static void main(String[] args) {

                  new Thread(){
                      @Override
                      public void run() {
                          Set<String> aSet = new HashSet<>();
                          aSet.add("A");
                          aSet.add("B");
                          aSet.add("C");
                          try {
                              Set<String> exchange = exchanger.exchange(aSet);
                              for (String s : exchange) {
                                  System.out.println("aSet"+s);
                              }
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                      }
                  }.start();

                  new Thread(){
                      @Override
                      public void run() {
                          Set<String> bSet = new HashSet<>();
                          bSet.add("1");
                          bSet.add("2");
                          bSet.add("3");
                          try {
                              Set<String> exchange = exchanger.exchange(bSet);
                              for (String s : exchange) {
                                  System.out.println("bSet"+s);
                              }
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                      }
                  }.start();

              }

          }

          執(zhí)行結(jié)果:

          bSetA
          bSetB
          bSetC
          aSet1
          aSet2
          aSet3

          通過執(zhí)行結(jié)果可以清晰的看到,兩個線程中的數(shù)據(jù)發(fā)生了交換,這就是 Exchanger 的線程數(shù)據(jù)交換了。

          以上就是 JUC 的 4 大常用并發(fā)工具類了。

          來源:cnblogs.com/flower-dance/p/13714006.html

          ·················END·················

             
                

          資料鏈接


               
          清華學(xué)姐自學(xué)的Linux筆記,天花板級別!
          新版鳥哥Linux私房菜資料
          阿里大佬總結(jié)的《圖解Java》火了,完整版PDF開放下載!
          Alibaba官方上線!SpringBoot+SpringCloud全彩指南
          國內(nèi)最強的SpringBoot+Vue全棧項目天花板,不接受反駁!

          瀏覽 799
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  一级A片免费看 | 草逼wwwwww. | 中文字幕一区二区三区日本在线 | 深夜福利小视频 | 可以看的三级网站 |