<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          電商防超賣的 N+1 個坑!

          共 22014字,需瀏覽 45分鐘

           ·

          2021-04-07 00:10

          點擊關注公眾號,Java干貨及時送達

          今天和同事討論庫存防超賣問題,發(fā)現(xiàn)雖然只是簡單的庫存扣減場景,卻隱藏著很多坑,一不小心就容易翻車,讓西瓜推土機來填平這些坑。

          單實例環(huán)境

          一般電商體系防止庫存超賣,主要有以下幾種方式:防止庫存超賣,最先想到的可能就是「鎖」,如果是一些單實例部署的庫存服務,大部分情況下我們可以使用以下鎖或并發(fā)工具類:這三個任何一個都可以保證同一單位時間只有一個線程能夠進行庫存扣減,廢話不多說,上碼!

          /**
               * 庫存扣減(偽代碼 ReentrantLock )
               * @param stockRequestDTO
               * @return Boolean
               */

              public Boolean stockHandle(StockRequestDTO stockRequestDTO) {
                  // 日志打印...校驗...前置處理等...
                  int stock = stockMapper.getStock(stockRequestDTO.getGoodsId());
                  reentrantLock.lock();
                  try {
                      int result = stock > 0 ? 
                              stockMapper.updateStock(stockRequestDTO.getGoodsId(), --stock) : 0;
                      return result > 0 ? true : false;
                  } catch (SQLException e) {
                      // 異常日志打印及處理...
                      return false;
                  } finally {
                      reentrantLock.unlock();
                  }
              }
              
             /**
               * 庫存扣減(偽代碼 synchronized )
               * @param stockRequestDTO
               * @return Boolean
               */

              public synchronized Boolean stockHandle(StockRequestDTO stockRequestDTO){
                  // 執(zhí)行業(yè)務邏輯...
              }

              /**
               * 庫存扣減(偽代碼 Semaphore )
               * @param stockRequestDTO
               * @return Boolean
               */

              public Boolean stockHandle(StockRequestDTO stockRequestDTO) {
                try{
                    semaphore.acquire();
                    // 執(zhí)行業(yè)務邏輯...
                } finally {
                    semaphore.release();
                  }
              }

          如果你的項目是單實例部署,那么使用以上鎖或并發(fā)工具中的一種,都可以有效的防止超賣出現(xiàn)。

          分布式環(huán)境

          但現(xiàn)在的互聯(lián)網公司,基本都是負載均衡的方式,訪問集群中多個實例的,所以基于JVM級別的鎖無法發(fā)揮作用,需要引入第三方組件來解決,分布式鎖登場。46張PPT弄懂JVM,這個分享給你。

          如果想實現(xiàn)分布式環(huán)境下的鎖機制,最簡單的莫過于利用MySQL的鎖機制:

          *** 使用悲觀鎖實現(xiàn) *** 

          begin-- 開啟事務
          select  stock_num from t_stock t_stock where goodsId = '12345' for update-- 獲取并設置排他鎖
          update t_stock set stock_num = stock_num - 1  where goodsId = '12345' ;-- 更新資源
          commit-- 提交事務并解鎖


          *** 樂觀鎖實現(xiàn) *** 

          update t_stock set stock_num = stock_num - 1 , version = version + 1 where goodsId = '12345' and version = 7;
          -- 1.更新資源時先判斷當前數(shù)據(jù)版本號和之前獲取時是否一致
          -- 2.如果版本號一致,更新資源并版本號+1
          -- 3.若版本號不一致,返回錯誤并由業(yè)務系統(tǒng)進行自旋重試


          *** 唯一索引實現(xiàn) *** 
          較簡單,此方式實際應用幾乎沒有,不再贅述

          有一點要注意,樂觀鎖的自旋是需要在自己的業(yè)務邏輯中實現(xiàn)的。

          使用數(shù)據(jù)庫作為分布式鎖,優(yōu)點是實現(xiàn)簡單、不需要引入其他中間件,缺點是可能存在磁盤IO,性能一般。

          那有沒有性能夠用、實現(xiàn)簡單、且在分布式環(huán)境下能保證資源并發(fā)安全的方案呢?常規(guī)有三,Redis、Zookeeper、MQ,其中MQ的解決方案不能算分布式鎖。

          今天我們介紹第一種,使用Redis實現(xiàn)分布式鎖,Redis分布式鎖的特點是輕松保證可重入、互斥。

          Redis中提供了SetNX+Expire兩個命令,可以對指定的Key加鎖:

          // redis原生命令
          redis-cli 127.0.0.1:6379> SETNX KEY_NAME VALUE
          redis-cli 127.0.0.1:6379> EXPlRE <key> <ttl> 

          spring-boot-starter-data-redis 也提供了操作Redis的模板類:

               /**
               * 庫存扣減 (偽代碼 spring-boot-starter-data-redis中提供的模板方法)
               * @param stockRequestDTO
               * @return Boolean
               */

              @Transactional(rollbackFor = {RuntimeException.classError.class})
              public Boolean stockHandle(StockRequestDTO stockRequestDTO
          {
                  // 省略日志打印...校驗...前置處理等...
                  try {
                      Boolean redisLock = redisTemplate
                              .opsForValue()
                              .setIfAbsent(stockLockKey, true);
                      if (redisLock) {
                          redisTemplate.expire((stockLockKey,1,TimeUnit.SECONDS);
                          Object stock = redisTemplate
                                  .opsForValue()
                                  .get(stockKeyPrefix.concat(stockRequestDTO.getGoodsId()));
                          if (null == stock || Integer.parseInt(stock.toString()) <= 0) {
                              // 庫存異常
                              return false;
                          } else {
                              // 扣減庫存
                              stock = Integer.parseInt(stock.toString()) - 1;
                              // 更新數(shù)據(jù)庫、緩存等...
                              return true;
                          }
                      } else {
                          return false;
                      }
                  } finally {
                      // 釋放鎖等等后置處理...
                      redisTemplate.delete(stockLockKey);
                  }
              }

          原子性問題

          以上代碼存在一個問題,假設

          1.)A線程獲取鎖成功
          2.)A線程所在實例掛掉了
          3.)A線程還沒來得及給鎖設置過期時間,Redis中資源會被永久鎖住
          Spring Boot 教程推薦看這個
          https://github.com/javastacks/spring-boot-best-practice

          所以我們需要使用原子指令:

          // redis原生命令
          redis-cli 127.0.0.1:6379> set key value [EX seconds] [PX milliseconds] [NX|XX]
          // 方式一:省略其他代碼......
          boolean redisLock = redisTemplate
                  .opsForValue()
                  .setIfAbsent(stockLockKey,true,30, TimeUnit.SECONDS);
           // 方式二:使用Lua腳本進行加鎖保證原子性(偽代碼)
            static {
                  StringBuilder sb = new StringBuilder();
                  sb.append("if redis.call(\"setnx\", KEYS[1], KEYS[2]) == 1 then");
                  sb.append("return redis.call(\"pexpire\", KEYS[1], KEYS[3])");
                  sb.append("else");
                  sb.append("return 0");
                  LUA_SCRIPT = sb.toString();
              }

              private Long redisLockByLua(String key, int num) {
                  // 腳本里的KEYS參數(shù),忽略類型轉換等......
                  List<Object> keys = new ArrayList<>();
                  keys.add(stockLockKey);
                  keys.add(true);
                  keys.add(30);
                  return (long) redisTemplate.execute(new DefaultRedisScript(LUA_SCRIPT), keys);
              }

          超時&誤刪鎖問題

          雖然我們優(yōu)化了,但還是有BUG,假設現(xiàn)在A 和 B 兩個線程同時訪問以上代碼:

          1.)A先獲得鎖
          2.)A執(zhí)行扣減庫存過程中因為不可控的原因超過了設定的10秒,此時分布式鎖失效
          3.)B線程將競爭獲得鎖開始執(zhí)行任務
          4.)此時A和B并行對加鎖資源操作,可能造成錯誤
          5.)A執(zhí)行比B執(zhí)行任務快,A恰好執(zhí)行到最后一步刪除鎖
          6.)B的任務還在執(zhí)行,B的鎖卻被A刪除,C線程也將競爭到鎖,與B同時并行執(zhí)行,循環(huán)往復,業(yè)務語義發(fā)生錯誤,可能導致各種臟數(shù)據(jù)產生

          這種現(xiàn)象還是比較容易發(fā)生的,對于鎖超時問題,我們加以優(yōu)化:

          1.)假如鎖的超時時間是10秒
          2.)獲取鎖后使用ScheduledExecutorService開啟守護線程
          3.)守護線程每隔5秒去查看分布式鎖是否還存在,如果存在就進行續(xù)期10秒

          對于誤刪鎖的問題,我們也可以加強優(yōu)化一下:

          1.)A線程獲取到鎖之前先生成一個隨機串
          2.)A線程將鎖的value設置為該隨機串
          3.)A線程刪除鎖時先判斷鎖的value是否為A線程剛才隨機生成的隨機串
           /**
               * 庫存扣減 (偽代碼 spring-boot-starter-data-redis中提供的模板方法)
               * @param stockRequestDTO
               * @return Boolean
               */

              @Transactional(rollbackFor = {RuntimeException.classError.class})
              public Boolean stockHandle(StockRequestDTO stockRequestDTO
          {
                  // 省略日志打印...校驗...前置處理等...
                  String nonceStr = UUID.randomUUID().toString();
                  ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
                  try {
                      //競爭鎖
                      boolean redisLock = redisTemplate
                              .opsForValue()
                              .setIfAbsent(stockLockKey, true30, TimeUnit.SECONDS);
                      if (redisLock) {
                          // 開啟守護線程定時對鎖續(xù)命
                          scheduledExecutorService.scheduleWithFixedDelay(() -> {
                              if (redisTemplate.hasKey(stockLockKey)) {
                                  redisTemplate.expire(stockLockKey, 30, TimeUnit.SECONDS);
                              }
                          }, 1515, TimeUnit.SECONDS);
                          // 獲取庫存
                          String stockKey =  stockKeyPrefix.concat(stockRequestDTO.getGoodsId());
                          Object stock = redisTemplate
                                  .opsForValue()
                                  .get(stockKey);
                          if (null == stock || Integer.parseInt(stock.toString()) <= 0) {
                              // 庫存異常
                              return false;
                          } else {
                              // 扣減庫存
                              redisTemplate.opsForValue()
                                      .set(stockKey,Integer.parseInt(stock.toString()) - 1);
                              // 更新數(shù)據(jù)庫等...
                              return true;
                          }
                      } else {
                          return false;
                      }
                  } finally {
                      // 釋放鎖等等后置處理... 也可使用lua腳本保證原子性判斷和刪除鎖
                      if (redisTemplate.opsForValue().get(stockLockKey).equals(nonceStr)) {
                          redisTemplate.delete(stockLockKey);
                      }
                      scheduledExecutorService.shutdownNow();
                  }
              }

          釋放鎖也需要原子性執(zhí)行,我們依然使用Lua腳本來保證原子:

           // 解鎖 (偽代碼)
              static {
                  StringBuilder sb = new StringBuilder();
                  sb.append("if redis.call(\"get\",KEYS[1]) == KEYS[2] then");
                  sb.append("return redis.call(\"del\",KEYS[1])");
                  sb.append("else");
                  sb.append("return -1");
                  LUA_SCRIPT = sb.toString();
              }

              private Long redisUnlockByLua(String key, int num) {
                  // 腳本里的KEYS參數(shù),忽略類型轉換等...
                  List<Object> keys = new ArrayList<>();
                  keys.add(stockLockKey);
                  keys.add("線程加鎖時生成的隨機數(shù)");
                  return (long) redisTemplate.execute(new DefaultRedisScript(LUA_SCRIPT), keys);
              }

          Redisson

          對于鎖超時問題,我們還可以使用現(xiàn)成的工具Redisson,Redisson提供了WatchDog(看門狗)機制,內置了鎖續(xù)命機制,無需手動實現(xiàn)。

          注意,要想使用開門狗機制Redisson加鎖時不要指定超時時間,默認鎖超時時間30秒,看門狗每隔30秒的1/3時間也就是10秒去檢查一次鎖狀態(tài),鎖還在就進行續(xù)命。

          // 構造Redisson Config
          Config config = new Config();
          config.useClusterServers().addNodeAddress(
                  "redis://ip1:port1","redis://ip2:port2""redis://ip3:port3",
                  "redis://ip4:port4","redis://ip5:port5""redis://ip6:port6")
                  .setPassword("a123456").setScanInterval(5000);
          // 構造RedissonClient
          RedissonClient redissonClient = Redisson.create(config);
          // 設置鎖定資源名稱
          RLock rLock = redissonClient.getLock("lock_key");
          // boolean isLock;
          try {
              // 嘗試獲取分布式鎖
              // isLock = rLock.tryLock(500, 15000, TimeUnit.MILLISECONDS);
              rLock.lock();
              // 日志...業(yè)務處理等...
          catch (Exception e) {
              // 日志等...
          finally {
              // 解鎖
              rLock.unlock();
          }

          主從數(shù)據(jù)一致性問題

          不過此時還可能出現(xiàn)一種意外情況,假設Redis主從環(huán)境:

          1.)A線程在Redis Master節(jié)點獲得了鎖,還沒同步給Slave

          2.)Master節(jié)點掛掉

          3.)故障轉移后Slave節(jié)點升級為Master節(jié)點

          4.)此時B線程將競爭到鎖,至此A和B同時對加鎖任務并行執(zhí)行,業(yè)務語義發(fā)生錯誤,可能導致各種臟數(shù)據(jù)產生

          要解決這個問題,可以使用Redis官方提供的RedLock算法。
          簡單來說RedLock 的思想是使用多臺Redis Master,節(jié)點完全獨立,節(jié)點間不需要進行數(shù)據(jù)同步。

          假設N個節(jié)點,在有效時間內當獲得鎖的數(shù)量大于 (N/2+1) 代表成功,失敗后需要向所有節(jié)點發(fā)送釋放鎖的消息。

          RedLock的方式也有缺點,因為需要對多個節(jié)點操作加鎖解鎖,高并發(fā)情況下,耗時較長響應延遲,對性能有影響。

          后面我們找個時間專門講一下RedLock源碼以及存在的問題。

          Config config1 = new Config();
          config1.useSingleServer().setAddress("redis://ip1:port1").setPassword("...").setDatabase(0);
          RedissonClient redissonClient1 = Redisson.create(config1);

          Config config2 = new Config();
          config2.useSingleServer().setAddress("redis://ip2:port2").setPassword("...").setDatabase(0);
          RedissonClient redissonClient2 = Redisson.create(config2);

          Config config3 = new Config();
          config3.useSingleServer().setAddress("redis://ip3:port3").setPassword("...").setDatabase(0);
                  
          RedissonClient redissonClient3 = Redisson.create(config3);

          String lockKey = "lock_key";
          RedissonRedLock redLock = new RedissonRedLock(
                          redissonClient1.getLock(lockKey), 
                          redissonClient2.getLock(lockKey), 
                          redissonClient3.getLock(lockKey)
                          );
          // boolean isLock;
          try {
              // 嘗試獲取分布式鎖
              // isLock = redLock.tryLock(500, 15000, TimeUnit.MILLISECONDS);
              redLock.lock();
              // 日志...業(yè)務處理等...
          catch (Exception e) {
              // 日志等...
          finally {
              // 解鎖
              redLock.unlock();
          }

          事務問題

          至此,是不是萬無一失了呢?其實還沒完,還有一個隱藏問題。
          我們知道,MySQL默認的事務隔離級別是Repeatable-Read可重復讀。

          通過上述表格可以看出,Repeatable-Read這種隔離級別在同一個事務中多次讀取的數(shù)據(jù)是一致的;
          另一方面,Spring聲明式事務默認的傳播特性是Required,在調用聲明式事務修飾的方法stockHandle之前就已經開啟了事務;

          以上兩點會導致:

          1.)線程Thread-A和Thread-B都執(zhí)行到該方法

          2.)各自開啟了事務Transation-A和Transation-B

          3.)Transation-A先執(zhí)行加鎖、執(zhí)行、解鎖

          4.)Transation-B后執(zhí)行加鎖、執(zhí)行、解鎖

          5.)由于Transation-B事務的開啟,是在Transation-A事務提交之前

          6.)此時默認隔離級別Repeatable-Read,事務Transation-B事務讀取不到Transation-A已經提交的數(shù)據(jù)

          7.)就會出現(xiàn)Transation-A和Transation-B事務開啟后讀取到的值是一樣的,即Transation-B讀取的是Transation-A更新前的數(shù)據(jù)

          • 要解決這種隱藏BUG,可以將庫存信息放入Redis,利用Redis的decr方法在分布式環(huán)境下原子性的扣減庫存:

          // 省略其他代碼
          // Redis原子扣減庫存
             Long stock = redisTemplate.opsForValue().decrement(1);
             if (null == stock || Integer.parseInt(stock.toString()) < 0){
                // 庫存異常
             return false;
          }
          // 更新數(shù)據(jù)庫等...

          至于MQ和Zookeeper方式今天不在此介紹啦,大家感興趣的話我后面專門開篇來講。

          其實沒有最完美無缺的方案,以上方案還是會存在某些特定場景下的特定問題,具體場景具體分析,逐步優(yōu)化,一步步思考,加固項目城墻之余,也夯實自身的技術壁壘。






          關注Java技術棧看更多干貨



          獲取 Spring Boot 實戰(zhàn)筆記!
          瀏覽 74
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  被错误羁押6千天男子申请国赔1911万 | 成年人的免费视频 | 日韩三级视频在线观看 | 豆花a在线观看 | 国产美女精品久久久 |