利用Zookeeper實現(xiàn) - 分布式鎖
許多場景中,數(shù)據一致性是一個比較重要的話題,在單機環(huán)境中,我們可以通過Java提供的并發(fā)API來解決;而在分布式環(huán)境(會遇到網絡故障、消息重復、消息丟失等各種問題)下要復雜得多,常見的解決方案是分布式事務、分布式鎖等。
本文主要探討如何利用Zookeeper來實現(xiàn)分布式鎖。
關于分布式鎖
分布式鎖是控制分布式系統(tǒng)之間同步訪問共享資源的一種方式。
在實現(xiàn)分布式鎖的過程中需要注意的:
鎖的可重入性(遞歸調用不應該被阻塞、避免死鎖)
鎖的超時(避免死鎖、死循環(huán)等意外情況)
鎖的阻塞(保證原子性等)
鎖的特性支持(阻塞鎖、可重入鎖、公平鎖、聯(lián)鎖、信號量、讀寫鎖)
在使用分布式鎖時需要注意:
分布式鎖的開銷(分布式鎖一般能不用就不用,有些場景可以用樂觀鎖代替)
加鎖的粒度(控制加鎖的粒度,可以優(yōu)化系統(tǒng)的性能)
加鎖的方式
以下是幾種常見的實現(xiàn)分布式鎖的方案及其優(yōu)缺點。
基于數(shù)據庫
1. 基于數(shù)據庫表
最簡單的方式可能就是直接創(chuàng)建一張鎖表,當我們要鎖住某個方法或資源時,我們就在該表中增加一條記錄,想要釋放鎖的時候就刪除這條記錄。給某字段添加唯一性約束,如果有多個請求同時提交到數(shù)據庫的話,數(shù)據庫會保證只有一個操作可以成功,那么我們就可以認為操作成功的那個線程獲得了該方法的鎖,可以執(zhí)行方法體內容。
會引入數(shù)據庫單點、無失效時間、不阻塞、不可重入等問題。
2. 基于數(shù)據庫排他鎖
如果使用的是MySql的InnoDB引擎,在查詢語句后面增加for update,數(shù)據庫會在查詢過程中(須通過唯一索引查詢)給數(shù)據庫表增加排他鎖,我們可以認為獲得排它鎖的線程即可獲得分布式鎖,通過 connection.commit() 操作來釋放鎖。
會引入數(shù)據庫單點、不可重入、無法保證一定使用行鎖(部分情況下MySQL自動使用表鎖而不是行鎖)、排他鎖長時間不提交導致占用數(shù)據庫連接等問題。
3. 數(shù)據庫實現(xiàn)分布式鎖總結
優(yōu)點:
直接借助數(shù)據庫,容易理解。
缺點:
會引入更多的問題,使整個方案變得越來越復雜
操作數(shù)據庫需要一定的開銷,有一定的性能問題
使用數(shù)據庫的行級鎖并不一定靠譜,尤其是當我們的鎖表并不大的時候
基于緩存
相比較于基于數(shù)據庫實現(xiàn)分布式鎖的方案來說,基于緩存來實現(xiàn)在性能方面會表現(xiàn)的更好一點。目前有很多成熟的緩存產品,包括Redis、memcached、tair等。
這里以Redis為例舉出幾種實現(xiàn)方法:
1. 基于 redis 的 setnx()、expire() 方法做分布式鎖
setnx 的含義就是?SET if Not Exists,其主要有兩個參數(shù)?setnx(key, value)。該方法是原子的,如果 key 不存在,則設置當前 key 成功,返回 1;如果當前 key 已經存在,則設置當前 key 失敗,返回 0。
expire 設置過期時間,要注意的是 setnx 命令不能設置 key 的超時時間,只能通過 expire() 來對 key 設置。
2. 基于 redis 的 setnx()、get()、getset()方法做分布式鎖
getset 這個命令主要有兩個參數(shù)?getset(key,newValue),該方法是原子的,對 key 設置 newValue 這個值,并且返回 key 原來的舊值。
3. 基于 Redlock 做分布式鎖
Redlock 是 Redis 的作者 antirez 給出的集群模式的 Redis 分布式鎖,它基于 N 個完全獨立的 Redis 節(jié)點(通常情況下 N 可以設置成 5)
4. 基于 redisson 做分布式鎖
redisson 是 redis 官方的分布式鎖組件
基于緩存實現(xiàn)分布式鎖總結
優(yōu)點:
性能好
缺點:
實現(xiàn)中需要考慮的因素太多
通過超時時間來控制鎖的失效時間并不是十分的靠譜
基于Zookeeper
大致思想為:每個客戶端對某個方法加鎖時,在 Zookeeper 上與該方法對應的指定節(jié)點的目錄下,生成一個唯一的臨時有序節(jié)點。判斷是否獲取鎖的方式很簡單,只需要判斷有序節(jié)點中序號最小的一個。當釋放鎖的時候,只需將這個臨時節(jié)點刪除即可。同時,其可以避免服務宕機導致的鎖無法釋放,而產生的死鎖問題
Zookeeper實現(xiàn)分布式鎖總結
優(yōu)點:
有效的解決單點問題,不可重入問題,非阻塞問題以及鎖無法釋放的問題
實現(xiàn)較為簡單
缺點:
性能上不如使用緩存實現(xiàn)的分布式鎖,因為每次在創(chuàng)建鎖和釋放鎖的過程中,都要動態(tài)創(chuàng)建、銷毀臨時節(jié)點來實現(xiàn)鎖功能
需要對Zookeeper的原理有所了解
Zookeeper 如何實現(xiàn)分布式鎖?
下面講如何實現(xiàn)排他鎖和共享鎖,以及如何解決羊群效應。
排他鎖
排他鎖,又稱寫鎖或獨占鎖。如果事務T1對數(shù)據對象O1加上了排他鎖,那么在整個加鎖期間,只允許事務T1對O1進行讀取或更新操作,其他任務事務都不能對這個數(shù)據對象進行任何操作,直到T1釋放了排他鎖。
排他鎖核心是保證當前有且僅有一個事務獲得鎖,并且鎖釋放之后,所有正在等待獲取鎖的事務都能夠被通知到。
Zookeeper 的強一致性特性,能夠很好地保證在分布式高并發(fā)情況下節(jié)點的創(chuàng)建一定能夠保證全局唯一性,即Zookeeper將會保證客戶端無法重復創(chuàng)建一個已經存在的數(shù)據節(jié)點。可以利用Zookeeper這個特性,實現(xiàn)排他鎖。
定義鎖:通過Zookeeper上的數(shù)據節(jié)點來表示一個鎖
獲取鎖:客戶端通過調用?
create?方法創(chuàng)建表示鎖的臨時節(jié)點,可以認為創(chuàng)建成功的客戶端獲得了鎖,同時可以讓沒有獲得鎖的節(jié)點在該節(jié)點上注冊Watcher監(jiān)聽,以便實時監(jiān)聽到lock節(jié)點的變更情況釋放鎖:以下兩種情況都可以讓鎖釋放
當前獲得鎖的客戶端發(fā)生宕機或異常,那么Zookeeper上這個臨時節(jié)點就會被刪除
正常執(zhí)行完業(yè)務邏輯,客戶端主動刪除自己創(chuàng)建的臨時節(jié)點
基于Zookeeper實現(xiàn)排他鎖流程:

