分布式鎖(三):基于Redisson的分布式鎖實踐
Redisson是基于Redis的Java駐內(nèi)存數(shù)據(jù)網(wǎng)格(In-Memory Data Grid),底層使用Netty進行實現(xiàn)。其提供了相應(yīng)的分布式鎖實現(xiàn)

RedissonLock 分布式非公平可重入互斥鎖
RedissonLock是一個分布式非公平可重入互斥鎖,其在獲取鎖的過程中,支持lock阻塞式、tryLock非阻塞式兩種形式。其中,這兩個方法還有多個重載版本,以支持設(shè)置鎖的最大持有時間、設(shè)置獲取鎖的最大等待時間。具體方法如下所示
#?阻塞式獲取鎖
void?lock();
#?阻塞式獲取鎖
#?支持通過leaseTime參數(shù)設(shè)置鎖的最大持有時間
void?lock(long?leaseTime,?TimeUnit?unit);
#?非阻塞式獲取鎖
boolean?tryLock();
#?非阻塞式獲取鎖
#?支持通過time參數(shù)設(shè)置獲取鎖的最大等待時間
boolean?tryLock(long?time,?TimeUnit?unit);
#?非阻塞式獲取鎖
#?支持通過waitTime參數(shù)設(shè)置獲取鎖的最大等待時間
#?支持通過leaseTime參數(shù)設(shè)置鎖的最大持有時間
boolean?tryLock(long?waitTime,?long?leaseTime,?TimeUnit?unit);
可重入性
現(xiàn)在我們驗證下RedissonLock是一個互斥鎖并且支持可重入。示例代碼如下所示
/**
?*?RedissonLock?Demo?:?分布式非公平可重入互斥鎖
?*?@author?Aaron?Zhu
?*?@date?2022-04-04
?*/
public?class?RedissonLockDemo?{
????private?static?DateTimeFormatter?formatter?=?DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
????private?static?ExecutorService?threadPool?=?Executors.newFixedThreadPool(10);
????private?static?RedissonClient?redissonClient;
????@BeforeClass
????public?static?void?init()?{
????????Config?config?=?new?Config();
????????config.useSingleServer()
????????????.setAddress("redis://127.0.0.1:6379")
????????????.setPassword("123456");
????????redissonClient?=?Redisson.create(?config?);
????}
????/**
?????*?測試:?阻塞式獲取鎖、可重入性
?????*/
????@Test
????public?void?testLock1()?{
????????final?String?lockName?=?"sellPc";
????????Runnable?task?=?()?->?{
????????????//?設(shè)置分布式鎖
????????????RLock?lock?=?redissonClient.getLock(lockName);
????????????try{
????????????????//?阻塞式獲取鎖
????????????????lock.lock();
????????????????info("成功獲取鎖?#1");
????????????????//?模擬業(yè)務(wù)耗時
????????????????Thread.sleep(RandomUtils.nextLong(100,?500));
????????????????lock.lock();
????????????????info("成功獲取鎖?#2");
????????????????//?模擬業(yè)務(wù)耗時
????????????????Thread.sleep(RandomUtils.nextLong(100,?500));
????????????}?catch?(Exception?e)?{
????????????????System.out.println("Happen?Exception:?"?+?e.getMessage());
????????????}?finally?{
????????????????info("釋放鎖?#2");
????????????????lock.unlock();
????????????????info("釋放鎖?#1\n");
????????????????lock.unlock();
????????????}
????????};
????????RLock?tempLock?=?redissonClient.getLock(lockName);
????????if(?tempLock?instanceof?RedissonLock)?{
????????????System.out.println("鎖類型:?RedissonLock");
????????}
????????try{?Thread.sleep(?1*1000?);?}?catch?(Exception?e)?{}
????????
????????for?(int?i=1;?i<=3;?i++)?{
????????????threadPool.execute(?task?);
????????}
????????//?主線程等待所有任務(wù)執(zhí)行完畢
????????try{?Thread.sleep(?10*1000?);?}?catch?(Exception?e)?{}
????????System.out.println("----------------------?系統(tǒng)下線?----------------------");
????}
????@AfterClass
????public?static?void?close()?{
????????redissonClient.shutdown();
????}
????/**
?????*?打印信息
?????*?@param?msg
?????*/
????private?static?void?info(String?msg)?{
????????String?time?=?formatter.format(LocalTime.now());
????????String?thread?=?Thread.currentThread().getName();
????????String?log?=?"["+time+"]?"+?"?<"+?thread?+">?"?+?msg;
????????System.out.println(log);
????}
}
測試結(jié)果如下所示,符合預(yù)期

顯式指定鎖的最大持有時間
前面提到支持通過leaseTime參數(shù)顯式設(shè)置鎖的最大持有時間,當業(yè)務(wù)持鎖時間超過leaseTime參數(shù)值,則其持有的鎖會被自動釋放。但需要注意的是某個線程的鎖一旦被自動釋放后,此時再調(diào)用unlock方法來釋放鎖時,即會拋出IllegalMonitorStateException異常。原因也很簡單,因為此時線程實際上并未持有鎖,示例代碼如下所示
/**
?*?RedissonLock?Demo?:?分布式非公平可重入互斥鎖
?*?@author?Aaron?Zhu
?*?@date?2022-04-04
?*/
public?class?RedissonLockDemo?{
????...
????/**
?????*?測試:?指定鎖的最大持有時間
?????*/
????@Test
????public?void?testLock2()?{
????????final?String?lockName?=?"sellBooK";
????????Runnable?task?=?()?->?{
????????????//?設(shè)置分布式鎖
????????????RLock?lock?=?redissonClient.getLock(lockName);
????????????try{
????????????????//?阻塞式獲取鎖,?指定鎖的最大持有時間為1秒
????????????????lock.lock(1,?TimeUnit.SECONDS);
????????????????info("成功獲取鎖");
????????????????//?模擬業(yè)務(wù)耗時:?10s
????????????????Thread.sleep(?10?*?1000?);
????????????}?catch?(Exception?e)?{
????????????????info("Happen?Exception:?"?+?e.getMessage());
????????????}?finally?{
????????????????info("釋放鎖");
????????????????try?{
????????????????????lock.unlock();
????????????????}?catch?(IllegalMonitorStateException?e)?{
????????????????????info("Happen?Exception:?"?+?e.getMessage());
????????????????}
????????????}
????????};
????????for?(int?i=1;?i<=5;?i++)?{
????????????threadPool.execute(?task?);
????????}
????????//?主線程等待所有任務(wù)執(zhí)行完畢
????????try{?Thread.sleep(?120*1000?);?}?catch?(Exception?e)?{}
????????System.out.println("----------------------?系統(tǒng)下線?----------------------");
????}
????...
}
測試結(jié)果如下所示,符合預(yù)期。可以看到每隔1秒后,由于線程持鎖時間到期了。鎖被自動釋放了,進而使得下一個任務(wù)拿到了鎖。并且由于每個任務(wù)的鎖都是自動釋放的,故每次調(diào)用unlock方法均會拋出異常

