<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          分布式鎖實(shí)現(xiàn)的正確打開(kāi)方式

          共 24639字,需瀏覽 50分鐘

           ·

          2021-03-15 09:26

          點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號(hào)”

          優(yōu)質(zhì)文章,第一時(shí)間送達(dá)

            作者 |  LanceToBigData

          來(lái)源 |  urlify.cn/MVn263

          76套java從入門(mén)到精通實(shí)戰(zhàn)課程分享

          一、分布式鎖概述

          1.1、分布式鎖作用

          1)在分布式系統(tǒng)環(huán)境下,一個(gè)方法在同一時(shí)間只能被一個(gè)機(jī)器的一個(gè)線(xiàn)程執(zhí)行

          2)具備高可用、高性能的獲取鎖與釋放鎖

          3)具備鎖失效機(jī)制,防止死鎖

          4)具備非阻塞鎖(沒(méi)有獲取到鎖將直接返回獲取鎖失敗)或堵塞鎖特性(根據(jù)業(yè)務(wù)需求考慮)

          1.2、分布式鎖應(yīng)用場(chǎng)景

          1)庫(kù)存扣減與增加

          分布式鎖保證庫(kù)存扣減不會(huì)超賣(mài),庫(kù)存增加不會(huì)造成庫(kù)存數(shù)據(jù)不準(zhǔn)確

          2)積分抵現(xiàn)

          防止積分扣減出現(xiàn)溢出的情況

          3)會(huì)員禮品核銷(xiāo)

          防止禮品核銷(xiāo)多次

          1.3、實(shí)現(xiàn)方式

          1)使用Redis,基于setnx命令或其他。

          2)使用ZooKeeper,基于臨時(shí)有序節(jié)點(diǎn)。

          3)使用MySQL,基于唯一索引

          二、基于Zookeeper實(shí)現(xiàn)分布式鎖

          2.1、Zookeeper特性介紹

          1)有序節(jié)點(diǎn)

          假如當(dāng)前有一個(gè)父節(jié)點(diǎn)為/lock,我們可以在這個(gè)父節(jié)點(diǎn)下面創(chuàng)建子節(jié)點(diǎn);zookeeper提供了一個(gè)可選的有序特性,例如我們可以創(chuàng)建子節(jié)點(diǎn)“/lock/node-”并且指明有序,那么zookeeper在生成子節(jié)點(diǎn)時(shí)會(huì)根據(jù)當(dāng)前的子節(jié)點(diǎn)數(shù)量自動(dòng)添加整數(shù)序號(hào),也就是說(shuō)如果是第一個(gè)創(chuàng)建的子節(jié)點(diǎn),那么生成的子節(jié)點(diǎn)為/lock/node-0000000000,下一個(gè)節(jié)點(diǎn)則為/lock/node-0000000001,依次類(lèi)推。

          2)臨時(shí)節(jié)點(diǎn)

          客戶(hù)端可以建立一個(gè)臨時(shí)節(jié)點(diǎn),在會(huì)話(huà)結(jié)束或者會(huì)話(huà)超時(shí)后,zookeeper會(huì)自動(dòng)刪除該節(jié)點(diǎn)。

          3)事件監(jiān)聽(tīng)

          在讀取數(shù)據(jù)時(shí),我們可以同時(shí)對(duì)節(jié)點(diǎn)設(shè)置事件監(jiān)聽(tīng),當(dāng)節(jié)點(diǎn)數(shù)據(jù)或結(jié)構(gòu)變化時(shí),zookeeper會(huì)通知客戶(hù)端。當(dāng)前zookeeper有如下四種事件:節(jié)點(diǎn)創(chuàng)建、節(jié)點(diǎn)刪除、節(jié)點(diǎn)數(shù)據(jù)修改、子節(jié)點(diǎn)變更

          2.2、Zookeeper分布式鎖實(shí)現(xiàn)(方式一)

          2.2.1、實(shí)現(xiàn)原理

          1)客戶(hù)端連接zookeeper,并在父節(jié)點(diǎn)(/lock)下創(chuàng)建臨時(shí)的且有序的子節(jié)點(diǎn),第一個(gè)客戶(hù)端對(duì)應(yīng)的子節(jié)點(diǎn)為/lock/lock-1,第二個(gè)為/lock/lock-2,以此類(lèi)推。
          2)客戶(hù)端獲取/lock下的子節(jié)點(diǎn)列表,判斷自己創(chuàng)建的子節(jié)點(diǎn)是否為當(dāng)前子節(jié)點(diǎn)列表中序號(hào)最小的子節(jié)點(diǎn),如果是則認(rèn)為獲得鎖,否則監(jiān)聽(tīng)/lock的子節(jié)點(diǎn)變更消息,獲得子節(jié)點(diǎn)變更通知后重復(fù)此步驟直至獲得鎖
          3)執(zhí)行業(yè)務(wù)代碼;
          4)完成業(yè)務(wù)流程后,刪除對(duì)應(yīng)的子節(jié)點(diǎn)釋放鎖。

          2.2.2、實(shí)現(xiàn)代碼

          1.基于curator的zookeeper分布式鎖實(shí)現(xiàn)

          public static void main(String[] args) throws Exception {
                  //創(chuàng)建zookeeper的客戶(hù)端
                  RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

                  CuratorFramework client = CuratorFrameworkFactory.newClient("10.21.41.181:2181,10.21.42.47:2181,10.21.49.252:2181", retryPolicy);

                  client.start();

                  //創(chuàng)建分布式鎖, 鎖空間的根節(jié)點(diǎn)路徑為/curator/lock
                  InterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock");

                  mutex.acquire();

                  //獲得了鎖, 進(jìn)行業(yè)務(wù)流程
                  System.out.println("Enter mutex");

                  //完成業(yè)務(wù)流程, 釋放鎖
                  mutex.release();

                  //關(guān)閉客戶(hù)端
                  client.close();

              }

          2.實(shí)現(xiàn)方式二

          1)定義變量

          /**
              * Zookeeper客戶(hù)端
              */
          private ZooKeeper zookeeper;

          /**
              * 鎖的唯一標(biāo)識(shí)
              */
          private String lockId;

          /**
              * 與Zookeeper建立會(huì)話(huà)的信號(hào)量
              */
          private CountDownLatch connectedLatch;

          /**
              * 創(chuàng)建分布式鎖的過(guò)程中,開(kāi)始和等待請(qǐng)求創(chuàng)建分布式鎖的信號(hào)標(biāo)志
              */
          private CountDownLatch creatingLatch;

          /**
              * 分布式鎖路徑前綴
              */
          private String locksRootPath = "/locks";

          /**
              * 排在當(dāng)前節(jié)點(diǎn)前面一位的節(jié)點(diǎn)的路徑
              */
          private String waitNodeLockPath;

          /**
              * 為了獲得鎖,本次創(chuàng)建的節(jié)點(diǎn)的路徑
              */
          private String currentNodeLockPath;

          2)構(gòu)造函數(shù)

          public ZookeeperTempOrderLock(String lockId) {
              this.lockId = lockId;
              try {
                  // 會(huì)話(huà)超時(shí)時(shí)間
                  int sessionTimeout = 30000;
                  //
                  zookeeper = new ZooKeeper("192.168.0.93:2181", sessionTimeout, this);
                  connectedLatch.await();
              } catch (IOException ioe) {
                  log.error("與Zookeeper建立連接時(shí)出現(xiàn)異常", ioe);
              } catch (InterruptedException ite) {
                  log.error("等待與Zookeeper會(huì)話(huà)建立完成時(shí)出現(xiàn)異常", ite);
              }
          }

          3)實(shí)現(xiàn)Zookeeper的watcher

          @Override
          public void process(WatchedEvent event) {
              if (Event.KeeperState.SyncConnected == event.getState()) {
                  connectedLatch.countDown();
              }

              if (creatingLatch != null) {
                  creatingLatch.countDown();
              }
          }

          4)獲取分布式鎖

          /**
              * 獲取鎖
              */
          public void acquireDistributedLock() {
              try {
                  while(!tryLock()) {
                      // 等待前一項(xiàng)服務(wù)釋放鎖的等待時(shí)間 不能超過(guò)一次Zookeeper會(huì)話(huà)的時(shí)間
                      long waitForPreviousLockRelease = 30000;
                      waitForLock(waitNodeLockPath, waitForPreviousLockRelease);
                  }
              } catch (InterruptedException | KeeperException e) {
                  log.error("等待上鎖的過(guò)程中出現(xiàn)異常", e);
              }
          }

          public boolean tryLock() {
              try {
                  // 創(chuàng)建順序臨時(shí)節(jié)點(diǎn)
                  currentNodeLockPath = zookeeper.create(locksRootPath + "/" + lockId,
                          "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                  // 查看剛剛創(chuàng)建的節(jié)點(diǎn)是不是最小節(jié)點(diǎn)
                  // 比如針對(duì)于這個(gè)同名節(jié)點(diǎn),之前有其它服務(wù)曾申請(qǐng)創(chuàng)建過(guò),因此Zookeeper中臨時(shí)順序節(jié)點(diǎn)形如:
                  // /locks/10000000000, /locks/10000000001, /locks/10000000002
                  List<String> nodePaths = zookeeper.getChildren(locksRootPath, false);
                  Collections.sort(nodePaths);
                  if(currentNodeLockPath.equals(locksRootPath + "/" + nodePaths.get(0))) {
                      // 如果是最小節(jié)點(diǎn),則代表獲取到鎖
                      return true;
                  }
                  // 如果不是最小節(jié)點(diǎn),則找到比自己小1的節(jié)點(diǎn) (緊挨著自己)
                  int previousLockNodeIndex = -1;
                  for (int i = 0; i < nodePaths.size(); i++) {
                      if(currentNodeLockPath.equals(locksRootPath + "/" + nodePaths.get(i))) {
                          previousLockNodeIndex = i-1;
                          break;
                      }
                  }
                  this.waitNodeLockPath = nodePaths.get(previousLockNodeIndex);

              } catch (KeeperException | InterruptedException e) {
                  log.error("創(chuàng)建臨時(shí)順序節(jié)點(diǎn)失敗", e);
              }
              return false;
          }

          5)等待其他服務(wù)釋放鎖

          /**
              * 等待其他服務(wù)釋放鎖
              * 實(shí)際上就是在等待前一個(gè)臨時(shí)節(jié)點(diǎn)被刪除
              *
              * @param nodePath 希望被刪除的節(jié)點(diǎn)的相對(duì)路徑
              * @param waitTime 等待時(shí)長(zhǎng) 單位:毫秒
              */
          private boolean waitForLock(String nodePath, long waitTime) throws KeeperException, InterruptedException {
              Stat stat = zookeeper.exists(locksRootPath + "/" + nodePath, true);
              if (stat != null) {
                  this.creatingLatch = new CountDownLatch(1);
                  this.creatingLatch.await(waitTime, TimeUnit.MILLISECONDS);
                  this.creatingLatch = null;
              }
              return true;
          }

          6)釋放分布式鎖

          /**
              * 釋放鎖
              * 實(shí)際上就是刪除當(dāng)前創(chuàng)建的臨時(shí)節(jié)點(diǎn)
              */
          public void releaseLock() {
              log.info("準(zhǔn)備刪除的節(jié)點(diǎn)路徑: " + currentNodeLockPath);
              try {
                  zookeeper.delete(currentNodeLockPath, -1);
                  currentNodeLockPath = null;
                  zookeeper.close();
              } catch (Exception e) {
                  log.error("刪除節(jié)點(diǎn)失敗", e);
              }
          }

          2.3、Zookeeper分布式鎖實(shí)現(xiàn)(方式二)

          2.3.1、實(shí)現(xiàn)原理

          假設(shè)有兩個(gè)服務(wù)A、B希望獲得同一把鎖,執(zhí)行過(guò)程大致如下:

          1)服務(wù)A向zookeeper申請(qǐng)獲得鎖,該請(qǐng)求將嘗試在zookeeper內(nèi)創(chuàng)建一個(gè)臨時(shí)節(jié)點(diǎn)(ephemeral znode),如果沒(méi)有同名的臨時(shí)節(jié)點(diǎn)存在,則znode創(chuàng)建成功,標(biāo)志著服務(wù)A成功的獲得了鎖。

          2) 服務(wù)B向zookeeper申請(qǐng)獲得鎖,同樣嘗試在zookeeper內(nèi)創(chuàng)建一個(gè)臨時(shí)節(jié)點(diǎn)(名稱(chēng)必須與服務(wù)A的相同),由于同名znode已經(jīng)存在,因此請(qǐng)求被拒絕。接著,服務(wù)B會(huì)在zk中注冊(cè)一個(gè)監(jiān)聽(tīng)器,用于監(jiān)聽(tīng)臨時(shí)節(jié)點(diǎn)被刪除的事件

          3) 若服務(wù)A主動(dòng)向zk發(fā)起請(qǐng)求釋放鎖,或者服務(wù)A宕機(jī)、斷開(kāi)與zk的網(wǎng)絡(luò)連接,zk會(huì)將服務(wù)A(創(chuàng)建者)創(chuàng)建的臨時(shí)節(jié)點(diǎn)刪除。而刪除事件也將立刻被監(jiān)聽(tīng)器捕獲到,并反饋給服務(wù)B。最后,服務(wù)B再次向zookeeper申請(qǐng)獲得鎖。

          2.3.2、實(shí)現(xiàn)代碼

          基于臨時(shí)節(jié)點(diǎn)實(shí)現(xiàn)Zookeeper分布式鎖

          多個(gè)服務(wù)如果想競(jìng)爭(zhēng)同一把鎖,那就向Zookeeper發(fā)起創(chuàng)建臨時(shí)節(jié)點(diǎn)的請(qǐng)求,若能成功創(chuàng)建則獲得鎖,否則借助監(jiān)聽(tīng)器,當(dāng)監(jiān)聽(tīng)到鎖被其它服務(wù)釋放(臨時(shí)節(jié)點(diǎn)被刪除),則自己再請(qǐng)求創(chuàng)建臨時(shí)節(jié)點(diǎn),反復(fù)這幾個(gè)步驟直到成功創(chuàng)建臨時(shí)節(jié)點(diǎn)或者與zookeeper建立的會(huì)話(huà)超時(shí)。

          步驟:

          1)定義變量

              /**
               * 與Zookeeper成功建立連接的信號(hào)標(biāo)志
               */
              private CountDownLatch connectedSemaphore = new CountDownLatch(1);

              /**
               * 創(chuàng)建分布式鎖的過(guò)程中,開(kāi)始和等待請(qǐng)求創(chuàng)建分布式鎖的信號(hào)標(biāo)志
               */
              private CountDownLatch creatingSemaphore;

              /**
               * Zookeeper客戶(hù)端
               */
              private ZooKeeper zookeeper;

              /**
               * 分布式鎖的過(guò)期時(shí)間 單位:毫秒
               */
              private static final Long DISTRIBUTED_KEY_OVERDUE_TIME = 30000L;

          2)構(gòu)造函數(shù)

          public ZookeeperLock() {
              try {
                  this.zookeeper = new ZooKeeper("192.168.0.93:2181", 5000, new ZookeeperWatcher());
                  try {
                      connectedSemaphore.await();
                  } catch (InterruptedException ite) {
                      log.error("等待Zookeeper成功建立連接的過(guò)程中,線(xiàn)程拋出異常", ite);
                  }
                  log.info("與Zookeeper成功建立連接");
              } catch (Exception e) {
                  log.error("與Zookeeper建立連接時(shí)出現(xiàn)異常", e);
              }
          }

          3)獲取分布式鎖

          實(shí)際上就是在嘗試創(chuàng)建臨時(shí)節(jié)點(diǎn)znode
          create(final String path, byte data[], List acl,CreateMode createMod)
          path: 從根節(jié)點(diǎn)"/"到當(dāng)前節(jié)點(diǎn)的全路徑
          data: 當(dāng)前節(jié)點(diǎn)存儲(chǔ)的數(shù)據(jù) (由于這里只是借助臨時(shí)節(jié)點(diǎn)的創(chuàng)建來(lái)實(shí)現(xiàn)分布式鎖,因此無(wú)需存儲(chǔ)數(shù)據(jù))
          acl: Access Control list 訪(fǎng)問(wèn)控制列表 主要涵蓋權(quán)限模式(Scheme)、授權(quán)對(duì)象(ID)、授予的權(quán)限(Permission)這三個(gè)方面 OPEN_ACL_UNSAFE 完全開(kāi)放的訪(fǎng)問(wèn)控制 對(duì)當(dāng)前節(jié)點(diǎn)進(jìn)行操作時(shí),無(wú)需考慮ACL權(quán)限控制
          createMode: 節(jié)點(diǎn)創(chuàng)建的模式
          EPHEMERAL(臨時(shí)節(jié)點(diǎn)) 當(dāng)創(chuàng)建節(jié)點(diǎn)的客戶(hù)端與zk斷開(kāi)連接后,臨時(shí)節(jié)點(diǎn)將被刪除
          EPHEMERAL_SEQUENTIAL(臨時(shí)順序節(jié)點(diǎn))
          PERSISTENT(持久節(jié)點(diǎn))
          PERSISTENT_SEQUENTIAL(持久順序節(jié)點(diǎn))

          public boolean acquireDistributeLock(Long lockId) {
              String path = "/product-lock-" + lockId;

              try {
                  zookeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                  log.info("ThreadId=" + Thread.currentThread().getId() + "創(chuàng)建臨時(shí)節(jié)點(diǎn)成功");
                  return true;
              } catch (Exception e) {
                  // 若臨時(shí)節(jié)點(diǎn)已存在,則會(huì)拋出異常: NodeExistsException
                  while (true) {
                      // 相當(dāng)于給znode注冊(cè)了一個(gè)監(jiān)聽(tīng)器,查看監(jiān)聽(tīng)器是否存在
                      try {
                          Stat stat = zookeeper.exists(path, true);
                          if (stat != null) {
                              this.creatingSemaphore = new CountDownLatch(1);
                              this.creatingSemaphore.await(DISTRIBUTED_KEY_OVERDUE_TIME, TimeUnit.MILLISECONDS);
                              this.creatingSemaphore = null;
                          }
                          zookeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                          return true;
                      } catch (Exception ex) {
                          log.error("ThreadId=" + Thread.currentThread().getId() + ",查看臨時(shí)節(jié)點(diǎn)時(shí)出現(xiàn)異常", ex);
                      }
                  }
              }
          }

          4)釋放分布式鎖

          public void releaseDistributedLock(Long lockId) {
              String path = "/product-lock-" + lockId;
              try {
                  // 第二個(gè)參數(shù)version是數(shù)據(jù)版本 每次znode內(nèi)數(shù)據(jù)發(fā)生變化,都會(huì)使version自增,但由于分布式鎖創(chuàng)建的臨時(shí)znode沒(méi)有存數(shù)據(jù),因此version=-1
                  zookeeper.delete(path, -1);
                  log.info("成功釋放分布式鎖, lockId=" + lockId + ", ThreadId=" + Thread.currentThread().getId());
              } catch (Exception e) {
                  log.error("釋放分布式鎖失敗,lockId=" + lockId, e);
              }
          }

          5)建立Zookeeper的watcher

          不論是zk客戶(hù)端與服務(wù)器連接成功,還是刪除節(jié)點(diǎn),watcher監(jiān)聽(tīng)到的事件都是SyncConnected

          private class ZookeeperWatcher implements Watcher {
              @Override
              public void process(WatchedEvent event) {
                  log.info("接收到事件: " + event.getState() + ", ThreadId=" + Thread.currentThread().getId());

                  if (Event.KeeperState.SyncConnected == event.getState()) {
                      connectedSemaphore.countDown();
                  }

                  if (creatingSemaphore != null) {
                      creatingSemaphore.countDown();
                  }
              }
          }

          6)main方式運(yùn)用

          創(chuàng)建了兩個(gè)線(xiàn)程,其中第一個(gè)線(xiàn)程先執(zhí)行,且持有鎖5秒鐘才釋放鎖,第二個(gè)線(xiàn)程后執(zhí)行,當(dāng)且僅當(dāng)?shù)谝粋€(gè)線(xiàn)程釋放鎖(刪除臨時(shí)節(jié)點(diǎn))后,第二個(gè)線(xiàn)程才能成功獲取鎖。

          public static void main(String[] args) throws InterruptedException{
              long lockId = 20200730;

              new Thread(() ->{
                  ZookeeperLock zookeeperLock = new ZookeeperLock();
                  System.out.println("ThreadId1=" + Thread.currentThread().getId());
                  System.out.println("ThreadId=" + Thread.currentThread().getId() + "獲取到分布式鎖: " + zookeeperLock.acquireDistributeLock(lockId));
                  try {
                      TimeUnit.SECONDS.sleep(5);
                  } catch (InterruptedException e) {
                      log.error("ThreadId=" + Thread.currentThread().getId() + "暫停時(shí)出現(xiàn)異常", e);
                  }
                  zookeeperLock.releaseDistributedLock(lockId);
              }).start();

              TimeUnit.SECONDS.sleep(1);
              new Thread(() -> {
                  ZookeeperLock zookeeperLock = new ZookeeperLock();
                  System.out.println("ThreadId2=" + Thread.currentThread().getId());
                  System.out.println("ThreadId=" + Thread.currentThread().getId() + "獲取到分布式鎖: " + zookeeperLock.acquireDistributeLock(lockId));
              }).start();
          }

          三、基于Redis實(shí)現(xiàn)分布式鎖

          3.1、普通常見(jiàn)實(shí)現(xiàn)方式

          3.1.1、實(shí)現(xiàn)代碼

          public String deductStock() {
              String lockKey = "product_001";
              try {
                 /*Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "aaa"); //jedis.setnx
                  stringRedisTemplate.expire(lockKey, 30, TimeUnit.SECONDS); //設(shè)置超時(shí)*/
                  //為解決原子性問(wèn)題將設(shè)置鎖和設(shè)置超時(shí)時(shí)間合并
                  Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "aaa", 10, TimeUnit.SECONDS);

                  //未設(shè)置成功,當(dāng)前key已經(jīng)存在了,直接返回錯(cuò)誤
                  if (!result) {
                      return "error_code";
                  }
                  //業(yè)務(wù)邏輯實(shí)現(xiàn),扣減庫(kù)存
                  ....
              } catch (Exception e) {
                  e.printStackTrace();
              }finally {
                  stringRedisTemplate.delete(lockKey);
              }
              return "end";
          }

          3.2.2、問(wèn)題分析

          上述代碼可以看到,當(dāng)前鎖的失效時(shí)間為10s,如果當(dāng)前扣減庫(kù)存的業(yè)務(wù)邏輯執(zhí)行需要15s時(shí),高并發(fā)時(shí)會(huì)出現(xiàn)問(wèn)題:

          • 線(xiàn)程1,首先執(zhí)行到10s后,鎖(product_001)失效

          • 線(xiàn)程2,在第10s后同樣進(jìn)入當(dāng)前方法,此時(shí)加上鎖(product_001)

          • 當(dāng)執(zhí)行到15s時(shí),線(xiàn)程1刪除線(xiàn)程2加的鎖(product_001)

          • 線(xiàn)程3,可以加鎖 .... 如此循環(huán),實(shí)際鎖已經(jīng)沒(méi)有意義

          3.2.3、解決方案

          定義一個(gè)子線(xiàn)程,定時(shí)去查看是否存在主線(xiàn)程的持有當(dāng)前鎖,如果存在則為其延長(zhǎng)過(guò)期時(shí)間

          3.2、基于Redission實(shí)現(xiàn)方式

          3.2.1、Redission簡(jiǎn)介

          Jedis是Redis的Java實(shí)現(xiàn)的客戶(hù)端,其API提供了比較全面的Redis命令的支持。Redission也是Redis的客戶(hù)端,相比于Jedis功能簡(jiǎn)單。Jedis簡(jiǎn)單使用阻塞的I/O和redis交互,Redission通過(guò)Netty支持非阻塞I/O

          Redission封裝了鎖的實(shí)現(xiàn),其繼承了java.util.concurrent.locks.Lock的接口,讓我們像操作我們的本地Lock一樣去操作Redission的Lock。

          常用API:

          RLock redissonLock = redission.getLock();
          redissionLock.lock(30,TmieUnit.SECONDS);加鎖并設(shè)置鎖的存活時(shí)間
          redissionLock.unLock();解鎖

          3.2.2、實(shí)現(xiàn)原理

          • 多個(gè)線(xiàn)程去執(zhí)行l(wèi)ock操作,僅有一個(gè)線(xiàn)程能夠加鎖成功,其它線(xiàn)程循環(huán)阻塞。

          • 加鎖成功,鎖超時(shí)時(shí)間默認(rèn)30s,并開(kāi)啟后臺(tái)線(xiàn)程(子線(xiàn)程),加鎖的后臺(tái)會(huì)每隔10秒去檢測(cè)線(xiàn)程持有的鎖是否存在,還在的話(huà),就延遲鎖超時(shí)時(shí)間,重新設(shè)置為30s,即鎖延期

          • 對(duì)于原子性,Redis分布式鎖底層借助Lua腳本實(shí)現(xiàn)鎖的原子性。鎖延期是通過(guò)在底層用Lua進(jìn)行延時(shí),延時(shí)檢測(cè)時(shí)間是對(duì)超時(shí)時(shí)間timeout /3。

          1)簡(jiǎn)單實(shí)現(xiàn)代碼:

          public String deductStockRedission() {
              String lockKey = "product_001";
              RLock rlock = redission.getLock(lockKey);
              try {
                  rlock.lock();

                  //業(yè)務(wù)邏輯實(shí)現(xiàn),扣減庫(kù)存
                  ....
              } catch (Exception e) {
                  e.printStackTrace();
              } finally {
                  rlock.unlock();
              }
              return "end";
          }

          2)分析Redission適用原因:

          1)redisson所有指令都通過(guò)lua腳本執(zhí)行,redis支持lua腳本原子性執(zhí)行

          2)redisson設(shè)置一個(gè)key的默認(rèn)過(guò)期時(shí)間為30s,如果某個(gè)客戶(hù)端持有一個(gè)鎖超過(guò)了30s怎么辦?

          redisson中有一個(gè)watchdog的概念,翻譯過(guò)來(lái)就是看門(mén)狗,它會(huì)在你獲取鎖之后,每隔10秒幫你把key的超時(shí)時(shí)間設(shè)為30s

          這樣的話(huà),就算一直持有鎖也不會(huì)出現(xiàn)key過(guò)期了,其他線(xiàn)程獲取到鎖的問(wèn)題了。保證了沒(méi)有死鎖發(fā)生

          3)Redisson的可重入鎖

          Redis存儲(chǔ)鎖的數(shù)據(jù)類(lèi)型是 Hash類(lèi)型

          Hash數(shù)據(jù)類(lèi)型的key值包含了當(dāng)前線(xiàn)程信息

          3.2.3、問(wèn)題分析及對(duì)應(yīng)方案

          1)主從同步問(wèn)題

          問(wèn)題分析:

           當(dāng)主Redis加鎖了,開(kāi)始執(zhí)行線(xiàn)程,若還未將鎖通過(guò)異步同步的方式同步到從Redis節(jié)點(diǎn),主節(jié)點(diǎn)就掛了,此時(shí)會(huì)把某一臺(tái)從節(jié)點(diǎn)作為新的主節(jié)點(diǎn),此時(shí)別的線(xiàn)程就可以加鎖了,這樣就出錯(cuò)了,怎么辦?

          解決方案:

          1)采用zookeeper代替Redis

            由于zk集群的特點(diǎn),其支持的是CP。而Redis集群支持的則是AP。

          2)采用RedLock

          假設(shè)有3個(gè)redis節(jié)點(diǎn),這些節(jié)點(diǎn)之間既沒(méi)有主從,也沒(méi)有集群關(guān)系。客戶(hù)端用相同的key和隨機(jī)值在3個(gè)節(jié)點(diǎn)上請(qǐng)求鎖,請(qǐng)求鎖的超時(shí)時(shí)間應(yīng)小于鎖自動(dòng)釋放時(shí)間。當(dāng)在2個(gè)(超過(guò)半數(shù))redis上請(qǐng)求到鎖的時(shí)候,才算是真正獲取到了鎖。如果沒(méi)有獲取到鎖,則把部分已鎖的redis釋放掉。

          public String deductStockRedlock() {
              String lockKey = "product_001";
              //TODO 這里需要自己實(shí)例化不同redis實(shí)例的redission客戶(hù)端連接,這里只是偽代碼用一個(gè)redisson客戶(hù)端簡(jiǎn)化了
              RLock rLock1 = redisson.getLock(lockKey);
              RLock rLock2 = redisson.getLock(lockKey);
              RLock rLock3 = redisson.getLock(lockKey);

              // 向3個(gè)redis實(shí)例嘗試加鎖
              RedissonRedLock redLock = new RedissionRedLock(rLock1, rLock2, rLock3);
              boolean isLock;
              try {
                  // 500ms拿不到鎖, 就認(rèn)為獲取鎖失敗。10000ms即10s是鎖失效時(shí)間。
                  isLock = redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS);
                  System.out.println("isLock = " + isLock);
                  if (isLock) {
                      //業(yè)務(wù)邏輯處理
                      ...
                  }
              } catch (Exception e) {

              } finally {
                  // 無(wú)論如何, 最后都要解鎖
                  redLock.unlock();
              }
          }

          不太推薦使用。如果考慮高可用并發(fā)推薦使用Redisson,考慮一致性推薦使用zookeeper。

          2)提高并發(fā):分段鎖

          由于Redission實(shí)際上就是將并行的請(qǐng)求,轉(zhuǎn)化為串行請(qǐng)求。這樣就降低了并發(fā)的響應(yīng)速度,為了解決這一問(wèn)題,可以將鎖進(jìn)行分段處理:例如秒殺商品001,原本存在1000個(gè)商品,可以將其分為20段,為每段分配50個(gè)商品。

          比如:

          將庫(kù)存進(jìn)行分段,放入redis中,例如1000庫(kù)存,可分10段放入Redis

          key的設(shè)計(jì)可以為Product:10001:0 | Product:10001:1 ....

          Redis底層集群,將根據(jù)key,計(jì)算器槽位,放入不同節(jié)點(diǎn)中



          鋒哥最新SpringCloud分布式電商秒殺課程發(fā)布

          ??????

          ??長(zhǎng)按上方微信二維碼 2 秒





          感謝點(diǎn)贊支持下哈 

          瀏覽 83
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产肏屄视频在线免费看 | 欧美日韩网站 | 婷婷撸一撸 | 青青草在线免费 | 人妻日日爽夜夜爽一区二区 |