基于Zookeeper實現(xiàn)排他鎖流程
共享鎖
共享鎖,又稱讀鎖。如果事務T1對數(shù)據對象O1加上了共享鎖,那么當前事務只能對O1進行讀取操作,其他事務也只能對這個數(shù)據對象加共享鎖,直到該數(shù)據對象上的所有共享鎖都被釋放。
共享鎖與排他鎖的區(qū)別在于,加了排他鎖之后,數(shù)據對象只對當前事務可見,而加了共享鎖之后,數(shù)據對象對所有事務都可見。
定義鎖:通過Zookeeper上的數(shù)據節(jié)點來表示一個鎖,是一個類似于?
/lockpath/[hostname]-請求類型-序號的臨時順序節(jié)點獲取鎖:客戶端通過調用?
create?方法創(chuàng)建表示鎖的臨時順序節(jié)點,如果是讀請求,則創(chuàng)建?/lockpath/[hostname]-R-序號?節(jié)點,如果是寫請求則創(chuàng)建?/lockpath/[hostname]-W-序號?節(jié)點判斷讀寫順序:大概分為4個步驟
1)創(chuàng)建完節(jié)點后,獲取?
/lockpath?節(jié)點下的所有子節(jié)點,并對該節(jié)點注冊子節(jié)點變更的Watcher監(jiān)聽2)確定自己的節(jié)點序號在所有子節(jié)點中的順序
3.1)對于讀請求:1. 如果沒有比自己序號更小的子節(jié)點,或者比自己序號小的子節(jié)點都是讀請求,那么表明自己已經成功獲取到了共享鎖,同時開始執(zhí)行讀取邏輯 2. 如果有比自己序號小的子節(jié)點有寫請求,那么等待 3.
3.2)對于寫請求,如果自己不是序號最小的節(jié)點,那么等待
4)接收到Watcher通知后,重復步驟1)
釋放鎖:與排他鎖邏輯一致