非阻塞式獲取鎖
下面展示如果通過tryLock方法進行非阻塞式獲取鎖,示例代碼如下所示
/**
?*?RedissonLock?Demo?:?分布式非公平可重入互斥鎖
?*?@author?Aaron?Zhu
?*?@date?2022-04-04
?*/
public?class?RedissonLockDemo?{
????...
????/**
?????*?測試:?非阻塞式獲取鎖
?????*/
????@Test
????public?void?testTryLock()?{
????????final?String?lockName?=?"sellPhone";
????????Runnable?task?=?()?->?{
????????????//?設(shè)置分布式鎖
????????????RLock?lock?=?redissonClient.getLock(lockName);
????????????boolean?flag?=?false;
????????????try{
????????????????//?非阻塞式獲取鎖
????????????????flag?=?lock.tryLock();
????????????????if(?flag?)?{
????????????????????info("成功獲取鎖");
????????????????????//?模擬業(yè)務(wù)耗時
????????????????????Thread.sleep(RandomUtils.nextLong(100,?500));
????????????????}?else?{
????????????????????info("未獲取到鎖\n");
????????????????}
????????????}?catch?(Exception?e)?{
????????????????System.out.println("Happen?Exception:?"?+?e.getMessage());
????????????}?finally?{
????????????????if(?flag?)?{
????????????????????info("釋放鎖\n");
????????????????????lock.unlock();
????????????????}
????????????}
????????};
????????for?(int?i=1;?i<=5;?i++)?{
????????????threadPool.execute(?task?);
????????}
????????//?主線程等待所有任務(wù)執(zhí)行完畢
????????try{?Thread.sleep(?10*1000?);?}?catch?(Exception?e)?{}
????????System.out.println("----------------------?系統(tǒng)下線?----------------------");
????}
????...
}
測試結(jié)果如下所示,符合預(yù)期

Watch Dog看門狗機制
在介紹Redisson的Watch Dog看門狗機制之前,我們先來做個測試。如果某個線程一直持有鎖執(zhí)行業(yè)務(wù)邏輯,則鎖是否會被自動釋放呢?示例代碼如下所示
/**
?*?RedissonLock?Demo?:?分布式非公平可重入互斥鎖
?*?@author?Aaron?Zhu
?*?@date?2022-04-04
?*/
public?class?RedissonLockDemo?{
????...
????/**
?????*?測試:?看門狗機制
?????*/
????@Test
????public?void?testLock3()?{
????????final?String?lockName?=?"sellPig";
????????Runnable?task?=?()?->?{
????????????//?設(shè)置分布式鎖
????????????RLock?lock?=?redissonClient.getLock(lockName);
????????????try{
????????????????info("嘗試獲取鎖");
????????????????//?阻塞式獲取鎖
????????????????lock.lock();
????????????????info("成功獲取鎖");
????????????????//?未顯式指定鎖的持有時間,?則看門狗會在斷開連接前一直進行續(xù)期
????????????????while?(true)?{
????????????????}
????????????}?catch?(Exception?e)?{
????????????????info("Happen?Exception:?"?+?e.getMessage());
????????????}?finally?{
????????????????info("成功釋放鎖");
????????????????lock.unlock();
????????????}
????????};
????????for?(int?i=1;?i<5;?i++)?{
????????????threadPool.execute(?task?);
????????}
????????//?主線程等待所有任務(wù)執(zhí)行完畢
????????try{?Thread.sleep(?120*1000?);?}?catch?(Exception?e)?{}
????????info("----------------------?系統(tǒng)下線?----------------------");
????}
????...
}
測試結(jié)果如下所示,該鎖被持有后,一直未被釋放。其他任務(wù)都被阻塞住了

