分布式鎖的三種實現(xiàn)方式
分布式鎖的三種實現(xiàn)方式
為什么要使用分布式鎖
使用分布式鎖的目的,無外乎就是保證同一時間只有一個客戶端可以對共享資源進行操作。
zookeeper可靠性比redis強太多,只是效率低了點,如果并發(fā)量不是特別大,追求可靠性,首選zookeeper。為了效率,則首選redis實現(xiàn)。
1.數(shù)據(jù)庫方式
/*** 該類實現(xiàn)Lock類,用于分布式鎖(也可以不實現(xiàn)該類,自己添加lock和unlock方法)* 實現(xiàn)思路:* 1.在數(shù)據(jù)庫中建立一張表,創(chuàng)建一個屬性唯一的字段* 2.當有用戶訪問時,先查詢該字段是否有指定的值,* 如果有則說明該數(shù)據(jù)正在被訪問,拒絕其他用戶進行寫操作* 如果沒有則向數(shù)據(jù)庫添加指定的值(上鎖)* 3.該用戶用完需要的數(shù)據(jù)后刪除表中添加的指定數(shù)據(jù)(解鎖)*/public class MysqlLock implements Lock {TestLockMapper testLockMapper;private static final String FLAG = "isLock";/*** 上鎖就是向數(shù)據(jù)庫中加上一條數(shù)據(jù),該字段設置為唯一* 當其他人想上鎖時發(fā)現(xiàn)該數(shù)據(jù)有人占用就會等待*/public void lock() {while(true){ //循環(huán)直到獲取到鎖boolean b = tryLock(); //如果鎖被占用則獲取不到if(b){TestLock testLock = new TestLock();testLock.setFlag(FLAG);testLockMapper.insert(testLock);break;}else {System.out.println("該數(shù)據(jù)被占用...請等待。。。");}}}public void lockInterruptibly() throws InterruptedException {}/*** 嘗試獲取鎖,如果數(shù)據(jù)庫中有該值,則獲取不到鎖* @return*/public boolean tryLock() {QueryWrapper<TestLock> wrapper = new QueryWrapper<>();wrapper.eq("flag",FLAG);TestLock testLock = testLockMapper.selectOne(wrapper);if(testLock==null){return true;}return false;}public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return false;}/*** 釋放鎖,將上鎖時設置的數(shù)據(jù)刪除,其他請求可以設置數(shù)據(jù)來得到鎖*/public void unlock() {QueryWrapper<TestLock> wrapper = new QueryWrapper<>();wrapper.eq("flag",FLAG);testLockMapper.delete(wrapper);}public Condition newCondition() {return null;}}
2.redis方式
死鎖問題:當進行l(wèi)ock方法上鎖以后,在執(zhí)行操作時有異常而無法執(zhí)行unlock釋放鎖
解決死鎖:在redis中可以通過expire設置數(shù)據(jù)的過期時間,指定的時間內即使我們不釋放鎖也會自動刪除
redis分布式鎖實現(xiàn)基于setnt(set if not exists),設置成功返回1,失敗返回0,釋放鎖通過del指令完成
/*** 實現(xiàn)思路* 基本和mysql一致* 在redis中使用命令判斷該值是否存在,存在則等待,不存在則設置值,獲取鎖* 當操作完成后刪除值并且釋放鎖*/public class RedisLock {@AutowiredRedisTemplate redisTemplate;private static final String NAME = "lock";private static final String FLAG = "isLock";public void lock(){while (true){//向redis中設置指定的key/value,//該方法有可能導致死鎖// Boolean b = redisTemplate.opsForValue().setIfAbsent(NAME, FLAG);//設置過期時間為 1 分鐘 ,即使1分鐘后沒有人釋放鎖他也會自動消失Boolean b = redisTemplate.opsForValue().setIfAbsent(NAME, FLAG,1, TimeUnit.MINUTES);if (b){return ;}else{System.out.println(" 該數(shù)據(jù)被占用...請等待。。。");}}}public void unlock(){redisTemplate.delete(NAME);}}
3.zookeeper方式
zookeeper實現(xiàn)分布式鎖:原理:有序臨時節(jié)點+watch監(jiān)聽來實現(xiàn)
實現(xiàn)思路:為每一個執(zhí)行的線程創(chuàng)建一個有序的臨時節(jié)點,為了確保有序性,在創(chuàng)建完節(jié)點,會再獲取全部節(jié)點,再重新進行—次排序,排序過程中,每個線程要判斷自己剩下的臨時節(jié)點的序號是否是顯小的, 如果是最小的,將會獲取到鎖,執(zhí)行相關操作,釋放鎖 如果不是最小的,會監(jiān)聽它的前一個節(jié)點,當它的前一個節(jié)點被刪除時,它就會獲得鎖,依次類推
public class ZkLock {//ZooKeeper客戶端private ZooKeeper zk;//zk的一個目錄結構locks,代表根目錄private String root ="/locks";//鎖的名稱private String lockName;//當前線程創(chuàng)建的序列nodeprivate ThreadLocal<String> nodeId = new ThreadLocal<>();//用來同步等待zkclient鏈接到了服務端//CountDownLatch 一個同步輔助類,在完成一組正在其他線程中執(zhí)行的操作之前,它允許一個或多個線程一直等待private CountDownLatch countDownLatch = new CountDownLatch(1);//超時時間private static final int sessionTimeout = 3000;private final static byte[] data = new byte[0];//在構造方法中連接zookeeper,創(chuàng)建根節(jié)點lockpublic ZkLock(String config,String lockName){this.lockName = lockName;try {zk = new ZooKeeper(config, sessionTimeout, new Watcher() {//watcher用于監(jiān)聽public void process(WatchedEvent event) {if(event.getState()==Event.KeeperState.SyncConnected){//遞減鎖存器的計數(shù),如果計數(shù)到達零,則釋放所有等待的線程。countDownLatch.countDown();}}});//使當前線程在鎖存器倒計數(shù)至零之前一直等待,除非線程被中斷。countDownLatch.await();Stat stat = zk.exists(root,false);if(null==stat){//創(chuàng)建根節(jié)點zk.create(root,data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}} catch (Exception e) {e.printStackTrace();}}/*** 添加watch監(jiān)聽臨時順序節(jié)點的刪除*/class LockWatcher implements Watcher {private CountDownLatch latch = null;public LockWatcher(CountDownLatch latch) {this.latch = latch;}public void process(WatchedEvent event) {if(event.getType()== Event.EventType.NodeDeleted){latch.countDown();}}}public void lock(){try{//創(chuàng)建臨時子節(jié)點String myNode = zk.create(root+"/"+lockName,data,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);System.out.println(Thread.currentThread().getName()+myNode+"created");//取出所有子節(jié)點,并排序List<String> subNode = zk.getChildren(root,false);TreeSet<String> sortedNode = new TreeSet<>();for (String node: subNode) {sortedNode.add(root+"/"+node);}String smallNode = sortedNode.first();if(myNode.equals(smallNode)){//如果是最小節(jié)點,則表示獲得鎖this.nodeId.set(myNode);return;}String preNode = sortedNode.lower(myNode);CountDownLatch latch = new CountDownLatch(1);Stat stat = zk.exists(preNode,new LockWatcher(latch));//同時注冊監(jiān)聽//判斷比自己小一個數(shù)的節(jié)點是否存在,如果不存在則不等待鎖,同時注冊監(jiān)聽if(stat != null){latch.await();//等待其他線程釋放鎖nodeId.set(myNode);latch = null;}} catch (Exception e) {e.printStackTrace();}}public void unlock(){try {if(null!= nodeId){zk.delete(nodeId.get(),-1);}nodeId.remove();//釋放鎖} catch (Exception e) {e.printStackTrace();}}}
關于Redundant declaration:@SpringBootApplication already applies given @ComponentScan異常
1.@ComponentScan默認掃描使用該注解的類所在的包,包括這個包下的類和子包,所以如果沒有配置basepackages,并且類都放在子包中,是可以正常訪問的2.如果配置了@ComponentScn中的basepackages,那么就要把所有需要掃描的包都配置.這種情況下,@ComponentScan是不會再去掃描當前類所在的包的.之前我之所以以為@ComponentScan對啟動類之外的包無能為力,就是因為配置了domain包,但是沒有配controller類的包,導致程序無法訪問.
