<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Java并發(fā)控制機(jī)制詳解

          共 26739字,需瀏覽 54分鐘

           ·

          2021-05-28 10:59

          點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號(hào)”

          優(yōu)質(zhì)文章,第一時(shí)間送達(dá)

            作者 |  ypp91zr

          來源 |  urlify.cn/nqe2ei

          在一般性開發(fā)中,筆者經(jīng)??吹胶芏嗤瑢W(xué)在對(duì)待java并發(fā)開發(fā)模型中只會(huì)使用一些基礎(chǔ)的方法。比如Volatile,synchronized。像Lock和atomic這類高級(jí)并發(fā)包很多人并不經(jīng)常使用。我想大部分原因都是來之于對(duì)原理的不屬性導(dǎo)致的。在繁忙的開發(fā)工作中,又有誰會(huì)很準(zhǔn)確的把握和使用正確的并發(fā)模型呢?

          所以最近基于這個(gè)思想,本人打算把并發(fā)控制機(jī)制這部分整理成一篇文章。既是對(duì)自己掌握知識(shí)的一個(gè)回憶,也是希望這篇講到的類容能幫助到大部分開發(fā)者。

          并行程序開發(fā)不可避免地要涉及多線程、多任務(wù)的協(xié)作和數(shù)據(jù)共享等問題。在JDK中,提供了多種途徑實(shí)現(xiàn)多線程間的并發(fā)控制。比如常用的:內(nèi)部鎖、重入鎖、讀寫鎖和信號(hào)量。

          Java內(nèi)存模型

          在java中,每一個(gè)線程有一塊工作內(nèi)存區(qū),其中存放著被所有線程共享的主內(nèi)存中的變量的值的拷貝。當(dāng)線程執(zhí)行時(shí),它在自己的工作內(nèi)存中操作這些變量。

          為了存取一個(gè)共享的變量,一個(gè)線程通常先獲取鎖定并且清除它的工作內(nèi)存區(qū),這保證該共享變量從所有線程的共享內(nèi)存區(qū)正確地裝入到線程的工作內(nèi)存區(qū),當(dāng)線程解鎖時(shí)保證該工作內(nèi)存區(qū)中變量的值協(xié)會(huì)到共享內(nèi)存中。

          當(dāng)一個(gè)線程使用某一個(gè)變量時(shí),不論程序是否正確地使用線程同步操作,它獲取的值一定是由它本身或者其他線程存儲(chǔ)到變量中的值。例如,如果兩個(gè)線程把不同的值或者對(duì)象引用存儲(chǔ)到同一個(gè)共享變量中,那么該變量的值要么是這個(gè)線程的,要么是那個(gè)線程的,共享變量的值不會(huì)是由兩個(gè)線程的引用值組合而成。

          一個(gè)變量時(shí)Java程序可以存取的一個(gè)地址,它不僅包括基本類型變量、引用類型變量,而且還包括數(shù)組類型變量。保存在主內(nèi)存區(qū)的變量可以被所有線程共享,但是一個(gè)線程存取另一個(gè)線程的參數(shù)或者局部變量時(shí)不可能的,所以開發(fā)人員不必?fù)?dān)心局部變量的線程安全問題。

          volatile變量–多線程間可見

          由于每個(gè)線程都有自己的工作內(nèi)存區(qū),因此當(dāng)一個(gè)線程改變自己的工作內(nèi)存中的數(shù)據(jù)時(shí),對(duì)其他線程來說,可能是不可見的。為此,可以使用volatile關(guān)鍵字破事所有線程軍讀寫內(nèi)存中的變量,從而使得volatile變量在多線程間可見。

          聲明為volatile的變量可以做到如下保證:

          1、其他線程對(duì)變量的修改,可以及時(shí)反應(yīng)在當(dāng)前線程中;
          2、確保當(dāng)前線程對(duì)volatile變量的修改,能及時(shí)寫回到共享內(nèi)存中,并被其他線程所見;
          3、使用volatile聲明的變量,編譯器會(huì)保證其有序性。

          同步關(guān)鍵字synchronized

          同步關(guān)鍵字synchronized是Java語言中最為常用的同步方法之一。在JDK早期版本中,synchronized的性能并不是太好,值適合于鎖競(jìng)爭(zhēng)不是特別激烈的場(chǎng)合。在JDK6中,synchronized和非公平鎖的差距已經(jīng)縮小。更為重要的是,synchronized更為簡(jiǎn)潔明了,代碼可讀性和維護(hù)性比較好。

          鎖定一個(gè)對(duì)象的方法:

          public synchronized void method(){}

          當(dāng)method()方法被調(diào)用時(shí),調(diào)用線程首先必須獲得當(dāng)前對(duì)象所,若當(dāng)前對(duì)象鎖被其他線程持有,這調(diào)用線程會(huì)等待,犯法結(jié)束后,對(duì)象鎖會(huì)被釋放,以上方法等價(jià)于下面的寫法:

          public void method(){
          synchronized(this){
          // do something …
          }
          }

          其次,使用synchronized還可以構(gòu)造同步塊,與同步方法相比,同步塊可以更為精確控制同步代碼范圍。一個(gè)小的同步代碼非常有離與鎖的快進(jìn)快出,從而使系統(tǒng)擁有更高的吞吐量。

          public void method(Object o){
          // before
          synchronized(o){
          // do something ...
          }
          // after
          }

          synchronized也可以用于static函數(shù):

          public synchronized static void method(){}

          這個(gè)地方一定要注意,synchronized的鎖是加在當(dāng)前Class對(duì)象上,因此,所有對(duì)該方法的調(diào)用,都必須獲得Class對(duì)象的鎖。

          雖然synchronized可以保證對(duì)象或者代碼段的線程安全,但是僅使用synchronized還是不足以控制擁有復(fù)雜邏輯的線程交互。為了實(shí)現(xiàn)多線程間的交互,還需要使用Object對(duì)象的wait()和notify()方法。

          典型用法:

          synchronized(obj){
              while(<?>){
                  obj.wait();
                  // 收到通知后,繼續(xù)執(zhí)行。
              }
          }

          在使用wait()方法前,需要獲得對(duì)象鎖。在wait()方法執(zhí)行時(shí),當(dāng)前線程或釋放obj的獨(dú)占鎖,供其他線程使用。

          當(dāng)?shù)却趏bj上線程收到obj.notify()時(shí),它就能重新獲得obj的獨(dú)占鎖,并繼續(xù)運(yùn)行。注意了,notify()方法是隨機(jī)喚起等待在當(dāng)前對(duì)象的某一個(gè)線程。

          下面是一個(gè)阻塞隊(duì)列的實(shí)現(xiàn):

          public class BlockQueue{
           private List list = new ArrayList();
           
           public synchronized Object pop() throws InterruptedException{
           while (list.size()==0){
           this.wait();
           }
           if (list.size()>0){
           return list.remove(0);
           } else{
           return null;
           }
           }
           
           public synchronized Object put(Object obj){
           list.add(obj);
           this.notify();
           }
           
          }

          synchronized配合wait()、notify()應(yīng)該是Java開發(fā)者必須掌握的基本技能。

          Reentrantlock重入鎖

          Reentrantlock稱為重入鎖。它比synchronized擁有更加強(qiáng)大的功能,它可以中斷、可定時(shí)。在高并發(fā)的情況下,它比synchronized有明顯的性能優(yōu)勢(shì)。

          Reentrantlock提供了公平和非公平兩種鎖。公平鎖是對(duì)鎖的獲取是先進(jìn)先出,而非公平鎖是可以插隊(duì)的。當(dāng)然從性能上分析,非公平鎖的性能要好得多。因此,在無特殊需要,應(yīng)該優(yōu)選非公平鎖,但是synchronized提供鎖業(yè)不是絕對(duì)公平的。Reentrantlock在構(gòu)造的時(shí)候可以指定鎖是否公平。

          在使用重入鎖時(shí),一定要在程序最后釋放鎖。一般釋放鎖的代碼要寫在finally里。否則,如果程序出現(xiàn)異常,Loack就永遠(yuǎn)無法釋放了。synchronized的鎖是JVM最后自動(dòng)釋放的。

          經(jīng)典使用方式如下:

          try {
           if (lock.tryLock(5, TimeUnit.SECONDS)) { //如果已經(jīng)被lock,嘗試等待5s,看是否可以獲得鎖,如果5s后仍然無法獲得鎖則返回false繼續(xù)執(zhí)行
           // lock.lockInterruptibly();可以響應(yīng)中斷事件
           try { 
           //操作
           } finally {
           lock.unlock();
           }
           }
          } catch (InterruptedException e) {
           e.printStackTrace(); //當(dāng)前線程被中斷時(shí)(interrupt),會(huì)拋InterruptedException 
          }

          Reentrantlock提供了非常豐富的鎖控制功能,靈活應(yīng)用這些控制方法,可以提高應(yīng)用程序的性能。不過這里并非是極力推薦使用Reentrantlock。重入鎖算是JDK中提供的高級(jí)開發(fā)工具。

          ReadWriteLock讀寫鎖

          讀寫分離是一種非常常見的數(shù)據(jù)處理思想。在sql中應(yīng)該算是必須用到的技術(shù)。ReadWriteLock是在JDK5中提供的讀寫分離鎖。讀寫分離鎖可以有效地幫助減少鎖競(jìng)爭(zhēng),以提升系統(tǒng)性能。讀寫分離使用場(chǎng)景主要是如果在系統(tǒng)中,讀操作次數(shù)遠(yuǎn)遠(yuǎn)大于寫操作。使用方式如下:

          private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
          private Lock readLock = readWriteLock.readLock();
          private Lock writeLock = readWriteLock.writeLock();
          public Object handleRead() throws InterruptedException {
              try {
                  readLock.lock();
                  Thread.sleep(1000);
                  return value;
              }finally{
                  readLock.unlock();
              }
          }
          public Object handleRead() throws InterruptedException {
              try {
                  writeLock.lock();
                  Thread.sleep(1000);
                  return value;
              }finally{
                  writeLock.unlock();
              }
          }

          Condition對(duì)象

          Conditiond對(duì)象用于協(xié)調(diào)多線程間的復(fù)雜協(xié)作。主要與鎖相關(guān)聯(lián)。通過Lock接口中的newCondition()方法可以生成一個(gè)與Lock綁定的Condition實(shí)例。Condition對(duì)象和鎖的關(guān)系就如用Object.wait()、Object.notify()兩個(gè)函數(shù)以及synchronized關(guān)鍵字一樣。

          這里可以把ArrayBlockingQueue的源碼摘出來看一下:

          public class ArrayBlockingQueue extends AbstractQueue
          implements BlockingQueue, java.io.Serializable {
          /** Main lock guarding all access */
          final ReentrantLock lock;
          /** Condition for waiting takes */
          private final Condition notEmpty;
          /** Condition for waiting puts */
          private final Condition notFull;
           
          public ArrayBlockingQueue(int capacity, boolean fair) {
              if (capacity <= 0)
                  throw new IllegalArgumentException();
              this.items = new Object[capacity];
              lock = new ReentrantLock(fair); 
              notEmpty = lock.newCondition(); // 生成與Lock綁定的Condition
              notFull =  lock.newCondition();
          }
           
          public void put(E e) throws InterruptedException {
              checkNotNull(e);
              final ReentrantLock lock = this.lock;
              lock.lockInterruptibly();
              try {
                  while (count == items.length)
                      notFull.await();
                  insert(e);
              } finally {
                  lock.unlock();
              }
          }
           
          private void insert(E x) {
              items[putIndex] = x;
              putIndex = inc(putIndex);
              ++count;
              notEmpty.signal(); // 通知
          }
           
          public E take() throws InterruptedException {
              final ReentrantLock lock = this.lock;
              lock.lockInterruptibly();
              try {
                  while (count == 0) // 如果隊(duì)列為空
                      notEmpty.await();  // 則消費(fèi)者隊(duì)列要等待一個(gè)非空的信號(hào)
                  return extract();
              } finally {
                  lock.unlock();
              }
          }
           
          private E extract() {
              final Object[] items = this.items;
              E x = this.<E>cast(items[takeIndex]);
              items[takeIndex] = null;
              takeIndex = inc(takeIndex);
              --count;
              notFull.signal(); // 通知put() 線程隊(duì)列已有空閑空間
              return x;
          }
           
          // other code
          }

          Semaphore信號(hào)量

          信號(hào)量為多線程協(xié)作提供了更為強(qiáng)大的控制方法。信號(hào)量是對(duì)鎖的擴(kuò)展。無論是內(nèi)部鎖synchronized還是重入鎖ReentrantLock,一次都允許一個(gè)線程訪問一個(gè)資源,而信號(hào)量卻可以指定多個(gè)線程同時(shí)訪問某一個(gè)資源。從構(gòu)造函數(shù)可以看出:

          public Semaphore(int permits) {}
          public Semaphore(int permits, boolean fair){} // 可以指定是否公平

          permits指定了信號(hào)量的準(zhǔn)入書,也就是同時(shí)能申請(qǐng)多少個(gè)許可。當(dāng)每個(gè)線程每次只申請(qǐng)一個(gè)許可時(shí),這就相當(dāng)于指定了同時(shí)有多少個(gè)線程可以訪問某一個(gè)資源。這里羅列一下主要方法的使用:

          •  public void acquire() throws InterruptedException {} //嘗試獲得一個(gè)準(zhǔn)入的許可。若無法獲得,則線程會(huì)等待,知道有線程釋放一個(gè)許可或者當(dāng)前線程被中斷。

          • public void acquireUninterruptibly(){} // 類似于acquire(),但是不會(huì)響應(yīng)中斷。

          • public boolean tryAcquire(){} // 嘗試獲取,如果成功則為true,否則false。這個(gè)方法不會(huì)等待,立即返回。

          • public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {} // 嘗試等待多長(zhǎng)時(shí)間

          • public void release() //用于在現(xiàn)場(chǎng)訪問資源結(jié)束后,釋放一個(gè)許可,以使其他等待許可的線程可以進(jìn)行資源訪問。

          下面來看一下JDK文檔中提供使用信號(hào)量的實(shí)例。這個(gè)實(shí)例很好的解釋了如何通過信號(hào)量控制資源訪問。

          public class Pool {
          private static final int MAX_AVAILABLE = 100;
          private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
          public Object getItem() throws InterruptedException {
              available.acquire();
              // 申請(qǐng)一個(gè)許可
              // 同時(shí)只能有100個(gè)線程進(jìn)入取得可用項(xiàng),
              // 超過100個(gè)則需要等待
              return getNextAvailableItem();
          }
           
          public void putItem(Object x) {
              // 將給定項(xiàng)放回池內(nèi),標(biāo)記為未被使用
              if (markAsUnused(x)) {
                  available.release();
                  // 新增了一個(gè)可用項(xiàng),釋放一個(gè)許可,請(qǐng)求資源的線程被激活一個(gè)
              }
          }
           
          // 僅作示例參考,非真實(shí)數(shù)據(jù)
          protected Object[] items = new Object[MAX_AVAILABLE]; // 用于對(duì)象池復(fù)用對(duì)象
          protected boolean[] used = new boolean[MAX_AVAILABLE]; // 標(biāo)記作用
           
          protected synchronized Object getNextAvailableItem() {
              for (int i = 0; i < MAX_AVAILABLE; ++i) {
                  if (!used[i]) {
                      used[i] = true;
                      return items[i];
                  }
              }
              return null;
          }
           
          protected synchronized boolean markAsUnused(Object item) {
              for (int i = 0; i < MAX_AVAILABLE; ++i) {
                  if (item == items[i]) {
                      if (used[i]) {
                          used[i] = false;
                          return true;
                      } else {
                          return false;
                      }
                  }
              }
              return false;
          }
          }

          此實(shí)例簡(jiǎn)單實(shí)現(xiàn)了一個(gè)對(duì)象池,對(duì)象池最大容量為100。因此,當(dāng)同時(shí)有100個(gè)對(duì)象請(qǐng)求時(shí),對(duì)象池就會(huì)出現(xiàn)資源短缺,未能獲得資源的線程就需要等待。當(dāng)某個(gè)線程使用對(duì)象完畢后,就需要將對(duì)象返回給對(duì)象池。此時(shí),由于可用資源增加,因此,可以激活一個(gè)等待該資源的線程。

          ThreadLocal線程局部變量

          在剛開始接觸ThreadLocal,筆者很難理解這個(gè)線程局部變量的使用場(chǎng)景。當(dāng)現(xiàn)在回過頭去看,ThreadLocal是一種多線程間并發(fā)訪問變量的解決方案。與synchronized等加鎖的方式不同,ThreadLocal完全不提供鎖,而使用了以空間換時(shí)間的手段,為每個(gè)線程提供變量的獨(dú)立副本,以保障線程安全,因此它不是一種數(shù)據(jù)共享的解決方案。

          ThreadLocal是解決線程安全問題一個(gè)很好的思路,ThreadLocal類中有一個(gè)Map,用于存儲(chǔ)每一個(gè)線程的變量副本,Map中元素的鍵為線程對(duì)象,而值對(duì)應(yīng)線程的變量副本,由于Key值不可重復(fù),每一個(gè)“線程對(duì)象”對(duì)應(yīng)線程的“變量副本”,而到達(dá)了線程安全。

          特別值得注意的地方,從性能上說,ThreadLocal并不具有絕對(duì)的又是,在并發(fā)量不是很高時(shí),也行加鎖的性能會(huì)更好。但作為一套與鎖完全無關(guān)的線程安全解決方案,在高并發(fā)量或者所競(jìng)爭(zhēng)激烈的場(chǎng)合,使用ThreadLocal可以在一定程度上減少鎖競(jìng)爭(zhēng)。

          下面是一個(gè)ThreadLocal的簡(jiǎn)單使用:

          public class TestNum {
           // 通過匿名內(nèi)部類覆蓋ThreadLocal的initialValue()方法,指定初始值
           private static ThreadLocal seqNum = new ThreadLocal() {
           public Integer initialValue() {
           return 0;
           }
           };
           // 獲取下一個(gè)序列值
           public int getNextNum() {
           seqNum.set(seqNum.get() + 1);
           return seqNum.get();
          }public static void main(String[] args) {
           TestNum sn = new TestNum();
           //3個(gè)線程共享sn,各自產(chǎn)生序列號(hào)
           TestClient t1 = new TestClient(sn);
           TestClient t2 = new TestClient(sn);
           TestClient t3 = new TestClient(sn);
           t1.start();
           t2.start();
           t3.start();
           }
          private static class TestClient extends Thread {
           private TestNum sn;
          public TestClient(TestNum sn) {
           this.sn = sn;
           }
          public void run() {
           for (int i = 0; i < 3; i++) {
           // 每個(gè)線程打出3個(gè)序列值
           System.out.println("thread[" + Thread.currentThread().getName() + "] --> sn["
           + sn.getNextNum() + "]");
           }
           }
           }
           }

          輸出結(jié)果:

          thread[Thread-0] –> sn[1]
          thread[Thread-1] –> sn[1]
          thread[Thread-2] –> sn[1]
          thread[Thread-1] –> sn[2]
          thread[Thread-0] –> sn[2]
          thread[Thread-1] –> sn[3]
          thread[Thread-2] –> sn[2]
          thread[Thread-0] –> sn[3]
          thread[Thread-2] –> sn[3]

          輸出的結(jié)果信息可以發(fā)現(xiàn)每個(gè)線程所產(chǎn)生的序號(hào)雖然都共享同一個(gè)TestNum實(shí)例,但它們并沒有發(fā)生相互干擾的情況,而是各自產(chǎn)生獨(dú)立的序列號(hào),這是因?yàn)門hreadLocal為每一個(gè)線程提供了單獨(dú)的副本。

          鎖的性能和優(yōu)化

          “鎖”是最常用的同步方法之一。在平常開發(fā)中,經(jīng)常能看到很多同學(xué)直接把鎖加很大一段代碼上。還有的同學(xué)只會(huì)用一種鎖方式解決所有共享問題。顯然這樣的編碼是讓人無法接受的。特別的在高并發(fā)的環(huán)境下,激烈的鎖競(jìng)爭(zhēng)會(huì)導(dǎo)致程序的性能下降德更加明顯。因此合理使用鎖對(duì)程序的性能直接相關(guān)。

          1、線程的開銷

          在多核情況下,使用多線程可以明顯提高系統(tǒng)的性能。但是在實(shí)際情況中,使用多線程的方式會(huì)額外增加系統(tǒng)的開銷。相對(duì)于單核系統(tǒng)任務(wù)本身的資源消耗外,多線程應(yīng)用還需要維護(hù)額外多線程特有的信息。比如,線程本身的元數(shù)據(jù),線程調(diào)度,線程上下文的切換等。

          2、減小鎖持有時(shí)間 

          在使用鎖進(jìn)行并發(fā)控制的程序中,當(dāng)鎖發(fā)生競(jìng)爭(zhēng)時(shí),單個(gè)線程對(duì)鎖的持有時(shí)間與系統(tǒng)性能有著直接的關(guān)系。如果線程持有鎖的時(shí)間很長(zhǎng),那么相對(duì)地,鎖的競(jìng)爭(zhēng)程度也就越激烈。因此,在程序開發(fā)過程中,應(yīng)該盡可能地減少對(duì)某個(gè)鎖的占有時(shí)間,以減少線程間互斥的可能。比如下面這一段代碼:

          public synchronized void syncMehod(){
          beforeMethod();
          mutexMethod();
          afterMethod();
          }

          此實(shí)例如果只有mutexMethod()方法是有同步需要的,而在beforeMethod(),和afterMethod()并不需要做同步控制。如果beforeMethod(),和afterMethod()分別是重量級(jí)的方法,則會(huì)花費(fèi)較長(zhǎng)的CPU時(shí)間。在這個(gè)時(shí)候,如果并發(fā)量較大時(shí),使用這種同步方案會(huì)導(dǎo)致等待線程大量增加。因?yàn)楫?dāng)前執(zhí)行的線程只有在執(zhí)行完所有任務(wù)后,才會(huì)釋放鎖。

          下面是優(yōu)化后的方案,只在必要的時(shí)候進(jìn)行同步,這樣就能明顯減少線程持有鎖的時(shí)間,提高系統(tǒng)的吞吐量。代碼如下:

          public void syncMehod(){
          beforeMethod();
          synchronized(this){
          mutexMethod();
          }
          afterMethod();
          }

          3、減少鎖粒度

          減小鎖粒度也是一種削弱多線程鎖競(jìng)爭(zhēng)的一種有效手段,這種技術(shù)典型的使用場(chǎng)景就是ConcurrentHashMap這個(gè)類。在普通的HashMap中每當(dāng)對(duì)集合進(jìn)行add()操作或者get()操作時(shí),總是獲得集合對(duì)象的鎖。這種操作完全是一種同步行為,因?yàn)殒i是在整個(gè)集合對(duì)象上的,因此,在高并發(fā)時(shí),激烈的鎖競(jìng)爭(zhēng)會(huì)影響到系統(tǒng)的吞吐量。

          如果看過源碼的同學(xué)應(yīng)該知道HashMap是數(shù)組+鏈表的方式做實(shí)現(xiàn)的。ConcurrentHashMap在HashMap的基礎(chǔ)上將整個(gè)HashMap分成若干個(gè)段(Segment),每個(gè)段都是一個(gè)子HashMap。如果需要在增加一個(gè)新的表項(xiàng),并不是將這個(gè)HashMap加鎖,二十搜線根據(jù)hashcode得到該表項(xiàng)應(yīng)該被存放在哪個(gè)段中,然后對(duì)該段加鎖,并完成put()操作。這樣,在多線程環(huán)境中,如果多個(gè)線程同時(shí)進(jìn)行寫入操作,只要被寫入的項(xiàng)不存在同一個(gè)段中,那么線程間便可以做到真正的并行。具體的實(shí)現(xiàn)希望讀者自己花點(diǎn)時(shí)間讀一讀ConcurrentHashMap這個(gè)類的源碼,這里就不再做過多描述了。

          4、鎖分離 

          在前面提起過ReadWriteLock讀寫鎖,那么讀寫分離的延伸就是鎖的分離。同樣可以在JDK中找到鎖分離的源碼LinkedBlockingQueue。

          public class LinkedBlockingQueue extends AbstractQueue
          implements BlockingQueue, java.io.Serializable {
          /* Lock held by take, poll, etc /
          private final ReentrantLock takeLock = new ReentrantLock();
          /** Wait queue for waiting takes */
          private final Condition notEmpty = takeLock.newCondition();
           
          /** Lock held by put, offer, etc */
          private final ReentrantLock putLock = new ReentrantLock();
           
          /** Wait queue for waiting puts */
          private final Condition notFull = putLock.newCondition();
           
          public E take() throws InterruptedException {
              E x;
              int c = -1;
              final AtomicInteger count = this.count;
              final ReentrantLock takeLock = this.takeLock;
              takeLock.lockInterruptibly(); // 不能有兩個(gè)線程同時(shí)讀取數(shù)據(jù)
              try {
                  while (count.get() == 0) { // 如果當(dāng)前沒有可用數(shù)據(jù),一直等待put()的通知
                      notEmpty.await();
                  }
                  x = dequeue(); // 從頭部移除一項(xiàng)
                  c = count.getAndDecrement(); // size減1
                  if (c > 1)
                      notEmpty.signal(); // 通知其他take()操作
              } finally {
                  takeLock.unlock(); // 釋放鎖
              }
              if (c == capacity)
                  signalNotFull(); // 通知put()操作,已有空余空間
              return x;
          }
           
          public void put(E e) throws InterruptedException {
              if (e == null) throw new NullPointerException();
              // Note: convention in all put/take/etc is to preset local var
              // holding count negative to indicate failure unless set.
              int c = -1;
              Node<E> node = new Node(e);
              final ReentrantLock putLock = this.putLock;
              final AtomicInteger count = this.count;
              putLock.lockInterruptibly(); // 不能有兩個(gè)線程同時(shí)put數(shù)據(jù)
              try {
                  /*
                   * Note that count is used in wait guard even though it is
                   * not protected by lock. This works because count can
                   * only decrease at this point (all other puts are shut
                   * out by lock), and we (or some other waiting put) are
                   * signalled if it ever changes from capacity. Similarly
                   * for all other uses of count in other wait guards.
                   */
                  while (count.get() == capacity) { // 隊(duì)列滿了 則等待
                      notFull.await();
                  }
                  enqueue(node); // 加入隊(duì)列
                  c = count.getAndIncrement();// size加1
                  if (c + 1 < capacity)
                      notFull.signal(); // 如果有足夠空間,通知其他線程
              } finally {
                  putLock.unlock();// 釋放鎖
              }
              if (c == 0)
                  signalNotEmpty();// 插入成功后,通知take()操作讀取數(shù)據(jù)
          }
           
          // other code     
          }

          這里需要說明一下的就是,take()和put()函數(shù)是相互獨(dú)立的,它們之間不存在鎖競(jìng)爭(zhēng)關(guān)系。只需要在take()和put()各自方法內(nèi)部分別對(duì)takeLock和putLock發(fā)生競(jìng)爭(zhēng)。從而,削弱了鎖競(jìng)爭(zhēng)的可能性。

          5、鎖粗化

          上面說到的減小鎖時(shí)間和粒度,這樣做就是為了滿足每個(gè)線程持有鎖的時(shí)間盡量短。但是,在粒度上應(yīng)該把握一個(gè)度,如果對(duì)用一個(gè)鎖不停地進(jìn)行請(qǐng)求、同步和釋放,其本身也會(huì)消耗系統(tǒng)寶貴的資源,反而加大了系統(tǒng)開銷。

          我們需要知道的是,虛擬機(jī)在遇到一連串連續(xù)的對(duì)同一鎖不斷進(jìn)行請(qǐng)求和釋放的操作時(shí),便會(huì)把所有的鎖操作整合成對(duì)鎖的一次請(qǐng)求,從而減少對(duì)鎖的請(qǐng)求同步次數(shù),這樣的操作叫做鎖的粗化。下面是一段整合實(shí)例演示:

          public void syncMehod(){
          synchronized(lock){
          method1();
          }
          synchronized(lock){
          method2();
          }
          }

          JVM整合后的形式:

          public void syncMehod(){
          synchronized(lock){
          method1();
          method2();
          }
          }

          因此,這樣的整合給我們開發(fā)人員對(duì)鎖粒度的把握給出了很好的演示作用。

          無鎖的并行計(jì)算

          上面花了很大篇幅在說鎖的事情,同時(shí)也提到過鎖是會(huì)帶來一定的上下文切換的額外資源開銷,在高并發(fā)時(shí),”鎖“的激烈競(jìng)爭(zhēng)可能會(huì)成為系統(tǒng)瓶頸。因此,這里可以使用一種非阻塞同步方法。這種無鎖方式依然能保證數(shù)據(jù)和程序在高并發(fā)環(huán)境下保持多線程間的一致性。

          1、非阻塞同步/無鎖
          非阻塞同步方式其實(shí)在前面的ThreadLocal中已經(jīng)有所體現(xiàn),每個(gè)線程擁有各自獨(dú)立的變量副本,因此在并行計(jì)算時(shí),無需相互等待。這里筆者主要推薦一種更為重要的、基于比較并交換(Compare And Swap)CAS算法的無鎖并發(fā)控制方法。

          CAS算法的過程:它包含3個(gè)參數(shù)CAS(V,E,N)。V表示要更新的變量,E表示預(yù)期值,N表示新值。僅當(dāng)V值等于E值時(shí),才會(huì)將V的值設(shè)為N,如果V值和E值不同,則說明已經(jīng)有其他線程做了更新,則當(dāng)前線程什么都不做。最后CAS返回當(dāng)前V的真實(shí)值。CAS操作時(shí)抱著樂觀的態(tài)度進(jìn)行的,它總是認(rèn)為自己可以成功完成操作。當(dāng)多個(gè)線程同時(shí)使用CAS操作一個(gè)變量時(shí),只有一個(gè)會(huì)勝出,并成功更新,其余俊輝失敗。失敗的線程不會(huì)被掛起,僅是被告知失敗,并且允許再次嘗試,當(dāng)然也允許失敗的線程放棄操作?;谶@樣的原理,CAS操作及時(shí)沒有鎖,也可以發(fā)現(xiàn)其他線程對(duì)當(dāng)前線程的干擾,并且進(jìn)行恰當(dāng)?shù)奶幚怼?/p>

          2、原子量操作

          JDK的java.util.concurrent.atomic包提供了使用無鎖算法實(shí)現(xiàn)的原子操作類,代碼內(nèi)部主要使用了底層native代碼的實(shí)現(xiàn)。有興趣的同學(xué)可以繼續(xù)跟蹤一下native層面的代碼。這里就不貼表層的代碼實(shí)現(xiàn)了。

          下面主要以一個(gè)例子來展示普通同步方法和無鎖同步的性能差距:

          public class TestAtomic {
          private static final int MAX_THREADS = 3;
          private static final int TASK_COUNT = 3;
          private static final int TARGET_COUNT = 100 * 10000;
          private AtomicInteger acount = new AtomicInteger(0);
          private int count = 0;
          synchronized int inc() {
              return ++count;
          }
           
          synchronized int getCount() {
              return count;
          }
           
          public class SyncThread implements Runnable {
              String name;
              long startTime;
              TestAtomic out;
           
              public SyncThread(TestAtomic o, long startTime) {
                  this.out = o;
                  this.startTime = startTime;
              }
           
              @Override
              public void run() {
                  int v = out.inc();
                  while (v < TARGET_COUNT) {
                      v = out.inc();
                  }
                  long endTime = System.currentTimeMillis();
                  System.out.println("SyncThread spend:" + (endTime - startTime) + "ms" + ", v=" + v);
              }
          }
           
          public class AtomicThread implements Runnable {
              String name;
              long startTime;
           
              public AtomicThread(long startTime) {
                  this.startTime = startTime;
              }
           
              @Override
              public void run() {
                  int v = acount.incrementAndGet();
                  while (v < TARGET_COUNT) {
                      v = acount.incrementAndGet();
                  }
                  long endTime = System.currentTimeMillis();
                  System.out.println("AtomicThread spend:" + (endTime - startTime) + "ms" + ", v=" + v);
              }
          }
           
          @Test
          public void testSync() throws InterruptedException {
              ExecutorService exe = Executors.newFixedThreadPool(MAX_THREADS);
              long startTime = System.currentTimeMillis();
              SyncThread sync = new SyncThread(this, startTime);
              for (int i = 0; i < TASK_COUNT; i++) {
                  exe.submit(sync);
              }
              Thread.sleep(10000);
          }
           
          @Test
          public void testAtomic() throws InterruptedException {
              ExecutorService exe = Executors.newFixedThreadPool(MAX_THREADS);
              long startTime = System.currentTimeMillis();
              AtomicThread atomic = new AtomicThread(startTime);
              for (int i = 0; i < TASK_COUNT; i++) {
                  exe.submit(atomic);
              }
              Thread.sleep(10000);
          }
          }

          測(cè)試結(jié)果如下:

          testSync():
          SyncThread spend:201ms, v=1000002
          SyncThread spend:201ms, v=1000000
          SyncThread spend:201ms, v=1000001
          testAtomic():
          AtomicThread spend:43ms, v=1000000
          AtomicThread spend:44ms, v=1000001
          AtomicThread spend:46ms, v=1000002

          相信這樣的測(cè)試結(jié)果將內(nèi)部鎖和非阻塞同步算法的性能差異體現(xiàn)的非常明顯。因此筆者更推薦直接視同atomic下的這個(gè)原子類。

          結(jié)束語

          終于把想表達(dá)的這些東西整理完成了,其實(shí)還有一些想CountDownLatch這樣的類沒有講到。不過上面的所講到的絕對(duì)是并發(fā)編程中的核心。也許有些讀者朋友能在網(wǎng)上看到很多這樣的知識(shí)點(diǎn),但是個(gè)人還是覺得知識(shí)只有在對(duì)比的基礎(chǔ)上才能找到它合適的使用場(chǎng)景。因此,這也是筆者整理這篇文章的原因,也希望這篇文章能幫到更多的同學(xué)。









          瀏覽 94
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  成人拍拍视频 | 69精品自拍| 婷婷综合伊人 | 囯产精品宾馆在线精品酒店 | 中国一级片操小逼的 |