Java ReentrantLock 源碼解析與簡單使用
點擊上方藍色字體,選擇“標星公眾號”
優(yōu)質(zhì)文章,第一時間送達
ReentrantLock是juc中的一個可重入鎖獨占鎖,該鎖分為公平鎖和非公平鎖。獨占鎖是這個鎖一旦被一個線程占據(jù)其他線程就不能搶占,可重入鎖是如果這個線程重復(fù)獲取這個鎖不需要進行等待,直接進入。公平鎖和非公平鎖是剛進入的線程是否可以和在隊列中等待的第一個線程進行競爭,如果是非公平鎖則剛進入的線程在當前鎖空閑的時候可以和等待隊列中的第一個線程進行競爭,如果是公平鎖則不能,剛進入的線程必須進入隊列等待。
一、java.util.concurrent.locks.Lock接口
public class ReentrantLock implements Lock, java.io.Serializable {
.....
}
從源碼中我們可以看到ReentrantLock 實現(xiàn)了Lock接口。Lock接口是juc中定義的一個鎖的接口。
public interface Lock {
// 獲取鎖
void lock();
// 獲取鎖的過程能響應(yīng)中斷
void lockInterruptibly() throws InterruptedException;
// 非阻塞式響應(yīng)中斷能立即返回,獲取鎖返回true反之返回false
boolean tryLock();
// 超時獲取鎖,在超時內(nèi)或者未中斷的情況下能獲取鎖
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
// 獲取與lock綁定的等待通知組件,當前線程必須獲得了鎖才能進行等待,進行等待時會先釋放鎖,當再次獲取鎖時才能從等待中返回
Condition newCondition();
}
進行加鎖主要有l(wèi)ock()、tryLock()、lockInterruptibly()這三個方法。tryLock()方法只是嘗試進行獲取當前鎖,獲取到了則返回true,獲取不到返回false,不會直接讓線程加入到等待隊列中進行阻塞等待。lock()方法則會直接讓線程阻塞等待,使用unlock才能喚醒線程。lockInterruptibly()和lock()方法主要的區(qū)別在于對異常的處理。
二、公平鎖介紹
從源碼中我們可以看出ReentrantLock使用了適配器設(shè)計模式,ReentrantLock繼承了lock方法,同時擁有Sync類,Lock的方法都是大部分調(diào)用Sync類實現(xiàn)的,Sync類繼承了AbstractQueuedSynchronizer類,該類是隊列同步器會將等待的線程加入到隊列的末端等待喚醒。通過公平鎖的例子,講解一下lock幾個主要的方法。
// ReentrantLock的lock方法調(diào)用了sync的lock方法
public void lock() {
sync.lock();
}
// 公平鎖類
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
// ReentrantLock公平鎖模式下lock調(diào)用的方法
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
// 獲取當前調(diào)用獲取鎖方法的線程
final Thread current = Thread.currentThread();
// 獲取當前鎖的狀態(tài),如果c=0表示還沒有被解鎖,可以進行競爭獲取鎖
int c = getState();
// 表明重入鎖沒有被搶占
if (c == 0) {
// 等待對象中沒有其他需要處理的 就cas將鎖的狀態(tài)設(shè)置為acquires。公平鎖與非公平鎖主要的區(qū)別在于是否是要要讓隊列等待的先進行執(zhí)行。
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// 把獨占線程設(shè)置為自己
setExclusiveOwnerThread(current);
return true;
}
}
// 走到這里表明鎖被鎖住了
// 如果鎖的獨占線程是自己,則直接重入
else if (current == getExclusiveOwnerThread()) {
// 記錄鎖被重入的次數(shù),在釋放鎖的時候判斷是否滿足釋放條件
int nextc = c + acquires;
// 出現(xiàn)異常
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 因為當前鎖被獨占了所有其他線程是無法獲取這個鎖的,state的狀態(tài)只能被自己修改,所以不需要處理并發(fā)的情況。
setState(nextc);
// 獲取鎖成功
return true;
}
// 獲取鎖失敗
return false;
}
}
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
// 同步狀態(tài)不允許獲取
// 如果不在隊列,當前線程放入隊列
// 可能會阻塞當前前程
// 如果當前線程在隊列,將其移出隊列
public final void acquire(int arg) {
// 先嘗試tryAcquire方法獲取鎖,如果失敗了不進行阻塞直接返回,調(diào)用acquireQueued將當前線程封裝成一個節(jié)點并進行阻塞,加入到等待隊列中。
// Node.EXCLUSIVE
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
// 將當前的線程封裝成一個Node,在ReentrantLock鎖是獨占模式,所以是Node.EXCLUSIVE。
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 獲取等待隊列中的最后一個節(jié)點
Node pred = tail;
if (pred != null) {
// 走到這里表明tail節(jié)點存在
// 將當前node的prev節(jié)點設(shè)置為tail節(jié)點
node.prev = pred;
// cas將tail節(jié)點設(shè)置為新的節(jié)點,如果設(shè)置成功表明tail節(jié)點沒有被修改過,如果失敗表明其他線程提前修改的tail節(jié)點,出現(xiàn)了沖突。
if (compareAndSetTail(pred, node)) {
// 如果沒有出現(xiàn)沖突,表明當前節(jié)點添加節(jié)點到末尾成功
pred.next = node;
return node;
}
}
// tail節(jié)點被其他線程修改時走到這里
enq(node);
return node;
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
// 當tail節(jié)點不存在時或者tail節(jié)點cas替換失敗時調(diào)用這個方法
private Node enq(final Node node) {
for (;;) {
// 獲取隊列的末尾節(jié)點
Node t = tail;
// 如果tail節(jié)點不存在表明這個隊列還沒有進行初始化或者已經(jīng)被清空了
if (t == null) { // Must initialize
// cas初始化了一個隊列,防止多個線程進行初始化出現(xiàn)沖突
if (compareAndSetHead(new Node()))
// 初始化隊列
tail = head;
} else {
// 如果隊列存在則不斷調(diào)用這個方法進行cas替換,知道成功插入到隊列末尾。
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
// cas搶占鎖的時候失敗,將線程插入到等待隊列中
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 獲取當前節(jié)點的前一個節(jié)點
final Node p = node.predecessor();
// 如果前一個節(jié)點就是head,表明當前線程可以嘗試競爭鎖
if (p == head && tryAcquire(arg)) {
// 走到這里表明當前node競爭到了鎖
// 將當前節(jié)點設(shè)置為head
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果不是head,或者競爭鎖失敗了,調(diào)用shouldParkAfterFailedAcquire讓前一個節(jié)點完事兒了把自己喚醒,同時進入等待
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 獲取前一個node的waitStatus
int ws = pred.waitStatus;
// 如果已經(jīng)是SIGNAL表明前一個節(jié)點一直知道要通知自己了,可以放心進行睡眠等待了
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// 走到這里表明前面一個節(jié)點不為SIGNAL
// 下面的方法是尋找一個非cancel節(jié)點,把它設(shè)置為signal用來喚醒自己
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 把前一個節(jié)點設(shè)置為signal用來喚醒自己
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
// 阻塞當前線程
private final boolean parkAndCheckInterrupt() {
// 調(diào)用LockSupprt方法將當前線程進行阻塞
LockSupport.park(this);
return Thread.interrupted();
}
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
// 更新同步狀態(tài)
// 同步狀態(tài)是否允許被阻塞的線程獲取
public final boolean release(int arg) {
// 1. 釋放鎖
if (tryRelease(arg)) {
// 2. 如果獨占鎖釋放"完全",喚醒后繼節(jié)點
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果重入次數(shù)為零表示可以進行釋放
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
// 獲取最后一個沒有被cancel的節(jié)點
s = t;
}
if (s != null)
// 調(diào)用LockSupport.unpark釋放下一個被阻塞且沒有被取消的線程來競爭鎖
LockSupport.unpark(s.thread);
}
簡單使用
public class Main {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Thread thread1 = new Thread(()->{
System.out.println("thread1 wait lock");
lock.lock();
System.out.println("thread1 get lock");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.unlock();
System.out.println("thread1 unlock");
});
Thread thread2 = new Thread(()->{
System.out.println("thread2 wait lock");
lock.lock();
System.out.println("thread2 get lock");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.unlock();
System.out.println("thread2 unlock");
});
thread1.start();
thread2.start();
}
}

————————————————
版權(quán)聲明:本文為CSDN博主「zhanghl111」的原創(chuàng)文章,遵循CC 4.0 BY-SA版權(quán)協(xié)議,轉(zhuǎn)載請附上原文出處鏈接及本聲明。
原文鏈接:
https://blog.csdn.net/zhlily1/article/details/115794503
粉絲福利:Java從入門到入土學(xué)習(xí)路線圖
??????

??長按上方微信二維碼 2 秒
感謝點贊支持下哈 