當我們調(diào)用不含leaseTime參數(shù)版本的lock()方法時,即未顯式設(shè)置最大持鎖時間。則其在RedissonLock類內(nèi)部會將 特殊值-1 傳給leaseTime參數(shù)。然后在tryAcquireAsync方法中會通過RedissonLock類的internalLockLeaseTime字段設(shè)置一個默認的最大持鎖時間。最后通過RedissonLock構(gòu)造器我們不難發(fā)現(xiàn) internalLockLeaseTime 字段的值來自于Config類的lockWatchdogTimeout字段。其中l(wèi)ockWatchdogTimeout字段的默認值為30秒。換言之即使我們調(diào)用lock方法時,未顯式設(shè)置最大持鎖時間。但RedissonLock內(nèi)部也會通過lockWatchdogTimeout字段給該鎖設(shè)置一個最大持有時間,默認值為30秒
public?class?RedissonLock?extends?RedissonBaseLock?{
????protected?long?internalLockLeaseTime;
????
????public?RedissonLock(CommandAsyncExecutor?commandExecutor,?String?name)?{
????????super(commandExecutor,?name);
????????this.commandExecutor?=?commandExecutor;
????????//?internalLockLeaseTime?字段的值來自于Config類的lockWatchdogTimeout字段
????????this.internalLockLeaseTime?=?commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
????????this.pubSub?=?commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
????}
????
????/**
?????*?未顯式設(shè)置最大持鎖時間的lock方法
?????*/
????@Override
????public?void?lock()?{
????????try?{
????????????//?則其會將?特殊值-1?傳給leaseTime參數(shù)
????????????lock(-1,?null,?false);
????????}?catch?(InterruptedException?e)?{
????????????throw?new?IllegalStateException();
????????}
????}
????
????private?void?lock(long?leaseTime,?TimeUnit?unit,?boolean?interruptibly)?throws?InterruptedException?{
????????...
????????Long?ttl?=?tryAcquire(-1,?leaseTime,?unit,?threadId);
????????...
????}
????
????private?Long?tryAcquire(long?waitTime,?long?leaseTime,?TimeUnit?unit,?long?threadId)?{
????????return?get(tryAcquireAsync(waitTime,?leaseTime,?unit,?threadId));
????}
????private??RFuture?tryAcquireAsync(long?waitTime,?long?leaseTime,?TimeUnit?unit,?long?threadId)? {
????????RFuture?ttlRemainingFuture;
????????if?(leaseTime?!=?-1)?{
????????????ttlRemainingFuture?=?tryLockInnerAsync(waitTime,?leaseTime,?unit,?threadId,?RedisCommands.EVAL_LONG);
????????}?else?{
????????????//?未顯式設(shè)置最大持鎖時間?則會?通過?internalLockLeaseTime?字段設(shè)置一個默認的最大持鎖時間
????????????ttlRemainingFuture?=?tryLockInnerAsync(waitTime,?internalLockLeaseTime,
????????????????????TimeUnit.MILLISECONDS,?threadId,?RedisCommands.EVAL_LONG);
????????}
????????CompletionStage?f?=?ttlRemainingFuture.thenApply(ttlRemaining?->?{
????????????//?lock?acquired
????????????if?(ttlRemaining?==?null)?{
????????????????if?(leaseTime?!=?-1)?{
????????????????????internalLockLeaseTime?=?unit.toMillis(leaseTime);
????????????????}?else?{
????????????????????//?未顯式設(shè)置最大持鎖時間?則會啟動一個定時任務(wù)用于進行自動續(xù)期
????????????????????scheduleExpirationRenewal(threadId);
????????????????}
????????????}
????????????return?ttlRemaining;
????????});
????????return?new?CompletableFutureWrapper<>(f);
????}
}
...
public?class?Config?{
????//?時間:?30秒
????private?long?lockWatchdogTimeout?=?30?*?1000;
????public?long?getLockWatchdogTimeout()?{
????????return?lockWatchdogTimeout;
????}
????...
}
那問題來了,為啥在我們剛剛的測試代碼中即使持鎖時間超過了30秒,鎖也沒有被自動釋放呢?原因就在于Redisson的看門狗機制。在RedissonLock類的tryAcquireAsync方法中,未顯式設(shè)置最大持鎖時間 則會啟動一個定時任務(wù)用于進行自動續(xù)期。即RedissonLock類的tryAcquireAsync方法中會調(diào)用scheduleExpirationRenewal()以啟動一個定時任務(wù)用于進行自動續(xù)期。具體的續(xù)期邏輯在RedissonBaseLock類的renewExpiration方法中,其中自動續(xù)期定時任務(wù)的執(zhí)行周期 是RedissonBaseLock類的internalLockLeaseTime字段值的1/3。然后通過renewExpirationAsync方法每次利用Lua腳本向Redis發(fā)送續(xù)期命令,具體地。每次續(xù)期時會將RedissonBaseLock類的internalLockLeaseTime字段值設(shè)置為新的最大持鎖時間。同樣地,RedissonBaseLock類的 internalLockLeaseTime 字段值也是來自于Config類的lockWatchdogTimeout字段,即默認為30秒
public?abstract?class?RedissonBaseLock?extends?RedissonExpirable?implements?RLock?{
?...
????protected?long?internalLockLeaseTime;
?public?RedissonBaseLock(CommandAsyncExecutor?commandExecutor,?String?name)?{
????????super(commandExecutor,?name);
????????this.commandExecutor?=?commandExecutor;
????????this.id?=?commandExecutor.getConnectionManager().getId();
????????//?internalLockLeaseTime?參數(shù)的值來自于Config類的lockWatchdogTimeout變量
????????this.internalLockLeaseTime?=?commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
????????this.entryName?=?id?+?":"?+?name;
????}
?
?protected?void?scheduleExpirationRenewal(long?threadId)?{
????????ExpirationEntry?entry?=?new?ExpirationEntry();
????????ExpirationEntry?oldEntry?=?EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(),?entry);
????????if?(oldEntry?!=?null)?{
????????????oldEntry.addThreadId(threadId);
????????}?else?{
????????????entry.addThreadId(threadId);
????????????try?{
?????????????//?自動續(xù)期
????????????????renewExpiration();
????????????}?finally?{
????????????????if?(Thread.currentThread().isInterrupted())?{
????????????????????cancelExpirationRenewal(threadId);
????????????????}
????????????}
????????}
????}
????private?void?renewExpiration()?{
????????ExpirationEntry?ee?=?EXPIRATION_RENEWAL_MAP.get(getEntryName());
????????if?(ee?==?null)?{
????????????return;
????????}
????????
????????Timeout?task?=?commandExecutor.getConnectionManager().newTimeout(new?TimerTask()?{
????????????@Override
????????????public?void?run(Timeout?timeout)?throws?Exception?{
????????????????ExpirationEntry?ent?=?EXPIRATION_RENEWAL_MAP.get(getEntryName());
????????????????if?(ent?==?null)?{
????????????????????return;
????????????????}
????????????????Long?threadId?=?ent.getFirstThreadId();
????????????????if?(threadId?==?null)?{
????????????????????return;
????????????????}
????????????????
????????????????//?通過Lua腳本向Redis發(fā)送續(xù)期命令
????????????????RFuture?future?=?renewExpirationAsync(threadId);
????????????????future.whenComplete((res,?e)?->?{
????????????????????if?(e?!=?null)?{
????????????????????????log.error("Can't?update?lock?"?+?getRawName()?+?"?expiration",?e);
????????????????????????EXPIRATION_RENEWAL_MAP.remove(getEntryName());
????????????????????????return;
????????????????????}
????????????????????
????????????????????if?(res)?{
????????????????????????//?reschedule?itself
????????????????????????renewExpiration();
????????????????????}?else?{
????????????????????????cancelExpirationRenewal(null);
????????????????????}
????????????????});
????????????}
????????????//?定時任務(wù)的執(zhí)行周期?是?internalLockLeaseTime字段值的?1/3
????????????//?即,?定時任務(wù)的執(zhí)行周期?是?lockWatchdogTimeout字段值的?1/3
????????},?internalLockLeaseTime?/?3,?TimeUnit.MILLISECONDS);
????????
????????ee.setTimeout(task);
????}
????protected?RFuture?renewExpirationAsync(long?threadId)? {
????????return?evalWriteAsync(getRawName(),?LongCodec.INSTANCE,?RedisCommands.EVAL_BOOLEAN,
????????????????"if?(redis.call('hexists',?KEYS[1],?ARGV[2])?==?1)?then?"?+
????????????????????????"redis.call('pexpire',?KEYS[1],?ARGV[1]);?"?+
????????????????????????"return?1;?"?+
????????????????????????"end;?"?+
????????????????????????"return?0;",
????????????????//?每次續(xù)期時會將internalLockLeaseTime字段值作為新的最大持鎖時間
????????????????Collections.singletonList(getRawName()),?internalLockLeaseTime,?getLockName(threadId));
????}
????...
}
至此我們應(yīng)該比較清楚Redisson的看門狗機制了:
Redisson的Watch Dog看門狗機制只會在未顯式設(shè)置最大持鎖時間才會生效。換言之,一旦調(diào)用lock方法時指定了leaseTime參數(shù)值,則該鎖到期后即會自動釋放。Redisson的Watch Dog看門狗不會對該鎖進行自動續(xù)期 當我們未顯式設(shè)置Config類的lockWatchdogTimeout字段值時,使用默認的30秒。此時如果加鎖時未顯式設(shè)置最大持鎖時間,即Watch Dog看門狗機制會生效的場景中。該鎖實際上一開始也會設(shè)置一個默認的最大持鎖時間,即30秒。然后看門狗每隔10秒(30秒 * 1/3 = 10秒)會將該鎖的最大持鎖時間再次設(shè)置為30秒,以達到自動續(xù)期的目的。這樣只要持鎖線程的業(yè)務(wù)還未執(zhí)行完,則該鎖就一直有效、不會被自動釋放。當然一旦持鎖的服務(wù)實例發(fā)生宕機后,看門狗的定時任務(wù)自然也無法續(xù)期。這樣鎖到期后也就釋放掉了,避免了死鎖的發(fā)生
RedissonFairLock 分布式公平可重入互斥鎖
由于RedissonLock是非公平的,故Redisson提供了一個分布式公平可重入互斥鎖——RedissonFairLock。示例代碼如下所示
/**
?*?RedissonFairLock?Demo?:?分布式公平可重入互斥鎖
?*?@author?Aaron?Zhu
?*?@date?2022-04-04
?*/
public?class?RedissonFairLockDemo?{
????private?static?DateTimeFormatter?formatter?=?DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
????private?static?ExecutorService?threadPool?=?Executors.newFixedThreadPool(10);
????/**
?????*?測試:?可重入性
?????*/
????@Test
????public?void?test1()?{
????????Config?config?=?new?Config();
????????config.useSingleServer()
????????????.setAddress("redis://127.0.0.1:6379")
????????????.setPassword("123456");
????????RedissonClient?redissonClient?=?Redisson.create(?config?);
????????final?String?lockName?=?"sellWatch";
????????Runnable?task?=?()?->?{
????????????//?設(shè)置分布式鎖
????????????RLock?lock?=?redissonClient.getFairLock(lockName);
????????????try{
????????????????//?阻塞式獲取鎖
????????????????lock.lock();
????????????????info("成功獲取鎖?#1");
????????????????//?模擬業(yè)務(wù)耗時
????????????????Thread.sleep(RandomUtils.nextLong(100,?500));
????????????????lock.lock();
????????????????info("成功獲取鎖?#2");
????????????????//?模擬業(yè)務(wù)耗時
????????????????Thread.sleep(RandomUtils.nextLong(100,?500));
????????????}?catch?(Exception?e)?{
????????????????System.out.println("Happen?Exception:?"?+?e.getMessage());
????????????}?finally?{
????????????????info("釋放鎖?#2");
????????????????lock.unlock();
????????????????info("釋放鎖?#1\n");
????????????????lock.unlock();
????????????}
????????};
????????RLock?tempLock?=?redissonClient.getFairLock(lockName);
????????if(?tempLock?instanceof?RedissonFairLock?)?{
????????????System.out.println("鎖類型:?RedissonFairLock");
????????}
????????try{?Thread.sleep(?4*1000?);?}?catch?(Exception?e)?{}
????????for?(int?i=1;?i<=3;?i++)?{
????????????threadPool.execute(?task?);
????????}
????????//?主線程等待所有任務(wù)執(zhí)行完畢
????????try{?Thread.sleep(?40*1000?);?}?catch?(Exception?e)?{}
????????redissonClient.shutdown();
????????System.out.println("----------------------?系統(tǒng)下線?----------------------");
????}
????/**
?????*?打印信息
?????*?@param?msg
?????*/
????private?static?void?info(String?msg)?{
????????String?time?=?formatter.format(LocalTime.now());
????????String?thread?=?Thread.currentThread().getName();
????????String?log?=?"["+time+"]?"+?"?<"+?thread?+">?"?+?msg;
????????System.out.println(log);
????}
}
測試結(jié)果如下所示,符合預(yù)期