Zookeeper實現(xiàn)共享鎖節(jié)點樹
基于Zookeeper實現(xiàn)共享鎖流程:

基于Zookeeper實現(xiàn)共享鎖流程
羊群效應
在實現(xiàn)共享鎖的 “判斷讀寫順序” 的第1個步驟是:創(chuàng)建完節(jié)點后,獲取?/lockpath?節(jié)點下的所有子節(jié)點,并對該節(jié)點注冊子節(jié)點變更的Watcher監(jiān)聽。這樣的話,任何一次客戶端移除共享鎖之后,Zookeeper將會發(fā)送子節(jié)點變更的Watcher通知給所有機器,系統(tǒng)中將有大量的 “Watcher通知” 和 “子節(jié)點列表獲取” 這個操作重復執(zhí)行,然后所有節(jié)點再判斷自己是否是序號最小的節(jié)點(寫請求)或者判斷比自己序號小的子節(jié)點是否都是讀請求(讀請求),從而繼續(xù)等待下一次通知。
然而,這些重復操作很多都是 “無用的”,實際上每個鎖競爭者只需要關注序號比自己小的那個節(jié)點是否存在即可
當集群規(guī)模比較大時,這些 “無用的” 操作不僅會對Zookeeper造成巨大的性能影響和網絡沖擊,更為嚴重的是,如果同一時間有多個客戶端釋放了共享鎖,Zookeeper服務器就會在短時間內向其余客戶端發(fā)送大量的事件通知–這就是所謂的 “羊群效應“。
改進后的分布式鎖實現(xiàn):
具體實現(xiàn)如下:
客戶端調用?
create?方法創(chuàng)建一個類似于?/lockpath/[hostname]-請求類型-序號?的臨時順序節(jié)點客戶端調用?
getChildren?方法獲取所有已經創(chuàng)建的子節(jié)點列表(這里不注冊任何Watcher)讀請求:向比自己序號小的最后一個寫請求節(jié)點注冊Watcher監(jiān)聽
寫請求:向比自己序號小的最后一個節(jié)點注冊Watcher監(jiān)聽
如果無法獲取任何共享鎖,那么調用?
exist?來對比自己小的那個節(jié)點注冊Watcher等待Watcher監(jiān)聽,繼續(xù)進入步驟2
Zookeeper羊群效應改進前后Watcher監(jiān)聽圖

