ZooKeeper 分布式鎖 Curator 源碼 04:分布式信號(hào)量和互斥鎖

前言
分布式信號(hào)量,之前在 Redisson 中也介紹過,Redisson 的信號(hào)量是將計(jì)數(shù)維護(hù)在 Redis 中的,那現(xiàn)在來看一下 Curator 是如何基于 ZooKeeper 實(shí)現(xiàn)信號(hào)量的。
1
使用 Demo
public class CuratorDemo {
public static void main(String[] args) throws Exception {
String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString(connectString)
.retryPolicy(retryPolicy)
.build();
client.start();
InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/semaphores/semaphore_01", 3);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread() + " 線程 start - " + LocalTime.now());
Lease lease = semaphore.acquire();
System.out.println(Thread.currentThread() + " 線程 execute - " + LocalTime.now());
Thread.sleep(3000);
System.out.println(Thread.currentThread() + " 線程 over -" + LocalTime.now());
semaphore.returnLease(lease);
} catch (Exception e) {
}
}).start();
}
Thread.sleep(1000000);
}
}
控制臺(tái)輸出數(shù)據(jù)如下:

2
源碼
獲取憑證
核心源碼:InterProcessSemaphoreV2#internalAcquire1Lease
這里僅介紹大概邏輯,有興趣的小伙伴可以自行閱讀源碼。

lock 是 InterProcessMutex,InterProcessSemaphoreV2 信號(hào)量,也是借助于最基礎(chǔ)的加鎖。

通過圖也可以看出,使用 InterProcessSemaphoreV2 時(shí),會(huì)先創(chuàng)建 /semaphores/semaphore_01 路徑,并在路徑下創(chuàng)建 locks 節(jié)點(diǎn)。也就是 /semaphores/semaphore_01/locks 路徑下,有 10 個(gè)臨時(shí)順序節(jié)點(diǎn)。

緊接著會(huì)在 /semaphores/semaphore_01 路徑下創(chuàng)建 leases 節(jié)點(diǎn),所以創(chuàng)建鎖的臨時(shí)順序節(jié)點(diǎn)之后,會(huì)緊接著在 /semaphores/semaphore_01/leases 下創(chuàng)建臨時(shí)順序節(jié)點(diǎn)。

對(duì) /semaphores/semaphore_01/leases 節(jié)點(diǎn)進(jìn)行監(jiān)聽,同時(shí)獲取 /semaphores/semaphore_01/leases 下面的子節(jié)點(diǎn)數(shù)量。
如果子節(jié)點(diǎn)數(shù)量小于等于信號(hào)量計(jì)數(shù),則直接結(jié)束循環(huán); 如果大于,則會(huì)進(jìn)入 wait 等待喚醒。
釋放憑證

釋放憑證就是調(diào)用 Lease 的 close 方法,刪除節(jié)點(diǎn),這樣 /semaphores/semaphore_01/leases 上的監(jiān)聽器就會(huì)觸發(fā),然后其他線程獲取憑證。
互斥鎖
互斥鎖 InterProcessSemaphoreMutex,不支持重入,其他的和可重入鎖并沒有什么區(qū)別。就是基于 InterProcessSemaphoreV2 實(shí)現(xiàn)的。

就是把計(jì)數(shù)的值 maxLeases 設(shè)置為了 1。
3
總結(jié)
信號(hào)量 InterProcessSemaphoreV2 其實(shí)是通過判斷節(jié)點(diǎn)下的子節(jié)點(diǎn)數(shù)量來實(shí)現(xiàn)控制信號(hào)量,同時(shí)內(nèi)部加鎖是基于可重入鎖 InterProcessMutex 實(shí)現(xiàn)的。
互斥鎖 InterProcessSemaphoreMutex 則是將信號(hào)量的技術(shù)設(shè)置為 1 來實(shí)現(xiàn)互斥功能。
- <End /> -
歷史文章 | 相關(guān)推薦