RReadWriteLock 分布式讀寫鎖
讀寫測試
RReadWriteLock是一個分布式可重入讀寫鎖。其中讀鎖為可重入的共享鎖、寫鎖為可重入的互斥鎖,且讀寫互斥。示例代碼如下所示
/**
?*?RReadWriteLock?Demo?:?分布式可重入讀寫鎖
?*?@author?Aaron?Zhu
?*?@date?2022-04-04
?*/
public?class?RReadWriteLockDemo?{
????private?static?DateTimeFormatter?formatter?=?DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
????private?static?ExecutorService?threadPool?=?Executors.newFixedThreadPool(10);
????private?static?RedissonClient?redissonClient;
????@BeforeClass
????public?static?void?init()?{
????????Config?config?=?new?Config();
????????config.useSingleServer()
????????????.setAddress("redis://127.0.0.1:6379")
????????????.setPassword("123456");
????????redissonClient?=?Redisson.create(?config?);
????}
????/**
?????*?測試:?讀鎖為共享鎖,?讀鎖具有可重入性
?????*/
????@Test
????public?void?test1Read()?{
????????System.out.println("\n----------------------?Test?1?:?Read?----------------------");
????????String?lockName?=?"sellCat";
????????for(int?i=1;?i<=3;?i++)?{
????????????String?taskName?=?"讀任務(wù)#"+i;
????????????Runnable?task?=?new?ReadTask(taskName,?redissonClient,?lockName);
????????????threadPool.execute(?task?);
????????}
????????//?主線程等待所有任務(wù)執(zhí)行完畢
????????try{?Thread.sleep(?10*1000?);?}?catch?(Exception?e)?{}
????}
????/**
?????*?測試:?寫鎖為互斥鎖,?寫鎖具有可重入性
?????*/
????@Test
????public?void?test2Write()?{
????????System.out.println("\n----------------------?Test?2?:?Write?----------------------");
????????String?lockName?=?"sellDog";
????????for(int?i=1;?i<=3;?i++)?{
????????????String?taskName?=?"寫任務(wù)#"+i;
????????????Runnable?task?=?new?WriteTask(taskName,?redissonClient,?lockName);
????????????threadPool.execute(?task?);
????????}
????????//?主線程等待所有任務(wù)執(zhí)行完畢
????????try{?Thread.sleep(?10*1000?);?}?catch?(Exception?e)?{}
????}
????/**
?????*?測試:?讀寫互斥
?????*/
????@Test
????public?void?test3ReadWrite()?{
????????System.out.println("\n----------------------?Test?3?:?Read?Write?----------------------");
????????String?lockName?=?"sellLion";
????????for(int?i=1;?i<=5;?i++)?{
????????????Runnable?task?=?null;
????????????Boolean?isReadTask?=?RandomUtils.nextBoolean();
????????????if(?isReadTask?)?{
????????????????task?=?new?ReadTask(?"讀任務(wù)#"+i,?redissonClient,?lockName);
????????????}?else?{
????????????????task?=?new?WriteTask(?"寫任務(wù)#"+i,?redissonClient,?lockName);
????????????}
????????????threadPool.execute(?task?);
????????}
????????//?主線程等待所有任務(wù)執(zhí)行完畢
????????try{?Thread.sleep(?10*1000?);?}?catch?(Exception?e)?{}
????}
????@AfterClass
????public?static?void?close()?{
????????redissonClient.shutdown();
????}
????/**
?????*?打印信息
?????*?@param?msg
?????*/
????private?static?void?info(String?msg)?{
????????String?time?=?formatter.format(LocalTime.now());
????????String?thread?=?Thread.currentThread().getName();
????????String?log?=?"["+time+"]?"+?"?<"+?thread?+">?"?+?msg;
????????System.out.println(log);
????}
????/**
?????*?讀任務(wù)
?????*/
????private?static?class?ReadTask?implements?Runnable?{
????????private?String?taskName;
????????private?RedissonReadLock?readLock;
????????public?ReadTask(String?taskName,?RedissonClient?redissonClient,?String?lockName)?{
????????????this.taskName?=?taskName;
????????????RReadWriteLock?readWriteLock?=?redissonClient.getReadWriteLock(lockName);
????????????this.readLock?=?(RedissonReadLock)?readWriteLock.readLock();
????????}
????????@Override
????????public?void?run()?{
????????????try{
????????????????readLock.lock();
????????????????info(taskName?+?":?成功獲取讀鎖?#1");
????????????????//?模擬業(yè)務(wù)耗時
????????????????Thread.sleep(RandomUtils.nextLong(100,?500));
????????????????readLock.lock();
????????????????info(taskName?+?":?成功獲取讀鎖?#2");
????????????????//?模擬業(yè)務(wù)耗時
????????????????Thread.sleep(RandomUtils.nextLong(100,?500));
????????????}?catch?(Exception?e)?{
????????????????System.out.println(?taskName?+?":?Happen?Exception:?"?+?e.getMessage());
????????????}?finally?{
????????????????info(taskName?+?":?釋放讀鎖?#2");
????????????????readLock.unlock();
????????????????info(taskName?+?":?釋放讀鎖?#1");
????????????????readLock.unlock();
????????????}
????????}
????}
????/**
?????*?寫任務(wù)
?????*/
????private?static?class?WriteTask?implements?Runnable?{
????????private?String?taskName;
????????private?RedissonWriteLock?writeLock;
????????public?WriteTask(String?taskName,?RedissonClient?redissonClient,?String?lockName)?{
????????????this.taskName?=?taskName;
????????????RReadWriteLock?readWriteLock?=?redissonClient.getReadWriteLock(lockName);
????????????this.writeLock?=?(RedissonWriteLock)?readWriteLock.writeLock();
????????}
????????@Override
????????public?void?run()?{
????????????try{
????????????????writeLock.lock();
????????????????info(taskName?+?":?成功獲取寫鎖?#1");
????????????????//?模擬業(yè)務(wù)耗時
????????????????Thread.sleep(RandomUtils.nextLong(100,?500));
????????????????writeLock.lock();
????????????????info(taskName?+?":?成功獲取寫鎖?#2");
????????????????//?模擬業(yè)務(wù)耗時
????????????????Thread.sleep(RandomUtils.nextLong(100,?500));
????????????}?catch?(Exception?e)?{
????????????????System.out.println(?taskName?+?":?Happen?Exception:?"?+?e.getMessage());
????????????}?finally?{
????????????????info(taskName?+?":?釋放寫鎖?#2");
????????????????writeLock.unlock();
????????????????info(taskName?+?":?釋放寫鎖?#1\n");
????????????????writeLock.unlock();
????????????}
????????}
????}
}
讀鎖測試結(jié)果如下所示,符合預(yù)期

