<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          可能要用心學(xué)高并發(fā)核心編程,限流原理與實戰(zhàn),分布式令牌桶限流

          共 5991字,需瀏覽 12分鐘

           ·

          2022-03-02 11:30

          實戰(zhàn):分布式令牌桶限流

          本節(jié)介紹的分布式令牌桶限流通過Lua+Java結(jié)合完成,首先在Lua腳本中完成限流的計算,然后在Java代碼中進(jìn)行組織和調(diào)用。


          分布式令牌桶限流Lua腳本

          分布式令牌桶限流Lua腳本的核心邏輯和Java令牌桶的執(zhí)行邏輯類似,只是限流計算相關(guān)的統(tǒng)計和時間數(shù)據(jù)存放于Redis中。

          這里將限流的腳本命名為rate_limiter.lua,該腳本既使用Redis存儲令牌桶信息,自身又執(zhí)行于Redis中,所以筆者將該腳本放置于base-redis基礎(chǔ)模塊中,它的代碼如下:

          ---此腳本的環(huán)境:redis內(nèi)部,不是運行在Nginx內(nèi)部
          ---方法:申請令牌
          ----1:failed
          ---1:success
          ---@param key:key限流關(guān)鍵字
          ---@param apply:申請的令牌數(shù)量
          local function acquire(key, apply)
          local times = redis.call('TIME');
          --times[1] 秒數(shù) --times[2] 微秒數(shù)
          local curr_mill_second = times[1] *1000000 + times[2];
          curr_mill_second = curr_mill_second / 1000;
          local cacheInfo = redis.pcall("HMGET", key, "last_mill_second", "curr_permits", "max_permits", "rate")
          ---局部變量:上次申請的時間
          local last_mill_second = cacheInfo[1];
          ---局部變量:之前的令牌數(shù)
          local curr_permits = tonumber(cacheInfo[2]);
          ---局部變量:桶的容量
          local max_permits = tonumber(cacheInfo[3]);
          ---局部變量:令牌的發(fā)放速率
          local rate = cacheInfo[4];
          ---局部變量:本次的令牌數(shù)
          local local_curr_permits = max_permits;
          if (type(last_mill_second) ~= 'boolean' and last_mill_second ~= nil) then
          --計算時間段內(nèi)的令牌數(shù)
          local reverse_permits = math.floor(((curr_mill_second - last_mill_second) / 1000) *rate);
          --令牌總數(shù)
          local expect_curr_permits = reverse_permits + curr_permits;
          --可以申請的令牌總數(shù)
          local_curr_permits = math.min(expect_curr_permits, max_permits);
          else
          --第一次獲取令牌
          redis.pcall("HSET", key, "last_mill_second", curr_mill_second)
          end
          local result = -1;
          --有足夠的令牌可以申請
          if (local_curr_permits - apply >= 0) then
          --保存剩余的令牌
          redis.pcall("HSET", key, "curr_permits", local_curr_permits - apply);
          --保存時間,下次令牌獲取時使用
          redis.pcall("HSET", key, "last_mill_second", curr_mill_second)
          --返回令牌獲取成功
          result = 1;
          else
          --保存令牌總數(shù)
          redis.pcall("HSET", key, "curr_permits", local_curr_permits);
          --返回令牌獲取失敗
          result = -1;
          end
          return result
          end
          ---方法:初始化限流器
          ---1 success
          ---@param key key
          ---@param max_permits 桶的容量
          ---@param rate 令牌的發(fā)放速率
          local function init(key, max_permits, rate)
          local rate_limit_info = redis.pcall("HMGET", key, "last_mill_second", "curr_permits", "max_permits", "rate")
          local org_max_permits = tonumber(rate_limit_info[3])
          local org_rate = rate_limit_info[4]
          if (org_max_permits == nil) or (rate ~= org_rate or max_permits ~= org_max_permits) then
          redis.pcall("HMSET", key, "max_permits", max_permits, "rate", rate, "curr_permits", max_permits)
          end
          return 1;
          end
          ---方法:刪除限流Key
          local function delete(key)
          redis.pcall("DEL", key) return 1;
          end
          local key = KEYS[1]
          local method = ARGV[1]
          if method == 'acquire' then
          return acquire(key, ARGV[2], ARGV[3])
          elseif method == 'init' then
          return init(key, ARGV[2], ARGV[3])
          elseif method == 'delete' then
          return delete(key)
          else
          --ignore
          end

          該腳本有3個方法,其中兩個方法比較重要,分別說明如下:

          (1)限流器初始化方法init(key,max_permits,rate),此方法在限流開始時被調(diào)用。

          (2)限流檢測的方法acquire(key,apply),此方法在請求到來時被調(diào)用。

          Java分布式令牌桶限流

          rate_limiter.lua腳本既可以在Java中調(diào)用,又可以在Nginx中調(diào)用。本小節(jié)先介紹其在Java中的使用,第10章再介紹其在Nginx中的使用。

          Java分布式令牌桶限流器的實現(xiàn)就是通過Java代碼向Redis加載rate_limiter.lua腳本,然后封裝其令牌桶初始化方法init(...)和限流監(jiān)測方法acquire(...),以供外部調(diào)用。它的代碼如下:

          package com.crazymaker.springcloud.standard.ratelimit;
          ...
          /**
          *實現(xiàn):令牌桶限流服務(wù)
          *create by尼恩 @ 瘋狂創(chuàng)客圈
          **/

          @Slf4j
          public class RedisRateLimitImpl implements RateLimitService, InitializingBean
          {
          /**
          *限流器的redis key前綴
          */

          private static final String RATE_LIMITER_KEY_PREFIX = "rate_limiter:";
          //private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
          private RedisRateLimitProperties redisRateLimitProperties;
          private RedisTemplate redisTemplate;
          //lua腳本的實例
          private static RedisScript rateLimiterScript = null;
          //lua腳本的類路徑
          private static String rateLimitLua = "script/rate_limiter.lua";
          static
          {
          //從類路徑文件中加載令牌桶l(fā)ua腳本
          String script = IOUtil.loadJarFile(RedisRateLimitImpl.class.getClassLoader(), rateLimitLua);
          if (StringUtils.isEmpty(script))
          {
          log.error("lua script load failed:" + rateLimitLua);
          } else
          {
          //創(chuàng)建Lua腳本實例
          rateLimiterScript = new DefaultRedisScript<>(script, Long.class);
          }
          }
          public RedisRateLimitImpl(
          RedisRateLimitProperties redisRateLimitProperties,
          RedisTemplate redisTemplate)
          {
          this.redisRateLimitProperties = redisRateLimitProperties;
          this.redisTemplate = redisTemplate;
          }
          private Map<String, LimiterInfo> limiterInfoMap = new HashMap<>();
          /**
          *限流器的信息
          */

          @Builder
          @Data
          public static class LimiterInfo
          {
          /**
          *限流器的key,如秒殺的id
          */
          private String key;
          /**
          *限流器的類型,如seckill
          */

          private String type = "default";
          /**
          *限流器的最大桶容量
          */

          private Integer maxPermits;
          /**
          *限流器的速率
          */

          private Integer rate;
          /**
          *限流器的redis key
          */

          public String fullKey()
          {
          return RATE_LIMITER_KEY_PREFIX + type + ":" + key;
          }
          /**
          *限流器在map中的緩存key
          */

          public String cashKey()
          {
          return type + ":" + key;
          }
          }
          /**
          *限流檢測:是否超過redis令牌桶限速器的限制
          *
          *@param cacheKey計數(shù)器的key
          *@return true or false
          */

          @Override
          public Boolean tryAcquire(String cacheKey)
          {
          if (cacheKey == null)
          {
          return true;
          }
          if (cacheKey.indexOf(":") <= 0)
          {
          cacheKey = "default:" + cacheKey;
          }
          LimiterInfo limiterInfo = limiterInfoMap.get(cacheKey);
          if (limiterInfo == null)
          {
          return true;
          }
          Long acquire = (Long) redisTemplate.execute(rateLimiterScript,
          ImmutableList.of(limiterInfo.fullKey()),
          "acquire",
          "1");
          if (acquire == 1)
          {
          return false;
          }
          return true;
          }
          /**
          *重載方法:限流器初始化
          *
          *@param limiterInfo限流的類型
          */

          public void initLimitKey(LimiterInfo limiterInfo)
          {
          if (null == rateLimiterScript)
          {
          return;
          }
          String maxPermits = limiterInfo.getMaxPermits().toString();
          String rate = limiterInfo.getRate().toString();
          //執(zhí)行redis腳本
          Long result = (Long) redisTemplate.execute(rateLimiterScript,
          ImmutableList.of(limiterInfo.fullKey()),
          "init",
          maxPermits,
          rate); limiterInfoMap.put(limiterInfo.cashKey(), limiterInfo);
          }
          /**
          *限流器初始化
          *
          *@param type類型
          *@param key id
          *@param maxPermits上限
          *@param rate 速度
          */

          public void initLimitKey(String type, String key,
          Integer maxPermits, Integer rate)
          {
          LimiterInfo limiterInfo = LimiterInfo.builder()
          .type(type)
          .key(key)
          .maxPermits(maxPermits)
          .rate(rate)
          .build();
          initLimitKey(limiterInfo);
          }
          /**
          *獲取redis lua腳本的sha1編碼,并緩存到redis
          */

          public String cacheSha1()
          {
          String sha1 = rateLimiterScript.getSha1();
          redisTemplate.opsForValue().set("lua:sha1:rate_limiter", sha1);
          return sha1;
          }
          }

          Java分布式令牌桶限流的自驗證

          自驗證的工作:首先初始化分布式令牌桶限流器,然后使用兩條

          線程不斷進(jìn)行限流的檢測。自驗證的代碼如下:

          package com.crazymaker.springcloud.ratelimit;
          ...
          @Slf4j
          @RunWith(SpringRunner.class)
          //指定啟動類
          @SpringBootTest(classes
          = {DemoCloudApplication.class})
          /**
          *redis分布式令牌桶測試類
          */
          public class RedisRateLimitTest
          {
          @Resource(name = "redisRateLimitImpl")
          RedisRateLimitImpl limitService;
          //線程池,用于多線程模擬測試
          private ExecutorService pool = Executors.newFixedThreadPool(10);
          @Test
          public void testRedisRateLimit()
          {
          //初始化分布式令牌桶限流器
          limitService.initLimitKey(
          "seckill", //redis key中的類型
          "10000", //redis key中的業(yè)務(wù)key,比如商品id
          2, //桶容量
          2); //每秒令牌數(shù)
          AtomicInteger count = new AtomicInteger();
          long start = System.currentTimeMillis();
          //線程數(shù)
          final int threads = 2;
          //每條線程的執(zhí)行輪數(shù)
          final int turns = 20;
          //同步器
          CountDownLatch countDownLatch = new CountDownLatch(threads);
          for (int i = 0; i < threads; i++)
          {
          pool.submit(() ->
          {
          try
          {
          //每個用戶訪問turns次
          for (int j = 0; j < turns; j++)
          {
          boolean limited = limitService.tryAcquire
          ("seckill:10000");
          if (limited)
          {
          count.getAndIncrement();
          }
          Thread.sleep(200);
          }
          } catch (Exception e)
          { e.printStackTrace();
          }
          countDownLatch.countDown();
          });
          }
          try
          {
          countDownLatch.await();
          } catch (InterruptedException e)
          {
          e.printStackTrace();
          }
          float time = (System.currentTimeMillis() - start) / 1000F;
          //輸出統(tǒng)計結(jié)果
          log.info("限制的次數(shù)為:" + count.get() + " 時長為:" + time);
          log.info("限制的次數(shù)為:" + count.get() +
          ",通過的次數(shù)為:" + (threads *turns - count.get()));
          log.info("限制的比例為:" +
          (float) count.get() / (float) (threads *turns));
          log.info("運行的時長為:" + time);
          try
          {
          Thread.sleep(Integer.MAX_VALUE);
          } catch (InterruptedException e)
          {
          e.printStackTrace();
          }
          }
          }

          兩條線程各運行20次,每一次運行休眠200毫秒,總計耗時4秒,

          運行40次,部分輸出結(jié)果如下:

          [main] INFO c.c.s.r.RedisRateLimitTest - 限制的次數(shù)為:32 時長為:4.015
          [main] INFO c.c.s.r.RedisRateLimitTest - 限制的次數(shù)為:32,通過的次數(shù)為:8
          [main] INFO c.c.s.r.RedisRateLimitTest - 限制的比例為:0.8
          [main] INFO c.c.s.r.RedisRateLimitTest - 運行的時長為:4.015

          大家可以自行調(diào)整參數(shù),運行以上自驗證程序并觀察實驗結(jié)果,體驗一下分布式令牌桶限流的效果。

          本文給大家講解的內(nèi)容是高并發(fā)核心編程,限流原理與實戰(zhàn),實戰(zhàn):分布式令牌桶限流

          1. 下篇文章給大家講解的是高并發(fā)核心編程,Spring Cloud+Nginx秒殺實戰(zhàn);

          2. 覺得文章不錯的朋友可以轉(zhuǎn)發(fā)此文關(guān)注小編;

          3. 感謝大家的支持!


          本文就是愿天堂沒有BUG給大家分享的內(nèi)容,大家有收獲的話可以分享下,想學(xué)習(xí)更多的話可以到微信公眾號里找我,我等你哦。

          瀏覽 59
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  黄色片免费观看视频 | 一级日逼片 | 男人的天堂黄色 | 狠狠穞A片一區二區三區 | 欧美高清中文字幕精品日韩不卡国产在线 |