深入講解java多線程與高并發(fā):線程池ThreadPool
前言
今天這節(jié)課呢,我們通過一道面試把前面講的哪些基礎(chǔ)復(fù)習一下,然后再開始線程池這部分的內(nèi)容,我們一點一點來看。
這道面試題呢實際上是華為的一道面試題,其實它里面是一道填空題,后來就很多的開始考這道題,這個面試題是兩個線程,第一個線程是從1到26,第二個線程是從A到一直到Z,然后要讓這兩個線程做到同時運行,交替輸出,順序打印。那么這道題目的解法有非常多。
用LockSupport其實是最簡單的。你讓一個線程輸出完了之后停止,然后讓另外一個線程繼續(xù)運行就完了。我們定義了兩個數(shù)組,兩個線程,第一個線程拿出數(shù)組里面的每一個數(shù)字來,然后打印,打印完叫醒t2,然后讓自己阻塞。另外一個線程上來之后自己先park,打印完叫醒線程t1。兩個線程就這么交替來交替去,就搞定了。
package com.mashibing.juc.c_026_00_interview.A1B2C3;public class T01_00_Question {public static void main(String[] args) {//要求用線程順序打印A1B2C3....Z26}
}package com.mashibing.juc.c_026_00_interview.A1B2C3;import java.util.concurrent.locks.LockSupport;//Locksupport park 當前線程阻塞(停止)//unpark(Thread t)public class T02_00_LockSupport {static Thread t1 = null, t2 = null;public static void main(String[] args) throws Exception {char[] aI = "1234567".toCharArray();char[] aC = "ABCDEFG".toCharArray();
t1 = new Thread(() -> {for(char c : aI) {
System.out.print(c);
LockSupport.unpark(t2); //叫醒T2LockSupport.park(); //T1阻塞}
}, "t1");
t2 = new Thread(() -> {for(char c : aC) {
LockSupport.park(); //t2阻塞System.out.print(c);
LockSupport.unpark(t1); //叫醒t1}
}, "t2");
t1.start();
t2.start();}
}當時出這道題的時候是想考察wait、notify和notifyAll,主要是synchronized、wait、notify。
來解釋一下,首先第一個我先調(diào)用wait、notify的時候,wait線程阻塞,notify叫醒其他線程,調(diào)用這個兩個方法的時候必須要進行synchronized鎖定的,如果沒有synchronized這個線程你是鎖定不了的,他是離開不鎖的,因此我們定義一個鎖的對象new Object(),兩個數(shù)組,第一線程上來先鎖定Object對象 o,鎖定完對象之后,我們開始輸出,輸出第一個數(shù)字,輸出完之后叫醒第二個,然后自己wait。還是這個思路,其實這個就和LookSupport的park、unpark是非常類似的,這里面最容易出錯的一個地方就是把整個數(shù)組都打印完了要記得notify,為什么要notify啊,因為這兩個線程里面終歸有一個線程wait的,是阻塞在這停止不動的。
package com.mashibing.juc.c_026_00_interview.A1B2C3;public class T06_00_sync_wait_notify {public static void main(String[] args) {final Object o = new Object();char[] aI = "1234567".toCharArray();char[] aC = "ABCDEFG".toCharArray();new Thread(()->{synchronized (o) {for(char c : aI) {
System.out.print(c);try {
o.notify();
o.wait(); //讓出鎖} catch (InterruptedException e) {
e.printStackTrace();
}
}
o.notify(); //必須,否則無法停止程序}
}, "t1").start();new Thread(()->{synchronized (o) {for(char c : aC) {
System.out.print(c);try {
o.notify();
o.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
o.notify();
}
}, "t2").start();
}
}//如果我想保證t2在t1之前打印,也就是說保證首先輸出的是A而不是1,這個時候該如何做?
保證第一個線程先運行,辦法也是非常的多的,看下面,使用自旋的方式,設(shè)置一個boolean類型的變量,t2剛開始不是static。如果說t2沒有static的話,我這個t1線程就wait,要求t2必須先static才能執(zhí)行我的業(yè)務(wù)邏輯。還有一種寫法就是t2上來二話不說先wait,然后t1呢上來二話不說先輸出,輸出完了之后notify;還有一種寫法用CountDownLatch也可以;
package com.mashibing.juc.c_026_00_interview.A1B2C3;public class T07_00_sync_wait_notify {private static volatile boolean t2Started = false;//private static CountDownLatch latch = new C(1);public static void main(String[] args) {final Object o = new Object();char[] aI = "1234567".toCharArray();char[] aC = "ABCDEFG".toCharArray();new Thread(()->{//latch.await();synchronized (o) {while(!t2Started) {try {
o.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}//for(char c : aI) {
System.out.print(c);try {
o.notify();
o.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
o.notify();
}
}, "t1").start();new Thread(()->{synchronized (o) {for(char c : aC) {
System.out.print(c);//latch.countDown()t2Started = true;try {
o.notify();
o.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}o.notify();
}
}, "t2").start();
}
}這兩種最重要的方法,一個是LockSupport,一個是synchronized、wait、notify。這兩種面試的時候你要是能寫出來問題就不大,但是,你如果能用新的lock的接口,就不再用synchronized,用這種自旋的,也可以。嚴格來講這個lock和synchronized本質(zhì)是一樣的。不過他也有好用的地方,下面我們來看看寫法。
嚴格來講這個lock和synchronized本質(zhì)是一樣的,不過還是有它好用的地方,我們來看看它的第一種寫法我用一個ReentrantLock,然后調(diào)用newCondition,上來之后先lock相當于synchronized了,打印,打印完之后signal叫醒另一個當前的等待,最后condition.signal()相當于notify(),然后之后另外一個也類似就完了,這種寫法相當于synchronized的一個變種。
package com.mashibing.juc.c_026_00_interview.A1B2C3;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;public class T08_00_lock_condition {public static void main(String[] args) {char[] aI = "1234567".toCharArray();char[] aC = "ABCDEFG".toCharArray();
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();new Thread(()->{try {lock.lock();for(char c : aI) {
System.out.print(c);
condition.signal();
condition.await();
}
condition.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {lock.unlock();
}
}, "t1").start();new Thread(()->{try {lock.lock();for(char c : aC) {
System.out.print(c);
condition.signal();
condition.await();
}condition.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {lock.unlock();
}
}, "t2").start();
}
}但是如果你能寫出兩個Condition的情況就會好很多。大家知道,在一個一把鎖,這個鎖的等待隊列里有好多線程,假如我要notify的話他實際上要找出一個讓它運行,如果說我要調(diào)用的是一個notifyAll的話,是讓所有線程都醒過來去爭用這把鎖看誰能搶的到,誰搶到了就讓這個線程運行。那好,在這里面呢,我不能去要求那一類或者那一個線程去醒過來,這個回想原來講過的生產(chǎn)者消費者的問題,既然我們有兩個線程,那完全可以模仿生產(chǎn)者和消費者我干脆來兩種的Condition,同學們也回顧一下,給大家講Condition的時候說過這個問題,Condition它本質(zhì)上是一個等待隊列 ,就是兩個等待隊列,其中一個線程在這個等待隊列上,另一個線程在另外一個等待隊列上。
所以呢,如果說我用兩個Condition的話就可以精確的指定那個等待隊列里的線程醒過來去執(zhí)行任務(wù)。
所以這個寫法就是這樣來寫的,第一線程呢conditionT2.signal(),叫醒第二個那個里面的線程,然后我第一個線程讓它等待,第二個就是我叫醒第一個線程,第二個讓它等待放到這個等待隊列里,相當于我放了兩個等待隊列,t1在這個等待隊列里,t2在另一個等待隊列里,在t1完成了之后呢叫醒t2是指定你這個隊列的線程醒過來,所以永遠都是t2。其實對于兩個線程來講區(qū)別不大,因為你叫醒的時候當前線程肯定是醒著的,叫醒的也就只有是你這個線程 ,不過對于寫代碼來說,寫到這個樣子面試官肯定是會高看你一眼。
/*
Condition本質(zhì)是鎖資源上不同的等待隊列
*/package com.mashibing.juc.c_026_00_interview.A1B2C3;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;public class T09_00_lock_condition {public static void main(String[] args) {char[] aI = "1234567".toCharArray();char[] aC = "ABCDEFG".toCharArray();
Lock lock = new ReentrantLock();
Condition conditionT1 = lock.newCondition();
Condition conditionT2 = lock.newCondition();new Thread(()->{try {lock.lock();for(char c : aI) {
System.out.print(c);
conditionT2.signal();
conditionT1.await();
}conditionT2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {lock.unlock();
}
}, "t1").start();new Thread(()->{try {lock.lock();for(char c : aC) {
System.out.print(c);
conditionT1.signal();
conditionT2.await();
}
conditionT1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {lock.unlock();
}
}, "t2").start();
}
}在這里,用一個自旋式的寫法,就是我們沒有鎖,相當于自己寫了一個自旋鎖。cas的寫法,這個寫法用了enum,到底哪個線程要運行他只能取兩個值,T1和T2,然后定義了一個ReadyToRun的變量,剛開始的時候是T1,這個意思呢就相當于是我有一個信號燈,這個信號燈要么就是T1要么就是T2,只能取這個兩個值,不能取別的,當一開始的時候我在這個信號燈上顯示的是T1,T1你可以走一步。看程序,第一上來判斷是不是T1啊,如果不是就占用cpu在這循環(huán)等待,如果一看是T1就打印,然后把r值變成T2進行下一次循環(huán),下一次循環(huán)上來之后這個r是不是T1,不是T1就有在這轉(zhuǎn)圈玩兒,而第二個線程發(fā)現(xiàn)它變成T2了,變成T2了下面的線程就會打印A,打印完了之后有把這個r變成了T1,就這么交替交替,就是這個一種玩法,寫volatile是保證線程的可見性。為什么要用enum類型,就是防止它取別的值,用一個int類型或者布爾也都可以。
package com.mashibing.juc.c_026_00_interview.A1B2C3;public class T03_00_cas {enum ReadyToRun {T1, T2}static volatile ReadyToRun r = ReadyToRun.T1; //思考為什么必須volatilepublic static void main(String[] args) {char[] aI = "1234567".toCharArray();char[] aC = "ABCDEFG".toCharArray();new Thread(() -> {for (char c : aI) {while (r != ReadyToRun.T1) {}
System.out.print(c);
r = ReadyToRun.T2;
}
}, "t1").start();new Thread(() -> {for (char c : aC) {while (r != ReadyToRun.T2) {}
System.out.print(c);
r = ReadyToRun.T1;
}
}, "t2").start();
}
}在來看一個BlockingQueue的玩法,上節(jié)課呢講了BlockingQueue了,它有一個特點,
BlockingQueue可以支持多線程的阻塞操作,他有兩個操作一個是put,一個take。put的時候滿了他就會阻塞住,take的時候如果沒有,他就會阻塞住在這兒等著,我們利用這個特點來了兩個BlockingQueue,這兩個BlockingQueue都是ArrayBlockingQueue數(shù)組實現(xiàn)的,但是數(shù)組的長度是1,相當于我用了兩個容器,這兩個容器里頭放兩個值,這兩個值比如說我第一個線程打印出1來了我就在這邊放一個,我這邊OK了,該你了,而另外一個線程盯著這個事,他take,這個take里面沒有值的時候他是要在這里阻塞等待的,take不到的時候他就等著,等什么時候這邊打印完了,take到了他就打印這個A,打印完了A之后他就往第二個里面放一個OK,第一個線程也去take第二個容器里面的OK,什么時候take到了他就接著往下打印,大概是這么一種玩兒法。
package com.mashibing.juc.c_026_00_interview.A1B2C3;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.locks.LockSupport;public class T04_00_BlockingQueue {static BlockingQueue q1 = new ArrayBlockingQueue(1);static BlockingQueue q2 = new ArrayBlockingQueue(1);public static void main(String[] args) throws Exception {char[] aI = "1234567".toCharArray();char[] aC = "ABCDEFG".toCharArray();new Thread(() -> {for(char c : aI) {
System.out.print(c);try {
q1.put("ok");
q2.take();
} catch (InterruptedException e) {
e.printStackTrace();
}}
}, "t1").start();new Thread(() -> {for(char c : aC) {try {
q1.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.print(c);try {
q2.put("ok");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "t2").start();
}
} 這個效率非常低,它里面有各種的同步,我們了解一下就可以了,基本上面試也問不到這個。這里要把兩個線程連接起來要求的步驟比較多,要求建立一個PipedInputStream和一個PipedOutputStream。
就相當于兩個線程通信,第一個這邊就得有一個OutputStream,對應(yīng)第二個線程這邊就得有一個InputStream,同樣的第二個要往第一個寫的話,第一個也得有一個InputStream,第二個也還得有一個OutputStream。最后要求你的第一個線程的input1和你第二個線程的output2連接connect起來,互相之間的扔消息玩兒,這邊搞定了告訴另一邊兒,另一邊兒搞定了告訴這邊,回合制。
package com.mashibing.juc.c_026_00_interview.A1B2C3;import java.io.IOException;import java.io.PipedInputStream;import java.io.PipedOutputStream;public class T10_00_PipedStream {public static void main(String[] args) throws Exception {char[] aI = "1234567".toCharArray();char[] aC = "ABCDEFG".toCharArray();
PipedInputStream input1 = new PipedInputStream();
PipedInputStream input2 = new PipedInputStream();
PipedOutputStream output1 = new PipedOutputStream();
PipedOutputStream output2 = new PipedOutputStream();
input1.connect(output2);
input2.connect(output1);
String msg = "Your Turn";new Thread(() -> {byte[] buffer = new byte[9];try {for(char c : aI) {input1.read(buffer);if(new String(buffer).equals(msg)) {
System.out.print(c);
}
output1.write(msg.getBytes());
}
} catch (IOException e) {
e.printStackTrace();
}
}, "t1").start();new Thread(() -> {byte[] buffer = new byte[9];try {for(char c : aC) {
System.out.print(c);
output2.write(msg.getBytes());
input2.read(buffer);if(new String(buffer).equals(msg)) {continue;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}, "t2").start();
}
}試了一下,使用Semaphore與Exchanger是解決不了這個問題的。(思考為什么?)
那么TransferQueue是一種什么樣的隊列呢,就是我一個線程往里頭生產(chǎn),生產(chǎn)者線程往里頭生產(chǎn)的時候,我生產(chǎn)了之后扔在這的時候我這個線程是阻塞的不動的,什么時候有另外一個線程把這個拿走了,拿走了之后這個線程才返回繼續(xù)運行。
我這個寫法是這樣的,我用了一個TransferQueue,我第一個線程上來二話不說先take,相當于第一個線程做了一個消費者,就在這個Queue等著,看看有沒有人往里扔。第二個線程二話不說上來經(jīng)過transfer,就把這個字母扔進去了,扔進去了一個A,第一個線程發(fā)現(xiàn)很好,來了一個,我就把這個拿出來打印,打印完之后我又進行transfer,進去了一個1。然后,第二個線程它去里面take,把這個1take出來打印。這個寫法很好玩兒,相當于我們自己每個人都把自己的一個數(shù)字或者是字母交到一個隊列里讓對方去打印。
package com.mashibing.juc.c_026_00_interview.A1B2C3;import java.util.concurrent.LinkedTransferQueue;import java.util.concurrent.TransferQueue;public class T13_TransferQueue {public static void main(String[] args) {char[] aI = "1234567".toCharArray();char[] aC = "ABCDEFG".toCharArray();
TransferQueue queue = new LinkedTransferQueue();new Thread(()->{try {for (char c : aI) {
System.out.print(queue.take());queue.transfer(c);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1").start();new Thread(()->{try {for (char c : aC) {queue.transfer(c);
System.out.print(queue.take());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t2").start();
}
} 我們接下來講高并發(fā)這個部分的理論知識的一部分,線程池。
線程池首先有幾個接口先了解第一個是Executor,第二個是ExecutorService,在后面才是線程池的一個使用ThreadPoolExecutor。

Executor看它的名字也能理解,執(zhí)行者,所以他有一個方法叫執(zhí)行,那么執(zhí)行的東西是Runnable,所以這個Executor有了之后呢由于它是一個借口,他可以有好多實現(xiàn),因此我們說,有了Executor之后呢,我們現(xiàn)場就是一個任務(wù)的定義,比如Runnable起了一個命令的意思,他的定義和運行就可以分開了,不像我們以前定義一個Thread,new一個Thread然后去重寫它的Run方法.start才可以運行,或者以前就是你寫了一個Runnable你也必須得new一個Thread出來,以前的這種定義和運行是固定的,是寫死的就是你new一個Thread讓他出來運行。有的同學他還是new一個Thread但是他有了各種各樣新的玩法,不用你親自去指定每一個Thread,他的運行的方式你可以自己去定義了,所以至于是怎么去定義的就看你怎么實現(xiàn)Executor的接口了,這里是定義和運行分開這么一個含義,所以這個接口體現(xiàn)的是這個意思,所以這個接口就比較簡單,至于你是直接調(diào)用run還是new一個Thread那是你自己的事兒。
* The {@code Executor} implementations provided in this package* implement {@link ExecutorService}, which is a more extensive
* interface. The {@link ThreadPoolExecutor} class provides an* extensible thread pool implementation. The {@link Executors} class* provides convenient factory methods for these Executors.
*
* Memory consistency effects: Actions in a thread prior to
* submitting a {@code Runnable} object to an {@code Executor}
* "package-summary.html#MemoryVisibility">happen-before
* its execution begins, perhaps in another thread.
*
* @since 1.5* @author Doug Lea
*/public interface Executor {/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/void execute(Runnable command);
}
ExecutorService又是什么意思呢,他是從Executor繼承,另外,他除了去實現(xiàn)Executor可以去執(zhí)行一個任務(wù)之外,他還完善了整個任務(wù)執(zhí)行器的一個生命周期,就拿線程池來舉例子,一個線程池里面一堆的線程就是一堆的工人,執(zhí)行完一個任務(wù)之后我這個線程怎么結(jié)束啊,線程池定義了這樣一些個方法:
void shutdown();//結(jié)束List shutdownNow() ;//馬上結(jié)束boolean isShutdown();//是否結(jié)束了boolean isTerminated();//是不是整體都執(zhí)行完了boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException;//等著結(jié)束,等多長時間,時間到了還不結(jié)束的話他就返回false等等,所以這里面呢,他是實現(xiàn)了一些個線程的線程池的生命周期的東西,擴展了Executor的接口,真正的線程池的現(xiàn)實是在ExecutorService的這個基礎(chǔ)上來實現(xiàn)的。當我們看到這個ExecutorService的時候你會發(fā)現(xiàn)他除了Executor執(zhí)行任務(wù)之外還有submit提交任務(wù),執(zhí)行任務(wù)是直接拿過來馬上運行,而submit是扔給這個線程池,什么時候運行由這個線程池來決定,相當于是異步的,我只要往里面一扔就不管了。
那好,如果不管的話什么時候他有結(jié)果啊,這里面就涉及了比較新的類:比如說Future、RunnableFuture、FutureTask所以在這個里面我要給大家拓展一些線程的基礎(chǔ)的概念,大家以前學線程的時候定義一個線程的任務(wù)只能去實現(xiàn)Runnable接口,那在1.5之后他就增加了Callable這個接口。
下面代碼我們看一下Callable這個文檔,他說這個接口和java.lang.Runnable類似,所以這兩個類設(shè)計出來都是想潛在的另外一個線程去運行他,所以通過這點你會知道Callable和Runnable一樣他也可以是一個線程來運行他,那好,為什么有了Runnable還要有Callable,很簡單看代碼Callable有一個返回值,call這個方法相當與Runnable里面的run方法,而Runnable里的方法返回值是空值,而這里是可以有一個返回值的,給你一個計算的任務(wù),最后你得給我一個結(jié)果啊,這個叫做Callable,那么由于他可以返回一個結(jié)果,我就可以把這個結(jié)果給存儲起來,等什么時候您老人家計算完了通知我就可以了,我就不需要像原來線程池里面我調(diào)用他的run在這等著了。
所以有了這個Callable之后就有了很多種新鮮的玩法,Callable是什么,他類似于Runnable,不過Callable可以有返回值。
package java.util.concurrent;/**
* A task that returns a result and may throw an exception.
* Implementors define a single method with no arguments called
* {@code call}.
*
* The {@code Callable} interface is similar to {@link* java.lang.Runnable}, in that both are designed for classes whose
* instances are potentially executed by another thread. A
* {@code Runnable}, however, does not return a result and cannot
* throw a checked exception.
*
*
The {@link Executors} class contains utility methods to
* convert from other common forms to {@code Callable} classes.
*
* @see Executor
* @since 1.5
* @author Doug Lea
* @param the result type of method {@code call}
*/
@FunctionalInterfacepublic interface Callable<V> {/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/V call() throws Exception;
}有了這個Callable之后呢,我們在來看一個接口:Future,這個Future代表的是什么呢,這個Future代表的是那個Callable被執(zhí)行完了之后我怎么才能拿到那個結(jié)果啊,它會封裝到一個Future里面。Future將來,未來。未來你執(zhí)行完之后可以把這個結(jié)果放到這個未來有可能執(zhí)行完的結(jié)果里頭,所以Future代表的是未來執(zhí)行完的一個結(jié)果。
由于Callable基本上就是為了線程池而設(shè)計的,所以你要是不用線程池的接口想去寫Callable的一些個小程序還是比較麻煩,所以這里面是要用到一些線程池的直接的用法,比較簡單,我們先用,用完后再給大家解釋什么意思。我們來看Future是怎么用的,在我們讀這個ExecutorService的時候你會發(fā)現(xiàn)他里面有submit方法,這個submit是異步的提交任務(wù),提交完了任務(wù)之后原線程該怎么運行怎么運行,運行完了之后他會出一個結(jié)果,這個結(jié)果出在哪兒 ,他的返回值是一個Future,所以你只能去提交一個Callable,必須有返回值,把Callable的任務(wù)扔給線程池,線程池執(zhí)行完了,異步的,就是把任務(wù)交給線程池之后我主線程該干嘛干嘛,調(diào)用get方法直到有結(jié)果之
后get會返回。Callable一般是配合線程池和Future來用的。其實更靈活的一個用法是FutureTask,即是一個Future同時又是一個Task,原來這Callable只能一個Task只能是一個任務(wù)但是他不能作為一個Future來用。這個FutureTask相當于是我自己可以作為一個任務(wù)來用,同時這個任務(wù)完成之后的結(jié)果也存在于這個對象里,為什么他能做到這一點,因為FutureTask他實現(xiàn)了RunnableFuture,而RunnableFuture即實現(xiàn)了Runnable又實現(xiàn)了Future,所以他即是一個任務(wù)又是一個Future。所以這個FutureTask是更好用的一個類。大家記住這個類,后面還會有WorkStealingPool、ForkJoinPool這些個基本上是會用到FutureTask類的。
package com.mashibing.juc.c_026_01_ThreadPool;import java.util.concurrent.*;public class T06_00_Future {public static void main(String[] args) throws InterruptedException,
ExecutionException {
FutureTask task = new FutureTask<>(()->{
TimeUnit.MILLISECONDS.sleep(500);return 1000;
}); //new Callable () { Integer call();}new Thread(task).start();
System.out.println(task.get()); //阻塞}
} 我們拓展了幾個類,大家把這幾個小類理解一下
Callable 類似與 Runnable,但是有返回值。
了解了Future,是用來存儲執(zhí)行的將來才會產(chǎn)生的結(jié)果。
FutureTask,他是Future加上Runnable,既可以執(zhí)行又可以存結(jié)果。
CompletableFuture,管理多個Future的結(jié)果。
那么有了這些之后那,我們可以介紹一個CompletableFuture。他底層特別復(fù)雜,但是用法特別靈活,如果你們感興趣可以去拓展的了解一下,用一下。CompletableFuture他的底層用的是ForkJoinPool。
我們先來看他的用法,這里有一個小例子,有這樣一個情景可以用到這個CompletableFuture,這個CompletableFuture非常的靈活,它內(nèi)部有好多關(guān)于各種結(jié)果的一個組合,這個CompletableFuture是可以組合各種各樣的不同的任務(wù),然后等這個任務(wù)執(zhí)行完產(chǎn)生一個結(jié)果進行一個組合。我們直接看代碼,假如你自己寫了一個網(wǎng)站,這個網(wǎng)站都賣格力空調(diào),同一個類型,然后很多人買東西都會進行一個價格比較,而你提供的這個服務(wù)就是我到淘寶上去查到這個格力空調(diào)買多少錢,然后我另啟動一個線程去京東上找格力空調(diào)賣多少錢,在啟動一個線程去拼多多上找,最后,我給你匯總一下這三個地方各售賣多少錢,然后你自己再來選去哪里買。
下面代碼,模擬了一個去別的地方取價格的一個方法,首先你去別的地方訪問會花好長時間,因此我寫了一個delay() 讓他去隨機的睡一段時間,表示我們要聯(lián)網(wǎng),我們要爬蟲爬結(jié)果執(zhí)行這個時間,然后打印了一下睡了多少時間之后才拿到結(jié)果的,如拿到天貓上的結(jié)果是1塊錢,淘寶上結(jié)果是2塊錢,京東上結(jié)果是3塊錢,總而言之是經(jīng)過網(wǎng)絡(luò)爬蟲爬過來的數(shù)據(jù)分析出來的多少錢。
然后我們需要模擬一下怎么拿到怎么匯總,第一種寫法就是我注釋的這種寫法,就是挨著牌的寫,假設(shè)跑天貓跑了10秒,跑淘寶拍了10秒,跑京東跑了5秒,一共歷時25秒才總出來。但是如果我用不同的線程呢,一個一個的線程他們是并行的執(zhí)行他們計算的結(jié)果是只有10秒。
但是用線程你寫起來會有各種各樣的麻煩事兒,比如說在去淘寶的過程中網(wǎng)絡(luò)報錯了該怎么辦,你去京東的過程中正好趕上那天他活動,并發(fā)訪問特別慢你又該怎么辦,你必須得等所有的線程都拿到之后才能產(chǎn)生一個結(jié)果,如果想要做這件事兒的話與其是要你每一個都要寫一個自己的線程,需要考慮到各種各樣的延遲的問題,各種各樣的異常的問題這個時候有一個簡單的寫法,用一個CompletableFuture,
首先第一點CompletableFuture他是一個Future,所以他會存一個將來有可能產(chǎn)生的結(jié)果值,結(jié)果值是一個Double,它會運行一個任務(wù),然后這個任務(wù)最后產(chǎn)生一個結(jié)果,這個結(jié)果會存在CompletableFuture里面,結(jié)果的類型是Double。
在這里我就定義了三個Future,分別代表了淘寶、京東、天貓,用了CompletableFuture的一個方法叫supplyAsync產(chǎn)生了一個異步的任務(wù),這個異步的任務(wù)去天貓那邊去給我拉數(shù)據(jù)去。你可以想象在一個線程池里面扔給他一個任務(wù)讓他去執(zhí)行,什么時候執(zhí)行完了之后他的結(jié)果會返回到這個futureTM里面。但是總體的要求就是這些個所有的future都得結(jié)束才可以,才能展示我最后的結(jié)果。
往下走還有這么一直寫法,就是我把這三個future都可以扔給一個CompletableFuture讓他去管理,他管理的時候可以調(diào)用allOf方法相當于這里面的所有的任務(wù)全部完成之后,最后join,你才能夠繼續(xù)往下運行。所以CompletableFuture除了提供了比較好用的對任務(wù)的管理之外,還提供了對于任務(wù)堆的管理,用于對一堆任務(wù)的管理。CompletableFuture還提供了很多的寫法,比如下面Lambda表達式的寫法。
CompletableFuture是什么東西呢?他是各種任務(wù)的一種管理類,總而言之呢CompletableFuture是一個更高級的類,它能夠在很高的一個層面上來幫助你管理一些個你想要的各種各樣的任務(wù),比如說你可以對任務(wù)進行各種各樣的組合 ,所有任務(wù)完成之后你要執(zhí)行一個什么樣的結(jié)果,以及任何一個任務(wù)完成之后你要執(zhí)行一個什么樣的結(jié)果,還有他可以提供一個鏈式的處理方式Lambda的一些寫法,拿到任務(wù)之后結(jié)果進行一個怎樣的處理。
/**
* 假設(shè)你能夠提供一個服務(wù)
* 這個服務(wù)查詢各大電商網(wǎng)站同一類產(chǎn)品的價格并匯總展示
* @author 馬士兵 http://mashibing.com
*/package com.mashibing.juc.c_026_01_ThreadPool;import java.io.IOException;import java.util.Random;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.concurrent.TimeUnit;public class T06_01_CompletableFuture {public static void main(String[] args) throws ExecutionException,
InterruptedException {long start, end;/*start = System.currentTimeMillis();
priceOfTM();
priceOfTB();
priceOfJD();
end = System.currentTimeMillis();
System.out.println("use serial method call! " + (end - start));*/start = System.currentTimeMillis();
CompletableFuture futureTM = CompletableFuture.supplyAsync(()-
>priceOfTM());
CompletableFuture futureTB = CompletableFuture.supplyAsync(()-
>priceOfTB());CompletableFuture futureJD = CompletableFuture.supplyAsync(()-
>priceOfJD());
CompletableFuture.allOf(futureTM, futureTB, futureJD).join();
CompletableFuture.supplyAsync(()->priceOfTM())
.thenApply(String::valueOf)
.thenApply(str-> "price " + str)
.thenAccept(System.out::println);
end = System.currentTimeMillis();
System.out.println("use completable future! " + (end - start));try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}private static double priceOfTM() {
delay();return 1.00;
}private static double priceOfTB() {
delay();return 2.00;
}private static double priceOfJD() {
delay();return 3.00;
}/*private static double priceOfAmazon() {
delay();
throw new RuntimeException("product not exist!");
}*/private static void delay() {int time = new Random().nextInt(500);try {
TimeUnit.MILLISECONDS.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("After %s sleep!\n", time);
}
} 我們再來了解一下線程池,線程池呢從目前JDK提供的有兩種類型,第一種就是普通的線程池
ThreadPoolExecutor,第二種是ForkJoinPool,這兩種是不同類型的線程池,能干的事兒不太一樣,大家先把結(jié)論記住。Fork分叉,分叉完再分叉,最后的結(jié)果匯總這叫join。給大家講一個故事,在我上大學的時候NASA美國航天局他們有很多的數(shù)據(jù),計算機的計算力不行,就想了辦法,他把哪些要計算的氣象或者宇宙中產(chǎn)生各種各樣的數(shù)據(jù)進行一個分片,一大塊兒數(shù)據(jù)分成一小片一小片的,然后自己的計算機確實算不過來,太多了,他就向全球發(fā)出請求,你們愿不愿意在計算機空余的時間來幫我做一些這樣的餓計算,他是干過這樣的一個事情的,我在上大學的時候是收到過NASA這樣一個申請的。
所以這個就是ForkJoinPool的一個概念。這是兩種不同類型的線程池,我們說線程池的時候一般是說的第一種線程池,嚴格來講這兩種是不一樣的,今天我先來對ThreadPoolExecutor進行一個入門,后面我們再來講ForkJoinPool。
ThreadPoolExecutor他的父類是從AbstractExecutorService,而AbstractExecutorService的父類是ExecutorService,再ExecutorService的父類是Executo,所以ThreadPoolExecutor就相當于線程池的執(zhí)行器,就是大家伙兒可以向這個池子里面扔任務(wù),讓這個線程池去運行。另外在阿里巴巴的手冊里面要求線程池是要自定義的,還有不少同學會被問這個線程池是怎么自定義。
我們來看怎么樣手動定義一個線程池,手動定義線程池他有很多構(gòu)造方法,我們找這個最常見的理解了就行了。大家看下面這里代碼,我定義了一個任務(wù)Task,這個任務(wù)是實現(xiàn)Runnable接口,就是一個普通的任務(wù)了,每一個任務(wù)里有一個編號i,然后打印這個編號,主要干這個事兒,打印完后阻塞System.in.read(),每個任務(wù)都是阻塞的,toString方法就不說了,定義一個線程池最長的有七個參數(shù),首先我們來理解什么叫線程池,線程池他維護這兩個集合,第一個是線程的集合,里面是一個一個的線程。第二個是任務(wù)的集合,里面是一個一個的任務(wù)這叫一個完整的線程池。
我怎么定義這一個線程池,這里面的七個參數(shù),
第一個參數(shù)corePoolSoze核心線程數(shù),最開始的時候是有這個線程池里面是有一定的核心線程數(shù)的;
第二個叫maximumPoolSize最大線程數(shù),線程數(shù)不夠了,能擴展到最大線程是多少;
第三個keepAliveTime生存時間,意思是這個線程有很長時間沒干活了請你把它歸還給操作系統(tǒng);
第四個TimeUnit.SECONDS生存時間的單位到底是毫秒納秒還是秒自己去定義;
第五個是任務(wù)隊列,就是我們上節(jié)課講的BlockingQueue,各種各樣的BlockingQueue你都可以往里面扔,我們這用的是ArrayBlockingQueue,參數(shù)最多可以裝四個任務(wù);
第六個是線程工廠defaultThreadFactory,他返回的是一個enw DefaultThreadFactory,它要去你去實現(xiàn)ThreadFactory的接口,這個接口只有一個方法叫newThread,所以就是產(chǎn)生線程的,可以通過這種方式產(chǎn)生自定義的線程,默認產(chǎn)生的是defaultThreadFactory,而defaultThreadFactory產(chǎn)生線程的時候有幾個特點:new出來的時候指定了group制定了線程名字,然后指定的這個線程絕對不是守護線程,設(shè)定好你線程的優(yōu)先級。自己可以定義產(chǎn)生的到底是什么樣的線程,指定線程名叫什么(為什么要指定線程名稱,有什么意義,就是可以方便出錯是回溯);
第七個叫拒絕策略,指的是線程池忙,而且任務(wù)隊列滿這種情況下我們就要執(zhí)行各種各樣的拒絕策略,jdk默認提供了四種拒絕策略,也是可以自定義的。
1:Abort:拋異常
2:Discard:扔掉,不拋異常
3:DiscardOldest:扔掉排隊時間最久的
4:CallerRuns:調(diào)用者處理服務(wù)
一般情況這四種我們會自定義策略,去實現(xiàn)這個拒絕策略的接口,處理的方式是一般我們的消息需要保存下來,要是訂單的話那就更需要保存了,保存到kafka,保存到redis或者是存到數(shù)據(jù)庫隨便你然后做好日志。
package com.mashibing.juc.c_026_01_ThreadPool;import java.io.IOException;import java.util.concurrent.*;public class T05_00_HelloThreadPool {static class Task implements Runnable {private int i;public Task(int i) {this.i = i;
}@Overridepublic void run() {
System.out.println(Thread.currentThread().getName() + " Task " + i);try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}@Overridepublic String toString() {return "Task{" +"i=" + i +'}';
}
}public static void main(String[] args) {
ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 4,60, TimeUnit.SECONDS,new ArrayBlockingQueue(4),
Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());for (int i = 0; i < 8; i++) {
tpe.execute(new Task(i));
}
System.out.println(tpe.getQueue());
tpe.execute(new Task(100));
System.out.println(tpe.getQueue());
tpe.shutdown();
}
} 今天講的東西很多,復(fù)習的時候你就把一個一個小程序從頭到尾看完,看懂意思之后自己去敲,前面呢我們通過一道面試題來復(fù)習了之前學的一些方法,重點是LockSupport以及synchronized_wait_notify。然后我們講了ThreadPool的一個入門,講ThreadPool的時候我們給大家擴展了Callable和Runnable的不同,F(xiàn)uture用來存儲執(zhí)行的將來才會產(chǎn)生的結(jié)果、FutureTask,他是Future加上Runnable,既可以執(zhí)行又可以存結(jié)果、CompletableFuture,管理各種各樣的Future的結(jié)果。
本篇內(nèi)容講解的是線程池的內(nèi)容,喜歡的朋友可以轉(zhuǎn)發(fā)關(guān)注一下小編~~
本文就是愿天堂沒有BUG給大家分享的內(nèi)容,大家有收獲的話可以分享下,想學習更多的話可以到微信公眾號里找我,我等你哦。
