<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          精講高并發(fā)核心編程,限流原理與實(shí)戰(zhàn),限流策略原理與參考實(shí)現(xiàn)

          共 2064字,需瀏覽 5分鐘

           ·

          2022-03-02 11:29


          限流原理與實(shí)戰(zhàn)

          在通信領(lǐng)域中,限流技術(shù)(Time Limiting)被用來控制網(wǎng)絡(luò)接口收發(fā)通信數(shù)據(jù)的速率,實(shí)現(xiàn)通信時(shí)的優(yōu)化性能、較少延遲和提高帶寬等。

          互聯(lián)網(wǎng)領(lǐng)域中借鑒了通信領(lǐng)域的限流概念,用來控制在高并發(fā)、大流量的場景中對服務(wù)接口請求的速率,比如雙十一秒殺、搶購、搶票、搶單等場景。

          舉一個(gè)具體的例子,假設(shè)某個(gè)接口能夠扛住的QPS為10 000,這時(shí)有20 000個(gè)請求進(jìn)來,經(jīng)過限流模塊,會先放10 000個(gè)請求,其余的請求會阻塞一段時(shí)間。不簡單粗暴地返回404,讓客戶端重試,同時(shí)又能起到流量削峰的作用。

          每個(gè)API接口都是有訪問上限的,當(dāng)訪問頻率或者并發(fā)量超過其承受范圍時(shí),就必須通過限流來保證接口的可用性或者降級可用性,給接口安裝上保險(xiǎn)絲,以防止非預(yù)期的請求對系統(tǒng)壓力過大而引起系統(tǒng)癱瘓。

          接口限流的算法主要有3種,分別是計(jì)數(shù)器限流、漏桶算法和令牌桶算法。接下來為大家一一介紹。

          限流策略原理與參考實(shí)現(xiàn)

          在高并發(fā)訪問的情況下,通常會通過限流的手段來控制流量問題,以保證服務(wù)器處于正常壓力下。

          首先給大家介紹3種常見的限流策略。

          ?3種限流策略:計(jì)數(shù)器、漏桶和令牌桶

          限流的手段通常有計(jì)數(shù)器、漏桶和令牌桶。注意限流和限速(所有請求都會處理)的差別,視業(yè)務(wù)場景而定。

          (1)計(jì)數(shù)器:在一段時(shí)間間隔內(nèi)(時(shí)間窗),處理請求的最大數(shù)量固定,超過部分不做處理。

          (2)漏桶:漏桶大小固定,處理速度固定,但請求進(jìn)入的速度不固定(在突發(fā)情況請求過多時(shí),會丟棄過多的請求)。

          (3)令牌桶:令牌桶的大小固定,令牌的產(chǎn)生速度固定,但是小耗令牌(請求)的速度不固定(可以應(yīng)對某些時(shí)間請求過多的情況)。每個(gè)請求都會從令牌桶中取出令牌,如果沒有令牌,就丟棄這次請求。

          ?計(jì)數(shù)器限流原理和Java參考實(shí)現(xiàn)

          計(jì)數(shù)器限流的原理非常簡單:在一個(gè)時(shí)間窗口(間隔)內(nèi),所處理的請求的最大數(shù)量是有限制的,對超過限制的部分請求不做處理。

          下面的代碼是計(jì)數(shù)器限流算法的一個(gè)簡單的演示實(shí)現(xiàn)和測試用例。

          package com.crazymaker.springcloud.ratelimit;
          ...
          //計(jì)速器,限速
          @Slf4j
          public class CounterLimiter
          {
          //起始時(shí)間
          private static long startTime = System.currentTimeMillis();
          //時(shí)間區(qū)間的時(shí)間間隔毫秒
          private static long interval = 1000;
          //每秒限制數(shù)量
          private static long maxCount = 2;
          //累加器
          private static AtomicLong accumulator = new AtomicLong();
          //計(jì)數(shù)判斷,是否超出限制
          private static long tryAcquire(long taskId, int turn)
          {
          long nowTime = System.currentTimeMillis();
          //在時(shí)間區(qū)間之內(nèi)
          if (nowTime < startTime + interval)
          {
          long count = accumulator.incrementAndGet();
          if (count <= maxCount)
          {
          return count;
          } else
          {
          return -count;
          }
          } else
          {
          //在時(shí)間區(qū)間之外
          synchronized (CounterLimiter.class)
          {
          log.info("新時(shí)間區(qū)到了,taskId{}, turn {}..", taskId, turn);
          //再一次判斷,防止重復(fù)初始化
          if (nowTime > startTime + interval)
          {
          accumulator.set(0);
          startTime = nowTime;
          }
          }
          return 0;
          }
          }
          //線程池,用于多線程模擬測試
          private ExecutorService pool = Executors.newFixedThreadPool(10);
          @Test
          public void testLimit()
          { //被限制的次數(shù)
          AtomicInteger limited = new AtomicInteger(0);
          //線程數(shù)
          final int threads = 2;
          //每條線程的執(zhí)行輪數(shù)
          final int turns = 20;
          //同步器
          CountDownLatch countDownLatch = new CountDownLatch(threads);
          long start = System.currentTimeMillis();
          for (int i = 0; i < threads; i++)
          {
          pool.submit(() ->
          {
          try
          {
          for (int j = 0; j < turns; j++)
          {
          long taskId = Thread.currentThread().getId();
          long index = tryAcquire(taskId, j);
          if (index <= 0)
          {
          //被限制的次數(shù)累積
          limited.getAndIncrement();
          }
          Thread.sleep(200);
          }
          } catch (Exception e)
          {
          e.printStackTrace();
          }
          //等待所有線程結(jié)束
          countDownLatch.countDown();
          });
          }
          try
          {
          countDownLatch.await();
          } catch (InterruptedException e)
          {
          e.printStackTrace();
          }
          float time = (System.currentTimeMillis() - start) / 1000F;
          //輸出統(tǒng)計(jì)結(jié)果
          log.info("限制的次數(shù)為:" + limited.get() +
          ",通過的次數(shù)為:" + (threads *turns - limited.get()));
          log.info("限制的比例為:" +
          (float) limited.get() / (float) (threads *turns));
          log.info("運(yùn)行的時(shí)長為:" + time);
          }
          }

          以上代碼使用兩條線程,每條線程各運(yùn)行20次,每一次運(yùn)行休眠200毫秒,總計(jì)耗時(shí)4秒,運(yùn)行40次,限流的輸出結(jié)果具體如下:

          [pool-2-thread-2] INFO c.c.s.ratelimit.CounterLimiter - 新時(shí)間區(qū)到了,taskId16, turn 5..
          [pool-2-thread-1] INFO c.c.s.ratelimit.CounterLimiter - 新時(shí)間區(qū)到了,taskId15, turn 5..
          [pool-2-thread-2] INFO c.c.s.ratelimit.CounterLimiter - 新時(shí)間區(qū)到了,taskId16, turn 10..
          [pool-2-thread-2] INFO c.c.s.ratelimit.CounterLimiter - 新時(shí)間區(qū)到了,taskId16, turn 15..
          [main] INFO c.c.s.ratelimit.CounterLimiter - 限制的次數(shù)為:32,通過的次數(shù)為:8
          [main] INFO c.c.s.ratelimit.CounterLimiter - 限制的比例為:0.8
          [main] INFO c.c.s.ratelimit.CounterLimiter - 運(yùn)行的時(shí)長為:4.104

          大家可以自行調(diào)整參數(shù),運(yùn)行以上自驗(yàn)證程序并觀察實(shí)驗(yàn)結(jié)果,體驗(yàn)一下計(jì)數(shù)器限流的效果。

          漏桶限流原理和Java參考實(shí)現(xiàn)

          漏桶限流的基本原理:水(對應(yīng)請求)從進(jìn)水口進(jìn)入漏桶里,漏桶以一定的速度出水(請求放行),當(dāng)水流入的速度過大時(shí),桶內(nèi)的總水量大于桶容量會直接溢出,請求被拒絕,如圖9-1所示。

          大致的漏桶限流規(guī)則如下:

          (1)水通過進(jìn)水口(對應(yīng)客戶端請求)以任意速率流入漏桶。

          (2)漏桶的容量是固定的,出水(放行)速率也是固定的。

          (3)漏桶容量是不變的,如果處理速度太慢,桶內(nèi)水量會超出桶的容量,后面流入的水就會溢出,表示請求拒絕。

          圖9-1 漏桶原理示意圖

          漏桶的Java參考實(shí)現(xiàn)代碼如下:

          package com.crazymaker.springcloud.ratelimit;
          //省略import
          //漏桶限流
          @Slf4j
          public class LeakBucketLimiter
          {
          //計(jì)算的起始時(shí)間
          private static long lastOutTime = System.currentTimeMillis();
          //流出速率每秒2次
          private static long rate = 2; //剩余的水量
          private static long water = 0;
          //返回值說明
          //false:沒有被限制到
          //true:被限流
          public static synchronized boolean tryAcquire(long taskId, int turn)
          {
          long nowTime = System.currentTimeMillis();
          //過去的時(shí)間
          long pastTime = nowTime - lastOutTime;
          //漏出水量,按照恒定的速度不斷流出
          //漏出的水 = 過去的時(shí)間 *預(yù)設(shè)速率
          long outWater = pastTime *rate / 1000;
          //剩余的水量 = 上次遺留的水量 - 漏出去的水
          water = water - outWater;
          log.info("water {} pastTime {} outWater {} ",
          water, pastTime, outWater);
          //糾正剩余的水量
          if (water < 0)
          {
          water = 0;
          }
          //若剩余的水量小于等于1,則放行
          if (water <= 1)
          {
          //更新起始時(shí)間,為了下次使用
          lastOutTime = nowTime;
          //增加遺留的水量
          water ++;
          return false;
          } else
          {
          //剩余的水量太大,被限流
          return true;
          }
          }
          //線程池,用于多線程模擬測試
          private ExecutorService pool = Executors.newFixedThreadPool(10);
          //測試用例
          @Test
          public void testLimit()
          {
          //測試用例太長,這里省略
          //90%的測試用例代碼與前面的計(jì)算器限流測試代碼相同
          //具體的源碼可參見瘋狂創(chuàng)客圈社群的開源庫
          }
          }

          以下是使用兩條線程,每條線程各運(yùn)行20次,每一次運(yùn)行休眠200毫秒,總計(jì)耗時(shí)4秒,運(yùn)行40次,部分輸出結(jié)果如下:

          [pool-2-thread-1] INFO c.c.s.r.LeakBucketLimiter - water 0 pastTime 75 outWater 0
          ...
          [pool-2-thread-1] INFO c.c.s.r.LeakBucketLimiter - water 1 pastTime 601 outWater 1
          ...
          [pool-2-thread-1] INFO c.c.s.r.LeakBucketLimiter - water 2 pastTime 416 outWater 0
          [pool-2-thread-2] INFO c.c.s.r.LeakBucketLimiter - water 1 pastTime 601 outWater 1
          [pool-2-thread-1] INFO c.c.s.r.LeakBucketLimiter - water 2 pastTime 15 outWater 0
          [pool-2-thread-2] INFO c.c.s.r.LeakBucketLimiter - water 2 pastTime 201 outWater 0 [pool-2-thread-1] INFO c.c.s.r.LeakBucketLimiter - water 2 pastTime 216 outWater 0
          [main] INFO c.c.s.r.LeakBucketLimiter - 限制的次數(shù)為:32,通過的次數(shù)為:8
          [main] INFO c.c.s.r.LeakBucketLimiter - 限制的比例為:0.8
          [main] INFO c.c.s.r.LeakBucketLimiter - 運(yùn)行的時(shí)長為:4.107

          漏桶的出水速度固定,也就是請求放行速度是固定的。故漏桶不能有效應(yīng)對突發(fā)流量,但是能起到平滑突發(fā)流量(整流)的作用。

          令牌桶限流原理和Java參考實(shí)現(xiàn)

          令牌桶算法以一個(gè)設(shè)定的速率產(chǎn)生令牌并放入令牌桶,每次用戶請求都得申請令牌,如果令牌不足,就會拒絕請求。

          在令牌桶算法中,新請求到來時(shí)會從桶里拿走一個(gè)令牌,如果桶內(nèi)沒有令牌可拿,就拒絕服務(wù)。當(dāng)然,令牌的數(shù)量是有上限的。令牌的數(shù)量與時(shí)間和發(fā)放速率強(qiáng)相關(guān),流逝的時(shí)間越長,會不斷往桶里加入越多令牌,如果令牌發(fā)放的速度比申請速度快,令牌桶就會放滿令牌,直到令牌占滿整個(gè)令牌桶,如圖9-2所示。

          另外,令牌的發(fā)送速率可以設(shè)置,從而可以對突發(fā)流量進(jìn)行有效的應(yīng)對。

          令牌桶限流大致的規(guī)則如下:

          (1)進(jìn)水口按照某個(gè)速度向桶中放入令牌。

          (2)令牌的容量是固定的,但是放行的速度是不固定的,只要桶中還有剩余令牌,一旦請求過來就能申請成功,然后放行。

          (3)如果令牌的發(fā)放速度慢于請求到來的速度,桶內(nèi)就無令牌可領(lǐng),請求就會被拒絕。

          圖9-2 令牌桶

          令牌桶的Java參考實(shí)現(xiàn)代碼如下:

          package com.crazymaker.springcloud.ratelimit;
          ...
          //令牌桶,限速
          @Slf4j
          public class TokenBucketLimiter
          {
          //上一次令牌發(fā)放的時(shí)間
          public long lastTime = System.currentTimeMillis();
          //桶的容量
          public int capacity = 2;
          //令牌生成速度個(gè)/秒
          public int rate = 2;
          //當(dāng)前令牌的數(shù)量
          public int tokens;
          //返回值說明
          //false:沒有被限制到
          //true:被限流
          public synchronized boolean tryAcquire(long taskId, int applyCount)
          {
          long now = System.currentTimeMillis();
          時(shí)間間隔
          單位為毫秒 //時(shí)間間隔,單位為毫秒
          long gap = now - lastTime;
          //當(dāng)前令牌數(shù)
          tokens = Math.min(capacity, (int) (tokens + gap *rate/ 1000));
          log.info("tokens {} capacity {} gap {} ", tokens, capacity, gap);
          if (tokens < applyCount)
          {
          //若拿不到令牌,則拒絕
          //log.info("被限流了.." + taskId + ", applyCount: " + applyCount);
          return true;
          } else
          {
          //還有令牌,領(lǐng)取令牌
          tokens -= applyCount;
          lastTime = now;
          //log.info("剩余令牌.." + tokens);
          return false;
          }
          }
          //線程池,用于多線程模擬測試
          private ExecutorService pool = Executors.newFixedThreadPool(10);
          @Test
          public void testLimit()
          {
          //被限制的次數(shù)
          AtomicInteger limited = new AtomicInteger(0);
          //線程數(shù)
          final int threads = 2;
          //每條線程的執(zhí)行輪數(shù)
          final int turns = 20;
          //同步器
          CountDownLatch countDownLatch = new CountDownLatch(threads);
          long start = System.currentTimeMillis();
          for (int i = 0; i < threads; i++)
          {
          pool.submit(() ->
          {
          try
          {
          for (int j = 0; j < turns; j++)
          {
          long taskId = Thread.currentThread().getId();
          boolean intercepted = tryAcquire(taskId, 1);
          if (intercepted)
          {
          //被限制的次數(shù)累積
          limited.getAndIncrement();
          }
          Thread.sleep(200);
          }
          } catch (Exception e)
          {
          e.printStackTrace();
          }
          //等待所有線程結(jié)束
          countDownLatch.countDown();
          });
          }
          try
          {
          countDownLatch.await();
          } catch (InterruptedException e)
          {
          e.printStackTrace();
          }
          float time = (System.currentTimeMillis() - start) / 1000F;
          //輸出統(tǒng)計(jì)結(jié)果
          log.info("限制的次數(shù)為:" + limited.get() +
          ",通過的次數(shù)為:" + (threads *turns - limited.get()));
          限制的比例為 log.info("限制的比例為:" +
          (float) limited.get() / (float) (threads *turns));
          log.info("運(yùn)行的時(shí)長為:" + time);
          }
          }

          運(yùn)行這個(gè)示例程序,部分結(jié)果如下:

          [pool-2-thread-2] INFO c.c.s.r.TokenBucketLimiter - tokens 0 capacity 2 gap 104
          [pool-2-thread-1] INFO c.c.s.r.TokenBucketLimiter - tokens 0 capacity 2 gap 114
          [pool-2-thread-2] INFO c.c.s.r.TokenBucketLimiter - tokens 0 capacity 2 gap 314
          [pool-2-thread-1] INFO c.c.s.r.TokenBucketLimiter - tokens 0 capacity 2 gap 314
          [pool-2-thread-2] INFO c.c.s.r.TokenBucketLimiter - tokens 1 capacity 2 gap 515
          [pool-2-thread-1] INFO c.c.s.r.TokenBucketLimiter - tokens 0 capacity 2 gap 0
          ...
          [pool-2-thread-2] INFO c.c.s.r.TokenBucketLimiter - tokens 0 capacity 2 gap 401
          [pool-2-thread-1] INFO c.c.s.r.TokenBucketLimiter - tokens 0 capacity 2 gap 402
          [main] INFO c.c.s.r.TokenBucketLimiter - 限制的次數(shù)為:34,通過的次數(shù)為:6
          [main] INFO c.c.s.r.TokenBucketLimiter - 限制的比例為:0.85
          [main] INFO c.c.s.r.TokenBucketLimiter - 運(yùn)行的時(shí)長為:4.119

          令牌桶的好處之一就是可以方便地應(yīng)對突發(fā)流量。比如,可以改變令牌的發(fā)放速度,算法能按照新的發(fā)送速率調(diào)大令牌的發(fā)放數(shù)量,使得突發(fā)流量能被處理。

          本文給大家講解的內(nèi)容是高并發(fā)核心編程,限流原理與實(shí)戰(zhàn),限流策略原理與參考實(shí)現(xiàn)

          1. 下篇文章給大家講解的是高并發(fā)核心編程,限流原理與實(shí)戰(zhàn),分布式計(jì)數(shù)器限流;

          2. 覺得文章不錯的朋友可以轉(zhuǎn)發(fā)此文關(guān)注小編;

          3. 感謝大家的支持!


          本文就是愿天堂沒有BUG給大家分享的內(nèi)容,大家有收獲的話可以分享下,想學(xué)習(xí)更多的話可以到微信公眾號里找我,我等你哦。

          瀏覽 46
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  人人摸人人看人人 | 荫蒂添出高潮A片视频 | 日日三级网| 成人先锋影音AV黄色电影网 | 国产肏逼视频 |