<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          分布式鎖的三種實現(xiàn)方式

          共 7691字,需瀏覽 16分鐘

           ·

          2023-08-10 07:06

          分布式鎖的三種實現(xiàn)方式

          為什么要使用分布式鎖

          使用分布式鎖的目的,無外乎就是保證同一時間只有一個客戶端可以對共享資源進行操作。

          zookeeper可靠性比redis強太多,只是效率低了點,如果并發(fā)量不是特別大,追求可靠性,首選zookeeper。為了效率,則首選redis實現(xiàn)。

          1.數(shù)據(jù)庫方式

          /** * 該類實現(xiàn)Lock類,用于分布式鎖(也可以不實現(xiàn)該類,自己添加lock和unlock方法) * 實現(xiàn)思路: * 1.在數(shù)據(jù)庫中建立一張表,創(chuàng)建一個屬性唯一的字段 * 2.當有用戶訪問時,先查詢該字段是否有指定的值, * 如果有則說明該數(shù)據(jù)正在被訪問,拒絕其他用戶進行寫操作 * 如果沒有則向數(shù)據(jù)庫添加指定的值(上鎖) * 3.該用戶用完需要的數(shù)據(jù)后刪除表中添加的指定數(shù)據(jù)(解鎖) */public class MysqlLock implements Lock {    @Autowired    TestLockMapper testLockMapper;    private static final String FLAG = "isLock";
          /** * 上鎖就是向數(shù)據(jù)庫中加上一條數(shù)據(jù),該字段設置為唯一 * 當其他人想上鎖時發(fā)現(xiàn)該數(shù)據(jù)有人占用就會等待 */ @Override public void lock() { while(true){ //循環(huán)直到獲取到鎖 boolean b = tryLock(); //如果鎖被占用則獲取不到 if(b){ TestLock testLock = new TestLock(); testLock.setFlag(FLAG); testLockMapper.insert(testLock); break; }else { System.out.println("該數(shù)據(jù)被占用...請等待。。。"); } } }
          @Override public void lockInterruptibly() throws InterruptedException {
          }
          /** * 嘗試獲取鎖,如果數(shù)據(jù)庫中有該值,則獲取不到鎖 * @return */ @Override public boolean tryLock() { QueryWrapper<TestLock> wrapper = new QueryWrapper<>(); wrapper.eq("flag",FLAG); TestLock testLock = testLockMapper.selectOne(wrapper); if(testLock==null){ return true; } return false; }
          @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; }
          /** * 釋放鎖,將上鎖時設置的數(shù)據(jù)刪除,其他請求可以設置數(shù)據(jù)來得到鎖 */ @Override public void unlock() { QueryWrapper<TestLock> wrapper = new QueryWrapper<>(); wrapper.eq("flag",FLAG); testLockMapper.delete(wrapper); }
          @Override public Condition newCondition() { return null; }}

          2.redis方式

          死鎖問題:當進行l(wèi)ock方法上鎖以后,在執(zhí)行操作時有異常而無法執(zhí)行unlock釋放鎖

          解決死鎖:在redis中可以通過expire設置數(shù)據(jù)的過期時間,指定的時間內即使我們不釋放鎖也會自動刪除

          redis分布式鎖實現(xiàn)基于setnt(set if not exists),設置成功返回1,失敗返回0,釋放鎖通過del指令完成

          /** * 實現(xiàn)思路 * 基本和mysql一致 * 在redis中使用命令判斷該值是否存在,存在則等待,不存在則設置值,獲取鎖 * 當操作完成后刪除值并且釋放鎖 */public class RedisLock {    @Autowired    RedisTemplate redisTemplate;    private static final String NAME = "lock";    private static final String FLAG = "isLock";    public void lock(){        while (true){        //向redis中設置指定的key/value,       //該方法有可能導致死鎖        //    Boolean b = redisTemplate.opsForValue().setIfAbsent(NAME, FLAG);       //設置過期時間為 1 分鐘 ,即使1分鐘后沒有人釋放鎖他也會自動消失            Boolean b = redisTemplate.opsForValue().setIfAbsent(NAME, FLAG,1, TimeUnit.MINUTES);            if (b){                return ;            }else{                System.out.println(" 該數(shù)據(jù)被占用...請等待。。。");            }        }    }
          public void unlock(){ redisTemplate.delete(NAME); }
          }

          3.zookeeper方式

          zookeeper實現(xiàn)分布式鎖:原理:有序臨時節(jié)點+watch監(jiān)聽來實現(xiàn)

          實現(xiàn)思路:為每一個執(zhí)行的線程創(chuàng)建一個有序的臨時節(jié)點,為了確保有序性,在創(chuàng)建完節(jié)點,會再獲取全部節(jié)點,再重新進行—次排序,排序過程中,每個線程要判斷自己剩下的臨時節(jié)點的序號是否是顯小的, 如果是最小的,將會獲取到鎖,執(zhí)行相關操作,釋放鎖 如果不是最小的,會監(jiān)聽它的前一個節(jié)點,當它的前一個節(jié)點被刪除時,它就會獲得鎖,依次類推

          public class ZkLock {    //ZooKeeper客戶端    private ZooKeeper zk;    //zk的一個目錄結構locks,代表根目錄    private String root ="/locks";    //鎖的名稱    private String lockName;    //當前線程創(chuàng)建的序列node    private ThreadLocal<String> nodeId = new ThreadLocal<>();    //用來同步等待zkclient鏈接到了服務端   //CountDownLatch 一個同步輔助類,在完成一組正在其他線程中執(zhí)行的操作之前,它允許一個或多個線程一直等待    private CountDownLatch countDownLatch = new CountDownLatch(1);    //超時時間    private static final int sessionTimeout = 3000;    private final static byte[] data = new byte[0];    //在構造方法中連接zookeeper,創(chuàng)建根節(jié)點lock    public  ZkLock(String config,String lockName){        this.lockName = lockName;        try {            zk = new ZooKeeper(config, sessionTimeout, new Watcher() {//watcher用于監(jiān)聽                @Override                public void process(WatchedEvent event) {                    if(event.getState()==Event.KeeperState.SyncConnected){                        //遞減鎖存器的計數(shù),如果計數(shù)到達零,則釋放所有等待的線程。                        countDownLatch.countDown();                    }                }            });            //使當前線程在鎖存器倒計數(shù)至零之前一直等待,除非線程被中斷。            countDownLatch.await();            Stat stat = zk.exists(root,false);            if(null==stat){                //創(chuàng)建根節(jié)點                zk.create(root,data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);            }        } catch (Exception e) {            e.printStackTrace();        }    }
          /** * 添加watch監(jiān)聽臨時順序節(jié)點的刪除 */ class LockWatcher implements Watcher { private CountDownLatch latch = null; public LockWatcher(CountDownLatch latch) { this.latch = latch; } @Override public void process(WatchedEvent event) { if(event.getType()== Event.EventType.NodeDeleted){ latch.countDown(); } } }
          public void lock(){ try{ //創(chuàng)建臨時子節(jié)點 String myNode = zk.create(root+"/"+lockName,data,ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(Thread.currentThread().getName()+myNode+"created"); //取出所有子節(jié)點,并排序 List<String> subNode = zk.getChildren(root,false); TreeSet<String> sortedNode = new TreeSet<>(); for (String node: subNode) { sortedNode.add(root+"/"+node); } String smallNode = sortedNode.first(); if(myNode.equals(smallNode)){ //如果是最小節(jié)點,則表示獲得鎖 this.nodeId.set(myNode); return; } String preNode = sortedNode.lower(myNode); CountDownLatch latch = new CountDownLatch(1); Stat stat = zk.exists(preNode,new LockWatcher(latch));//同時注冊監(jiān)聽 //判斷比自己小一個數(shù)的節(jié)點是否存在,如果不存在則不等待鎖,同時注冊監(jiān)聽 if(stat != null){ latch.await();//等待其他線程釋放鎖 nodeId.set(myNode); latch = null; }
          } catch (Exception e) { e.printStackTrace(); } }
          public void unlock(){ try { if(null!= nodeId){ zk.delete(nodeId.get(),-1); } nodeId.remove();//釋放鎖 } catch (Exception e) { e.printStackTrace(); }
          }
          }

          關于Redundant declaration:@SpringBootApplication already applies given @ComponentScan異常

          1.@ComponentScan默認掃描使用該注解的類所在的包,包括這個包下的類和子包,所以如果沒有配置basepackages,并且類都放在子包中,是可以正常訪問的2.如果配置了@ComponentScn中的basepackages,那么就要把所有需要掃描的包都配置.這種情況下,@ComponentScan是不會再去掃描當前類所在的包的.之前我之所以以為@ComponentScan對啟動類之外的包無能為力,就是因為配置了domain包,但是沒有配controller類的包,導致程序無法訪問.


          瀏覽 66
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  五月色在线视频 | 天堂精品在线 | 日本A黄色大片在线看免费在线看 | 午夜精品视频在线 | 在线观看小黄片 |