<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          分布式鎖(三):基于Redisson的分布式鎖實踐

          共 16414字,需瀏覽 33分鐘

           ·

          2022-05-10 15:11

          Redisson是基于Redis的Java駐內(nèi)存數(shù)據(jù)網(wǎng)格(In-Memory Data Grid),底層使用Netty進行實現(xiàn)。其提供了相應(yīng)的分布式鎖實現(xiàn)

          abstract.png

          RedissonLock 分布式非公平可重入互斥鎖

          RedissonLock是一個分布式非公平可重入互斥鎖,其在獲取鎖的過程中,支持lock阻塞式、tryLock非阻塞式兩種形式。其中,這兩個方法還有多個重載版本,以支持設(shè)置鎖的最大持有時間、設(shè)置獲取鎖的最大等待時間。具體方法如下所示

          #?阻塞式獲取鎖
          void?lock();

          #?阻塞式獲取鎖
          #?支持通過leaseTime參數(shù)設(shè)置鎖的最大持有時間
          void?lock(long?leaseTime,?TimeUnit?unit);

          #?非阻塞式獲取鎖
          boolean?tryLock();

          #?非阻塞式獲取鎖
          #?支持通過time參數(shù)設(shè)置獲取鎖的最大等待時間
          boolean?tryLock(long?time,?TimeUnit?unit);

          #?非阻塞式獲取鎖
          #?支持通過waitTime參數(shù)設(shè)置獲取鎖的最大等待時間
          #?支持通過leaseTime參數(shù)設(shè)置鎖的最大持有時間
          boolean?tryLock(long?waitTime,?long?leaseTime,?TimeUnit?unit);

          可重入性

          現(xiàn)在我們驗證下RedissonLock是一個互斥鎖并且支持可重入。示例代碼如下所示

          /**
          ?*?RedissonLock?Demo?:?分布式非公平可重入互斥鎖
          ?*?@author?Aaron?Zhu
          ?*?@date?2022-04-04
          ?*/

          public?class?RedissonLockDemo?{
          ????private?static?DateTimeFormatter?formatter?=?DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

          ????private?static?ExecutorService?threadPool?=?Executors.newFixedThreadPool(10);

          ????private?static?RedissonClient?redissonClient;

          ????@BeforeClass
          ????public?static?void?init()?{
          ????????Config?config?=?new?Config();
          ????????config.useSingleServer()
          ????????????.setAddress("redis://127.0.0.1:6379")
          ????????????.setPassword("123456");
          ????????redissonClient?=?Redisson.create(?config?);
          ????}

          ????/**
          ?????*?測試:?阻塞式獲取鎖、可重入性
          ?????*/

          ????@Test
          ????public?void?testLock1()?{
          ????????final?String?lockName?=?"sellPc";
          ????????Runnable?task?=?()?->?{
          ????????????//?設(shè)置分布式鎖
          ????????????RLock?lock?=?redissonClient.getLock(lockName);
          ????????????try{
          ????????????????//?阻塞式獲取鎖
          ????????????????lock.lock();
          ????????????????info("成功獲取鎖?#1");
          ????????????????//?模擬業(yè)務(wù)耗時
          ????????????????Thread.sleep(RandomUtils.nextLong(100,?500));

          ????????????????lock.lock();
          ????????????????info("成功獲取鎖?#2");
          ????????????????//?模擬業(yè)務(wù)耗時
          ????????????????Thread.sleep(RandomUtils.nextLong(100,?500));
          ????????????}?catch?(Exception?e)?{
          ????????????????System.out.println("Happen?Exception:?"?+?e.getMessage());
          ????????????}?finally?{
          ????????????????info("釋放鎖?#2");
          ????????????????lock.unlock();

          ????????????????info("釋放鎖?#1\n");
          ????????????????lock.unlock();
          ????????????}
          ????????};

          ????????RLock?tempLock?=?redissonClient.getLock(lockName);
          ????????if(?tempLock?instanceof?RedissonLock)?{
          ????????????System.out.println("鎖類型:?RedissonLock");
          ????????}
          ????????try{?Thread.sleep(?1*1000?);?}?catch?(Exception?e)?{}
          ????????
          ????????for?(int?i=1;?i<=3;?i++)?{
          ????????????threadPool.execute(?task?);
          ????????}

          ????????//?主線程等待所有任務(wù)執(zhí)行完畢
          ????????try{?Thread.sleep(?10*1000?);?}?catch?(Exception?e)?{}
          ????????System.out.println("----------------------?系統(tǒng)下線?----------------------");
          ????}


          ????@AfterClass
          ????public?static?void?close()?{
          ????????redissonClient.shutdown();
          ????}

          ????/**
          ?????*?打印信息
          ?????*?@param?msg
          ?????*/

          ????private?static?void?info(String?msg)?{
          ????????String?time?=?formatter.format(LocalTime.now());
          ????????String?thread?=?Thread.currentThread().getName();
          ????????String?log?=?"["+time+"]?"+?"?<"+?thread?+">?"?+?msg;
          ????????System.out.println(log);
          ????}
          }

          測試結(jié)果如下所示,符合預(yù)期

          figure 1.jpeg

          顯式指定鎖的最大持有時間

          前面提到支持通過leaseTime參數(shù)顯式設(shè)置鎖的最大持有時間,當業(yè)務(wù)持鎖時間超過leaseTime參數(shù)值,則其持有的鎖會被自動釋放。但需要注意的是某個線程的鎖一旦被自動釋放后,此時再調(diào)用unlock方法來釋放鎖時,即會拋出IllegalMonitorStateException異常。原因也很簡單,因為此時線程實際上并未持有鎖,示例代碼如下所示

          /**
          ?*?RedissonLock?Demo?:?分布式非公平可重入互斥鎖
          ?*?@author?Aaron?Zhu
          ?*?@date?2022-04-04
          ?*/

          public?class?RedissonLockDemo?{
          ????...

          ????/**
          ?????*?測試:?指定鎖的最大持有時間
          ?????*/

          ????@Test
          ????public?void?testLock2()?{
          ????????final?String?lockName?=?"sellBooK";
          ????????Runnable?task?=?()?->?{
          ????????????//?設(shè)置分布式鎖
          ????????????RLock?lock?=?redissonClient.getLock(lockName);
          ????????????try{
          ????????????????//?阻塞式獲取鎖,?指定鎖的最大持有時間為1秒
          ????????????????lock.lock(1,?TimeUnit.SECONDS);
          ????????????????info("成功獲取鎖");
          ????????????????//?模擬業(yè)務(wù)耗時:?10s
          ????????????????Thread.sleep(?10?*?1000?);
          ????????????}?catch?(Exception?e)?{
          ????????????????info("Happen?Exception:?"?+?e.getMessage());
          ????????????}?finally?{
          ????????????????info("釋放鎖");
          ????????????????try?{
          ????????????????????lock.unlock();
          ????????????????}?catch?(IllegalMonitorStateException?e)?{
          ????????????????????info("Happen?Exception:?"?+?e.getMessage());
          ????????????????}
          ????????????}
          ????????};

          ????????for?(int?i=1;?i<=5;?i++)?{
          ????????????threadPool.execute(?task?);
          ????????}

          ????????//?主線程等待所有任務(wù)執(zhí)行完畢
          ????????try{?Thread.sleep(?120*1000?);?}?catch?(Exception?e)?{}
          ????????System.out.println("----------------------?系統(tǒng)下線?----------------------");
          ????}

          ????...
          }

          測試結(jié)果如下所示,符合預(yù)期。可以看到每隔1秒后,由于線程持鎖時間到期了。鎖被自動釋放了,進而使得下一個任務(wù)拿到了鎖。并且由于每個任務(wù)的鎖都是自動釋放的,故每次調(diào)用unlock方法均會拋出異常

          figure 2.jpeg

          非阻塞式獲取鎖

          下面展示如果通過tryLock方法進行非阻塞式獲取鎖,示例代碼如下所示

          /**
          ?*?RedissonLock?Demo?:?分布式非公平可重入互斥鎖
          ?*?@author?Aaron?Zhu
          ?*?@date?2022-04-04
          ?*/

          public?class?RedissonLockDemo?{
          ????...

          ????/**
          ?????*?測試:?非阻塞式獲取鎖
          ?????*/

          ????@Test
          ????public?void?testTryLock()?{
          ????????final?String?lockName?=?"sellPhone";
          ????????Runnable?task?=?()?->?{
          ????????????//?設(shè)置分布式鎖
          ????????????RLock?lock?=?redissonClient.getLock(lockName);
          ????????????boolean?flag?=?false;
          ????????????try{
          ????????????????//?非阻塞式獲取鎖
          ????????????????flag?=?lock.tryLock();
          ????????????????if(?flag?)?{
          ????????????????????info("成功獲取鎖");
          ????????????????????//?模擬業(yè)務(wù)耗時
          ????????????????????Thread.sleep(RandomUtils.nextLong(100,?500));
          ????????????????}?else?{
          ????????????????????info("未獲取到鎖\n");
          ????????????????}
          ????????????}?catch?(Exception?e)?{
          ????????????????System.out.println("Happen?Exception:?"?+?e.getMessage());
          ????????????}?finally?{
          ????????????????if(?flag?)?{
          ????????????????????info("釋放鎖\n");
          ????????????????????lock.unlock();
          ????????????????}
          ????????????}
          ????????};

          ????????for?(int?i=1;?i<=5;?i++)?{
          ????????????threadPool.execute(?task?);
          ????????}

          ????????//?主線程等待所有任務(wù)執(zhí)行完畢
          ????????try{?Thread.sleep(?10*1000?);?}?catch?(Exception?e)?{}
          ????????System.out.println("----------------------?系統(tǒng)下線?----------------------");
          ????}

          ????...
          }

          測試結(jié)果如下所示,符合預(yù)期

          figure 3.jpeg

          Watch Dog看門狗機制

          在介紹Redisson的Watch Dog看門狗機制之前,我們先來做個測試。如果某個線程一直持有鎖執(zhí)行業(yè)務(wù)邏輯,則鎖是否會被自動釋放呢?示例代碼如下所示

          /**
          ?*?RedissonLock?Demo?:?分布式非公平可重入互斥鎖
          ?*?@author?Aaron?Zhu
          ?*?@date?2022-04-04
          ?*/

          public?class?RedissonLockDemo?{
          ????...

          ????/**
          ?????*?測試:?看門狗機制
          ?????*/

          ????@Test
          ????public?void?testLock3()?{
          ????????final?String?lockName?=?"sellPig";
          ????????Runnable?task?=?()?->?{
          ????????????//?設(shè)置分布式鎖
          ????????????RLock?lock?=?redissonClient.getLock(lockName);
          ????????????try{
          ????????????????info("嘗試獲取鎖");
          ????????????????//?阻塞式獲取鎖
          ????????????????lock.lock();
          ????????????????info("成功獲取鎖");
          ????????????????//?未顯式指定鎖的持有時間,?則看門狗會在斷開連接前一直進行續(xù)期
          ????????????????while?(true)?{
          ????????????????}
          ????????????}?catch?(Exception?e)?{
          ????????????????info("Happen?Exception:?"?+?e.getMessage());
          ????????????}?finally?{
          ????????????????info("成功釋放鎖");
          ????????????????lock.unlock();
          ????????????}
          ????????};

          ????????for?(int?i=1;?i<5;?i++)?{
          ????????????threadPool.execute(?task?);
          ????????}

          ????????//?主線程等待所有任務(wù)執(zhí)行完畢
          ????????try{?Thread.sleep(?120*1000?);?}?catch?(Exception?e)?{}
          ????????info("----------------------?系統(tǒng)下線?----------------------");
          ????}

          ????...
          }

          測試結(jié)果如下所示,該鎖被持有后,一直未被釋放。其他任務(wù)都被阻塞住了

          figure 4.jpeg

          當我們調(diào)用不含leaseTime參數(shù)版本的lock()方法時,即未顯式設(shè)置最大持鎖時間。則其在RedissonLock類內(nèi)部會將 特殊值-1 傳給leaseTime參數(shù)。然后在tryAcquireAsync方法中會通過RedissonLock類的internalLockLeaseTime字段設(shè)置一個默認的最大持鎖時間。最后通過RedissonLock構(gòu)造器我們不難發(fā)現(xiàn) internalLockLeaseTime 字段的值來自于Config類的lockWatchdogTimeout字段。其中l(wèi)ockWatchdogTimeout字段的默認值為30秒。換言之即使我們調(diào)用lock方法時,未顯式設(shè)置最大持鎖時間。但RedissonLock內(nèi)部也會通過lockWatchdogTimeout字段給該鎖設(shè)置一個最大持有時間,默認值為30秒

          public?class?RedissonLock?extends?RedissonBaseLock?{

          ????protected?long?internalLockLeaseTime;
          ????
          ????public?RedissonLock(CommandAsyncExecutor?commandExecutor,?String?name)?{
          ????????super(commandExecutor,?name);
          ????????this.commandExecutor?=?commandExecutor;
          ????????//?internalLockLeaseTime?字段的值來自于Config類的lockWatchdogTimeout字段
          ????????this.internalLockLeaseTime?=?commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
          ????????this.pubSub?=?commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
          ????}
          ????
          ????/**
          ?????*?未顯式設(shè)置最大持鎖時間的lock方法
          ?????*/

          ????@Override
          ????public?void?lock()?{
          ????????try?{
          ????????????//?則其會將?特殊值-1?傳給leaseTime參數(shù)
          ????????????lock(-1,?null,?false);
          ????????}?catch?(InterruptedException?e)?{
          ????????????throw?new?IllegalStateException();
          ????????}
          ????}
          ????
          ????private?void?lock(long?leaseTime,?TimeUnit?unit,?boolean?interruptibly)?throws?InterruptedException?{
          ????????...
          ????????Long?ttl?=?tryAcquire(-1,?leaseTime,?unit,?threadId);
          ????????...
          ????}
          ????
          ????private?Long?tryAcquire(long?waitTime,?long?leaseTime,?TimeUnit?unit,?long?threadId)?{
          ????????return?get(tryAcquireAsync(waitTime,?leaseTime,?unit,?threadId));
          ????}

          ????private??RFuture?tryAcquireAsync(long?waitTime,?long?leaseTime,?TimeUnit?unit,?long?threadId)?{
          ????????RFuture?ttlRemainingFuture;
          ????????if?(leaseTime?!=?-1)?{
          ????????????ttlRemainingFuture?=?tryLockInnerAsync(waitTime,?leaseTime,?unit,?threadId,?RedisCommands.EVAL_LONG);
          ????????}?else?{
          ????????????//?未顯式設(shè)置最大持鎖時間?則會?通過?internalLockLeaseTime?字段設(shè)置一個默認的最大持鎖時間
          ????????????ttlRemainingFuture?=?tryLockInnerAsync(waitTime,?internalLockLeaseTime,
          ????????????????????TimeUnit.MILLISECONDS,?threadId,?RedisCommands.EVAL_LONG);
          ????????}
          ????????CompletionStage?f?=?ttlRemainingFuture.thenApply(ttlRemaining?->?{
          ????????????//?lock?acquired
          ????????????if?(ttlRemaining?==?null)?{
          ????????????????if?(leaseTime?!=?-1)?{
          ????????????????????internalLockLeaseTime?=?unit.toMillis(leaseTime);
          ????????????????}?else?{
          ????????????????????//?未顯式設(shè)置最大持鎖時間?則會啟動一個定時任務(wù)用于進行自動續(xù)期
          ????????????????????scheduleExpirationRenewal(threadId);
          ????????????????}
          ????????????}
          ????????????return?ttlRemaining;
          ????????});
          ????????return?new?CompletableFutureWrapper<>(f);
          ????}
          }

          ...

          public?class?Config?{

          ????//?時間:?30秒
          ????private?long?lockWatchdogTimeout?=?30?*?1000;

          ????public?long?getLockWatchdogTimeout()?{
          ????????return?lockWatchdogTimeout;
          ????}

          ????...
          }

          那問題來了,為啥在我們剛剛的測試代碼中即使持鎖時間超過了30秒,鎖也沒有被自動釋放呢?原因就在于Redisson的看門狗機制。在RedissonLock類的tryAcquireAsync方法中,未顯式設(shè)置最大持鎖時間 則會啟動一個定時任務(wù)用于進行自動續(xù)期。即RedissonLock類的tryAcquireAsync方法中會調(diào)用scheduleExpirationRenewal()以啟動一個定時任務(wù)用于進行自動續(xù)期。具體的續(xù)期邏輯在RedissonBaseLock類的renewExpiration方法中,其中自動續(xù)期定時任務(wù)的執(zhí)行周期 是RedissonBaseLock類的internalLockLeaseTime字段值的1/3。然后通過renewExpirationAsync方法每次利用Lua腳本向Redis發(fā)送續(xù)期命令,具體地。每次續(xù)期時會將RedissonBaseLock類的internalLockLeaseTime字段值設(shè)置為新的最大持鎖時間。同樣地,RedissonBaseLock類的 internalLockLeaseTime 字段值也是來自于Config類的lockWatchdogTimeout字段,即默認為30秒

          public?abstract?class?RedissonBaseLock?extends?RedissonExpirable?implements?RLock?{

          ?...

          ????protected?long?internalLockLeaseTime;

          ?public?RedissonBaseLock(CommandAsyncExecutor?commandExecutor,?String?name)?{
          ????????super(commandExecutor,?name);
          ????????this.commandExecutor?=?commandExecutor;
          ????????this.id?=?commandExecutor.getConnectionManager().getId();
          ????????//?internalLockLeaseTime?參數(shù)的值來自于Config類的lockWatchdogTimeout變量
          ????????this.internalLockLeaseTime?=?commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
          ????????this.entryName?=?id?+?":"?+?name;
          ????}
          ?
          ?protected?void?scheduleExpirationRenewal(long?threadId)?{
          ????????ExpirationEntry?entry?=?new?ExpirationEntry();
          ????????ExpirationEntry?oldEntry?=?EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(),?entry);
          ????????if?(oldEntry?!=?null)?{
          ????????????oldEntry.addThreadId(threadId);
          ????????}?else?{
          ????????????entry.addThreadId(threadId);
          ????????????try?{
          ?????????????//?自動續(xù)期
          ????????????????renewExpiration();
          ????????????}?finally?{
          ????????????????if?(Thread.currentThread().isInterrupted())?{
          ????????????????????cancelExpirationRenewal(threadId);
          ????????????????}
          ????????????}
          ????????}
          ????}

          ????private?void?renewExpiration()?{
          ????????ExpirationEntry?ee?=?EXPIRATION_RENEWAL_MAP.get(getEntryName());
          ????????if?(ee?==?null)?{
          ????????????return;
          ????????}
          ????????
          ????????Timeout?task?=?commandExecutor.getConnectionManager().newTimeout(new?TimerTask()?{
          ????????????@Override
          ????????????public?void?run(Timeout?timeout)?throws?Exception?{
          ????????????????ExpirationEntry?ent?=?EXPIRATION_RENEWAL_MAP.get(getEntryName());
          ????????????????if?(ent?==?null)?{
          ????????????????????return;
          ????????????????}
          ????????????????Long?threadId?=?ent.getFirstThreadId();
          ????????????????if?(threadId?==?null)?{
          ????????????????????return;
          ????????????????}
          ????????????????
          ????????????????//?通過Lua腳本向Redis發(fā)送續(xù)期命令
          ????????????????RFuture?future?=?renewExpirationAsync(threadId);

          ????????????????future.whenComplete((res,?e)?->?{
          ????????????????????if?(e?!=?null)?{
          ????????????????????????log.error("Can't?update?lock?"?+?getRawName()?+?"?expiration",?e);
          ????????????????????????EXPIRATION_RENEWAL_MAP.remove(getEntryName());
          ????????????????????????return;
          ????????????????????}
          ????????????????????
          ????????????????????if?(res)?{
          ????????????????????????//?reschedule?itself
          ????????????????????????renewExpiration();
          ????????????????????}?else?{
          ????????????????????????cancelExpirationRenewal(null);
          ????????????????????}
          ????????????????});
          ????????????}
          ????????????//?定時任務(wù)的執(zhí)行周期?是?internalLockLeaseTime字段值的?1/3
          ????????????//?即,?定時任務(wù)的執(zhí)行周期?是?lockWatchdogTimeout字段值的?1/3
          ????????},?internalLockLeaseTime?/?3,?TimeUnit.MILLISECONDS);
          ????????
          ????????ee.setTimeout(task);
          ????}

          ????protected?RFuture?renewExpirationAsync(long?threadId)?{
          ????????return?evalWriteAsync(getRawName(),?LongCodec.INSTANCE,?RedisCommands.EVAL_BOOLEAN,
          ????????????????"if?(redis.call('hexists',?KEYS[1],?ARGV[2])?==?1)?then?"?+
          ????????????????????????"redis.call('pexpire',?KEYS[1],?ARGV[1]);?"?+
          ????????????????????????"return?1;?"?+
          ????????????????????????"end;?"?+
          ????????????????????????"return?0;",
          ????????????????//?每次續(xù)期時會將internalLockLeaseTime字段值作為新的最大持鎖時間
          ????????????????Collections.singletonList(getRawName()),?internalLockLeaseTime,?getLockName(threadId));
          ????}

          ????...

          }

          至此我們應(yīng)該比較清楚Redisson的看門狗機制了:

          1. Redisson的Watch Dog看門狗機制只會在未顯式設(shè)置最大持鎖時間才會生效。換言之,一旦調(diào)用lock方法時指定了leaseTime參數(shù)值,則該鎖到期后即會自動釋放。Redisson的Watch Dog看門狗不會對該鎖進行自動續(xù)期
          2. 當我們未顯式設(shè)置Config類的lockWatchdogTimeout字段值時,使用默認的30秒。此時如果加鎖時未顯式設(shè)置最大持鎖時間,即Watch Dog看門狗機制會生效的場景中。該鎖實際上一開始也會設(shè)置一個默認的最大持鎖時間,即30秒。然后看門狗每隔10秒(30秒 * 1/3 = 10秒)會將該鎖的最大持鎖時間再次設(shè)置為30秒,以達到自動續(xù)期的目的。這樣只要持鎖線程的業(yè)務(wù)還未執(zhí)行完,則該鎖就一直有效、不會被自動釋放。當然一旦持鎖的服務(wù)實例發(fā)生宕機后,看門狗的定時任務(wù)自然也無法續(xù)期。這樣鎖到期后也就釋放掉了,避免了死鎖的發(fā)生

          RedissonFairLock 分布式公平可重入互斥鎖

          由于RedissonLock是非公平的,故Redisson提供了一個分布式公平可重入互斥鎖——RedissonFairLock。示例代碼如下所示

          /**
          ?*?RedissonFairLock?Demo?:?分布式公平可重入互斥鎖
          ?*?@author?Aaron?Zhu
          ?*?@date?2022-04-04
          ?*/

          public?class?RedissonFairLockDemo?{

          ????private?static?DateTimeFormatter?formatter?=?DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

          ????private?static?ExecutorService?threadPool?=?Executors.newFixedThreadPool(10);

          ????/**
          ?????*?測試:?可重入性
          ?????*/

          ????@Test
          ????public?void?test1()?{
          ????????Config?config?=?new?Config();
          ????????config.useSingleServer()
          ????????????.setAddress("redis://127.0.0.1:6379")
          ????????????.setPassword("123456");
          ????????RedissonClient?redissonClient?=?Redisson.create(?config?);

          ????????final?String?lockName?=?"sellWatch";
          ????????Runnable?task?=?()?->?{
          ????????????//?設(shè)置分布式鎖
          ????????????RLock?lock?=?redissonClient.getFairLock(lockName);
          ????????????try{
          ????????????????//?阻塞式獲取鎖
          ????????????????lock.lock();
          ????????????????info("成功獲取鎖?#1");
          ????????????????//?模擬業(yè)務(wù)耗時
          ????????????????Thread.sleep(RandomUtils.nextLong(100,?500));

          ????????????????lock.lock();
          ????????????????info("成功獲取鎖?#2");
          ????????????????//?模擬業(yè)務(wù)耗時
          ????????????????Thread.sleep(RandomUtils.nextLong(100,?500));
          ????????????}?catch?(Exception?e)?{
          ????????????????System.out.println("Happen?Exception:?"?+?e.getMessage());
          ????????????}?finally?{
          ????????????????info("釋放鎖?#2");
          ????????????????lock.unlock();

          ????????????????info("釋放鎖?#1\n");
          ????????????????lock.unlock();
          ????????????}
          ????????};

          ????????RLock?tempLock?=?redissonClient.getFairLock(lockName);
          ????????if(?tempLock?instanceof?RedissonFairLock?)?{
          ????????????System.out.println("鎖類型:?RedissonFairLock");
          ????????}
          ????????try{?Thread.sleep(?4*1000?);?}?catch?(Exception?e)?{}

          ????????for?(int?i=1;?i<=3;?i++)?{
          ????????????threadPool.execute(?task?);
          ????????}

          ????????//?主線程等待所有任務(wù)執(zhí)行完畢
          ????????try{?Thread.sleep(?40*1000?);?}?catch?(Exception?e)?{}
          ????????redissonClient.shutdown();
          ????????System.out.println("----------------------?系統(tǒng)下線?----------------------");
          ????}

          ????/**
          ?????*?打印信息
          ?????*?@param?msg
          ?????*/

          ????private?static?void?info(String?msg)?{
          ????????String?time?=?formatter.format(LocalTime.now());
          ????????String?thread?=?Thread.currentThread().getName();
          ????????String?log?=?"["+time+"]?"+?"?<"+?thread?+">?"?+?msg;
          ????????System.out.println(log);
          ????}
          }

          測試結(jié)果如下所示,符合預(yù)期

          figure 5.jpeg

          RReadWriteLock 分布式讀寫鎖

          讀寫測試

          RReadWriteLock是一個分布式可重入讀寫鎖。其中讀鎖為可重入的共享鎖、寫鎖為可重入的互斥鎖,且讀寫互斥。示例代碼如下所示

          /**
          ?*?RReadWriteLock?Demo?:?分布式可重入讀寫鎖
          ?*?@author?Aaron?Zhu
          ?*?@date?2022-04-04
          ?*/

          public?class?RReadWriteLockDemo?{
          ????private?static?DateTimeFormatter?formatter?=?DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

          ????private?static?ExecutorService?threadPool?=?Executors.newFixedThreadPool(10);

          ????private?static?RedissonClient?redissonClient;

          ????@BeforeClass
          ????public?static?void?init()?{
          ????????Config?config?=?new?Config();
          ????????config.useSingleServer()
          ????????????.setAddress("redis://127.0.0.1:6379")
          ????????????.setPassword("123456");
          ????????redissonClient?=?Redisson.create(?config?);
          ????}

          ????/**
          ?????*?測試:?讀鎖為共享鎖,?讀鎖具有可重入性
          ?????*/

          ????@Test
          ????public?void?test1Read()?{
          ????????System.out.println("\n----------------------?Test?1?:?Read?----------------------");
          ????????String?lockName?=?"sellCat";
          ????????for(int?i=1;?i<=3;?i++)?{
          ????????????String?taskName?=?"讀任務(wù)#"+i;
          ????????????Runnable?task?=?new?ReadTask(taskName,?redissonClient,?lockName);
          ????????????threadPool.execute(?task?);
          ????????}
          ????????//?主線程等待所有任務(wù)執(zhí)行完畢
          ????????try{?Thread.sleep(?10*1000?);?}?catch?(Exception?e)?{}
          ????}

          ????/**
          ?????*?測試:?寫鎖為互斥鎖,?寫鎖具有可重入性
          ?????*/

          ????@Test
          ????public?void?test2Write()?{
          ????????System.out.println("\n----------------------?Test?2?:?Write?----------------------");
          ????????String?lockName?=?"sellDog";
          ????????for(int?i=1;?i<=3;?i++)?{
          ????????????String?taskName?=?"寫任務(wù)#"+i;
          ????????????Runnable?task?=?new?WriteTask(taskName,?redissonClient,?lockName);
          ????????????threadPool.execute(?task?);
          ????????}
          ????????//?主線程等待所有任務(wù)執(zhí)行完畢
          ????????try{?Thread.sleep(?10*1000?);?}?catch?(Exception?e)?{}
          ????}

          ????/**
          ?????*?測試:?讀寫互斥
          ?????*/

          ????@Test
          ????public?void?test3ReadWrite()?{
          ????????System.out.println("\n----------------------?Test?3?:?Read?Write?----------------------");
          ????????String?lockName?=?"sellLion";
          ????????for(int?i=1;?i<=5;?i++)?{
          ????????????Runnable?task?=?null;
          ????????????Boolean?isReadTask?=?RandomUtils.nextBoolean();
          ????????????if(?isReadTask?)?{
          ????????????????task?=?new?ReadTask(?"讀任務(wù)#"+i,?redissonClient,?lockName);
          ????????????}?else?{
          ????????????????task?=?new?WriteTask(?"寫任務(wù)#"+i,?redissonClient,?lockName);
          ????????????}
          ????????????threadPool.execute(?task?);
          ????????}
          ????????//?主線程等待所有任務(wù)執(zhí)行完畢
          ????????try{?Thread.sleep(?10*1000?);?}?catch?(Exception?e)?{}
          ????}

          ????@AfterClass
          ????public?static?void?close()?{
          ????????redissonClient.shutdown();
          ????}

          ????/**
          ?????*?打印信息
          ?????*?@param?msg
          ?????*/

          ????private?static?void?info(String?msg)?{
          ????????String?time?=?formatter.format(LocalTime.now());
          ????????String?thread?=?Thread.currentThread().getName();
          ????????String?log?=?"["+time+"]?"+?"?<"+?thread?+">?"?+?msg;
          ????????System.out.println(log);
          ????}

          ????/**
          ?????*?讀任務(wù)
          ?????*/

          ????private?static?class?ReadTask?implements?Runnable?{
          ????????private?String?taskName;

          ????????private?RedissonReadLock?readLock;

          ????????public?ReadTask(String?taskName,?RedissonClient?redissonClient,?String?lockName)?{
          ????????????this.taskName?=?taskName;
          ????????????RReadWriteLock?readWriteLock?=?redissonClient.getReadWriteLock(lockName);
          ????????????this.readLock?=?(RedissonReadLock)?readWriteLock.readLock();
          ????????}

          ????????@Override
          ????????public?void?run()?{
          ????????????try{
          ????????????????readLock.lock();
          ????????????????info(taskName?+?":?成功獲取讀鎖?#1");
          ????????????????//?模擬業(yè)務(wù)耗時
          ????????????????Thread.sleep(RandomUtils.nextLong(100,?500));

          ????????????????readLock.lock();
          ????????????????info(taskName?+?":?成功獲取讀鎖?#2");
          ????????????????//?模擬業(yè)務(wù)耗時
          ????????????????Thread.sleep(RandomUtils.nextLong(100,?500));
          ????????????}?catch?(Exception?e)?{
          ????????????????System.out.println(?taskName?+?":?Happen?Exception:?"?+?e.getMessage());
          ????????????}?finally?{
          ????????????????info(taskName?+?":?釋放讀鎖?#2");
          ????????????????readLock.unlock();

          ????????????????info(taskName?+?":?釋放讀鎖?#1");
          ????????????????readLock.unlock();
          ????????????}
          ????????}
          ????}

          ????/**
          ?????*?寫任務(wù)
          ?????*/

          ????private?static?class?WriteTask?implements?Runnable?{
          ????????private?String?taskName;

          ????????private?RedissonWriteLock?writeLock;

          ????????public?WriteTask(String?taskName,?RedissonClient?redissonClient,?String?lockName)?{
          ????????????this.taskName?=?taskName;
          ????????????RReadWriteLock?readWriteLock?=?redissonClient.getReadWriteLock(lockName);
          ????????????this.writeLock?=?(RedissonWriteLock)?readWriteLock.writeLock();
          ????????}

          ????????@Override
          ????????public?void?run()?{
          ????????????try{
          ????????????????writeLock.lock();
          ????????????????info(taskName?+?":?成功獲取寫鎖?#1");
          ????????????????//?模擬業(yè)務(wù)耗時
          ????????????????Thread.sleep(RandomUtils.nextLong(100,?500));

          ????????????????writeLock.lock();
          ????????????????info(taskName?+?":?成功獲取寫鎖?#2");
          ????????????????//?模擬業(yè)務(wù)耗時
          ????????????????Thread.sleep(RandomUtils.nextLong(100,?500));
          ????????????}?catch?(Exception?e)?{
          ????????????????System.out.println(?taskName?+?":?Happen?Exception:?"?+?e.getMessage());
          ????????????}?finally?{
          ????????????????info(taskName?+?":?釋放寫鎖?#2");
          ????????????????writeLock.unlock();

          ????????????????info(taskName?+?":?釋放寫鎖?#1\n");
          ????????????????writeLock.unlock();
          ????????????}
          ????????}
          ????}
          }

          讀鎖測試結(jié)果如下所示,符合預(yù)期

          figure 6.jpeg

          寫鎖測試結(jié)果如下所示,符合預(yù)期

          figure 7.jpeg

          讀寫測試結(jié)果如下所示,符合預(yù)期

          figure 8.jpeg

          鎖升級、鎖降級


          所謂鎖升級指的是讀鎖升級為寫鎖。當一個線程先獲取到讀鎖再去申請寫鎖,顯然其是不支持的。理由也很簡單,讀鎖是可以多個服務(wù)實例同時持有的。若其中一個服務(wù)實例此鎖線程能夠進行鎖升級,成功獲得寫鎖。顯然與我們之前的所說的讀寫互斥相違背。因為其在獲得寫鎖的同時,其他服務(wù)實例依然持有讀鎖;反之,其是支持鎖降級的,即寫鎖降級為讀鎖。當一個服務(wù)實例的線程在獲得寫鎖后,該線程依然可以獲得讀鎖。這個時候當其釋放寫鎖,則將只持有讀鎖,即完成了鎖降級過程。鎖降級的使用價值也很大,其一方面保證了安全,讀鎖在寫鎖釋放前獲取;另一方面保證了高效,因為讀鎖是共享的。

          鎖升級示例代碼如下所示

          /**
          ?*?RReadWriteLock?Demo?:?分布式可重入讀寫鎖
          ?*?@author?Aaron?Zhu
          ?*?@date?2022-04-04
          ?*/

          public?class?RReadWriteLockDemo?{
          ????
          ????...

          ????/**
          ?????*?測試:?鎖升級
          ?????*/

          ????@Test
          ????public?void?test4Read2Write()?{
          ????????System.out.println("----------------------?Test?4?:?Read?->?Write?----------------------\n");
          ????????String?lockName?=?"sellTiger";
          ????????RReadWriteLock?readWriteLock?=?redissonClient.getReadWriteLock(lockName);
          ????????RedissonReadLock?readLock?=?(RedissonReadLock)?readWriteLock.readLock();
          ????????RedissonWriteLock??writeLock?=?(RedissonWriteLock)?readWriteLock.writeLock();

          ????????try?{
          ????????????readLock.lock();
          ????????????info("成功獲取讀鎖");
          ????????????//?模擬業(yè)務(wù)耗時
          ????????????Thread.sleep(RandomUtils.nextLong(100,?500));

          ????????????writeLock.lock();
          ????????????info("成功獲取寫鎖");
          ????????????//?模擬業(yè)務(wù)耗時
          ????????????Thread.sleep(RandomUtils.nextLong(100,?500));

          ????????????readLock.unlock();
          ????????????info("成功釋放讀鎖");
          ????????????//?模擬業(yè)務(wù)耗時
          ????????????Thread.sleep(RandomUtils.nextLong(100,?500));

          ????????????writeLock.unlock();
          ????????????info("成功釋放寫鎖");
          ????????}?catch?(Exception?e)?{
          ????????????System.out.println("Happen?Exception:?"?+?e.getMessage());
          ????????}
          ????????System.out.println("----------------------?系統(tǒng)下線?----------------------");
          ????}????
          }

          測試結(jié)果如下所示,在持有讀鎖的情況下,繼續(xù)嘗試獲取寫鎖會被一直阻塞

          鎖降級示例代碼如下所示

          /**
          ?*?RReadWriteLock?Demo?:?分布式可重入讀寫鎖
          ?*?@author?Aaron?Zhu
          ?*?@date?2022-04-04
          ?*/

          public?class?RReadWriteLockDemo?{
          ????
          ????...

          ????/**
          ?????*?測試:?鎖降級
          ?????*/

          ????@Test
          ????public?void?test5Write2Read()?{
          ????????System.out.println("----------------------?Test?2?:?Write?->?Read?----------------------");
          ????????String?lockName?=?"sellChicken";
          ????????RReadWriteLock?readWriteLock?=?redissonClient.getReadWriteLock(lockName);
          ????????RedissonReadLock?readLock?=?(RedissonReadLock)?readWriteLock.readLock();
          ????????RedissonWriteLock??writeLock?=?(RedissonWriteLock)?readWriteLock.writeLock();

          ????????try?{
          ????????????writeLock.lock();
          ????????????info("成功獲取寫鎖");
          ????????????//?模擬業(yè)務(wù)耗時
          ????????????Thread.sleep(RandomUtils.nextLong(100,?500));

          ????????????readLock.lock();
          ????????????info("成功獲取讀鎖");
          ????????????//?模擬業(yè)務(wù)耗時
          ????????????Thread.sleep(RandomUtils.nextLong(100,?500));

          ????????????writeLock.unlock();
          ????????????info("成功釋放寫鎖");
          ????????????//?模擬業(yè)務(wù)耗時
          ????????????Thread.sleep(RandomUtils.nextLong(100,?500));

          ????????????readLock.unlock();
          ????????????info("成功釋放讀鎖");
          ????????}?catch?(Exception?e)?{
          ????????????System.out.println("Happen?Exception:?"?+?e.getMessage());
          ????????}
          ????????System.out.println("----------------------?系統(tǒng)下線?----------------------");
          ????}
          }

          測試結(jié)果如下所示,符合預(yù)期

          figure 9.jpeg

          RedissonSpinLock 分布式非公平可重入自旋互斥鎖

          RedissonSpinLock則是一個分布式非公平可重入自旋互斥鎖。示例代碼如下所示

          /**
          ?*?RedissonSpinLock?Demo?:?分布式非公平可重入自旋互斥鎖
          ?*?@author?Aaron?Zhu
          ?*?@date?2022-04-04
          ?*/

          public?class?RedissonSpinLockDemo?{
          ????private?static?DateTimeFormatter?formatter?=?DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

          ????private?static?ExecutorService?threadPool?=?Executors.newFixedThreadPool(10);

          ????/**
          ?????*?測試:?可重入性、互斥鎖
          ?????*/

          ????@Test
          ????public?void?test1()?{
          ????????Config?config?=?new?Config();
          ????????config.useSingleServer()
          ????????????.setAddress("redis://127.0.0.1:6379")
          ????????????.setPassword("123456");
          ????????RedissonClient?redissonClient?=?Redisson.create(?config?);

          ????????final?String?lockName?=?"sellKeyword";
          ????????Runnable?task?=?()?->?{
          ????????????//?設(shè)置分布式鎖
          ????????????RLock?lock?=?redissonClient.getSpinLock(lockName);
          ????????????try{
          ????????????????//?阻塞式獲取鎖
          ????????????????lock.lock();
          ????????????????info("成功獲取鎖?#1");
          ????????????????//?模擬業(yè)務(wù)耗時
          ????????????????Thread.sleep(RandomUtils.nextLong(100,?500));

          ????????????????lock.lock();
          ????????????????info("成功獲取鎖?#2");
          ????????????????//?模擬業(yè)務(wù)耗時
          ????????????????Thread.sleep(RandomUtils.nextLong(100,?500));
          ????????????}?catch?(Exception?e)?{
          ????????????????System.out.println("Happen?Exception:?"?+?e.getMessage());
          ????????????}?finally?{
          ????????????????info("釋放鎖?#2");
          ????????????????lock.unlock();

          ????????????????info("釋放鎖?#1\n");
          ????????????????lock.unlock();
          ????????????}
          ????????};

          ????????RLock?tempLock?=?redissonClient.getSpinLock(lockName);
          ????????if(?tempLock?instanceof?RedissonSpinLock)?{
          ????????????System.out.println("鎖類型:?RedissonSpinLock");
          ????????}
          ????????try{?Thread.sleep(?2*1000?);?}?catch?(Exception?e)?{}

          ????????for?(int?i=1;?i<=3;?i++)?{
          ????????????threadPool.execute(?task?);
          ????????}

          ????????//?主線程等待所有任務(wù)執(zhí)行完畢
          ????????try{?Thread.sleep(?40*1000?);?}?catch?(Exception?e)?{}
          ????????redissonClient.shutdown();
          ????????System.out.println("----------------------?系統(tǒng)下線?----------------------");
          ????}

          ????/**
          ?????*?打印信息
          ?????*?@param?msg
          ?????*/

          ????private?static?void?info(String?msg)?{
          ????????String?time?=?formatter.format(LocalTime.now());
          ????????String?thread?=?Thread.currentThread().getName();
          ????????String?log?=?"["+time+"]?"+?"?<"+?thread?+">?"?+?msg;
          ????????System.out.println(log);
          ????}
          }

          測試結(jié)果如下所示,符合預(yù)期

          figure 10.jpeg

          RedissonCountDownLatch 分布式閂鎖

          RedissonCountDownLatch是一個分布式的CountDownLatch閂鎖。比如我們的業(yè)務(wù)系統(tǒng)會依賴很多其他基礎(chǔ)服務(wù),這樣在業(yè)務(wù)系統(tǒng)啟動過程中,需要等待其他基礎(chǔ)服務(wù)全部啟動完畢。示例代碼如下所示

          /**
          ?*?RedissonCountDownLatch?Demo?:?分布式閂鎖
          ?*?@author?Aaron?Zhu
          ?*?@date?2022-04-04
          ?*/

          public?class?RedissonCountDownLatchDemo?{
          ????private?static?DateTimeFormatter?formatter?=?DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

          ????private?static?ExecutorService?threadPool?=?Executors.newFixedThreadPool(10);

          ????private?static?RedissonClient?redissonClient;

          ????@BeforeClass
          ????public?static?void?init()?{
          ????????Config?config?=?new?Config();
          ????????config.useSingleServer()
          ????????????.setAddress("redis://127.0.0.1:6379")
          ????????????.setPassword("123456");
          ????????redissonClient?=?Redisson.create(?config?);
          ????}

          ????@Test
          ????public?void?test1()?throws?InterruptedException?{
          ????????final?String?countDownLatchName?=?"systemInit";
          ????????int?count?=?5;

          ????????for?(int?i=1;?i<=count;?i++)?{
          ????????????String?serviceName?=?"基礎(chǔ)服務(wù)?#"+i;
          ????????????BasicService?basicService?=?new?BasicService(serviceName,?redissonClient,?countDownLatchName,?count);
          ????????????threadPool.execute(?basicService?);
          ????????}

          ????????RedissonCountDownLatch?countDownLatch?=?(RedissonCountDownLatch)?redissonClient.getCountDownLatch(countDownLatchName);
          ????????countDownLatch.trySetCount(count);
          ????????//?阻塞等待基礎(chǔ)服務(wù)全部啟動完成
          ????????countDownLatch.await();

          ????????info("系統(tǒng)初始化已完成,?業(yè)務(wù)系統(tǒng)啟動?...");
          ????}

          ????/**
          ?????*?打印信息
          ?????*?@param?msg
          ?????*/

          ????private?static?void?info(String?msg)?{
          ????????String?time?=?formatter.format(LocalTime.now());
          ????????String?thread?=?Thread.currentThread().getName();
          ????????String?log?=?"["+time+"]?"+?"?<"+?thread?+">?"?+?msg;
          ????????System.out.println(log);
          ????}

          ????/**
          ?????*?基礎(chǔ)服務(wù)
          ?????*/

          ????private?static?class?BasicService?implements?Runnable?{
          ????????private?String?serviceName;

          ????????private?RedissonCountDownLatch?countDownLatch;

          ????????public?BasicService(String?serviceName,?RedissonClient?redissonClient,?String?countDownLatchName,?Integer?count)?{
          ????????????this.serviceName?=?serviceName;
          ????????????this.countDownLatch?=?(RedissonCountDownLatch)?redissonClient.getCountDownLatch(countDownLatchName);
          ????????????this.countDownLatch.trySetCount(?count?);
          ????????}

          ????????@Override
          ????????public?void?run()?{
          ????????????try{
          ????????????????info(serviceName?+?":?啟動中");
          ????????????????//?模擬?基礎(chǔ)服務(wù)啟動?耗時
          ????????????????Thread.sleep(?RandomUtils.nextLong(1,?5)?*?1000?);
          ????????????????countDownLatch.countDown();
          ????????????????info(serviceName?+?":?啟動完成");
          ????????????}?catch?(Exception?e)?{
          ????????????????System.out.println(?serviceName?+?":?Happen?Exception:?"?+?e.getMessage());
          ????????????}
          ????????}
          ????}
          }

          測試結(jié)果如下所示,符合預(yù)期

          figure 11.jpeg

          RedissonSemaphore 分布式信號量

          RedissonSemaphore是一個分布式的信號量,示例代碼如下所示

          /**
          ?*?RedissonSemaphore?Demo?:?分布式信號量
          ?*?@author?Aaron?Zhu
          ?*?@date?2022-04-04
          ?*/

          public?class?RedissonSemaphoreDemo?{
          ????private?static?DateTimeFormatter?formatter?=?DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

          ????private?static?ExecutorService?threadPool?=?Executors.newFixedThreadPool(10);

          ????@Test
          ????public?void?test1()?{
          ????????Config?config?=?new?Config();
          ????????config.useSingleServer()
          ????????????.setAddress("redis://127.0.0.1:6379")
          ????????????.setPassword("123456");
          ????????RedissonClient?redissonClient?=?Redisson.create(?config?);

          ????????final?String?lockName?=?"sellAnimal";
          ????????//?系統(tǒng)最大并發(fā)處理量
          ????????int?maxLimit?=?3;
          ????????IntStream.rangeClosed(1,8)
          ????????????.mapToObj(?num?->?new?UserReq("用戶#"+num,?redissonClient,?lockName,?maxLimit)?)
          ????????????.forEach(?threadPool::execute?);

          ????????//?主線程等待所有任務(wù)執(zhí)行完畢
          ????????try{?Thread.sleep(?40*1000?);?}?catch?(Exception?e)?{}
          ????????redissonClient.shutdown();
          ????????System.out.println("----------------------?系統(tǒng)下線?----------------------");
          ????}

          ????/**
          ?????*?打印信息
          ?????*?@param?msg
          ?????*/

          ????private?static?void?info(String?msg)?{
          ????????String?time?=?formatter.format(LocalTime.now());
          ????????String?thread?=?Thread.currentThread().getName();
          ????????String?log?=?"["+time+"]?"+?"?<"+?thread?+">?"?+?msg;
          ????????System.out.println(log);
          ????}

          ????private?static?class?UserReq?implements?Runnable?{
          ????????private?String?name;

          ????????private?RedissonSemaphore?semaphore;

          ????????public?UserReq(String?name,?RedissonClient?redissonClient,?String?lockName,?Integer?maxLimit)?{
          ????????????this.name?=?name;
          ????????????this.semaphore?=?(RedissonSemaphore)?redissonClient.getSemaphore(lockName);
          ????????????//?設(shè)置信號量的許可數(shù)
          ????????????semaphore.trySetPermits(?maxLimit?);
          ????????}

          ????????@Override
          ????????public?void?run()?{
          ????????????try?{
          ????????????????//?模擬用戶不定時發(fā)起請求
          ????????????????Thread.sleep(RandomUtils.nextLong(500,?2000));
          ????????????????info(?name?+?":?發(fā)起請求"?);

          ????????????????//?阻塞等待,直到獲取許可
          ????????????????semaphore.acquire();
          ????????????????info(name?+?":?系統(tǒng)開始處理請求");
          ????????????????//?模擬業(yè)務(wù)耗時
          ????????????????Thread.sleep(RandomUtils.nextInt(5,?20)*1000);

          ????????????????//?用戶請求處理完畢,釋放許可
          ????????????????semaphore.release();
          ????????????????info(name?+?":?系統(tǒng)處理完畢");
          ????????????}catch?(Exception?e)?{
          ????????????????System.out.println("Happen?Exception:?"?+?e.getMessage());
          ????????????}
          ????????}
          ????}
          }

          測試結(jié)果如下所示,符合預(yù)期

          figure 12.jpeg

          RedissonPermitExpirableSemaphore 分布式支持有效期的信號量

          相比較于RedissonSemaphore而言,RedissonPermitExpirableSemaphore在獲取許可的acquire方法中,增加了一個支持leaseTime參數(shù)的重載版本。以實現(xiàn)指定許可的最大持有時間。一旦業(yè)務(wù)持許可時間超過leaseTime參數(shù)值,則其持有的許可會被自動釋放。但需要注意的是某個線程的許可一旦被自動釋放后,此時再調(diào)用release方法來釋放許可時,即會拋出異常。原因也很簡單,因為此時線程實際上并未持有許可,示例代碼如下所示。示例代碼如下所示

          /**
          ?*?RedissonPermitExpirableSemaphore?Demo?:?分布式支持有效期的信號量
          ?*?@author?Aaron?Zhu
          ?*?@date?2022-04-04
          ?*/

          public?class?RedissonPermitExpirableSemaphoreDemo?{
          ????private?static?DateTimeFormatter?formatter?=?DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

          ????private?static?ExecutorService?threadPool?=?Executors.newFixedThreadPool(10);

          ????@Test
          ????public?void?test1()?{
          ????????Config?config?=?new?Config();
          ????????config.useSingleServer()
          ????????????.setAddress("redis://127.0.0.1:6379")
          ????????????.setPassword("123456");
          ????????RedissonClient?redissonClient?=?Redisson.create(?config?);

          ????????final?String?lockName?=?"sellMilk";
          ????????//?系統(tǒng)最大并發(fā)處理量
          ????????int?maxLimit?=?3;
          ????????IntStream.rangeClosed(1,8)
          ????????????.mapToObj(?num?->?new?UserReq("用戶?#"+num,?redissonClient,?lockName,?maxLimit)?)
          ????????????.forEach(?threadPool::execute?);

          ????????//?主線程等待所有任務(wù)執(zhí)行完畢
          ????????try{?Thread.sleep(?40*1000?);?}?catch?(Exception?e)?{}
          ????????redissonClient.shutdown();
          ????????System.out.println("----------------------?系統(tǒng)下線?----------------------");
          ????}

          ????/**
          ?????*?打印信息
          ?????*?@param?msg
          ?????*/

          ????private?static?void?info(String?msg)?{
          ????????String?time?=?formatter.format(LocalTime.now());
          ????????String?thread?=?Thread.currentThread().getName();
          ????????String?log?=?"["+time+"]?"+?"?<"+?thread?+">?"?+?msg;
          ????????System.out.println(log);
          ????}

          ????private?static?class?UserReq?implements?Runnable?{
          ????????private?String?name;

          ????????private?RedissonPermitExpirableSemaphore?semaphore;

          ????????public?UserReq(String?name,?RedissonClient?redissonClient,?String?lockName,?Integer?maxLimit)?{
          ????????????this.name?=?name;
          ????????????this.semaphore?=?(RedissonPermitExpirableSemaphore)?redissonClient.getPermitExpirableSemaphore(lockName);
          ????????????//?設(shè)置信號量的許可數(shù)
          ????????????semaphore.trySetPermits(?maxLimit?);
          ????????}

          ????????@Override
          ????????public?void?run()?{
          ????????????try?{
          ????????????????//?模擬用戶不定時發(fā)起請求
          ????????????????if(?!name.equals("用戶?#1")?)?{
          ????????????????????Thread.sleep(?RandomUtils.nextLong(1000,?2000)?);
          ????????????????}
          ????????????????info(?name?+?":?發(fā)起請求"?);

          ????????????????//?阻塞等待直到獲取許可,?指定信號量的最大持有時間為2秒
          ????????????????String?permitId?=?semaphore.acquire(2,?TimeUnit.SECONDS);
          ????????????????info(name?+?":?系統(tǒng)開始處理請求");

          ????????????????//?模擬業(yè)務(wù)耗時
          ????????????????if(?name.equals("用戶?#1")?)?{
          ????????????????????Thread.sleep(?5?*?1000?);
          ????????????????}?else?{
          ????????????????????Thread.sleep(RandomUtils.nextInt(500,?1000));
          ????????????????}

          ????????????????//?用戶請求處理完畢,釋放許可
          ????????????????semaphore.release(permitId);
          ????????????????info(name?+?":?系統(tǒng)處理完畢");
          ????????????}catch?(Exception?e)?{
          ????????????????info(?name?+?":?Happen?Exception:?"?+?e.getCause().getMessage());
          ????????????}
          ????????}
          ????}
          }

          測試結(jié)果如下所示,符合預(yù)期

          figure 13.jpeg


          瀏覽 77
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  大香蕉在线网站 | 久福利在线视频草 | 黄a免费视频网站 | 成人在线网址 | 91av资源网 |