Zookeeper羊群效應改進前后
基于Curator客戶端實現(xiàn)分布式鎖
Apache Curator是一個Zookeeper的開源客戶端,它提供了Zookeeper各種應用場景(Recipe,如共享鎖服務、master選舉、分布式計數(shù)器等)的抽象封裝,接下來將利用Curator提供的類來實現(xiàn)分布式鎖。
Curator提供的跟分布式鎖相關的類有5個,分別是:
Shared Reentrant Lock 可重入鎖
Shared Lock 共享不可重入鎖
Shared Reentrant Read Write Lock 可重入讀寫鎖
Shared Semaphore 信號量
Multi Shared Lock 多鎖
關于錯誤處理:還是強烈推薦使用ConnectionStateListener處理連接狀態(tài)的改變。當連接LOST時你不再擁有鎖。
可重入鎖
Shared Reentrant Lock,全局可重入鎖,所有客戶端都可以請求,同一個客戶端在擁有鎖的同時,可以多次獲取,不會被阻塞。它是由類?InterProcessMutex?來實現(xiàn),它的主要方法:
// 構造方法
public InterProcessMutex(CuratorFramework client, String path)
public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver)
// 通過acquire獲得鎖,并提供超時機制:
public void acquire() throws Exception
public boolean acquire(long time, TimeUnit unit) throws Exception
// 撤銷鎖
public void makeRevocable(RevocationListener listener)
public void makeRevocable(final RevocationListener listener, Executor executor)
定義一個 FakeLimitedResource 類來模擬一個共享資源,該資源一次只能被一個線程使用,直到使用結束,下一個線程才能使用,否則會拋出異常
public class FakeLimitedResource {
private final AtomicBoolean inUse = new AtomicBoolean(false);
// 模擬只能單線程操作的資源
public void use() throws InterruptedException {
if (!inUse.compareAndSet(false, true)) {
// 在正確使用鎖的情況下,此異常不可能拋出
throw new IllegalStateException("Needs to be used by one client at a time");
}
try {
Thread.sleep((long) (100 * Math.random()));
} finally {
inUse.set(false);
}
}
}
下面的代碼將創(chuàng)建 N 個線程來模擬分布式系統(tǒng)中的節(jié)點,系統(tǒng)將通過 InterProcessMutex 來控制對資源的同步使用;每個節(jié)點都將發(fā)起10次請求,完成?請求鎖--訪問資源--再次請求鎖--釋放鎖--釋放鎖?的過程;客戶端通過?acquire?請求鎖,通過?release?釋放鎖,獲得幾把鎖就要釋放幾把鎖;這個共享資源一次只能被一個線程使用,如果控制同步失敗,將拋異常。
public class SharedReentrantLockTest {
private static final String lockPath = "/testZK/sharedreentrantlock";
private static final Integer clientNums = 5;
final static FakeLimitedResource resource = new FakeLimitedResource(); // 共享的資源
private static CountDownLatch countDownLatch = new CountDownLatch(clientNums);
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < clientNums; i++) {
String clientName = "client#" + i;
new Thread(new Runnable() {
@Override
public void run() {
CuratorFramework client = ZKUtils.getClient();
client.start();
Random random = new Random();
try {
final InterProcessMutex lock = new InterProcessMutex(client, lockPath);
// 每個客戶端請求10次共享資源
for (int j = 0; j < 10; j++) {
if (!lock.acquire(10, TimeUnit.SECONDS)) {
throw new IllegalStateException(j + ". " + clientName + " 不能得到互斥鎖");
}
try {
System.out.println(j + ". " + clientName + " 已獲取到互斥鎖");
resource.use(); // 使用資源
if (!lock.acquire(10, TimeUnit.SECONDS)) {
throw new IllegalStateException(j + ". " + clientName + " 不能再次得到互斥鎖");
}
System.out.println(j + ". " + clientName + " 已再次獲取到互斥鎖");
lock.release(); // 申請幾次鎖就要釋放幾次鎖
} finally {
System.out.println(j + ". " + clientName + " 釋放互斥鎖");
lock.release(); // 總是在finally中釋放
}
Thread.sleep(random.nextInt(100));
}
} catch (Throwable e) {
System.out.println(e.getMessage());
} finally {
CloseableUtils.closeQuietly(client);
System.out.println(clientName + " 客戶端關閉!");
countDownLatch.countDown();
}
}
}).start();
}
countDownLatch.await();
System.out.println("結束!");
}
}
控制臺打印日志,可以看到對資源的同步訪問控制成功,并且鎖是可重入的
0. client#3 已獲取到互斥鎖
0. client#3 已再次獲取到互斥鎖
0. client#3 釋放互斥鎖
0. client#1 已獲取到互斥鎖
0. client#1 已再次獲取到互斥鎖
0. client#1 釋放互斥鎖
0. client#2 已獲取到互斥鎖
0. client#2 已再次獲取到互斥鎖
0. client#2 釋放互斥鎖
0. client#0 已獲取到互斥鎖
0. client#0 已再次獲取到互斥鎖
0. client#0 釋放互斥鎖
0. client#4 已獲取到互斥鎖
0. client#4 已再次獲取到互斥鎖
0. client#4 釋放互斥鎖
1. client#1 已獲取到互斥鎖
1. client#1 已再次獲取到互斥鎖
1. client#1 釋放互斥鎖
2. client#1 已獲取到互斥鎖
2. client#1 已再次獲取到互斥鎖
2. client#1 釋放互斥鎖
1. client#4 已獲取到互斥鎖
1. client#4 已再次獲取到互斥鎖
1. client#4 釋放互斥鎖
1. client#3 已獲取到互斥鎖
1. client#3 已再次獲取到互斥鎖
1. client#3 釋放互斥鎖
1. client#2 已獲取到互斥鎖
1. client#2 已再次獲取到互斥鎖
1. client#2 釋放互斥鎖
2. client#4 已獲取到互斥鎖
2. client#4 已再次獲取到互斥鎖
2. client#4 釋放互斥鎖
....
....
client#2 客戶端關閉!
9. client#0 已獲取到互斥鎖
9. client#0 已再次獲取到互斥鎖
9. client#0 釋放互斥鎖
9. client#3 已獲取到互斥鎖
9. client#3 已再次獲取到互斥鎖
9. client#3 釋放互斥鎖
client#0 客戶端關閉!
8. client#4 已獲取到互斥鎖
8. client#4 已再次獲取到互斥鎖
8. client#4 釋放互斥鎖
9. client#4 已獲取到互斥鎖
9. client#4 已再次獲取到互斥鎖
9. client#4 釋放互斥鎖
client#3 客戶端關閉!
client#4 客戶端關閉!
結束!
同時在程序運行期間查看Zookeeper節(jié)點樹,可以發(fā)現(xiàn)每一次請求的鎖實際上對應一個臨時順序節(jié)點
[zk: localhost:2181(CONNECTED) 42] ls /testZK/sharedreentrantlock
[leases, _c_208d461b-716d-43ea-ac94-1d2be1206db3-lock-0000001659, locks, _c_64b19dba-3efa-46a6-9344-19a52e9e424f-lock-0000001658, _c_cee02916-d7d5-4186-8867-f921210b8815-lock-0000001657]
不可重入鎖
Shared Lock 與 Shared Reentrant Lock 相似,但是不可重入。這個不可重入鎖由類 InterProcessSemaphoreMutex 來實現(xiàn),使用方法和上面的類類似。
將上面程序中的 InterProcessMutex 換成不可重入鎖 InterProcessSemaphoreMutex,如果再運行上面的代碼,結果就會發(fā)現(xiàn)線程被阻塞在第二個?acquire?上,直到超時,也就是此鎖不是可重入的。
控制臺輸出日志
0. client#2 已獲取到互斥鎖
0. client#1 不能得到互斥鎖
0. client#4 不能得到互斥鎖
0. client#0 不能得到互斥鎖
0. client#3 不能得到互斥鎖
client#1 客戶端關閉!
client#4 客戶端關閉!
client#3 客戶端關閉!
client#0 客戶端關閉!
0. client#2 釋放互斥鎖
0. client#2 不能再次得到互斥鎖
client#2 客戶端關閉!
結束!
把第二個獲取鎖的代碼注釋,程序才能正常執(zhí)行
0. client#1 已獲取到互斥鎖
0. client#1 釋放互斥鎖
0. client#2 已獲取到互斥鎖
0. client#2 釋放互斥鎖
0. client#0 已獲取到互斥鎖
0. client#0 釋放互斥鎖
0. client#4 已獲取到互斥鎖
0. client#4 釋放互斥鎖
0. client#3 已獲取到互斥鎖
0. client#3 釋放互斥鎖
1. client#1 已獲取到互斥鎖
1. client#1 釋放互斥鎖
1. client#2 已獲取到互斥鎖
1. client#2 釋放互斥鎖
....
....
9. client#4 已獲取到互斥鎖
9. client#4 釋放互斥鎖
9. client#0 已獲取到互斥鎖
client#2 客戶端關閉!
9. client#0 釋放互斥鎖
9. client#1 已獲取到互斥鎖
client#0 客戶端關閉!
client#4 客戶端關閉!
9. client#1 釋放互斥鎖
9. client#3 已獲取到互斥鎖
client#1 客戶端關閉!
9. client#3 釋放互斥鎖
client#3 客戶端關閉!
結束!
可重入讀寫鎖
Shared Reentrant Read Write Lock,可重入讀寫鎖,一個讀寫鎖管理一對相關的鎖,一個負責讀操作,另外一個負責寫操作;讀操作在寫鎖沒被使用時可同時由多個進程使用,而寫鎖在使用時不允許讀(阻塞);此鎖是可重入的;一個擁有寫鎖的線程可重入讀鎖,但是讀鎖卻不能進入寫鎖,這也意味著寫鎖可以降級成讀鎖, 比如?請求寫鎖 --->讀鎖 ---->釋放寫鎖;從讀鎖升級成寫鎖是不行的。
可重入讀寫鎖主要由兩個類實現(xiàn):InterProcessReadWriteLock、InterProcessMutex,使用時首先創(chuàng)建一個 InterProcessReadWriteLock 實例,然后再根據你的需求得到讀鎖或者寫鎖,讀寫鎖的類型是 InterProcessMutex。
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < clientNums; i++) {
final String clientName = "client#" + i;
new Thread(new Runnable() {
@Override
public void run() {
CuratorFramework client = ZKUtils.getClient();
client.start();
final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, lockPath);
final InterProcessMutex readLock = lock.readLock();
final InterProcessMutex writeLock = lock.writeLock();
try {
// 注意只能先得到寫鎖再得到讀鎖,不能反過來!!!
if (!writeLock.acquire(10, TimeUnit.SECONDS)) {
throw new IllegalStateException(clientName + " 不能得到寫鎖");
}
System.out.println(clientName + " 已得到寫鎖");
if (!readLock.acquire(10, TimeUnit.SECONDS)) {
throw new IllegalStateException(clientName + " 不能得到讀鎖");
}
System.out.println(clientName + " 已得到讀鎖");
try {
resource.use(); // 使用資源
} finally {
System.out.println(clientName + " 釋放讀寫鎖");
readLock.release();
writeLock.release();
}
} catch (Exception e) {
System.out.println(e.getMessage());
} finally {
CloseableUtils.closeQuietly(client);
countDownLatch.countDown();
}
}
}).start();
}
countDownLatch.await();
System.out.println("結束!");
}
}
控制臺打印日志
client#1 已得到寫鎖
client#1 已得到讀鎖
client#1 釋放讀寫鎖
client#2 已得到寫鎖
client#2 已得到讀鎖
client#2 釋放讀寫鎖
client#0 已得到寫鎖
client#0 已得到讀鎖
client#0 釋放讀寫鎖
client#4 已得到寫鎖
client#4 已得到讀鎖
client#4 釋放讀寫鎖
client#3 已得到寫鎖
client#3 已得到讀鎖
client#3 釋放讀寫鎖
結束!
信號量
Shared Semaphore,一個計數(shù)的信號量類似JDK的 Semaphore,JDK中 Semaphore 維護的一組許可(permits),而Cubator中稱之為租約(Lease)。有兩種方式可以決定 semaphore 的最大租約數(shù),第一種方式是由用戶給定的 path 決定,第二種方式使用 SharedCountReader 類。如果不使用 SharedCountReader,沒有內部代碼檢查進程是否假定有10個租約而進程B假定有20個租約。所以所有的實例必須使用相同的 numberOfLeases 值.
信號量主要實現(xiàn)類有:
InterProcessSemaphoreV2 - 信號量實現(xiàn)類
Lease - 租約(單個信號)
SharedCountReader - 計數(shù)器,用于計算最大租約數(shù)量
調用?acquire?會返回一個租約對象,客戶端必須在 finally 中 close 這些租約對象,否則這些租約會丟失掉。但是,如果客戶端session由于某種原因比如crash丟掉,那么這些客戶端持有的租約會自動close,這樣其它客戶端可以繼續(xù)使用這些租約。租約還可以通過下面的方式返還:
public void returnLease(Lease lease)
public void returnAll(Collection leases)
注意一次你可以請求多個租約,如果 Semaphore 當前的租約不夠,則請求線程會被阻塞。同時還提供了超時的重載方法。
public Lease acquire() throws Exception
public Collection acquire(int qty) throws Exception
public Lease acquire(long time, TimeUnit unit) throws Exception
public Collection acquire(int qty, long time, TimeUnit unit) throws Exception
一個Demo程序如下
public class SharedSemaphoreTest {
private static final int MAX_LEASE = 10;
private static final String PATH = "/testZK/semaphore";
private static final FakeLimitedResource resource = new FakeLimitedResource();
public static void main(String[] args) throws Exception {
CuratorFramework client = ZKUtils.getClient();
client.start();
InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);
Collection leases = semaphore.acquire(5);
System.out.println("獲取租約數(shù)量:" + leases.size());
Lease lease = semaphore.acquire();
System.out.println("獲取單個租約");
resource.use(); // 使用資源
// 再次申請獲取5個leases,此時leases數(shù)量只剩4個,不夠,將超時
Collection leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
System.out.println("獲取租約,如果超時將為null:" + leases2);
System.out.println("釋放租約");
semaphore.returnLease(lease);
// 再次申請獲取5個,這次剛好夠
leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
System.out.println("獲取租約,如果超時將為null:" + leases2);
System.out.println("釋放集合中的所有租約");
semaphore.returnAll(leases);
semaphore.returnAll(leases2);
client.close();
System.out.println("結束!");
}
}
控制臺打印日志
獲取租約數(shù)量:5
獲取單個租約
獲取租約,如果超時將為null:null
釋放租約
獲取租約,如果超時將為null:[org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2$3@3108bc, org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2$3@370736d9, org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2$3@5f9d02cb, org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2$3@63753b6d, org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2$3@6b09bb57]
釋放集合中的所有租約
結束!
注意:上面所講的4種鎖都是公平鎖(fair)。從ZooKeeper的角度看,每個客戶端都按照請求的順序獲得鎖,相當公平。
多鎖
Multi Shared Lock 是一個鎖的容器。當調用?acquire,所有的鎖都會被?acquire,如果請求失敗,所有的鎖都會被?release。同樣調用?release?時所有的鎖都被?release(失敗被忽略)。基本上,它就是組鎖的代表,在它上面的請求釋放操作都會傳遞給它包含的所有的鎖。
主要涉及兩個類:
InterProcessMultiLock - 對所對象實現(xiàn)類
InterProcessLock - 分布式鎖接口類
它的構造函數(shù)需要包含的鎖的集合,或者一組 ZooKeeper 的 path,用法和 Shared Lock 相同
public InterProcessMultiLock(CuratorFramework client, List paths)
public InterProcessMultiLock(List locks)
一個Demo程序如下
public class MultiSharedLockTest {
private static final String lockPath1 = "/testZK/MSLock1";
private static final String lockPath2 = "/testZK/MSLock2";
private static final FakeLimitedResource resource = new FakeLimitedResource();
public static void main(String[] args) throws Exception {
CuratorFramework client = ZKUtils.getClient();
client.start();
InterProcessLock lock1 = new InterProcessMutex(client, lockPath1); // 可重入鎖
InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, lockPath2); // 不可重入鎖
// 組鎖,多鎖
InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));
if (!lock.acquire(10, TimeUnit.SECONDS)) {
throw new IllegalStateException("不能獲取多鎖");
}
System.out.println("已獲取多鎖");
System.out.println("是否有第一個鎖: " + lock1.isAcquiredInThisProcess());
System.out.println("是否有第二個鎖: " + lock2.isAcquiredInThisProcess());
try {
resource.use(); // 資源操作
} finally {
System.out.println("釋放多個鎖");
lock.release(); // 釋放多鎖
}
System.out.println("是否有第一個鎖: " + lock1.isAcquiredInThisProcess());
System.out.println("是否有第二個鎖: " + lock2.isAcquiredInThisProcess());
client.close();
System.out.println("結束!");
}
}推薦閱讀
分享我常用的5個免費的在線 SQL 數(shù)據庫環(huán)境,簡直太方便了!
Spring Boot 三招組合拳,手把手教你打出優(yōu)雅的后端接口
MySQL 5.7 vs 8.0,你選那個?網友:我繼續(xù)原地踏步~
最后,推薦給大家一個有趣有料的公眾號:寫代碼的渣渣鵬,7年老程序員教你寫bug,回復”面試“有驚喜哦
掃碼關注