寫鎖測試結(jié)果如下所示,符合預(yù)期

讀寫測試結(jié)果如下所示,符合預(yù)期

鎖升級、鎖降級
所謂鎖升級指的是讀鎖升級為寫鎖。當一個線程先獲取到讀鎖再去申請寫鎖,顯然其是不支持的。理由也很簡單,讀鎖是可以多個服務(wù)實例同時持有的。若其中一個服務(wù)實例此鎖線程能夠進行鎖升級,成功獲得寫鎖。顯然與我們之前的所說的讀寫互斥相違背。因為其在獲得寫鎖的同時,其他服務(wù)實例依然持有讀鎖;反之,其是支持鎖降級的,即寫鎖降級為讀鎖。當一個服務(wù)實例的線程在獲得寫鎖后,該線程依然可以獲得讀鎖。這個時候當其釋放寫鎖,則將只持有讀鎖,即完成了鎖降級過程。鎖降級的使用價值也很大,其一方面保證了安全,讀鎖在寫鎖釋放前獲取;另一方面保證了高效,因為讀鎖是共享的。
鎖升級示例代碼如下所示
/**
?*?RReadWriteLock?Demo?:?分布式可重入讀寫鎖
?*?@author?Aaron?Zhu
?*?@date?2022-04-04
?*/
public?class?RReadWriteLockDemo?{
????
????...
????/**
?????*?測試:?鎖升級
?????*/
????@Test
????public?void?test4Read2Write()?{
????????System.out.println("----------------------?Test?4?:?Read?->?Write?----------------------\n");
????????String?lockName?=?"sellTiger";
????????RReadWriteLock?readWriteLock?=?redissonClient.getReadWriteLock(lockName);
????????RedissonReadLock?readLock?=?(RedissonReadLock)?readWriteLock.readLock();
????????RedissonWriteLock??writeLock?=?(RedissonWriteLock)?readWriteLock.writeLock();
????????try?{
????????????readLock.lock();
????????????info("成功獲取讀鎖");
????????????//?模擬業(yè)務(wù)耗時
????????????Thread.sleep(RandomUtils.nextLong(100,?500));
????????????writeLock.lock();
????????????info("成功獲取寫鎖");
????????????//?模擬業(yè)務(wù)耗時
????????????Thread.sleep(RandomUtils.nextLong(100,?500));
????????????readLock.unlock();
????????????info("成功釋放讀鎖");
????????????//?模擬業(yè)務(wù)耗時
????????????Thread.sleep(RandomUtils.nextLong(100,?500));
????????????writeLock.unlock();
????????????info("成功釋放寫鎖");
????????}?catch?(Exception?e)?{
????????????System.out.println("Happen?Exception:?"?+?e.getMessage());
????????}
????????System.out.println("----------------------?系統(tǒng)下線?----------------------");
????}????
}
測試結(jié)果如下所示,在持有讀鎖的情況下,繼續(xù)嘗試獲取寫鎖會被一直阻塞
鎖降級示例代碼如下所示
/**
?*?RReadWriteLock?Demo?:?分布式可重入讀寫鎖
?*?@author?Aaron?Zhu
?*?@date?2022-04-04
?*/
public?class?RReadWriteLockDemo?{
????
????...
????/**
?????*?測試:?鎖降級
?????*/
????@Test
????public?void?test5Write2Read()?{
????????System.out.println("----------------------?Test?2?:?Write?->?Read?----------------------");
????????String?lockName?=?"sellChicken";
????????RReadWriteLock?readWriteLock?=?redissonClient.getReadWriteLock(lockName);
????????RedissonReadLock?readLock?=?(RedissonReadLock)?readWriteLock.readLock();
????????RedissonWriteLock??writeLock?=?(RedissonWriteLock)?readWriteLock.writeLock();
????????try?{
????????????writeLock.lock();
????????????info("成功獲取寫鎖");
????????????//?模擬業(yè)務(wù)耗時
????????????Thread.sleep(RandomUtils.nextLong(100,?500));
????????????readLock.lock();
????????????info("成功獲取讀鎖");
????????????//?模擬業(yè)務(wù)耗時
????????????Thread.sleep(RandomUtils.nextLong(100,?500));
????????????writeLock.unlock();
????????????info("成功釋放寫鎖");
????????????//?模擬業(yè)務(wù)耗時
????????????Thread.sleep(RandomUtils.nextLong(100,?500));
????????????readLock.unlock();
????????????info("成功釋放讀鎖");
????????}?catch?(Exception?e)?{
????????????System.out.println("Happen?Exception:?"?+?e.getMessage());
????????}
????????System.out.println("----------------------?系統(tǒng)下線?----------------------");
????}
}
測試結(jié)果如下所示,符合預(yù)期

