<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          5W字高質(zhì)量java并發(fā)系列詳解教程(上)-附PDF下載

          共 47097字,需瀏覽 95分鐘

           ·

          2021-09-06 20:11

          并發(fā)是java高級程序員必須要深入研究的話題,從Synchronized到Lock,JDK本身提供了很多優(yōu)秀的并發(fā)類和鎖控制器,靈活使用這些類,可以寫出優(yōu)秀的并發(fā)程序,而這些類基本上都是在java.util.concurrent包中的,本文將會從具體的例子出發(fā),一步一步帶領(lǐng)大家進入java高質(zhì)量并發(fā)的世界。

          本文PDF下載鏈接concurrent-all-in-one.pdf

          本文的例子可以參考github.com/ddean2009/l…

          第一章  java.util.concurrent簡介

          java.util.concurrent包提供了很多有用的類,方便我們進行并發(fā)程序的開發(fā)。本文將會做一個總體的簡單介紹。

          主要的組件

          java.util.concurrent包含了很多內(nèi)容, 本文將會挑選其中常用的一些類來進行大概的說明:

          • Executor

          • ExecutorService

          • ScheduledExecutorService

          • Future

          • CountDownLatch

          • CyclicBarrier

          • Semaphore

          • ThreadFactory

          Executor

          Executor是一個接口,它定義了一個execute方法,這個方法接收一個Runnable,并在其中調(diào)用Runnable的run方法。

          我們看一個Executor的實現(xiàn):

          public class Invoker implements Executor {
          @Override
          public void execute(Runnable r) {
          r.run();
          }
          }
          復制代碼

          現(xiàn)在我們可以直接調(diào)用該類中的方法:

              public void execute() {
          Executor executor = new Invoker();
          executor.execute( () -> {
          log.info("{}", Thread.currentThread().toString());
          });
          }
          復制代碼

          注意,Executor并不一定要求執(zhí)行的任務(wù)是異步的。

          ExecutorService

          如果我們真正的需要使用多線程的話,那么就需要用到ExecutorService了。

          ExecutorService管理了一個內(nèi)存的隊列,并定時提交可用的線程。

          我們首先定義一個Runnable類:

          public class Task implements Runnable {
          @Override
          public void run() {
          // task details
          }
          }
          復制代碼

          我們可以通過Executors來方便的創(chuàng)建ExecutorService:

          ExecutorService executor = Executors.newFixedThreadPool(10);
          復制代碼

          上面創(chuàng)建了一個ThreadPool, 我們也可以創(chuàng)建單線程的ExecutorService:

          ExecutorService executor =Executors.newSingleThreadExecutor();
          復制代碼

          我們這樣提交task:

          public void execute() { 
          executor.submit(new Task());
          }
          復制代碼

          因為ExecutorService維持了一個隊列,所以它不會自動關(guān)閉, 我們需要調(diào)用executor.shutdown() 或者executor.shutdownNow()來關(guān)閉它。

          如果想要判斷ExecutorService中的線程在收到shutdown請求后是否全部執(zhí)行完畢,可以調(diào)用如下的方法:

          try {
          executor.awaitTermination( 5l, TimeUnit.SECONDS );
          } catch (InterruptedException e) {
          e.printStackTrace();
          }
          復制代碼

          ScheduledExecutorService

          ScheduledExecutorService和ExecutorService很類似,但是它可以周期性的執(zhí)行任務(wù)。

          我們這樣創(chuàng)建ScheduledExecutorService:

          ScheduledExecutorService executorService
          = Executors.newSingleThreadScheduledExecutor();
          復制代碼

          executorService的schedule方法,可以傳入Runnable也可以傳入Callable:

          Future<String> future = executorService.schedule(() -> {
          // ...
          return "Hello world";
          }, 1, TimeUnit.SECONDS);

          ScheduledFuture<?> scheduledFuture = executorService.schedule(() -> {
          // ...
          }, 1, TimeUnit.SECONDS);
          復制代碼

          還有兩個比較相近的方法:

          scheduleAtFixedRate( Runnable command, long initialDelay, long period, TimeUnit unit )

          scheduleWithFixedDelay( Runnable command, long initialDelay, long delay, TimeUnit unit )
          復制代碼

          兩者的區(qū)別是前者的period是以任務(wù)開始時間來計算的,后者是以任務(wù)結(jié)束時間來計算。

          Future

          Future用來獲取異步執(zhí)行的結(jié)果。可以調(diào)用cancel(boolean mayInterruptIfRunning) 方法來取消線程的執(zhí)行。

          我們看下怎么得到一個Future對象:

          public void invoke() {
          ExecutorService executorService = Executors.newFixedThreadPool(10);

          Future<String> future = executorService.submit(() -> {
          // ...
          Thread.sleep(10000l);
          return "Hello world";
          });
          }
          復制代碼

          我們看下怎么獲取Future的結(jié)果:

          if (future.isDone() && !future.isCancelled()) {
          try {
          str = future.get();
          } catch (InterruptedException | ExecutionException e) {
          e.printStackTrace();
          }
          }
          復制代碼

          future還可以接受一個時間參數(shù),超過指定的時間,將會報TimeoutException。

          try {
          future.get(10, TimeUnit.SECONDS);
          } catch (InterruptedException | ExecutionException | TimeoutException e) {
          e.printStackTrace();
          }
          復制代碼

          CountDownLatch

          CountDownLatch是一個并發(fā)中很有用的類,CountDownLatch會初始化一個counter,通過這個counter變量,來控制資源的訪問。我們會在后面的文章詳細介紹。

          CyclicBarrier

          CyclicBarrier和CountDownLatch很類似。CyclicBarrier主要用于多個線程互相等待的情況,可以通過調(diào)用await() 方法等待,知道達到要等的數(shù)量。

          public class Task implements Runnable {

          private CyclicBarrier barrier;

          public Task(CyclicBarrier barrier) {
          this.barrier = barrier;
          }

          @Override
          public void run() {
          try {
          LOG.info(Thread.currentThread().getName() +
          " is waiting");
          barrier.await();
          LOG.info(Thread.currentThread().getName() +
          " is released");
          } catch (InterruptedException | BrokenBarrierException e) {
          e.printStackTrace();
          }
          }

          }
          復制代碼
          public void start() {

          CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
          // ...
          LOG.info("All previous tasks are completed");
          });

          Thread t1 = new Thread(new Task(cyclicBarrier), "T1");
          Thread t2 = new Thread(new Task(cyclicBarrier), "T2");
          Thread t3 = new Thread(new Task(cyclicBarrier), "T3");

          if (!cyclicBarrier.isBroken()) {
          t1.start();
          t2.start();
          t3.start();
          }
          }
          復制代碼

          Semaphore

          Semaphore包含了一定數(shù)量的許可證,通過獲取許可證,從而獲得對資源的訪問權(quán)限。通過 tryAcquire()來獲取許可,如果獲取成功,許可證的數(shù)量將會減少。

          一旦線程release()許可,許可的數(shù)量將會增加。

          我們看下怎么使用:

          static Semaphore semaphore = new Semaphore(10);

          public void execute() throws InterruptedException {

          LOG.info("Available permit : " + semaphore.availablePermits());
          LOG.info("Number of threads waiting to acquire: " +
          semaphore.getQueueLength());

          if (semaphore.tryAcquire()) {
          try {
          // ...
          }
          finally {
          semaphore.release();
          }
          }

          }
          復制代碼

          ThreadFactory

          ThreadFactory可以很方便的用來創(chuàng)建線程:

          public class ThreadFactoryUsage implements ThreadFactory {
          private int threadId;
          private String name;

          public ThreadFactoryUsage(String name) {
          threadId = 1;
          this.name = name;
          }

          @Override
          public Thread newThread(Runnable r) {
          Thread t = new Thread(r, name + "-Thread_" + threadId);
          log.info("created new thread with id : " + threadId +
          " and name : " + t.getName());
          threadId++;
          return t;
          }
          }
          復制代碼

          第二章 java并發(fā)中的Synchronized關(guān)鍵詞

          如果在多線程的環(huán)境中,我們經(jīng)常會遇到資源競爭的情況,比如多個線程要去同時修改同一個共享變量,這時候,就需要對資源的訪問方法進行一定的處理,保證同一時間只有一個線程訪問。

          java提供了synchronized關(guān)鍵字,方便我們實現(xiàn)上述操作。

          為什么要同步

          我們舉個例子,我們創(chuàng)建一個類,提供了一個setSum的方法:


          public class SynchronizedMethods {

          private int sum = 0;

          public void calculate() {
          setSum(getSum() + 1);
          }
          }
          復制代碼

          如果我們在多線程的環(huán)境中調(diào)用這個calculate方法:

              @Test
          public void givenMultiThread_whenNonSyncMethod() throws InterruptedException {
          ExecutorService service = Executors.newFixedThreadPool(3);
          SynchronizedMethods summation = new SynchronizedMethods();

          IntStream.range(0, 1000)
          .forEach(count -> service.submit(summation::calculate));
          service.shutdown();
          service.awaitTermination(1000, TimeUnit.MILLISECONDS);

          assertEquals(1000, summation.getSum());
          }
          復制代碼

          按照上面的方法,我們預計要返回1000, 但是實際上基本不可能得到1000這個值,因為在多線程環(huán)境中,對同一個資源進行同時操作帶來的不利影響。

          那我們怎么才能夠建線程安全的環(huán)境呢?

          Synchronized關(guān)鍵詞

          java提供了多種線程安全的方法,本文主要講解Synchronized關(guān)鍵詞,Synchronized關(guān)鍵詞可以有很多種形式:

          • Instance methods

          • Static methods

          • Code blocks

          當我們使用synchronized時,java會在相應(yīng)的對象上加鎖,從而在同一個對象等待鎖的方法都必須順序執(zhí)行,從而保證了線程的安全。

          Synchronized Instance Methods

          Synchronized關(guān)鍵詞可以放在實例方法的前面:

              public synchronized void synchronisedCalculate() {
          setSum(getSum() + 1);
          }
          復制代碼

          看下調(diào)用結(jié)果:

          @Test
          public void givenMultiThread_whenMethodSync() {
          ExecutorService service = Executors.newFixedThreadPool(3);
          SynchronizedMethods method = new SynchronizedMethods();

          IntStream.range(0, 1000)
          .forEach(count -> service.submit(method::synchronisedCalculate));
          service.awaitTermination(1000, TimeUnit.MILLISECONDS);

          assertEquals(1000, method.getSum());
          }
          復制代碼

          這里synchronized將會鎖住該方法的實例對象,多個線程中只有獲得該實例對象鎖的線程才能夠執(zhí)行。

          Synchronized Static Methods

          Synchronized關(guān)鍵詞也可以用在static方法前面:

              public static synchronized void syncStaticCalculate() {
          staticSum = staticSum + 1;
          }
          復制代碼

          Synchronized放在static方法前面和實例方法前面鎖住的對象不同。放在static方法前面鎖住的對象是這個Class本身,因為一個Class在JVM中只會存在一個,所以不管有多少該Class的實例,在同一時刻只會有一個線程可以執(zhí)行該放方法。

              @Test
          public void givenMultiThread_whenStaticSyncMethod() throws InterruptedException {
          ExecutorService service = Executors.newCachedThreadPool();

          IntStream.range(0, 1000)
          .forEach(count ->
          service.submit(SynchronizedMethods::syncStaticCalculate));
          service.shutdown();
          service.awaitTermination(100, TimeUnit.MILLISECONDS);

          assertEquals(1000, SynchronizedMethods.staticSum);
          }
          復制代碼

          Synchronized Blocks

          有時候,我們可能不需要Synchronize整個方法,而是同步其中的一部分,這時候,我們可以使用Synchronized Blocks:

              public void performSynchronizedTask() {
          synchronized (this) {
          setSum(getSum() + 1);
          }
          }
          復制代碼

          我們看下怎么測試:

              @Test
          public void givenMultiThread_whenBlockSync() throws InterruptedException {
          ExecutorService service = Executors.newFixedThreadPool(3);
          SynchronizedMethods synchronizedBlocks = new SynchronizedMethods();

          IntStream.range(0, 1000)
          .forEach(count ->
          service.submit(synchronizedBlocks::performSynchronizedTask));
          service.shutdown();
          service.awaitTermination(100, TimeUnit.MILLISECONDS);

          assertEquals(1000, synchronizedBlocks.getSum());
          }
          復制代碼

          上面我們同步的是實例,如果在靜態(tài)方法中,我們也可以同步class:

              public static void performStaticSyncTask(){
          synchronized (SynchronizedMethods.class) {
          staticSum = staticSum + 1;
          }
          }
          復制代碼

          我們看下怎么測試:

              @Test
          public void givenMultiThread_whenStaticSyncBlock() throws InterruptedException {
          ExecutorService service = Executors.newCachedThreadPool();

          IntStream.range(0, 1000)
          .forEach(count ->
          service.submit(SynchronizedMethods::performStaticSyncTask));
          service.shutdown();
          service.awaitTermination(100, TimeUnit.MILLISECONDS);

          assertEquals(1000, SynchronizedMethods.staticSum);
          }
          復制代碼

          第三章 java中的Volatile關(guān)鍵字使用

          在本文中,我們會介紹java中的一個關(guān)鍵字volatile。 volatile的中文意思是易揮發(fā)的,不穩(wěn)定的。那么在java中使用是什么意思呢?

          我們知道,在java中,每個線程都會有個自己的內(nèi)存空間,我們稱之為working memory。這個空間會緩存一些變量的信息,從而提升程序的性能。當執(zhí)行完某個操作之后,thread會將更新后的變量更新到主緩存中,以供其他線程讀寫。

          因為變量存在working memory和main memory兩個地方,那么就有可能出現(xiàn)不一致的情況。 那么我們就可以使用Volatile關(guān)鍵字來強制將變量直接寫到main memory,從而保證了不同線程讀寫到的是同一個變量。

          什么時候使用volatile

          那么我們什么時候使用volatile呢?當一個線程需要立刻讀取到另外一個線程修改的變量值的時候,我們就可以使用volatile。我們來舉個例子:

          public class VolatileWithoutUsage {
          private int count = 0;

          public void incrementCount() {
          count++;
          }
          public int getCount() {
          return count;
          }
          }
          復制代碼

          這個類定義了一個incrementCount()方法,會去更新count值,我們接下來在多線程環(huán)境中去測試這個方法:

              @Test
          public void testWithoutVolatile() throws InterruptedException {
          ExecutorService service= Executors.newFixedThreadPool(3);
          VolatileWithoutUsage volatileWithoutUsage=new VolatileWithoutUsage();

          IntStream.range(0,1000).forEach(count ->service.submit(volatileWithoutUsage::incrementCount) );
          service.shutdown();
          service.awaitTermination(1000, TimeUnit.MILLISECONDS);
          assertEquals(1000,volatileWithoutUsage.getCount() );
          }
          復制代碼

          運行一下,我們會發(fā)現(xiàn)結(jié)果是不等于1000的。


          java.lang.AssertionError:
          Expected :1000
          Actual :999
          復制代碼

          這是因為多線程去更新同一個變量,我們在上篇文章也提到了,這種情況可以通過加Synchronized關(guān)鍵字來解決。

          那么是不是我們加上Volatile關(guān)鍵字后就可以解決這個問題了呢?

          public class VolatileFalseUsage {
          private volatile int count = 0;

          public void incrementCount() {
          count++;
          }
          public int getCount() {
          return count;
          }

          }
          復制代碼

          上面的類中,我們加上了關(guān)鍵字Volatile,我們再測試一下:

              @Test
          public void testWithVolatileFalseUsage() throws InterruptedException {
          ExecutorService service= Executors.newFixedThreadPool(3);
          VolatileFalseUsage volatileFalseUsage=new VolatileFalseUsage();

          IntStream.range(0,1000).forEach(count ->service.submit(volatileFalseUsage::incrementCount) );
          service.shutdown();
          service.awaitTermination(5000, TimeUnit.MILLISECONDS);
          assertEquals(1000,volatileFalseUsage.getCount() );
          }
          復制代碼

          運行一下,我們會發(fā)現(xiàn)結(jié)果還是錯誤的:

          java.lang.AssertionError: 
          Expected :1000
          Actual :992
          ~~

          為什么呢? 我們先來看下count++的操作,count++可以分解為三步操作,1. 讀取count的值,2.給count加1, 3.將count寫回內(nèi)存。添加Volatile關(guān)鍵詞只能夠保證count的變化立馬可見,而不能保證1,2,3這三個步驟的總體原子性。 要實現(xiàn)總體的原子性還是需要用到類似Synchronized的關(guān)鍵字。

          下面看下正確的用法:

          ~~~java
          public class VolatileTrueUsage {
          private volatile int count = 0;

          public void setCount(int number) {
          count=number;
          }
          public int getCount() {
          return count;
          }
          }
          復制代碼
              @Test
          public void testWithVolatileTrueUsage() throws InterruptedException {
          VolatileTrueUsage volatileTrueUsage=new VolatileTrueUsage();
          Thread threadA = new Thread(()->volatileTrueUsage.setCount(10));
          threadA.start();
          Thread.sleep(100);

          Thread reader = new Thread(() -> {
          int valueReadByThread = volatileTrueUsage.getCount();
          assertEquals(10, valueReadByThread);
          });
          reader.start();
          }
          復制代碼

          Happens-Before

          從java5之后,volatile提供了一個Happens-Before的功能。Happens-Before 是指當volatile進行寫回主內(nèi)存的操作時,會將之前的非volatile的操作一并寫回主內(nèi)存。

          public class VolatileHappenBeforeUsage {

          int a = 0;
          volatile boolean flag = false;

          public void writer() {
          a = 1; // 1 線程A修改共享變量
          flag = true; // 2 線程A寫volatile變量
          }
          }
          復制代碼

          上面的例子中,a是一個非volatile變量,flag是一個volatile變量,但是由于happens-before的特性,a 將會表現(xiàn)的和volatile一樣。

          第四章  wait和sleep的區(qū)別

          在本篇文章中,我們將會討論一下java中wait()和sleep()方法的區(qū)別。并討論一下怎么使用這兩個方法。

          Wait和sleep的區(qū)別

          wait() 是Object中定義的native方法:

          public final native void wait(long timeout) throws InterruptedException;
          復制代碼

          所以每一個類的實例都可以調(diào)用這個方法。wait()只能在synchronized block中調(diào)用。它會釋放synchronized時加在object上的鎖。

          sleep()是定義Thread中的native靜態(tài)類方法:

          public static native void sleep(long millis) throws InterruptedException;
          復制代碼

          所以Thread.sleep()可以在任何情況下調(diào)用。Thread.sleep()將會暫停當前線程,并且不會釋放任何鎖資源。

          我們先看一下一個簡單的wait使用:

          @Slf4j
          public class WaitUsage {

          private static Object LOCK = new Object();

          public static void WaitExample() throws InterruptedException {
          synchronized (LOCK) {
          LOCK.wait(1000);
          log.info("Object '" + LOCK + "' is woken after" +
          " waiting for 1 second");
          }
          }
          }
          復制代碼

          再看一下sleep的使用:

          @Slf4j
          public class SleepUsage {

          public static void sleepExample() throws InterruptedException {
          Thread.sleep(1000);
          log.info(
          "Thread '" + Thread.currentThread().getName() +
          "' is woken after sleeping for 1 second");
          }
          }
          復制代碼

          喚醒wait和sleep

          sleep()方法自帶sleep時間,時間過后,Thread會自動被喚醒。 或者可以通過調(diào)用interrupt()方法來中斷。

          相比而言wait的喚醒會比較復雜,我們需要調(diào)用notify() 和 notifyAll()方法來喚醒等待在特定wait object上的線程。

          notify()會根據(jù)線程調(diào)度的機制選擇一個線程來喚醒,而notifyAll()會喚醒所有等待的線程,由這些線程重新爭奪資源鎖。

          wait,notity通常用在生產(chǎn)者和消費者情形,我們看下怎么使用:

          @Slf4j
          public class WaitNotifyUsage {

          private int count =0;

          public void produceMessage() throws InterruptedException {

          while(true) {
          synchronized (this) {
          while (count == 5) {
          log.info("count == 5 , wait ....");
          wait();
          }
          count++;
          log.info("produce count {}", count);
          notify();
          }
          }
          }

          public void consumeMessage() throws InterruptedException {

          while (true) {
          synchronized (this) {
          while (count == 0) {
          log.info("count == 0, wait ...");
          wait();
          }
          log.info("consume count {}", count);
          count--;
          notify();
          }
          }
          }
          }
          復制代碼

          看下怎么調(diào)用:

             @Test
          public void testWaitNotifyUsage() throws InterruptedException{
          WaitNotifyUsage waitNotifyUsage=new WaitNotifyUsage();

          ExecutorService executorService=Executors.newFixedThreadPool(4);
          executorService.submit(()-> {
          try {
          waitNotifyUsage.produceMessage();
          } catch (InterruptedException e) {
          e.printStackTrace();
          }
          });

          executorService.submit(()-> {
          try {
          waitNotifyUsage.consumeMessage();
          } catch (InterruptedException e) {
          e.printStackTrace();
          }
          });

          Thread.sleep(50000);
          }
          復制代碼

          第五章 java中Future的使用

          Future是java 1.5引入的一個interface,可以方便的用于異步結(jié)果的獲取。 本文將會通過具體的例子講解如何使用Future。

          創(chuàng)建Future

          正如上面所說,F(xiàn)uture代表的是異步執(zhí)行的結(jié)果,意思是當異步執(zhí)行結(jié)束之后,返回的結(jié)果將會保存在Future中。

          那么我們什么時候會用到Future呢? 一般來說,當我們執(zhí)行一個長時間運行的任務(wù)時,使用Future就可以讓我們暫時去處理其他的任務(wù),等長任務(wù)執(zhí)行完畢再返回其結(jié)果。

          經(jīng)常會使用到Future的場景有:1. 計算密集場景。2. 處理大數(shù)據(jù)量。3. 遠程方法調(diào)用等。

          接下來我們將會使用ExecutorService來創(chuàng)建一個Future。

              <T> Future<T> submit(Callable<T> task);
          復制代碼

          上面是ExecutorService中定義的一個submit方法,它接收一個Callable參數(shù),并返回一個Future。

          我們用一個線程來計算一個平方運算:

              private ExecutorService executor
          = Executors.newSingleThreadExecutor();

          public Future<Integer> calculate(Integer input) {
          return executor.submit(() -> {
          System.out.println("Calculating..."+ input);
          Thread.sleep(1000);
          return input * input;
          });
          }
          復制代碼

          submit需要接受一個Callable參數(shù),Callable需要實現(xiàn)一個call方法,并返回結(jié)果。這里我們使用lamaba表達式來簡化這一個流程。

          從Future獲取結(jié)果

          上面我們創(chuàng)建好了Future,接下來我們看一下怎么獲取到Future的值。

                 FutureUsage futureUsage=new FutureUsage();
          Future<Integer> futureOne = futureUsage.calculate(20);
          while(!futureOne.isDone()) {
          System.out.println("Calculating...");
          Thread.sleep(300);
          }
          Integer result = futureOne.get();
          復制代碼

          首先我們通過Future.isDone() 來判斷這個異步操作是否執(zhí)行完畢,如果完畢我們就可以直接調(diào)用futureOne.get()來獲得Futre的結(jié)果。

          這里futureOne.get()是一個阻塞操作,會一直等待異步執(zhí)行完畢才返回結(jié)果。

          如果我們不想等待,future提供了一個帶時間的方法:

          Integer result = futureOne.get(500, TimeUnit.MILLISECONDS);
          復制代碼

          如果在等待時間結(jié)束的時候,F(xiàn)uture還有返回,則會拋出一個TimeoutException。

          取消Future

          如果我們提交了一個異步程序,但是想取消它, 則可以這樣:

          uture<Integer> futureTwo = futureUsage.calculate(4);

          boolean canceled = futureTwo.cancel(true);
          復制代碼

          Future.cancel(boolean) 傳入一個boolean參數(shù),來選擇是否中斷正在運行的task。

          如果我們cancel之后,再次調(diào)用get()方法,則會拋出CancellationException。

          多線程環(huán)境中運行

          如果有兩個計算任務(wù),先看下在單線程下運行的結(jié)果。

                  Future<Integer> future1 = futureUsage.calculate(10);
          Future<Integer> future2 = futureUsage.calculate(100);

          while (!(future1.isDone() && future2.isDone())) {
          System.out.println(
          String.format(
          "future1 is %s and future2 is %s",
          future1.isDone() ? "done" : "not done",
          future2.isDone() ? "done" : "not done"
          )
          );
          Thread.sleep(300);
          }

          Integer result1 = future1.get();
          Integer result2 = future2.get();

          System.out.println(result1 + " and " + result2);
          復制代碼

          因為我們通過Executors.newSingleThreadExecutor()來創(chuàng)建的單線程池。所以運行結(jié)果如下:

          Calculating...10
          future1 is not done and future2 is not done
          future1 is not done and future2 is not done
          future1 is not done and future2 is not done
          future1 is not done and future2 is not done
          Calculating...100
          future1 is done and future2 is not done
          future1 is done and future2 is not done
          future1 is done and future2 is not done
          100 and 10000
          復制代碼

          如果我們使用Executors.newFixedThreadPool(2)來創(chuàng)建一個多線程池,則可以得到如下的結(jié)果:

          calculating...10
          calculating...100
          future1 is not done and future2 is not done
          future1 is not done and future2 is not done
          future1 is not done and future2 is not done
          future1 is not done and future2 is not done
          100 and 10000
          復制代碼

          第六章 java并發(fā)中ExecutorService的使用

          ExecutorService是java中的一個異步執(zhí)行的框架,通過使用ExecutorService可以方便的創(chuàng)建多線程執(zhí)行環(huán)境。

          本文將會詳細的講解ExecutorService的具體使用。

          創(chuàng)建ExecutorService

          通常來說有兩種方法來創(chuàng)建ExecutorService。

          第一種方式是使用Executors中的工廠類方法,例如:

          ExecutorService executor = Executors.newFixedThreadPool(10);
          復制代碼

          除了newFixedThreadPool方法之外,Executors還包含了很多創(chuàng)建ExecutorService的方法。

          第二種方法是直接創(chuàng)建一個ExecutorService, 因為ExecutorService是一個interface,我們需要實例化ExecutorService的一個實現(xiàn)。

          這里我們使用ThreadPoolExecutor來舉例:

          ExecutorService executorService =
          new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
          new LinkedBlockingQueue<Runnable>());
          復制代碼

          為ExecutorService分配Tasks

          ExecutorService可以執(zhí)行Runnable和Callable的task。其中Runnable是沒有返回值的,而Callable是有返回值的。我們分別看一下兩種情況的使用:

          Runnable runnableTask = () -> {
          try {
          TimeUnit.MILLISECONDS.sleep(300);
          } catch (InterruptedException e) {
          e.printStackTrace();
          }
          };

          Callable<String> callableTask = () -> {
          TimeUnit.MILLISECONDS.sleep(300);
          return "Task's execution";
          };
          復制代碼

          將task分配給ExecutorService,可以通過調(diào)用xecute(), submit(), invokeAny(), invokeAll()這幾個方法來實現(xiàn)。

          execute() 返回值是void,他用來提交一個Runnable task。

          executorService.execute(runnableTask);
          復制代碼

          submit() 返回值是Future,它可以提交Runnable task, 也可以提交Callable task。 提交Runnable的有兩個方法:

          <T> Future<T> submit(Runnable task, T result);

          Future<?> submit(Runnable task);
          復制代碼

          第一個方法在返回傳入的result。第二個方法返回null。

          再看一下callable的使用:

          Future<String> future = 
          executorService.submit(callableTask);
          復制代碼

          invokeAny() 將一個task列表傳遞給executorService,并返回其中的一個成功返回的結(jié)果。

          String result = executorService.invokeAny(callableTasks);
          復制代碼

          invokeAll() 將一個task列表傳遞給executorService,并返回所有成功執(zhí)行的結(jié)果:

          List<Future<String>> futures = executorService.invokeAll(callableTasks);
          復制代碼

          關(guān)閉ExecutorService

          如果ExecutorService中的任務(wù)運行完畢之后,ExecutorService不會自動關(guān)閉。它會等待接收新的任務(wù)。如果需要關(guān)閉ExecutorService, 我們需要調(diào)用shutdown() 或者 shutdownNow() 方法。

          shutdown() 會立即銷毀ExecutorService,它會讓ExecutorServic停止接收新的任務(wù),并等待現(xiàn)有任務(wù)全部執(zhí)行完畢再銷毀。

          executorService.shutdown();
          復制代碼

          shutdownNow()并不保證所有的任務(wù)都被執(zhí)行完畢,它會返回一個未執(zhí)行任務(wù)的列表:

          List<Runnable> notExecutedTasks = executorService.shutdownNow();
          復制代碼

          oracle推薦的最佳關(guān)閉方法是和awaitTermination一起使用:

          executorService.shutdown();
          try {
          if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) {
          executorService.shutdownNow();
          }
          } catch (InterruptedException e) {
          executorService.shutdownNow();
          }
          復制代碼

          先停止接收任務(wù),然后再等待一定的時間讓所有的任務(wù)都執(zhí)行完畢,如果超過了給定的時間,則立刻結(jié)束任務(wù)。

          Future

          submit() 和 invokeAll() 都會返回Future對象。之前的文章我們已經(jīng)詳細講過了Future。 這里就只列舉一下怎么使用:

          Future<String> future = executorService.submit(callableTask);
          String result = null;
          try {
          result = future.get();
          } catch (InterruptedException | ExecutionException e) {
          e.printStackTrace();
          }
          復制代碼

          ScheduledExecutorService

          ScheduledExecutorService為我們提供了定時執(zhí)行任務(wù)的機制。

          我們這樣創(chuàng)建ScheduledExecutorService:

          ScheduledExecutorService executorService
          = Executors.newSingleThreadScheduledExecutor();
          復制代碼

          executorService的schedule方法,可以傳入Runnable也可以傳入Callable:

          Future<String> future = executorService.schedule(() -> {
          // ...
          return "Hello world";
          }, 1, TimeUnit.SECONDS);

          ScheduledFuture<?> scheduledFuture = executorService.schedule(() -> {
          // ...
          }, 1, TimeUnit.SECONDS);
          復制代碼

          還有兩個比較相近的方法:

          scheduleAtFixedRate( Runnable command, long initialDelay, long period, TimeUnit unit )

          scheduleWithFixedDelay( Runnable command, long initialDelay, long delay, TimeUnit unit )
          復制代碼

          兩者的區(qū)別是前者的period是以任務(wù)開始時間來計算的,后者是以任務(wù)結(jié)束時間來計算。

          ExecutorService和 Fork/Join

          java 7 引入了Fork/Join框架。 那么兩者的區(qū)別是什么呢?

          ExecutorService可以由用戶來自己控制生成的線程,提供了對線程更加細粒度的控制。而Fork/Join則是為了讓任務(wù)更加快速的執(zhí)行完畢。

          第七章 java中Runnable和Callable的區(qū)別

          在java的多線程開發(fā)中Runnable一直以來都是多線程的核心,而Callable是java1.5添加進來的一個增強版本。

          本文我們會詳細探討Runnable和Callable的區(qū)別。

          運行機制

          首先看下Runnable和Callable的接口定義:

          @FunctionalInterface
          public interface Runnable {
          /**
          * When an object implementing interface <code>Runnable</code> is used
          * to create a thread, starting the thread causes the object's
          * <code>run</code> method to be called in that separately executing
          * thread.
          * <p>
          * The general contract of the method <code>run</code> is that it may
          * take any action whatsoever.
          *
          * @see java.lang.Thread#run()
          */
          public abstract void run();
          }
          復制代碼
          @FunctionalInterface
          public interface Callable<V> {
          /**
          * Computes a result, or throws an exception if unable to do so.
          *
          * @return computed result
          * @throws Exception if unable to compute a result
          */
          V call() throws Exception;
          }
          復制代碼

          Runnable需要實現(xiàn)run()方法,Callable需要實現(xiàn)call()方法。

          我們都知道要自定義一個Thread有兩種方法,一是繼承Thread,而是實現(xiàn)Runnable接口,這是因為Thread本身就是一個Runnable的實現(xiàn):

          class Thread implements Runnable {
          /* Make sure registerNatives is the first thing <clinit> does. */
          private static native void registerNatives();
          static {
          registerNatives();
          }
          ...
          復制代碼

          所以Runnable可以通過Runnable和之前我們介紹的ExecutorService 來執(zhí)行,而Callable則只能通過ExecutorService 來執(zhí)行。

          返回值的不同

          根據(jù)上面兩個接口的定義,Runnable是不返還值的,而Callable可以返回值。

          如果我們都通過ExecutorService來提交,看看有什么不同:

          • 使用runnable

              public void executeTask() {
          ExecutorService executorService = Executors.newSingleThreadExecutor();
          Future future = executorService.submit(()->log.info("in runnable!!!!"));
          executorService.shutdown();
          }
          復制代碼
          • 使用callable

              public void executeTask() {
          ExecutorService executorService = Executors.newSingleThreadExecutor();
          Future future = executorService.submit(()->{
          log.info("in callable!!!!");
          return "callable";
          });
          executorService.shutdown();
          }
          復制代碼

          雖然我們都返回了Future,但是runnable的情況下Future將不包含任何值。

          Exception處理

          Runnable的run()方法定義沒有拋出任何異常,所以任何的Checked Exception都需要在run()實現(xiàn)方法中自行處理。

          Callable的Call()方法拋出了throws Exception,所以可以在call()方法的外部,捕捉到Checked Exception。我們看下Callable中異常的處理。

           public void executeTaskWithException(){
          ExecutorService executorService = Executors.newSingleThreadExecutor();
          Future future = executorService.submit(()->{
          log.info("in callable!!!!");
          throw new CustomerException("a customer Exception");
          });
          try {
          Object object= future.get();
          } catch (InterruptedException e) {
          e.printStackTrace();
          } catch (ExecutionException e) {
          e.printStackTrace();
          e.getCause();
          }
          executorService.shutdown();
          }
          復制代碼

          上面的例子中,我們在Callable中拋出了一個自定義的CustomerException。

          這個異常會被包含在返回的Future中。當我們調(diào)用future.get()方法時,就會拋出ExecutionException,通過e.getCause(),就可以獲取到包含在里面的具體異常信息。

          第八章 ThreadLocal的使用

          ThreadLocal主要用來為當前線程存儲數(shù)據(jù),這個數(shù)據(jù)只有當前線程可以訪問。

          在定義ThreadLocal的時候,我們可以同時定義存儲在ThreadLocal中的特定類型的對象。

          ThreadLocal<Integer> threadLocalValue = new ThreadLocal<>();
          復制代碼

          上面我們定義了一個存儲Integer的ThreadLocal對象。

          要存儲和獲取ThreadLocal中的對象也非常簡單,使用get()和set()即可:

          threadLocalValue.set(1);
          Integer result = threadLocalValue.get();
          復制代碼

          我可以將ThreadLocal看成是一個map,而當前的線程就是map中的key。

          除了new一個ThreadLocal對象,我們還可以通過:

              public static <S> ThreadLocal<S> withInitial(Supplier<? extends S> supplier) {
          return new SuppliedThreadLocal<>(supplier);
          }
          復制代碼

          ThreadLocal提供的靜態(tài)方法withInitial來初始化一個ThreadLocal。

          ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 1);
          復制代碼

          withInitial需要一個Supplier對象,通過調(diào)用Supplier的get()方法獲取到初始值。

          要想刪除ThreadLocal中的存儲數(shù)據(jù),可以調(diào)用:

          threadLocal.remove();
          復制代碼

          下面我通過兩個例子的對比,來看一下使用ThreadLocal的好處。

          在實際的應(yīng)用中,我們通常會需要為不同的用戶請求存儲不同的用戶信息,一般來說我們需要構(gòu)建一個全局的Map,來根據(jù)不同的用戶ID,來存儲不同的用戶信息,方便在后面獲取。

          在Map中存儲用戶數(shù)據(jù)

          我們先看下如果使用全局的Map該怎么用:

          public class SharedMapWithUserContext implements Runnable {

          public static Map<Integer, Context> userContextPerUserId
          = new ConcurrentHashMap<>();
          private Integer userId;
          private UserRepository userRepository = new UserRepository();

          public SharedMapWithUserContext(int i) {
          this.userId=i;
          }

          @Override
          public void run() {
          String userName = userRepository.getUserNameForUserId(userId);
          userContextPerUserId.put(userId, new Context(userName));
          }
          }
          復制代碼

          這里我們定義了一個static的Map來存取用戶信息。

          再看一下怎么使用:

              @Test
          public void testWithMap(){
          SharedMapWithUserContext firstUser = new SharedMapWithUserContext(1);
          SharedMapWithUserContext secondUser = new SharedMapWithUserContext(2);
          new Thread(firstUser).start();
          new Thread(secondUser).start();
          assertEquals(SharedMapWithUserContext.userContextPerUserId.size(), 2);
          }
          復制代碼

          在ThreadLocal中存儲用戶數(shù)據(jù)

          如果我們要在ThreadLocal中使用可以這樣:

          public class ThreadLocalWithUserContext implements Runnable {

          private static ThreadLocal<Context> userContext
          = new ThreadLocal<>();
          private Integer userId;
          private UserRepository userRepository = new UserRepository();

          public ThreadLocalWithUserContext(int i) {
          this.userId=i;
          }

          @Override
          public void run() {
          String userName = userRepository.getUserNameForUserId(userId);
          userContext.set(new Context(userName));
          System.out.println("thread context for given userId: "
          + userId + " is: " + userContext.get());
          }

          }
          復制代碼

          測試代碼如下:

          public class ThreadLocalWithUserContextTest {

          @Test
          public void testWithThreadLocal(){
          ThreadLocalWithUserContext firstUser
          = new ThreadLocalWithUserContext(1);
          ThreadLocalWithUserContext secondUser
          = new ThreadLocalWithUserContext(2);
          new Thread(firstUser).start();
          new Thread(secondUser).start();
          }
          }
          復制代碼

          運行之后,我們可以得到下面的結(jié)果:

          thread context for given userId: 1 is: com.flydean.Context@411734d4
          thread context for given userId: 2 is: com.flydean.Context@1e9b6cc
          復制代碼

          不同的用戶信息被存儲在不同的線程環(huán)境中。

          注意,我們使用ThreadLocal的時候,一定是我們可以自由的控制所創(chuàng)建的線程。如果在ExecutorService環(huán)境下,就最好不要使用ThreadLocal,因為在ExecutorService中,線程是不可控的。

          第九章 java中線程的生命周期

          線程是java中繞不過去的一個話題, 今天本文將會詳細講解java中線程的生命周期,希望可以給大家一些啟發(fā)。

          java中Thread的狀態(tài)

          java中Thread有6種狀態(tài),分別是:

          1. NEW - 新創(chuàng)建的Thread,還沒有開始執(zhí)行

          2. RUNNABLE - 可運行狀態(tài)的Thread,包括準備運行和正在運行的。

          3. BLOCKED - 正在等待資源鎖的線程

          4. WAITING - 正在無限期等待其他線程來執(zhí)行某個特定操作

          5. TIMED_WAITING - 在一定的時間內(nèi)等待其他線程來執(zhí)行某個特定操作

          6. TERMINATED - 線程執(zhí)行完畢

          我們可以用一個圖來直觀的表示:

          JDK代碼中的定義如下:

          public enum State {
          /**
          * Thread state for a thread which has not yet started.
          */
          NEW,

          /**
          * Thread state for a runnable thread. A thread in the runnable
          * state is executing in the Java virtual machine but it may
          * be waiting for other resources from the operating system
          * such as processor.
          */
          RUNNABLE,

          /**
          * Thread state for a thread blocked waiting for a monitor lock.
          * A thread in the blocked state is waiting for a monitor lock
          * to enter a synchronized block/method or
          * reenter a synchronized block/method after calling
          * {@link Object#wait() Object.wait}.
          */
          BLOCKED,

          /**
          * Thread state for a waiting thread.
          * A thread is in the waiting state due to calling one of the
          * following methods:
          * <ul>
          * <li>{@link Object#wait() Object.wait} with no timeout</li>
          * <li>{@link #join() Thread.join} with no timeout</li>
          * <li>{@link LockSupport#park() LockSupport.park}</li>
          * </ul>
          *
          * <p>A thread in the waiting state is waiting for another thread to
          * perform a particular action.
          *
          * For example, a thread that has called <tt>Object.wait()</tt>
          * on an object is waiting for another thread to call
          * <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on
          * that object. A thread that has called <tt>Thread.join()</tt>
          * is waiting for a specified thread to terminate.
          */
          WAITING,

          /**
          * Thread state for a waiting thread with a specified waiting time.
          * A thread is in the timed waiting state due to calling one of
          * the following methods with a specified positive waiting time:
          * <ul>
          * <li>{@link #sleep Thread.sleep}</li>
          * <li>{@link Object#wait(long) Object.wait} with timeout</li>
          * <li>{@link #join(long) Thread.join} with timeout</li>
          * <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
          * <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
          * </ul>
          */
          TIMED_WAITING,

          /**
          * Thread state for a terminated thread.
          * The thread has completed execution.
          */
          TERMINATED;
          }
          復制代碼

          NEW

          NEW 表示線程創(chuàng)建了,但是還沒有開始執(zhí)行。我們看一個NEW的例子:

          public class NewThread implements Runnable{
          public static void main(String[] args) {
          Runnable runnable = new NewThread();
          Thread t = new Thread(runnable);
          log.info(t.getState().toString());
          }

          @Override
          public void run() {

          }
          }
          復制代碼

          上面的代碼將會輸出:

          NEW
          復制代碼

          Runnable

          Runnable表示線程正在可執(zhí)行狀態(tài)。包括正在運行和準備運行兩種。

          為什么這兩種都叫做Runnable呢?我們知道在多任務(wù)環(huán)境中,CPU的個數(shù)是有限的,所以任務(wù)都是輪循占有CPU來處理的,JVM中的線程調(diào)度器會為每個線程分配特定的執(zhí)行時間,當執(zhí)行時間結(jié)束后,線程調(diào)度器將會釋放CPU,以供其他的Runnable線程執(zhí)行。

          我們看一個Runnable的例子:

          public class RunnableThread implements Runnable {
          @Override
          public void run() {

          }

          public static void main(String[] args) {
          Runnable runnable = new RunnableThread();
          Thread t = new Thread(runnable);
          t.start();
          log.info(t.getState().toString());
          }
          }
          復制代碼

          上面的代碼將會輸出:

          RUNNABLE
          復制代碼

          BLOCKED

          BLOCKED表示線程正在等待資源鎖,而目前該資源正在被其他線程占有。

          我們舉個例子:

          public class BlockThread implements Runnable {
          @Override
          public void run() {
          loopResource();
          }

          public static synchronized void loopResource() {
          while(true) {
          //無限循環(huán)
          }
          }

          public static void main(String[] args) throws InterruptedException {
          Thread t1 = new Thread(new BlockThread());
          Thread t2 = new Thread(new BlockThread());

          t1.start();
          t2.start();

          Thread.sleep(1000);
          log.info(t1.getState().toString());
          log.info(t2.getState().toString());
          System.exit(0);
          }
          }
          復制代碼

          上面的例子中,由于t1是無限循環(huán),將會一直占有資源鎖,導致t2無法獲取資源鎖,從而位于BLOCKED狀態(tài)。

          我們會得到如下結(jié)果:

          12:40:11.710 [main] INFO com.flydean.BlockThread - RUNNABLE
          12:40:11.713 [main] INFO com.flydean.BlockThread - BLOCKED
          復制代碼

          WAITING

          WAITING 狀態(tài)表示線程正在等待其他的線程執(zhí)行特定的操作。有三種方法可以導致線程處于WAITTING狀態(tài):

          1. object.wait()

          2. thread.join()

          3. LockSupport.park()

          其中1,2方法不需要傳入時間參數(shù)。

          我們看下使用的例子:

          public class WaitThread implements  Runnable{

          public static Thread t1;
          @Override
          public void run() {
          Thread t2 = new Thread(()->{
          try {
          Thread.sleep(10000);
          } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
          log.error("Thread interrupted", e);
          }
          log.info("t1"+t1.getState().toString());
          });
          t2.start();

          try {
          t2.join();
          } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
          log.error("Thread interrupted", e);
          }
          log.info("t2"+t2.getState().toString());
          }

          public static void main(String[] args) {
          t1 = new Thread(new WaitThread());
          t1.start();

          }
          }
          復制代碼

          在這個例子中,我們調(diào)用的t2.join(),這會使調(diào)用它的t1線程處于WAITTING狀態(tài)。

          我們看下輸出結(jié)果:

          12:44:12.958 [Thread-1] INFO com.flydean.WaitThread - t1 WAITING
          12:44:12.964 [Thread-0] INFO com.flydean.WaitThread - t2 TERMINATED
          復制代碼

          TIMED_WAITING

          TIMED_WAITING狀態(tài)表示在一個有限的時間內(nèi)等待其他線程執(zhí)行特定的某些操作。

          java中有5中方式來達到這種狀態(tài):

          1. thread.sleep(long millis)

          2. wait(int timeout) 或者 wait(int timeout, int nanos)

          3. thread.join(long millis)

          4. LockSupport.parkNanos

          5. LockSupport.parkUntil

          我們舉個例子:

          public class TimedWaitThread implements  Runnable{
          @Override
          public void run() {
          try {
          Thread.sleep(5000);
          } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
          log.error("Thread interrupted", e);
          }
          }

          public static void main(String[] args) throws InterruptedException {
          TimedWaitThread obj1 = new TimedWaitThread();
          Thread t1 = new Thread(obj1);
          t1.start();

          // The following sleep will give enough time for ThreadScheduler
          // to start processing of thread t1
          Thread.sleep(1000);
          log.info(t1.getState().toString());
          }
          }
          復制代碼

          上面的例子中我們調(diào)用了Thread.sleep(5000)來讓線程處于TIMED_WAITING狀態(tài)。

          看下輸出:

          12:58:02.706 [main] INFO com.flydean.TimedWaitThread - TIMED_WAITING
          復制代碼

          那么問題來了,TIMED_WAITING和WAITTING有什么區(qū)別呢?

          TIMED_WAITING如果在給定的時間內(nèi)沒有等到其他線程的特定操作,則會被喚醒,從而進入爭奪資源鎖的隊列,如果能夠獲取到鎖,則會變成Runnable狀態(tài),如果獲取不到鎖,則會變成BLOCKED狀態(tài)。

          TERMINATED

          TERMINATED表示線程已經(jīng)執(zhí)行完畢。我們看下例子:

          public class TerminatedThread implements Runnable{
          @Override
          public void run() {

          }

          public static void main(String[] args) throws InterruptedException {
          Thread t1 = new Thread(new TerminatedThread());
          t1.start();
          // The following sleep method will give enough time for
          // thread t1 to complete
          Thread.sleep(1000);
          log.info(t1.getState().toString());
          }
          }
          復制代碼

          輸出結(jié)果:

          13:02:38.868 [main] INFO com.flydean.TerminatedThread - TERMINATED
          復制代碼

          第十章 java中join的使用

          join()應(yīng)該是我們在java中經(jīng)常會用到的一個方法,它主要是將當前線程置為WAITTING狀態(tài),然后等待調(diào)用的線程執(zhí)行完畢或被interrupted。

          join()是Thread中定義的方法,我們看下他的定義:

             /**
          * Waits for this thread to die.
          *
          * <p> An invocation of this method behaves in exactly the same
          * way as the invocation
          *
          * <blockquote>
          * {@linkplain #join(long) join}{@code (0)}
          * </blockquote>
          *
          * @throws InterruptedException
          * if any thread has interrupted the current thread. The
          * <i>interrupted status</i> of the current thread is
          * cleared when this exception is thrown.
          */
          public final void join() throws InterruptedException {
          join(0);
          }

          復制代碼

          我們看下join是怎么使用的,通常我們需要在線程A中調(diào)用線程B.join():

          public class JoinThread implements Runnable{
          public int processingCount = 0;

          JoinThread(int processingCount) {
          this.processingCount = processingCount;
          log.info("Thread Created");
          }

          @Override
          public void run() {
          log.info("Thread " + Thread.currentThread().getName() + " started");
          while (processingCount > 0) {
          try {
          Thread.sleep(1000);
          } catch (InterruptedException e) {
          log.info("Thread " + Thread.currentThread().getName() + " interrupted");
          }
          processingCount--;
          }
          log.info("Thread " + Thread.currentThread().getName() + " exiting");
          }

          @Test
          public void joinTest()
          throws InterruptedException {
          Thread t2 = new Thread(new JoinThread(1));
          t2.start();
          log.info("Invoking join");
          t2.join();
          log.info("Returned from join");
          log.info("t2 status {}",t2.isAlive());
          }
          }
          復制代碼

          我們在主線程中調(diào)用了t2.join(),則主線程將會等待t2執(zhí)行完畢,我們看下輸出結(jié)果:

          06:17:14.775 [main] INFO com.flydean.JoinThread - Thread Created
          06:17:14.779 [main] INFO com.flydean.JoinThread - Invoking join
          06:17:14.779 [Thread-0] INFO com.flydean.JoinThread - Thread Thread-0 started
          06:17:15.783 [Thread-0] INFO com.flydean.JoinThread - Thread Thread-0 exiting
          06:17:15.783 [main] INFO com.flydean.JoinThread - Returned from join
          06:17:15.783 [main] INFO com.flydean.JoinThread - t2 status false
          復制代碼

          當線程已經(jīng)執(zhí)行完畢或者還沒開始執(zhí)行的時候,join()將會立即返回:

          Thread t1 = new SampleThread(0);
          t1.join(); //returns immediately
          復制代碼

          join還有兩個帶時間參數(shù)的方法:

          public final void join(long millis) throws InterruptedException
          復制代碼
          public final void join(long millis,int nanos) throws InterruptedException
          復制代碼

          如果在給定的時間內(nèi)調(diào)用的線程沒有返回,則主線程將會繼續(xù)執(zhí)行:

              @Test
          public void testJoinTimeout()
          throws InterruptedException {
          Thread t3 = new Thread(new JoinThread(10));
          t3.start();
          t3.join(1000);
          log.info("t3 status {}", t3.isAlive());
          }
          復制代碼

          上面的例子將會輸出:

          06:30:58.159 [main] INFO com.flydean.JoinThread - Thread Created
          06:30:58.163 [Thread-0] INFO com.flydean.JoinThread - Thread Thread-0 started
          06:30:59.172 [main] INFO com.flydean.JoinThread - t3 status true
          復制代碼

          Join()還有個happen-before的特性,這就是如果thread t1調(diào)用 t2.join(), 那么當t2返回時,所有t2的變動都會t1可見。

          之前我們講volatile關(guān)鍵詞的時候也提到了這個happen-before規(guī)則。  我們看下例子:

              @Test
          public void testHappenBefore() throws InterruptedException {
          JoinThread t4 = new JoinThread(10);
          t4.start();
          // not guaranteed to stop even if t4 finishes.
          do {
          log.info("inside the loop");
          Thread.sleep(1000);
          } while ( t4.processingCount > 0);
          }
          復制代碼

          我們運行下,可以看到while循環(huán)一直在進行中,即使t4中的變量已經(jīng)變成了0。

          所以如果我們需要在這種情況下使用的話,我們需要用到j(luò)oin(),或者其他的同步機制。

          第十一章 怎么在java中關(guān)閉一個thread

          我們經(jīng)常需要在java中用到thread,我們知道thread有一個start()方法可以開啟一個線程。那么怎么關(guān)閉這個線程呢?

          有人會說可以用Thread.stop()方法。但是這個方法已經(jīng)被廢棄了。

          根據(jù)Oracle的官方文檔,Thread.stop是不安全的。因為調(diào)用stop方法的時候,將會釋放它獲取的所有監(jiān)視器鎖(通過傳遞ThreadDeath異常實現(xiàn))。如果有資源該監(jiān)視器鎖所保護的話,就可能會出現(xiàn)數(shù)據(jù)不一致的異常。并且這種異常很難被發(fā)現(xiàn)。 所以現(xiàn)在已經(jīng)不推薦是用Thread.stop方法了。

          那我們還有兩種方式來關(guān)閉一個Thread。

          1. Flag變量

          如果我們有一個無法自動停止的Thread,我們可以創(chuàng)建一個條件變量,通過不斷判斷該變量的值,來決定是否結(jié)束該線程的運行。

          public class KillThread implements Runnable {
          private Thread worker;
          private final AtomicBoolean running = new AtomicBoolean(false);
          private int interval;

          public KillThread(int sleepInterval) {
          interval = sleepInterval;
          }

          public void start() {
          worker = new Thread(this);
          worker.start();
          }

          public void stop() {
          running.set(false);
          }

          public void run() {
          running.set(true);
          while (running.get()) {
          try {
          Thread.sleep(interval);
          } catch (InterruptedException e){
          Thread.currentThread().interrupt();
          log.info("Thread was interrupted, Failed to complete operation");
          }
          // do something here
          }
          log.info("finished");
          }

          public static void main(String[] args) {
          KillThread killThread= new KillThread(1000);
          killThread.start();
          killThread.stop();
          }


          }
          復制代碼

          上面的例子中,我們通過定義一個AtomicBoolean 的原子變量來存儲Flag標志。

          我們將會在后面的文章中詳細的講解原子變量。

          1. 調(diào)用interrupt()方法

          通過調(diào)用interrupt()方法,將會中斷正在等待的線程,并拋出InterruptedException異常。

          根據(jù)Oracle的說明,如果你想自己處理這個異常的話,需要reasserts出去,注意,這里是reasserts而不是rethrows,因為有些情況下,無法rethrow這個異常,我們需要這樣做:

           Thread.currentThread().interrupt();
          復制代碼

          這將會reasserts InterruptedException異常。

          看下我們第二種方法怎么調(diào)用:

          public class KillThread implements Runnable {
          private Thread worker;
          private final AtomicBoolean running = new AtomicBoolean(false);
          private int interval;

          public KillThread(int sleepInterval) {
          interval = sleepInterval;
          }

          public void start() {
          worker = new Thread(this);
          worker.start();
          }

          public void interrupt() {
          running.set(false);
          worker.interrupt();
          }

          public void stop() {
          running.set(false);
          }

          public void run() {
          running.set(true);
          while (running.get()) {
          try {
          Thread.sleep(interval);
          } catch (InterruptedException e){
          Thread.currentThread().interrupt();
          log.info("Thread was interrupted, Failed to complete operation");
          }
          // do something here
          }
          log.info("finished");
          }

          public static void main(String[] args) {
          KillThread killThread= new KillThread(1000);
          killThread.start();
          killThread.interrupt();
          }
          }
          復制代碼

          上面的例子中,當線程在Sleep中時,調(diào)用了interrupt方法,sleep會退出,并且拋出InterruptedException異常。

          第十二章 java中的Atomic類

          問題背景

          在多線程環(huán)境中,我們最常遇到的問題就是變量的值進行同步。因為變量需要在多線程中進行共享,所以我們必須需要采用一定的同步機制來進行控制。

          通過之前的文章,我們知道可以采用Lock的機制,當然也包括今天我們講的Atomic類。

          下面我們從兩種方式來分別介紹。

          Lock

          在之前的文章中,我們也講了同步的問題,我們再回顧一下。 如果定義了一個計數(shù)器如下:

          public class Counter {

          int counter;

          public void increment() {
          counter++;
          }

          }
          復制代碼

          如果是在單線程環(huán)境中,上面的代碼沒有任何問題。但是如果在多線程環(huán)境中,counter++將會得到不同的結(jié)果。

          因為雖然counter++看起來是一個原子操作,但是它實際上包含了三個操作:讀數(shù)據(jù),加一,寫回數(shù)據(jù)。

          我們之前的文章也講了,如何解決這個問題:

          public class LockCounter {

          private volatile int counter;

          public synchronized void increment() {
          counter++;
          }
          }
          復制代碼

          通過加synchronized,保證同一時間只會有一個線程去讀寫counter變量。

          通過volatile,保證所有的數(shù)據(jù)直接操作的主緩存,而不使用線程緩存。

          這樣雖然解決了問題,但是性能可能會受影響,因為synchronized會鎖住整個LockCounter實例。

          使用Atomic

          通過引入低級別的原子化語義命令(比如compare-and-swap (CAS)),從而能在保證效率的同時保證原子性。

          一個標準的CAS包含三個操作:

          1. 將要操作的內(nèi)存地址M。

          2. 現(xiàn)有的變量A。

          3. 新的需要存儲的變量B。

          CAS將會先比較A和M中存儲的值是否一致,一致則表示其他線程未對該變量進行修改,則將其替換為B。 否則不做任何操作。

          使用CAS可以不用阻塞其他的線程,但是我們需要自己處理好當更新失敗的情況下的業(yè)務(wù)邏輯處理情況。

          Java提供了很多Atomic類,最常用的包括AtomicInteger, AtomicLong, AtomicBoolean, 和 AtomicReference.

          其中的主要方法:

          1. get() – 直接中主內(nèi)存中讀取變量的值,類似于volatile變量。

          2. set() – 將變量寫回主內(nèi)存。類似于volatile變量。

          3. lazySet() – 延遲寫回主內(nèi)存。一種常用的情景是將引用重置為null的情況。

          4. compareAndSet() – 執(zhí)行CAS操作,成功返回true,失敗返回false。

          5. weakCompareAndSet() – 比較弱的CAS操作,不同的是它不執(zhí)行happens-before操作,從而不保證能夠讀取到其他變量最新的值。

          我們看下怎么用:

          public class AtomicCounter {

          private final AtomicInteger counter = new AtomicInteger(0);

          public int getValue() {
          return counter.get();
          }
          public void increment() {
          while(true) {
          int existingValue = getValue();
          int newValue = existingValue + 1;
          if(counter.compareAndSet(existingValue, newValue)) {
          return;
          }
          }
          }
          }
          復制代碼

          第十三章 java中interrupt,interrupted和isInterrupted的區(qū)別

          前面的文章我們講到了調(diào)用interrupt()來停止一個Thread,本文將會詳細講解java中三個非常相似的方法interrupt,interrupted和isInterrupted。

          isInterrupted

          首先看下最簡單的isInterrupted方法。isInterrupted是Thread類中的一個實例方法:

              public boolean isInterrupted() {
          return isInterrupted(false);
          }
          復制代碼

          通過調(diào)用isInterrupted()可以判斷實例線程是否被中斷。

          它的內(nèi)部調(diào)用了isInterrupted(false)方法:

            /**
          * Tests if some Thread has been interrupted. The interrupted state
          * is reset or not based on the value of ClearInterrupted that is
          * passed.
          */
          private native boolean isInterrupted(boolean ClearInterrupted);
          復制代碼

          這個方法是個native方法,接收一個是否清除Interrupted標志位的參數(shù)。

          我們可以看到isInterrupted()傳入的參數(shù)是false,這就表示isInterrupted()只會判斷是否被中斷,而不會清除中斷狀態(tài)。

          interrupted

          interrupted是Thread中的一個類方法:

           public static boolean interrupted() {
          return currentThread().isInterrupted(true);
          }
          復制代碼

          我們可以看到,interrupted()也調(diào)用了isInterrupted(true)方法,不過它傳遞的參數(shù)是true,表示將會清除中斷標志位。

          注意,因為interrupted()是一個類方法,調(diào)用isInterrupted(true)判斷的是當前線程是否被中斷。注意這里currentThread()的使用。

          interrupt

          前面兩個是判斷是否中斷的方法,而interrupt()就是真正觸發(fā)中斷的方法。

          我們先看下interrupt的定義:

              public void interrupt() {
          if (this != Thread.currentThread())
          checkAccess();

          synchronized (blockerLock) {
          Interruptible b = blocker;
          if (b != null) {
          interrupt0(); // Just to set the interrupt flag
          b.interrupt(this);
          return;
          }
          }
          interrupt0();
          }
          復制代碼

          從定義我們可以看到interrupt()是一個實例方法。

          它的工作要點有下面4點:

          1. 如果當前線程實例在調(diào)用Object類的wait(),wait(long)或wait(long,int)方法或join(),join(long),join(long,int)方法,或者在該實例中調(diào)用了Thread.sleep(long)或Thread.sleep(long,int)方法,并且正在阻塞狀態(tài)中時,則其中斷狀態(tài)將被清除,并將收到InterruptedException。

          2. 如果此線程在InterruptibleChannel上的I / O操作中處于被阻塞狀態(tài),則該channel將被關(guān)閉,該線程的中斷狀態(tài)將被設(shè)置為true,并且該線程將收到j(luò)ava.nio.channels.ClosedByInterruptException異常。

          3. 如果此線程在java.nio.channels.Selector中處于被被阻塞狀態(tài),則將設(shè)置該線程的中斷狀態(tài)為true,并且它將立即從select操作中返回。

          4. 如果上面的情況都不成立,則設(shè)置中斷狀態(tài)為true。

          我們來舉個例子:

          @Slf4j
          public class InterruptThread extends Thread {
          @Override
          public void run() {
          for (int i = 0; i < 1000; i++) {
          log.info("i= {}", (i+1));
          log.info("call inside thread.interrupted(): {}", Thread.interrupted());
          }
          }

          @Test
          public void testInterrupt(){
          InterruptThread thread=new InterruptThread();
          thread.start();
          thread.interrupt();
          //test isInterrupted
          log.info("first call isInterrupted(): {}", thread.isInterrupted());
          log.info("second call isInterrupted(): {}", thread.isInterrupted());

          //test interrupted()
          log.info("first call outside thread.interrupted(): {}", Thread.interrupted());
          log.info("second call outside thread.interrupted() {}:", Thread.interrupted());
          log.info("thread is alive : {}",thread.isAlive() );
          }
          }
          復制代碼

          輸出結(jié)果如下:

          13:07:17.804 [main] INFO com.flydean.InterruptThread - first call isInterrupted(): true
          13:07:17.808 [main] INFO com.flydean.InterruptThread - second call isInterrupted(): true

          13:07:17.808 [Thread-1] INFO com.flydean.InterruptThread - call inside thread.interrupted(): true
          13:07:17.808 [Thread-1] INFO com.flydean.InterruptThread - call inside thread.interrupted(): false

          13:07:17.808 [main] INFO com.flydean.InterruptThread - first call outside thread.interrupted(): false
          13:07:17.809 [main] INFO com.flydean.InterruptThread - second call outside thread.interrupted() false
          復制代碼

          上面的例子中,兩次調(diào)用thread.isInterrupted()的值都是true。

          在線程內(nèi)部調(diào)用Thread.interrupted(), 只有第一次返回的是ture,后面返回的都是false,這表明第一次被重置了。

          在線程外部,因為并沒有中斷外部線程,所以返回的值一直都是false。

          總結(jié)

          本文介紹了java并發(fā)系列文章1到14章,因為文件篇幅限制,剩下的章節(jié)將會在 5W字高質(zhì)量java并發(fā)系列詳解教程(下) 進行介紹,敬請期待!

          本文的例子github.com/ddean2009/l…

          本文PDF下載鏈接concurrent-all-in-one.pdf


          作者:程序那些事
          鏈接:https://juejin.cn/post/7003512267295686686
          來源:掘金
          著作權(quán)歸作者所有。商業(yè)轉(zhuǎn)載請聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請注明出處。



          瀏覽 23
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  午影操逼视频 | 日本无码性爱 | 影音先锋AV一区二区三区 | 麻豆成人影院 | 日本黄色电影网站wwww |