<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          加鎖了還有并發(fā)問題?Redis分布式鎖,真的用對了?

          共 24592字,需瀏覽 50分鐘

           ·

          2021-10-30 01:58

          新接手的項目,偶爾會出現(xiàn)賬不平的問題。之前的技術老大臨走時給的解釋是:排查了,沒找到原因,之后太忙就沒再解決,可能是框架的原因……

          既然項目交付到手中,這樣的問題是必須要解決的。梳理了所有賬務處理邏輯,最終找到了原因:數(shù)據(jù)庫并發(fā)操作熱點賬戶導致。就這這個問題,來聊一聊分布式系統(tǒng)下基于Redis的分布式鎖。順便也分解一下問題形成原因及解決方案。

          原因分析

          系統(tǒng)并發(fā)量并不高,存在熱點賬戶,但也不至于那么嚴重。問題的根源在于系統(tǒng)架構設計,人為的制造了并發(fā)。場景是這樣的:商戶批量導入一批數(shù)據(jù),系統(tǒng)會進行前置處理,并對賬戶余額進行增減。

          此時,另外一個定時任務,也會對賬戶進行掃描更新。而且對同一賬戶的操作分布到各個系統(tǒng)當中,熱點賬戶也就出現(xiàn)了。

          針對此問題的解決方案,從架構層面可以考慮將賬務系統(tǒng)進行抽離,集中在一個系統(tǒng)中進行處理,所有的數(shù)據(jù)庫事務及執(zhí)行順序由賬務系統(tǒng)來統(tǒng)籌處理。從技術方面來講,則可以通過鎖機制來對熱點賬戶進行加鎖。

          本篇文章就針對熱點賬戶基于分布式鎖的實現(xiàn)方式進行詳細的講解。

          鎖的分析

          在Java的多線程環(huán)境下,通常有幾類鎖可以使用:

          • JVM內存模型級別的鎖,常用的有:synchronized、Lock等;
          • 數(shù)據(jù)庫鎖,比如樂觀鎖,悲觀鎖等;
          • 分布式鎖;

          JVM內存級別的鎖,可以保證單體服務下線程的安全性,比如多個線程訪問/修改一個全局變量。但當系統(tǒng)進行集群部署時,JVM級別的本地鎖就無能為力了。

          悲觀鎖與樂觀鎖

          像上述案例中,熱點賬戶就屬于分布式系統(tǒng)中的共享資源,我們通常會采用數(shù)據(jù)庫鎖分布式鎖來進行解決。

          數(shù)據(jù)庫鎖,又分為樂觀鎖悲觀鎖。

          悲觀鎖是基于數(shù)據(jù)庫(Mysql的InnoDB)提供的排他鎖來實現(xiàn)的。在進行事務操作時,通過select ... for update語句,MySQL會對查詢結果集中每行數(shù)據(jù)都添加排他鎖,其他線程對該記錄的更新與刪除操作都會阻塞。從而達到共享資源的順序執(zhí)行(修改);

          樂觀鎖是相對悲觀鎖而言的,樂觀鎖假設數(shù)據(jù)一般情況不會造成沖突,所以在數(shù)據(jù)進行提交更新的時候,才會正式對數(shù)據(jù)的沖突與否進行檢測。如果沖突則返回給用戶異常信息,讓用戶決定如何去做。樂觀鎖適用于讀多寫少的場景,這樣可以提高程序的吞吐量。在樂觀鎖實現(xiàn)時通常會基于記錄狀態(tài)或添加version版本來進行實現(xiàn)。

          悲觀鎖失效場景

          項目中使用了悲觀鎖,但悲觀鎖卻失效了。這也是使用悲觀鎖時,常見的誤區(qū),下面來分析一下。

          正常使用悲觀鎖的流程:

          • 通過select ... for update鎖定記錄;
          • 計算新余額,修改金額并存儲;
          • 執(zhí)行完成釋放鎖;

          經(jīng)常犯錯的處理流程:

          • 查詢賬戶余額,計算新余額;
          • 通過select ... for update鎖定記錄;
          • 修改金額并存儲;
          • 執(zhí)行完成釋放鎖;

          錯誤的流程中,比如A和B服務查詢到的余額都是100,A扣減50,B扣減40,然后A鎖定記錄,更新數(shù)據(jù)庫為50;A釋放鎖之后,B鎖定記錄,更新數(shù)據(jù)庫為60。顯然,后者把前者的更新給覆蓋掉了。解決的方案就是擴大鎖的范圍,將鎖提前到計算新余額之前。

          通常悲觀鎖對數(shù)據(jù)庫的壓力是非常大的,在實踐中通常會根據(jù)場景使用樂觀鎖或分布式鎖等方式來實現(xiàn)。

          下面進入正題,講講基于Redis的分布式鎖實現(xiàn)。

          Redis分布式鎖實戰(zhàn)演習

          這里以Spring Boot、Redis、Lua腳本為例來演示分布式鎖的實現(xiàn)。為了簡化處理,示例中Redis既承擔了分布式鎖的功能,也承擔了數(shù)據(jù)庫的功能。

          場景構建

          集群環(huán)境下,對同一個賬戶的金額進行操作,基本步驟:

          • 從數(shù)據(jù)庫讀取用戶金額;
          • 程序修改金額;
          • 再將最新金額存儲到數(shù)據(jù)庫;

          下面從最初不加鎖,不同步處理,逐步推演出最終的分布式鎖。

          基礎集成及類構建

          準備一個不加鎖處理的基礎業(yè)務環(huán)境。

          首先在Spring Boot項目中引入相關依賴:

          <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-data-redis</artifactId>
          </dependency>
          <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-web</artifactId>
          </dependency>

          賬戶對應實體類UserAccount:

          public class UserAccount {

           //用戶ID
           private String userId;
           //賬戶內金額
           private int amount;

           //添加賬戶金額
           public void addAmount(int amount) {
            this.amount = this.amount + amount;
           }
           // 省略構造方法和getter/setter 
          }

          創(chuàng)建一個線程實現(xiàn)類AccountOperationThread:

          public class AccountOperationThread implements Runnable {

           private final static Logger logger = LoggerFactory.getLogger(AccountOperationThread.class);

           private static final Long RELEASE_SUCCESS = 1L;

           private String userId;

           private RedisTemplate<Object, Object> redisTemplate;

           public AccountOperationThread(String userId, RedisTemplate<Object, Object> redisTemplate) {
            this.userId = userId;
            this.redisTemplate = redisTemplate;
           }

           @Override
           public void run() {
            noLock();
           }

           /**
            * 不加鎖
            */
           private void noLock() {
            try {
             Random random = new Random();
             // 模擬線程進行業(yè)務處理
             TimeUnit.MILLISECONDS.sleep(random.nextInt(100) + 1);
            } catch (InterruptedException e) {
             e.printStackTrace();
            }
            //模擬數(shù)據(jù)庫中獲取用戶賬號
            UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId);
            // 金額+1
            userAccount.addAmount(1);
            logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount());
            //模擬存回數(shù)據(jù)庫
            redisTemplate.opsForValue().set(userId, userAccount);
           }
          }

          其中RedisTemplate的實例化交給了Spring Boot:

          @Configuration
          public class RedisConfig {

           @Bean
           public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
            RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
            redisTemplate.setConnectionFactory(redisConnectionFactory);
            Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer =
              new Jackson2JsonRedisSerializer<>(Object.class);
            ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
            jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
            // 設置value的序列化規(guī)則和 key的序列化規(guī)則
            redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
            redisTemplate.setKeySerializer(new StringRedisSerializer());
            redisTemplate.afterPropertiesSet();
            return redisTemplate;
           }
          }

          最后,再準備一個TestController來進行觸發(fā)多線程的運行:

          @RestController
          public class TestController {

           private final static Logger logger = LoggerFactory.getLogger(TestController.class);

           private static ExecutorService executorService = Executors.newFixedThreadPool(10);

           @Autowired
           private RedisTemplate<Object, Object> redisTemplate;

           @GetMapping("/test")
           public String test() throws InterruptedException {
            // 初始化用戶user_001到Redis,賬戶金額為0
            redisTemplate.opsForValue().set("user_001", new UserAccount("user_001", 0));
            // 開啟10個線程進行同步測試,每個線程為賬戶增加1元
            for (int i = 0; i < 10; i++) {
             logger.info("創(chuàng)建線程i=" + i);
             executorService.execute(new AccountOperationThread("user_001", redisTemplate));
            }

            // 主線程休眠1秒等待線程跑完
            TimeUnit.MILLISECONDS.sleep(1000);
            // 查詢Redis中的user_001賬戶
            UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get("user_001");
            logger.info("user id : " + userAccount.getUserId() + " amount : " + userAccount.getAmount());
            return "success";
           }
          }

          執(zhí)行上述程序,正常來說10個線程,每個線程加1,結果應該是10。但多執(zhí)行幾次,會發(fā)現(xiàn),結果變化很大,基本上都要比10小。

          [pool-1-thread-5] c.s.redis.thread.AccountOperationThread  : pool-1-thread-5 : user id : user_001 amount : 1
          [pool-1-thread-4] c.s.redis.thread.AccountOperationThread  : pool-1-thread-4 : user id : user_001 amount : 1
          [pool-1-thread-3] c.s.redis.thread.AccountOperationThread  : pool-1-thread-3 : user id : user_001 amount : 1
          [pool-1-thread-1] c.s.redis.thread.AccountOperationThread  : pool-1-thread-1 : user id : user_001 amount : 1
          [pool-1-thread-1] c.s.redis.thread.AccountOperationThread  : pool-1-thread-1 : user id : user_001 amount : 2
          [pool-1-thread-2] c.s.redis.thread.AccountOperationThread  : pool-1-thread-2 : user id : user_001 amount : 2
          [pool-1-thread-5] c.s.redis.thread.AccountOperationThread  : pool-1-thread-5 : user id : user_001 amount : 2
          [pool-1-thread-4] c.s.redis.thread.AccountOperationThread  : pool-1-thread-4 : user id : user_001 amount : 3
          [pool-1-thread-1] c.s.redis.thread.AccountOperationThread  : pool-1-thread-1 : user id : user_001 amount : 4
          [pool-1-thread-3] c.s.redis.thread.AccountOperationThread  : pool-1-thread-3 : user id : user_001 amount : 5
          [nio-8080-exec-1] c.s.redis.controller.TestController      : user id : user_001 amount : 5

          以上述日志為例,前四個線程都將值改為1,也就是后面三個線程都將前面的修改進行了覆蓋,導致最終結果不是10,只有5。這顯然是有問題的。

          Redis同步鎖實現(xiàn)

          針對上面的情況,在同一個JVM當中,我們可以通過線程加鎖來完成。但在分布式環(huán)境下,JVM級別的鎖是沒辦法實現(xiàn)的,這里可以采用Redis同步鎖實現(xiàn)。

          基本思路:第一個線程進入時,在Redis中進記錄,當后續(xù)線程過來請求時,判斷Redis是否存在該記錄,如果存在則說明處于鎖定狀態(tài),進行等待或返回。如果不存在,則進行后續(xù)業(yè)務處理。

            /**
            * 1.搶占資源時判斷是否被鎖。
            * 2.如未鎖則搶占成功且加鎖,否則等待鎖釋放。
            * 3.業(yè)務完成后釋放鎖,讓給其它線程。
            * <p>
            * 該方案并未解決同步問題,原因:線程獲得鎖和加鎖的過程,并非原子性操作,可能會導致線程A獲得鎖,還未加鎖時,線程B也獲得了鎖。
            */
           private void redisLock() {
            Random random = new Random();
            try {
             TimeUnit.MILLISECONDS.sleep(random.nextInt(1000) + 1);
            } catch (InterruptedException e) {
             e.printStackTrace();
            }
            while (true) {
             Object lock = redisTemplate.opsForValue().get(userId + ":syn");
             if (lock == null) {
              // 獲得鎖 -> 加鎖 -> 跳出循環(huán)
              logger.info(Thread.currentThread().getName() + ":獲得鎖");
              redisTemplate.opsForValue().set(userId + ":syn""lock");
              break;
             }
             try {
              // 等待500毫秒重試獲得鎖
              TimeUnit.MILLISECONDS.sleep(500);
             } catch (InterruptedException e) {
              e.printStackTrace();
             }
            }
            try {
             //模擬數(shù)據(jù)庫中獲取用戶賬號
             UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId);
             if (userAccount != null) {
              //設置金額
              userAccount.addAmount(1);
              logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount());
              //模擬存回數(shù)據(jù)庫
              redisTemplate.opsForValue().set(userId, userAccount);
             }
            } finally {
             //釋放鎖
             redisTemplate.delete(userId + ":syn");
             logger.info(Thread.currentThread().getName() + ":釋放鎖");
            }
           }

          在while代碼塊中,先判斷對應用戶ID是否在Redis中存在,如果不存在,則進行set加鎖,如果存在,則跳出循環(huán)繼續(xù)等待。

          上述代碼,看起來實現(xiàn)了加鎖的功能,但當執(zhí)行程序時,會發(fā)現(xiàn)與未加鎖一樣,依舊存在并發(fā)問題。原因是:獲取鎖和加鎖的操作并不是原子的。比如兩個線程發(fā)現(xiàn)lock都是null,都進行了加鎖,此時并發(fā)問題依舊存在。

          Redis原子性同步鎖

          針對上述問題,可將獲取鎖和加鎖的過程原子化處理?;趕pring-boot-data-redis提供的原子化API可以實現(xiàn):

          // 該方法使用了redis的指令:SETNX key value
          // 1.key不存在,設置成功返回value,setIfAbsent返回true;
          // 2.key存在,則設置失敗返回null,setIfAbsent返回false;
          // 3.原子性操作;
          Boolean setIfAbsent(K var1, V var2);

          上述方法的原子化操作是對Redis的setnx命令的封裝,在Redis中setnx的使用如下實例:

          redis> SETNX mykey "Hello"
          (integer) 1
          redis> SETNX mykey "World"
          (integer) 0
          redis> GET mykey
          "Hello"

          第一次,設置mykey時,并不存在,則返回1,表示設置成功;第二次設置mykey時,已經(jīng)存在,則返回0,表示設置失敗。再次查詢mykey對應的值,會發(fā)現(xiàn)依舊是第一次設置的值。也就是說redis的setnx保證了唯一的key只能被一個服務設置成功。

          理解了上述API及底層原理,來看看線程中的實現(xiàn)方法代碼如下:

           /**
            * 1.原子操作加鎖
            * 2.競爭線程循環(huán)重試獲得鎖
            * 3.業(yè)務完成釋放鎖
            */
           private void atomicityRedisLock() {
            //Spring data redis 支持的原子性操作
            while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn""lock")) {
             try {
              // 等待100毫秒重試獲得鎖
              TimeUnit.MILLISECONDS.sleep(100);
             } catch (InterruptedException e) {
              e.printStackTrace();
             }
            }
            logger.info(Thread.currentThread().getName() + ":獲得鎖");
            try {
             //模擬數(shù)據(jù)庫中獲取用戶賬號
             UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId);
             if (userAccount != null) {
              //設置金額
              userAccount.addAmount(1);
              logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount());
              //模擬存回數(shù)據(jù)庫
              redisTemplate.opsForValue().set(userId, userAccount);
             }
            } finally {
             //釋放鎖
             redisTemplate.delete(userId + ":syn");
             logger.info(Thread.currentThread().getName() + ":釋放鎖");
            }
           }

          再次執(zhí)行代碼,會發(fā)現(xiàn)結果正確了,也就是說可以成功的對分布式線程進行了加鎖。

          Redis分布式鎖的死鎖

          雖然上述代碼執(zhí)行結果沒問題,但如果應用異常宕機,沒來得及執(zhí)行finally中釋放鎖的方法,那么其他線程則永遠無法獲得這個鎖。

          此時可采用setIfAbsent的重載方法:

          Boolean setIfAbsent(K var1, V var2, long var3, TimeUnit var5);

          基于該方法,可以設置鎖的過期時間。這樣即便獲得鎖的線程宕機,在Redis中數(shù)據(jù)過期之后,其他線程可正常獲得該鎖。

          示例代碼如下:

          private void atomicityAndExRedisLock() {
            try {
             //Spring data redis 支持的原子性操作,并設置5秒過期時間
             while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn",
               System.currentTimeMillis() + 5000, 5000, TimeUnit.MILLISECONDS)) {
              // 等待100毫秒重試獲得鎖
              logger.info(Thread.currentThread().getName() + ":嘗試循環(huán)獲取鎖");
              TimeUnit.MILLISECONDS.sleep(1000);
             }
             logger.info(Thread.currentThread().getName() + ":獲得鎖--------");
             // 應用在這里宕機,進程退出,無法執(zhí)行 finally;
             Thread.currentThread().interrupt();
             // 業(yè)務邏輯...
            } catch (InterruptedException e) {
             e.printStackTrace();
            } finally {
             //釋放鎖
             if (!Thread.currentThread().isInterrupted()) {
              redisTemplate.delete(userId + ":syn");
              logger.info(Thread.currentThread().getName() + ":釋放鎖");
             }
            }
           }

          業(yè)務超時及守護線程

          上面添加了Redis所的超時時間,看似解決了問題,但又引入了新的問題。

          比如,正常情況下線程A在5秒內可正常處理完業(yè)務,但偶發(fā)會出現(xiàn)超過5秒的情況。如果將超時時間設置為5秒,線程A獲得了鎖,但業(yè)務邏輯處理需要6秒。此時,線程A還在正常業(yè)務邏輯,線程B已經(jīng)獲得了鎖。當線程A處理完時,有可能將線程B的鎖給釋放掉。

          在上述場景中有兩個問題點:

          • 第一,線程A和線程B可能會同時在執(zhí)行,存在并發(fā)問題。
          • 第二,線程A可能會把線程B的鎖給釋放掉,導致一系列的惡性循環(huán)。

          當然,可以通過在Redis中設置value值來判斷鎖是屬于線程A還是線程B。但仔細分析會發(fā)現(xiàn),這個問題的本質是因為線程A執(zhí)行業(yè)務邏輯耗時超出了鎖超時的時間。

          那么就有兩個解決方案了:

          • 第一,將超時時間設置的足夠長,確保業(yè)務代碼能夠在鎖釋放之前執(zhí)行完成;
          • 第二,為鎖添加守護線程,為將要過期釋放但未釋放的鎖增加時間;

          第一種方式需要全行大多數(shù)情況下業(yè)務邏輯的耗時,進行超時時間的設定。

          第二種方式,可通過如下守護線程的方式來動態(tài)增加鎖超時時間。

          public class DaemonThread implements Runnable {
           private final static Logger logger = LoggerFactory.getLogger(DaemonThread.class);

           // 是否需要守護 主線程關閉則結束守護線程
           private volatile boolean daemon = true;
           // 守護鎖
           private String lockKey;

           private RedisTemplate<Object, Object> redisTemplate;

           public DaemonThread(String lockKey, RedisTemplate<Object, Object> redisTemplate) {
            this.lockKey = lockKey;
            this.redisTemplate = redisTemplate;
           }

           @Override
           public void run() {
            try {
             while (daemon) {
              long time = redisTemplate.getExpire(lockKey, TimeUnit.MILLISECONDS);
              // 剩余有效期小于1秒則續(xù)命
              if (time < 1000) {
               logger.info("守護進程: " + Thread.currentThread().getName() + " 延長鎖時間 5000 毫秒");
               redisTemplate.expire(lockKey, 5000, TimeUnit.MILLISECONDS);
              }
              TimeUnit.MILLISECONDS.sleep(300);
             }
             logger.info(" 守護進程: " + Thread.currentThread().getName() + "關閉 ");
            } catch (InterruptedException e) {
             e.printStackTrace();
            }
           }

           // 主線程主動調用結束
           public void stop() {
            daemon = false;
           }
          }

          上述線程每隔300毫秒獲取一下Redis中鎖的超時時間,如果小于1秒,則延長5秒。當主線程調用關閉時,守護線程也隨之關閉。

          主線程中相關代碼實現(xiàn):

          private void deamonRedisLock() {
            //守護線程
            DaemonThread daemonThread = null;
            //Spring data redis 支持的原子性操作,并設置5秒過期時間
            String uuid = UUID.randomUUID().toString();
            String value = Thread.currentThread().getId() + ":" + uuid;
            try {
             while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", value, 5000, TimeUnit.MILLISECONDS)) {
              // 等待100毫秒重試獲得鎖
              logger.info(Thread.currentThread().getName() + ":嘗試循環(huán)獲取鎖");
              TimeUnit.MILLISECONDS.sleep(1000);
             }
             logger.info(Thread.currentThread().getName() + ":獲得鎖----");
             // 開啟守護線程
             daemonThread = new DaemonThread(userId + ":syn", redisTemplate);
             Thread thread = new Thread(daemonThread);
             thread.start();
             // 業(yè)務邏輯執(zhí)行10秒...
             TimeUnit.MILLISECONDS.sleep(10000);
            } catch (InterruptedException e) {
             e.printStackTrace();
            } finally {
             //釋放鎖 這里也需要原子操作,今后通過 Redis + Lua 講
             String result = (String) redisTemplate.opsForValue().get(userId + ":syn");
             if (value.equals(result)) {
              redisTemplate.delete(userId + ":syn");
              logger.info(Thread.currentThread().getName() + ":釋放鎖-----");
             }
             //關閉守護線程
             if (daemonThread != null) {
              daemonThread.stop();
             }
            }
           }

          其中在獲得鎖之后,開啟守護線程,在finally中將守護線程關閉。

          基于Lua腳本的實現(xiàn)

          在上述邏輯中,我們是基于spring-boot-data-redis提供的原子化操作來保證鎖判斷和執(zhí)行的原子化的。在非Spring Boot項目中,則可以基于Lua腳本來實現(xiàn)。

          首先定義加鎖和解鎖的Lua腳本及對應的DefaultRedisScript對象,在RedisConfig配置類中添加如下實例化代碼:

          @Configuration
          public class RedisConfig {

           //lock script
           private static final String LOCK_SCRIPT = " if redis.call('setnx',KEYS[1],ARGV[1]) == 1 " +
             " then redis.call('expire',KEYS[1],ARGV[2]) " +
             " return 1 " +
             " else return 0 end ";
           private static final String UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call" +
             "('del', KEYS[1]) else return 0 end";

           // ... 省略部分代碼
           
           @Bean
           public DefaultRedisScript<Boolean> lockRedisScript() {
            DefaultRedisScript<Boolean> defaultRedisScript = new DefaultRedisScript<>();
            defaultRedisScript.setResultType(Boolean.class);
            defaultRedisScript.setScriptText(LOCK_SCRIPT);
            return defaultRedisScript;
           }

           @Bean
           public DefaultRedisScript<Long> unlockRedisScript() {
            DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript<>();
            defaultRedisScript.setResultType(Long.class);
            defaultRedisScript.setScriptText(UNLOCK_SCRIPT);
            return defaultRedisScript;
           }
          }

          再通過在AccountOperationThread類中新建構造方法,將上述兩個對象傳入類中(省略此部分演示)。然后,就可以基于RedisTemplate來調用了,改造之后的代碼實現(xiàn)如下:

           private void deamonRedisLockWithLua() {
            //守護線程
            DaemonThread daemonThread = null;
            //Spring data redis 支持的原子性操作,并設置5秒過期時間
            String uuid = UUID.randomUUID().toString();
            String value = Thread.currentThread().getId() + ":" + uuid;
            try {
             while (!redisTemplate.execute(lockRedisScript, Collections.singletonList(userId + ":syn"), value, 5)) {
              // 等待1000毫秒重試獲得鎖
              logger.info(Thread.currentThread().getName() + ":嘗試循環(huán)獲取鎖");
              TimeUnit.MILLISECONDS.sleep(1000);
             }
             logger.info(Thread.currentThread().getName() + ":獲得鎖----");
             // 開啟守護線程
             daemonThread = new DaemonThread(userId + ":syn", redisTemplate);
             Thread thread = new Thread(daemonThread);
             thread.start();
             // 業(yè)務邏輯執(zhí)行10秒...
             TimeUnit.MILLISECONDS.sleep(10000);
            } catch (InterruptedException e) {
             logger.error("異常", e);
            } finally {
             //使用Lua腳本:先判斷是否是自己設置的鎖,再執(zhí)行刪除
             // key存在,當前值=期望值時,刪除key;key存在,當前值!=期望值時,返回0;
             Long result = redisTemplate.execute(unlockRedisScript, Collections.singletonList(userId + ":syn"), value);
             logger.info("redis解鎖:{}", RELEASE_SUCCESS.equals(result));
             if (RELEASE_SUCCESS.equals(result)) {
              if (daemonThread != null) {
               //關閉守護線程
               daemonThread.stop();
               logger.info(Thread.currentThread().getName() + ":釋放鎖---");
              }
             }
            }
           }

          其中while循環(huán)中加鎖和finally中的釋放鎖都是基于Lua腳本來實現(xiàn)了。

          Redis鎖的其他因素

          除了上述實例,在使用Redis分布式鎖時,還可以考慮以下情況及方案。

          Redis鎖的不可重入

          當線程在持有鎖的情況下再次請求加鎖,如果一個鎖支持一個線程多次加鎖,那么這個鎖就是可重入的。如果一個不可重入鎖被再次加鎖,由于該鎖已經(jīng)被持有,再次加鎖會失敗。Redis可通過對鎖進行重入計數(shù),加鎖時加 1,解鎖時減 1,當計數(shù)歸 0時釋放鎖。

          可重入鎖雖然高效但會增加代碼的復雜性,這里就不舉例說明了。

          等待鎖釋放

          有的業(yè)務場景,發(fā)現(xiàn)被鎖則直接返回。但有的場景下,客戶端需要等待鎖釋放然后去搶鎖。上述示例就屬于后者。針對等待鎖釋放也有兩種方案:

          • 客戶端輪訓:當未獲得鎖時,等待一段時間再重新獲取,直到成功。上述示例就是基于這種方式實現(xiàn)的。這種方式的缺點也很明顯,比較耗費服務器資源,當并發(fā)量大時會影響服務器的效率。
          • 使用Redis的訂閱發(fā)布功能:當獲取鎖失敗時,訂閱鎖釋放消息,獲取鎖成功后釋放時,發(fā)送釋放消息。

          集群中的主備切換和腦裂

          在Redis包含主從同步的集群部署方式中,如果主節(jié)點掛掉,從節(jié)點提升為主節(jié)點。如果客戶端A在主節(jié)點加鎖成功,指令還未同步到從節(jié)點,此時主節(jié)點掛掉,從節(jié)點升為主節(jié)點,新的主節(jié)點中沒有鎖的數(shù)據(jù)。這種情況下,客戶端B就可能加鎖成功,從而出現(xiàn)并發(fā)的場景。

          當集群發(fā)生腦裂時,Redis master節(jié)點跟slave 節(jié)點和 sentinel 集群處于不同的網(wǎng)絡分區(qū)。sentinel集群無法感知到master的存在,會將 slave 節(jié)點提升為 master 節(jié)點,此時就會存在兩個不同的 master 節(jié)點。從而也會導致并發(fā)問題的出現(xiàn)。Redis Cluster集群部署方式同理。

          小結

          通過生產環(huán)境中的一個問題,排查原因,尋找解決方案,到最終對基于Redis分布式的深入研究,這便是學習的過程。

          同時,每當面試或被問題如何解決分布式共享資源時,我們會脫口而出”基于Redis實現(xiàn)分布式鎖“,但通過本文的學習會發(fā)現(xiàn),Redis分布式鎖并不是萬能的,而且在使用的過程中還需要注意超時、死鎖、誤解鎖、集群選主/腦裂等問題。

          Redis以高性能著稱,但在實現(xiàn)分布式鎖的過程中還是存在一些問題。因此,基于Redis的分布式鎖可以極大的緩解并發(fā)問題,但要完全防止并發(fā),還是得從數(shù)據(jù)庫層面入手。


          源碼地址:

          https://github.com/secbr/springboot-all/tree/master/springboot-redis-lock

          參考文章:

          https://jinzhihong.github.io/2019/08/12/%E6%B7%B1%E5%85%A5%E6%B5%85%E5%87%BA-Redis-%E5%88%86%E5%B8%83%E5%BC%8F%E9%94%81%E5%8E%9F%E7%90%86%E4%B8%8E%E5%AE%9E%E7%8E%B0-%E4%B8%80/

          https://xiaomi-info.github.io/2019/12/17/redis-distributed-lock/

          往期推薦

          99%的人沒弄懂volatile的設計原理,更別說靈活運用了

          Maven Jar包沖突?看看高手是怎么解決的

          從Jar包沖突搞到類加載機制,就是這么霸氣

          如何輕松給Spring Boot配置文件加個密?

          08篇 要給Nacos的UDP通信功能點個贊



          如果你覺得這篇文章不錯,那么,下篇通常會更好。添加微信好友,可備注“加群”(微信號:zhuan2quan)

          一篇文章就看透技術本質的人,
            和花一輩子都看不清的人,
            注定是截然不同的搬磚生涯。
          ▲ 按關注”程序新視界“,洞察技術內幕
          瀏覽 34
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  天天操天天爱2020 | www.婷婷 | 青娱在线视频 | 亚洲国产色婷婷 | 黄a免费看 |