RedissonSpinLock 分布式非公平可重入自旋互斥鎖
RedissonSpinLock則是一個分布式非公平可重入自旋互斥鎖。示例代碼如下所示
/**
?*?RedissonSpinLock?Demo?:?分布式非公平可重入自旋互斥鎖
?*?@author?Aaron?Zhu
?*?@date?2022-04-04
?*/
public?class?RedissonSpinLockDemo?{
????private?static?DateTimeFormatter?formatter?=?DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
????private?static?ExecutorService?threadPool?=?Executors.newFixedThreadPool(10);
????/**
?????*?測試:?可重入性、互斥鎖
?????*/
????@Test
????public?void?test1()?{
????????Config?config?=?new?Config();
????????config.useSingleServer()
????????????.setAddress("redis://127.0.0.1:6379")
????????????.setPassword("123456");
????????RedissonClient?redissonClient?=?Redisson.create(?config?);
????????final?String?lockName?=?"sellKeyword";
????????Runnable?task?=?()?->?{
????????????//?設(shè)置分布式鎖
????????????RLock?lock?=?redissonClient.getSpinLock(lockName);
????????????try{
????????????????//?阻塞式獲取鎖
????????????????lock.lock();
????????????????info("成功獲取鎖?#1");
????????????????//?模擬業(yè)務(wù)耗時
????????????????Thread.sleep(RandomUtils.nextLong(100,?500));
????????????????lock.lock();
????????????????info("成功獲取鎖?#2");
????????????????//?模擬業(yè)務(wù)耗時
????????????????Thread.sleep(RandomUtils.nextLong(100,?500));
????????????}?catch?(Exception?e)?{
????????????????System.out.println("Happen?Exception:?"?+?e.getMessage());
????????????}?finally?{
????????????????info("釋放鎖?#2");
????????????????lock.unlock();
????????????????info("釋放鎖?#1\n");
????????????????lock.unlock();
????????????}
????????};
????????RLock?tempLock?=?redissonClient.getSpinLock(lockName);
????????if(?tempLock?instanceof?RedissonSpinLock)?{
????????????System.out.println("鎖類型:?RedissonSpinLock");
????????}
????????try{?Thread.sleep(?2*1000?);?}?catch?(Exception?e)?{}
????????for?(int?i=1;?i<=3;?i++)?{
????????????threadPool.execute(?task?);
????????}
????????//?主線程等待所有任務(wù)執(zhí)行完畢
????????try{?Thread.sleep(?40*1000?);?}?catch?(Exception?e)?{}
????????redissonClient.shutdown();
????????System.out.println("----------------------?系統(tǒng)下線?----------------------");
????}
????/**
?????*?打印信息
?????*?@param?msg
?????*/
????private?static?void?info(String?msg)?{
????????String?time?=?formatter.format(LocalTime.now());
????????String?thread?=?Thread.currentThread().getName();
????????String?log?=?"["+time+"]?"+?"?<"+?thread?+">?"?+?msg;
????????System.out.println(log);
????}
}
測試結(jié)果如下所示,符合預(yù)期

