Java并發(fā)控制機(jī)制詳解
點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號(hào)”
優(yōu)質(zhì)文章,第一時(shí)間送達(dá)
作者 | ypp91zr
來源 | urlify.cn/nqe2ei
Java內(nèi)存模型
volatile變量–多線程間可見
同步關(guān)鍵字synchronized
public synchronized void method(){}
public void method(){
synchronized(this){
// do something …
}
}
public void method(Object o){
// before
synchronized(o){
// do something ...
}
// after
}
public synchronized static void method(){}
synchronized(obj){
while(<?>){
obj.wait();
// 收到通知后,繼續(xù)執(zhí)行。
}
}
public class BlockQueue{
private List list = new ArrayList();
public synchronized Object pop() throws InterruptedException{
while (list.size()==0){
this.wait();
}
if (list.size()>0){
return list.remove(0);
} else{
return null;
}
}
public synchronized Object put(Object obj){
list.add(obj);
this.notify();
}
}
Reentrantlock重入鎖
try {
if (lock.tryLock(5, TimeUnit.SECONDS)) { //如果已經(jīng)被lock,嘗試等待5s,看是否可以獲得鎖,如果5s后仍然無法獲得鎖則返回false繼續(xù)執(zhí)行
// lock.lockInterruptibly();可以響應(yīng)中斷事件
try {
//操作
} finally {
lock.unlock();
}
}
} catch (InterruptedException e) {
e.printStackTrace(); //當(dāng)前線程被中斷時(shí)(interrupt),會(huì)拋InterruptedException
}
ReadWriteLock讀寫鎖
private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private Lock readLock = readWriteLock.readLock();
private Lock writeLock = readWriteLock.writeLock();
public Object handleRead() throws InterruptedException {
try {
readLock.lock();
Thread.sleep(1000);
return value;
}finally{
readLock.unlock();
}
}
public Object handleRead() throws InterruptedException {
try {
writeLock.lock();
Thread.sleep(1000);
return value;
}finally{
writeLock.unlock();
}
}
Condition對(duì)象
public class ArrayBlockingQueue extends AbstractQueue
implements BlockingQueue, java.io.Serializable {
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition(); // 生成與Lock綁定的Condition
notFull = lock.newCondition();
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal(); // 通知
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) // 如果隊(duì)列為空
notEmpty.await(); // 則消費(fèi)者隊(duì)列要等待一個(gè)非空的信號(hào)
return extract();
} finally {
lock.unlock();
}
}
private E extract() {
final Object[] items = this.items;
E x = this.<E>cast(items[takeIndex]);
items[takeIndex] = null;
takeIndex = inc(takeIndex);
--count;
notFull.signal(); // 通知put() 線程隊(duì)列已有空閑空間
return x;
}
// other code
}
Semaphore信號(hào)量
public Semaphore(int permits) {}
public Semaphore(int permits, boolean fair){} // 可以指定是否公平
public void acquire() throws InterruptedException {} //嘗試獲得一個(gè)準(zhǔn)入的許可。若無法獲得,則線程會(huì)等待,知道有線程釋放一個(gè)許可或者當(dāng)前線程被中斷。
public void acquireUninterruptibly(){} // 類似于acquire(),但是不會(huì)響應(yīng)中斷。
public boolean tryAcquire(){} // 嘗試獲取,如果成功則為true,否則false。這個(gè)方法不會(huì)等待,立即返回。
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {} // 嘗試等待多長(zhǎng)時(shí)間
public void release() //用于在現(xiàn)場(chǎng)訪問資源結(jié)束后,釋放一個(gè)許可,以使其他等待許可的線程可以進(jìn)行資源訪問。
public class Pool {
private static final int MAX_AVAILABLE = 100;
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
public Object getItem() throws InterruptedException {
available.acquire();
// 申請(qǐng)一個(gè)許可
// 同時(shí)只能有100個(gè)線程進(jìn)入取得可用項(xiàng),
// 超過100個(gè)則需要等待
return getNextAvailableItem();
}
public void putItem(Object x) {
// 將給定項(xiàng)放回池內(nèi),標(biāo)記為未被使用
if (markAsUnused(x)) {
available.release();
// 新增了一個(gè)可用項(xiàng),釋放一個(gè)許可,請(qǐng)求資源的線程被激活一個(gè)
}
}
// 僅作示例參考,非真實(shí)數(shù)據(jù)
protected Object[] items = new Object[MAX_AVAILABLE]; // 用于對(duì)象池復(fù)用對(duì)象
protected boolean[] used = new boolean[MAX_AVAILABLE]; // 標(biāo)記作用
protected synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null;
}
protected synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else {
return false;
}
}
}
return false;
}
}
ThreadLocal線程局部變量
public class TestNum {
// 通過匿名內(nèi)部類覆蓋ThreadLocal的initialValue()方法,指定初始值
private static ThreadLocal seqNum = new ThreadLocal() {
public Integer initialValue() {
return 0;
}
};
// 獲取下一個(gè)序列值
public int getNextNum() {
seqNum.set(seqNum.get() + 1);
return seqNum.get();
}public static void main(String[] args) {
TestNum sn = new TestNum();
//3個(gè)線程共享sn,各自產(chǎn)生序列號(hào)
TestClient t1 = new TestClient(sn);
TestClient t2 = new TestClient(sn);
TestClient t3 = new TestClient(sn);
t1.start();
t2.start();
t3.start();
}
private static class TestClient extends Thread {
private TestNum sn;
public TestClient(TestNum sn) {
this.sn = sn;
}
public void run() {
for (int i = 0; i < 3; i++) {
// 每個(gè)線程打出3個(gè)序列值
System.out.println("thread[" + Thread.currentThread().getName() + "] --> sn["
+ sn.getNextNum() + "]");
}
}
}
}
thread[Thread-0] –> sn[1]
thread[Thread-1] –> sn[1]
thread[Thread-2] –> sn[1]
thread[Thread-1] –> sn[2]
thread[Thread-0] –> sn[2]
thread[Thread-1] –> sn[3]
thread[Thread-2] –> sn[2]
thread[Thread-0] –> sn[3]
thread[Thread-2] –> sn[3]
鎖的性能和優(yōu)化
public synchronized void syncMehod(){
beforeMethod();
mutexMethod();
afterMethod();
}
public void syncMehod(){
beforeMethod();
synchronized(this){
mutexMethod();
}
afterMethod();
}
public class LinkedBlockingQueue extends AbstractQueue
implements BlockingQueue, java.io.Serializable {
/* Lock held by take, poll, etc /
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly(); // 不能有兩個(gè)線程同時(shí)讀取數(shù)據(jù)
try {
while (count.get() == 0) { // 如果當(dāng)前沒有可用數(shù)據(jù),一直等待put()的通知
notEmpty.await();
}
x = dequeue(); // 從頭部移除一項(xiàng)
c = count.getAndDecrement(); // size減1
if (c > 1)
notEmpty.signal(); // 通知其他take()操作
} finally {
takeLock.unlock(); // 釋放鎖
}
if (c == capacity)
signalNotFull(); // 通知put()操作,已有空余空間
return x;
}
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly(); // 不能有兩個(gè)線程同時(shí)put數(shù)據(jù)
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) { // 隊(duì)列滿了 則等待
notFull.await();
}
enqueue(node); // 加入隊(duì)列
c = count.getAndIncrement();// size加1
if (c + 1 < capacity)
notFull.signal(); // 如果有足夠空間,通知其他線程
} finally {
putLock.unlock();// 釋放鎖
}
if (c == 0)
signalNotEmpty();// 插入成功后,通知take()操作讀取數(shù)據(jù)
}
// other code
}
public void syncMehod(){
synchronized(lock){
method1();
}
synchronized(lock){
method2();
}
}
public void syncMehod(){
synchronized(lock){
method1();
method2();
}
}
無鎖的并行計(jì)算
public class TestAtomic {
private static final int MAX_THREADS = 3;
private static final int TASK_COUNT = 3;
private static final int TARGET_COUNT = 100 * 10000;
private AtomicInteger acount = new AtomicInteger(0);
private int count = 0;
synchronized int inc() {
return ++count;
}
synchronized int getCount() {
return count;
}
public class SyncThread implements Runnable {
String name;
long startTime;
TestAtomic out;
public SyncThread(TestAtomic o, long startTime) {
this.out = o;
this.startTime = startTime;
}
@Override
public void run() {
int v = out.inc();
while (v < TARGET_COUNT) {
v = out.inc();
}
long endTime = System.currentTimeMillis();
System.out.println("SyncThread spend:" + (endTime - startTime) + "ms" + ", v=" + v);
}
}
public class AtomicThread implements Runnable {
String name;
long startTime;
public AtomicThread(long startTime) {
this.startTime = startTime;
}
@Override
public void run() {
int v = acount.incrementAndGet();
while (v < TARGET_COUNT) {
v = acount.incrementAndGet();
}
long endTime = System.currentTimeMillis();
System.out.println("AtomicThread spend:" + (endTime - startTime) + "ms" + ", v=" + v);
}
}
@Test
public void testSync() throws InterruptedException {
ExecutorService exe = Executors.newFixedThreadPool(MAX_THREADS);
long startTime = System.currentTimeMillis();
SyncThread sync = new SyncThread(this, startTime);
for (int i = 0; i < TASK_COUNT; i++) {
exe.submit(sync);
}
Thread.sleep(10000);
}
@Test
public void testAtomic() throws InterruptedException {
ExecutorService exe = Executors.newFixedThreadPool(MAX_THREADS);
long startTime = System.currentTimeMillis();
AtomicThread atomic = new AtomicThread(startTime);
for (int i = 0; i < TASK_COUNT; i++) {
exe.submit(atomic);
}
Thread.sleep(10000);
}
}
testSync():
SyncThread spend:201ms, v=1000002
SyncThread spend:201ms, v=1000000
SyncThread spend:201ms, v=1000001
testAtomic():
AtomicThread spend:43ms, v=1000000
AtomicThread spend:44ms, v=1000001
AtomicThread spend:46ms, v=1000002
結(jié)束語


評(píng)論
圖片
表情