RedissonCountDownLatch 分布式閂鎖
RedissonCountDownLatch是一個分布式的CountDownLatch閂鎖。比如我們的業(yè)務(wù)系統(tǒng)會依賴很多其他基礎(chǔ)服務(wù),這樣在業(yè)務(wù)系統(tǒng)啟動過程中,需要等待其他基礎(chǔ)服務(wù)全部啟動完畢。示例代碼如下所示
/**
?*?RedissonCountDownLatch?Demo?:?分布式閂鎖
?*?@author?Aaron?Zhu
?*?@date?2022-04-04
?*/
public?class?RedissonCountDownLatchDemo?{
????private?static?DateTimeFormatter?formatter?=?DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
????private?static?ExecutorService?threadPool?=?Executors.newFixedThreadPool(10);
????private?static?RedissonClient?redissonClient;
????@BeforeClass
????public?static?void?init()?{
????????Config?config?=?new?Config();
????????config.useSingleServer()
????????????.setAddress("redis://127.0.0.1:6379")
????????????.setPassword("123456");
????????redissonClient?=?Redisson.create(?config?);
????}
????@Test
????public?void?test1()?throws?InterruptedException?{
????????final?String?countDownLatchName?=?"systemInit";
????????int?count?=?5;
????????for?(int?i=1;?i<=count;?i++)?{
????????????String?serviceName?=?"基礎(chǔ)服務(wù)?#"+i;
????????????BasicService?basicService?=?new?BasicService(serviceName,?redissonClient,?countDownLatchName,?count);
????????????threadPool.execute(?basicService?);
????????}
????????RedissonCountDownLatch?countDownLatch?=?(RedissonCountDownLatch)?redissonClient.getCountDownLatch(countDownLatchName);
????????countDownLatch.trySetCount(count);
????????//?阻塞等待基礎(chǔ)服務(wù)全部啟動完成
????????countDownLatch.await();
????????info("系統(tǒng)初始化已完成,?業(yè)務(wù)系統(tǒng)啟動?...");
????}
????/**
?????*?打印信息
?????*?@param?msg
?????*/
????private?static?void?info(String?msg)?{
????????String?time?=?formatter.format(LocalTime.now());
????????String?thread?=?Thread.currentThread().getName();
????????String?log?=?"["+time+"]?"+?"?<"+?thread?+">?"?+?msg;
????????System.out.println(log);
????}
????/**
?????*?基礎(chǔ)服務(wù)
?????*/
????private?static?class?BasicService?implements?Runnable?{
????????private?String?serviceName;
????????private?RedissonCountDownLatch?countDownLatch;
????????public?BasicService(String?serviceName,?RedissonClient?redissonClient,?String?countDownLatchName,?Integer?count)?{
????????????this.serviceName?=?serviceName;
????????????this.countDownLatch?=?(RedissonCountDownLatch)?redissonClient.getCountDownLatch(countDownLatchName);
????????????this.countDownLatch.trySetCount(?count?);
????????}
????????@Override
????????public?void?run()?{
????????????try{
????????????????info(serviceName?+?":?啟動中");
????????????????//?模擬?基礎(chǔ)服務(wù)啟動?耗時
????????????????Thread.sleep(?RandomUtils.nextLong(1,?5)?*?1000?);
????????????????countDownLatch.countDown();
????????????????info(serviceName?+?":?啟動完成");
????????????}?catch?(Exception?e)?{
????????????????System.out.println(?serviceName?+?":?Happen?Exception:?"?+?e.getMessage());
????????????}
????????}
????}
}
測試結(jié)果如下所示,符合預(yù)期

RedissonSemaphore 分布式信號量
RedissonSemaphore是一個分布式的信號量,示例代碼如下所示
/**
?*?RedissonSemaphore?Demo?:?分布式信號量
?*?@author?Aaron?Zhu
?*?@date?2022-04-04
?*/
public?class?RedissonSemaphoreDemo?{
????private?static?DateTimeFormatter?formatter?=?DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
????private?static?ExecutorService?threadPool?=?Executors.newFixedThreadPool(10);
????@Test
????public?void?test1()?{
????????Config?config?=?new?Config();
????????config.useSingleServer()
????????????.setAddress("redis://127.0.0.1:6379")
????????????.setPassword("123456");
????????RedissonClient?redissonClient?=?Redisson.create(?config?);
????????final?String?lockName?=?"sellAnimal";
????????//?系統(tǒng)最大并發(fā)處理量
????????int?maxLimit?=?3;
????????IntStream.rangeClosed(1,8)
????????????.mapToObj(?num?->?new?UserReq("用戶#"+num,?redissonClient,?lockName,?maxLimit)?)
????????????.forEach(?threadPool::execute?);
????????//?主線程等待所有任務(wù)執(zhí)行完畢
????????try{?Thread.sleep(?40*1000?);?}?catch?(Exception?e)?{}
????????redissonClient.shutdown();
????????System.out.println("----------------------?系統(tǒng)下線?----------------------");
????}
????/**
?????*?打印信息
?????*?@param?msg
?????*/
????private?static?void?info(String?msg)?{
????????String?time?=?formatter.format(LocalTime.now());
????????String?thread?=?Thread.currentThread().getName();
????????String?log?=?"["+time+"]?"+?"?<"+?thread?+">?"?+?msg;
????????System.out.println(log);
????}
????private?static?class?UserReq?implements?Runnable?{
????????private?String?name;
????????private?RedissonSemaphore?semaphore;
????????public?UserReq(String?name,?RedissonClient?redissonClient,?String?lockName,?Integer?maxLimit)?{
????????????this.name?=?name;
????????????this.semaphore?=?(RedissonSemaphore)?redissonClient.getSemaphore(lockName);
????????????//?設(shè)置信號量的許可數(shù)
????????????semaphore.trySetPermits(?maxLimit?);
????????}
????????@Override
????????public?void?run()?{
????????????try?{
????????????????//?模擬用戶不定時發(fā)起請求
????????????????Thread.sleep(RandomUtils.nextLong(500,?2000));
????????????????info(?name?+?":?發(fā)起請求"?);
????????????????//?阻塞等待,直到獲取許可
????????????????semaphore.acquire();
????????????????info(name?+?":?系統(tǒng)開始處理請求");
????????????????//?模擬業(yè)務(wù)耗時
????????????????Thread.sleep(RandomUtils.nextInt(5,?20)*1000);
????????????????//?用戶請求處理完畢,釋放許可
????????????????semaphore.release();
????????????????info(name?+?":?系統(tǒng)處理完畢");
????????????}catch?(Exception?e)?{
????????????????System.out.println("Happen?Exception:?"?+?e.getMessage());
????????????}
????????}
????}
}
測試結(jié)果如下所示,符合預(yù)期
RedissonPermitExpirableSemaphore 分布式支持有效期的信號量
相比較于RedissonSemaphore而言,RedissonPermitExpirableSemaphore在獲取許可的acquire方法中,增加了一個支持leaseTime參數(shù)的重載版本。以實現(xiàn)指定許可的最大持有時間。一旦業(yè)務(wù)持許可時間超過leaseTime參數(shù)值,則其持有的許可會被自動釋放。但需要注意的是某個線程的許可一旦被自動釋放后,此時再調(diào)用release方法來釋放許可時,即會拋出異常。原因也很簡單,因為此時線程實際上并未持有許可,示例代碼如下所示。示例代碼如下所示
/**
?*?RedissonPermitExpirableSemaphore?Demo?:?分布式支持有效期的信號量
?*?@author?Aaron?Zhu
?*?@date?2022-04-04
?*/
public?class?RedissonPermitExpirableSemaphoreDemo?{
????private?static?DateTimeFormatter?formatter?=?DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
????private?static?ExecutorService?threadPool?=?Executors.newFixedThreadPool(10);
????@Test
????public?void?test1()?{
????????Config?config?=?new?Config();
????????config.useSingleServer()
????????????.setAddress("redis://127.0.0.1:6379")
????????????.setPassword("123456");
????????RedissonClient?redissonClient?=?Redisson.create(?config?);
????????final?String?lockName?=?"sellMilk";
????????//?系統(tǒng)最大并發(fā)處理量
????????int?maxLimit?=?3;
????????IntStream.rangeClosed(1,8)
????????????.mapToObj(?num?->?new?UserReq("用戶?#"+num,?redissonClient,?lockName,?maxLimit)?)
????????????.forEach(?threadPool::execute?);
????????//?主線程等待所有任務(wù)執(zhí)行完畢
????????try{?Thread.sleep(?40*1000?);?}?catch?(Exception?e)?{}
????????redissonClient.shutdown();
????????System.out.println("----------------------?系統(tǒng)下線?----------------------");
????}
????/**
?????*?打印信息
?????*?@param?msg
?????*/
????private?static?void?info(String?msg)?{
????????String?time?=?formatter.format(LocalTime.now());
????????String?thread?=?Thread.currentThread().getName();
????????String?log?=?"["+time+"]?"+?"?<"+?thread?+">?"?+?msg;
????????System.out.println(log);
????}
????private?static?class?UserReq?implements?Runnable?{
????????private?String?name;
????????private?RedissonPermitExpirableSemaphore?semaphore;
????????public?UserReq(String?name,?RedissonClient?redissonClient,?String?lockName,?Integer?maxLimit)?{
????????????this.name?=?name;
????????????this.semaphore?=?(RedissonPermitExpirableSemaphore)?redissonClient.getPermitExpirableSemaphore(lockName);
????????????//?設(shè)置信號量的許可數(shù)
????????????semaphore.trySetPermits(?maxLimit?);
????????}
????????@Override
????????public?void?run()?{
????????????try?{
????????????????//?模擬用戶不定時發(fā)起請求
????????????????if(?!name.equals("用戶?#1")?)?{
????????????????????Thread.sleep(?RandomUtils.nextLong(1000,?2000)?);
????????????????}
????????????????info(?name?+?":?發(fā)起請求"?);
????????????????//?阻塞等待直到獲取許可,?指定信號量的最大持有時間為2秒
????????????????String?permitId?=?semaphore.acquire(2,?TimeUnit.SECONDS);
????????????????info(name?+?":?系統(tǒng)開始處理請求");
????????????????//?模擬業(yè)務(wù)耗時
????????????????if(?name.equals("用戶?#1")?)?{
????????????????????Thread.sleep(?5?*?1000?);
????????????????}?else?{
????????????????????Thread.sleep(RandomUtils.nextInt(500,?1000));
????????????????}
????????????????//?用戶請求處理完畢,釋放許可
????????????????semaphore.release(permitId);
????????????????info(name?+?":?系統(tǒng)處理完畢");
????????????}catch?(Exception?e)?{
????????????????info(?name?+?":?Happen?Exception:?"?+?e.getCause().getMessage());
????????????}
????????}
????}
}
測試結(jié)果如下所示,符合預(yù)期

