<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          一個(gè)公式看懂:為什么DUBBO線程池會(huì)打滿

          共 50591字,需瀏覽 102分鐘

           ·

          2021-04-25 10:12



          JAVA前線 


          歡迎大家關(guān)注公眾號(hào)「JAVA前線」查看更多精彩分享,主要包括源碼分析、實(shí)際應(yīng)用、架構(gòu)思維、職場分享、產(chǎn)品思考等等,同時(shí)也非常歡迎大家加我微信「java_front」一起交流學(xué)習(xí)


          0 文章概述

          大家可能都遇到過DUBBO線程池打滿這個(gè)問題,剛開始遇到這個(gè)問題可能會(huì)比較慌,常見方案可能就是重啟服務(wù),但也不知道重啟是否可以解決。我認(rèn)為重啟不僅不能解決問題,甚至有可能加劇問題,這是為什么呢?本文我們就一起分析DUBBO線程池打滿這個(gè)問題。



          1 基礎(chǔ)知識(shí)

          1.1 DUBBO線程模型

          1.1.1 基本概念

          DUBBO底層網(wǎng)絡(luò)通信采用Netty框架,我們編寫一個(gè)Netty服務(wù)端進(jìn)行觀察:

          public class NettyServer {
              public static void main(String[] args) throws Exception {
                  EventLoopGroup bossGroup = new NioEventLoopGroup(1);
                  EventLoopGroup workerGroup = new NioEventLoopGroup(8);
                  try {
                      ServerBootstrap bootstrap = new ServerBootstrap();
                      bootstrap.group(bossGroup, workerGroup)
                      .channel(NioServerSocketChannel.class)
                      .option(ChannelOption.SO_BACKLOG, 128)
                      .childOption(ChannelOption.SO_KEEPALIVEtrue)
                      .childHandler(new ChannelInitializer<SocketChannel>() 
          {
                          @Override
                          protected void initChannel(SocketChannel ch) throws Exception {
                              ch.pipeline().addLast(new NettyServerHandler());
                          }
                      });
                      ChannelFuture channelFuture = bootstrap.bind(7777).sync();
                      System.out.println("服務(wù)端準(zhǔn)備就緒");
                      channelFuture.channel().closeFuture().sync();
                  } catch (Exception ex) {
                      System.out.println(ex.getMessage());
                  } finally {
                      bossGroup.shutdownGracefully();
                      workerGroup.shutdownGracefully();
                  }
              }
          }

          BossGroup線程組只有一個(gè)線程處理客戶端連接請(qǐng)求,連接完成后將完成三次握手的SocketChannel連接分發(fā)給WorkerGroup處理讀寫請(qǐng)求,這兩個(gè)線程組被稱為「IO線程」。

          我們?cè)僖觥笜I(yè)務(wù)線程」這個(gè)概念。服務(wù)生產(chǎn)者接收到請(qǐng)求后,如果處理邏輯可以快速處理完成,那么可以直接放在IO線程處理,從而減少線程池調(diào)度與上下文切換。但是如果處理邏輯非常耗時(shí),或者會(huì)發(fā)起新IO請(qǐng)求例如查詢數(shù)據(jù)庫,那么必須派發(fā)到業(yè)務(wù)線程池處理。

          DUBBO提供了多種線程模型,選擇線程模型需要在配置文件指定dispatcher屬性:

          <dubbo:protocol name="dubbo" dispatcher="all" />
          <dubbo:protocol name="dubbo" dispatcher="direct" />
          <dubbo:protocol name="dubbo" dispatcher="message" />
          <dubbo:protocol name="dubbo" dispatcher="execution" />
          <dubbo:protocol name="dubbo" dispatcher="connection" />

          不同線程模型在選擇是使用IO線程還是業(yè)務(wù)線程,DUBBO官網(wǎng)文檔說明:

          all
          所有消息都派發(fā)到業(yè)務(wù)線程池,包括請(qǐng)求,響應(yīng),連接事件,斷開事件,心跳

          direct
          所有消息都不派發(fā)到業(yè)務(wù)線程池,全部在IO線程直接執(zhí)行

          message
          只有請(qǐng)求響應(yīng)消息派發(fā)到業(yè)務(wù)線程池,其它連接斷開事件,心跳等消息直接在IO線程執(zhí)行

          execution
          只有請(qǐng)求消息派發(fā)到業(yè)務(wù)線程池,響應(yīng)和其它連接斷開事件,心跳等消息直接在IO線程執(zhí)行

          connection
          在IO線程上將連接斷開事件放入隊(duì)列,有序逐個(gè)執(zhí)行,其它消息派發(fā)到業(yè)務(wù)線程池

          1.1.2 確定時(shí)機(jī)

          生產(chǎn)者和消費(fèi)者在初始化時(shí)確定線程模型:

          // 生產(chǎn)者
          public class NettyServer extends AbstractServer implements Server {
              public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
                  super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
              }
          }

          // 消費(fèi)者
          public class NettyClient extends AbstractClient {
              public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
               super(url, wrapChannelHandler(url, handler));
              }
          }

          生產(chǎn)者和消費(fèi)者默認(rèn)線程模型都會(huì)使用AllDispatcher,ChannelHandlers.wrap方法可以獲取Dispatch自適應(yīng)擴(kuò)展點(diǎn)。如果我們?cè)谂渲梦募兄付╠ispatcher,擴(kuò)展點(diǎn)加載器會(huì)從URL獲取屬性值加載對(duì)應(yīng)線程模型。本文以生產(chǎn)者為例進(jìn)行分析:

          public class NettyServer extends AbstractServer implements Server {
              public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
                  // ChannelHandlers.wrap確定線程策略
                  super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
              }
          }

          public class ChannelHandlers {
              protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
                  return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handlerurl)));
              }
          }

          @SPI(AllDispatcher.NAME)
          public interface Dispatcher {
              @Adaptive({Constants.DISPATCHER_KEY, "channel.handler"})
              ChannelHandler dispatch(ChannelHandler handler, URL url);
          }

          1.1.3 源碼分析

          我們分析其中兩個(gè)線程模型源碼,其它線程模型請(qǐng)閱讀DUBBO源碼。AllDispatcher模型所有消息都派發(fā)到業(yè)務(wù)線程池,包括請(qǐng)求,響應(yīng),連接事件,斷開事件,心跳:

          public class AllDispatcher implements Dispatcher {

              // 線程模型名稱
              public static final String NAME = "all";

              // 具體實(shí)現(xiàn)策略
              @Override
              public ChannelHandler dispatch(ChannelHandler handler, URL url) {
                  return new AllChannelHandler(handler, url);
              }
          }


          public class AllChannelHandler extends WrappedChannelHandler {

              @Override
              public void connected(Channel channel) throws RemotingException {
                  // 連接完成事件交給業(yè)務(wù)線程池
                  ExecutorService cexecutor = getExecutorService();
                  try {
                      cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
                  } catch (Throwable t) {
                      throw new ExecutionException("connect event", channel, getClass() + " error when process connected event", t);
                  }
              }

              @Override
              public void disconnected(Channel channel) throws RemotingException {
                  // 斷開連接事件交給業(yè)務(wù)線程池
                  ExecutorService cexecutor = getExecutorService();
                  try {
                      cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
                  } catch (Throwable t) {
                      throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event", t);
                  }
              }

              @Override
              public void received(Channel channel, Object message) throws RemotingException {
                  // 請(qǐng)求響應(yīng)事件交給業(yè)務(wù)線程池
                  ExecutorService cexecutor = getExecutorService();
                  try {
                      cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
                  } catch (Throwable t) {
                      if(message instanceof Request && t instanceof RejectedExecutionException) {
                          Request request = (Request)message;
                          if(request.isTwoWay()) {
                              String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
                              Response response = new Response(request.getId(), request.getVersion());
                              response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                              response.setErrorMessage(msg);
                              channel.send(response);
                              return;
                          }
                      }
                      throw new ExecutionException(message, channel, getClass() + " error when process received event", t);
                  }
              }

              @Override
              public void caught(Channel channel, Throwable exception) throws RemotingException {
                  // 異常事件交給業(yè)務(wù)線程池
                  ExecutorService cexecutor = getExecutorService();
                  try {
                      cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
                  } catch (Throwable t) {
                      throw new ExecutionException("caught event", channel, getClass() + " error when process caught event", t);
                  }
              }
          }

          DirectDispatcher策略所有消息都不派發(fā)到業(yè)務(wù)線程池,全部在IO線程直接執(zhí)行:

          public class DirectDispatcher implements Dispatcher {

              // 線程模型名稱
              public static final String NAME = "direct";

              // 具體實(shí)現(xiàn)策略
              @Override
              public ChannelHandler dispatch(ChannelHandler handler, URL url) {
                  // 直接返回handler表示所有事件都交給IO線程處理
                  return handler;
              }
          }

          1.2 DUBBO線程池策略

          1.2.1 基本概念

          上個(gè)章節(jié)分析了線程模型,我們知道不同的線程模型會(huì)選擇使用還是IO線程還是業(yè)務(wù)線程。如果使用業(yè)務(wù)線程池,那么使用什么線程池策略是本章節(jié)需要回答的問題。DUBBO官網(wǎng)線程派發(fā)模型圖展示了線程模型和線程池策略的關(guān)系:



          DUBBO提供了多種線程池策略,選擇線程池策略需要在配置文件指定threadpool屬性:

          <dubbo:protocol name="dubbo" threadpool="fixed" threads="100" />
          <dubbo:protocol name="dubbo" threadpool="cached" threads="100" />
          <dubbo:protocol name="dubbo" threadpool="limited" threads="100" />
          <dubbo:protocol name="dubbo" threadpool="eager" threads="100" />

          不同線程池策略會(huì)創(chuàng)建不同特性的線程池:

          fixed
          包含固定個(gè)數(shù)線程

          cached
          線程空閑一分鐘會(huì)被回收,當(dāng)新請(qǐng)求到來時(shí)會(huì)創(chuàng)建新線程

          limited
          線程個(gè)數(shù)隨著任務(wù)增加而增加,但不會(huì)超過最大閾值。空閑線程不會(huì)被回收

          eager
          當(dāng)所有核心線程數(shù)都處于忙碌狀態(tài)時(shí),優(yōu)先創(chuàng)建新線程執(zhí)行任務(wù),而不是立即放入隊(duì)列

          1.2.2 確定時(shí)機(jī)

          本文我們以AllDispatcher為例分析線程池策略在什么時(shí)候確定:

          public class AllDispatcher implements Dispatcher {
              public static final String NAME = "all";

              @Override
              public ChannelHandler dispatch(ChannelHandler handler, URL url) {
                  return new AllChannelHandler(handler, url);
              }
          }

          public class AllChannelHandler extends WrappedChannelHandler {
              public AllChannelHandler(ChannelHandler handler, URL url) {
                  super(handler, url);
              }
          }

          在WrappedChannelHandler構(gòu)造函數(shù)中如果配置指定了threadpool屬性,擴(kuò)展點(diǎn)加載器會(huì)從URL獲取屬性值加載對(duì)應(yīng)線程池策略,默認(rèn)策略為fixed:

          public class WrappedChannelHandler implements ChannelHandlerDelegate {

              public WrappedChannelHandler(ChannelHandler handler, URL url) {
                  this.handler = handler;
                  this.url = url;
                  // 獲取線程池自適應(yīng)擴(kuò)展點(diǎn)
                  executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
                  String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
                  if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
                      componentKey = Constants.CONSUMER_SIDE;
                  }
                  DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
                  dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
              }
          }

          @SPI("fixed")
          public interface ThreadPool {
              @Adaptive({Constants.THREADPOOL_KEY})
              Executor getExecutor(URL url);
          }

          1.2.3 源碼分析

          (1) FixedThreadPool

          public class FixedThreadPool implements ThreadPool {

              @Override
              public Executor getExecutor(URL url) {

                  // 線程名稱
                  String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);

                  // 線程個(gè)數(shù)默認(rèn)200
                  int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);

                  // 隊(duì)列容量默認(rèn)0
                  int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);

                  // 隊(duì)列容量等于0使用阻塞隊(duì)列SynchronousQueue
                  // 隊(duì)列容量小于0使用無界阻塞隊(duì)列LinkedBlockingQueue
                  // 隊(duì)列容量大于0使用有界阻塞隊(duì)列LinkedBlockingQueue
                  return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                                                queues == 0 ? new SynchronousQueue<Runnable>()
                                                : (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                                   : new LinkedBlockingQueue<Runnable>(queues)),
                                                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
              }
          }

          (2) CachedThreadPool

          public class CachedThreadPool implements ThreadPool {

              @Override
              public Executor getExecutor(URL url) {

                  // 獲取線程名稱
                  String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);

                  // 核心線程數(shù)默認(rèn)0
                  int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);

                  // 最大線程數(shù)默認(rèn)Int最大值
                  int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);

                  // 隊(duì)列容量默認(rèn)0
                  int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);

                  // 線程空閑多少時(shí)間被回收默認(rèn)1分鐘
                  int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);

                  // 隊(duì)列容量等于0使用阻塞隊(duì)列SynchronousQueue
                  // 隊(duì)列容量小于0使用無界阻塞隊(duì)列LinkedBlockingQueue
                  // 隊(duì)列容量大于0使用有界阻塞隊(duì)列LinkedBlockingQueue
                  return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                                                queues == 0 ? new SynchronousQueue<Runnable>()
                                                : (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                                   : new LinkedBlockingQueue<Runnable>(queues)),
                                                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
              }
          }

          (3) LimitedThreadPool

          public class LimitedThreadPool implements ThreadPool {

              @Override
              public Executor getExecutor(URL url) {

                  // 獲取線程名稱
                  String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);

                  // 核心線程數(shù)默認(rèn)0
                  int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);

                  // 最大線程數(shù)默認(rèn)200
                  int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);

                  // 隊(duì)列容量默認(rèn)0
                  int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);

                  // 隊(duì)列容量等于0使用阻塞隊(duì)列SynchronousQueue
                  // 隊(duì)列容量小于0使用無界阻塞隊(duì)列LinkedBlockingQueue
                  // 隊(duì)列容量大于0使用有界阻塞隊(duì)列LinkedBlockingQueue
                  // keepalive時(shí)間設(shè)置Long.MAX_VALUE表示不回收空閑線程
                  return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
                                                queues == 0 ? new SynchronousQueue<Runnable>()
                                                : (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                                   : new LinkedBlockingQueue<Runnable>(queues)),
                                                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
              }
          }

          (4) EagerThreadPool

          我們知道ThreadPoolExecutor是普通線程執(zhí)行器。當(dāng)線程池核心線程達(dá)到閾值時(shí)新任務(wù)放入隊(duì)列,當(dāng)隊(duì)列已滿開啟新線程處理,當(dāng)前線程數(shù)達(dá)到最大線程數(shù)時(shí)執(zhí)行拒絕策略。

          但是EagerThreadPool自定義線程執(zhí)行策略,當(dāng)線程池核心線程達(dá)到閾值時(shí),新任務(wù)不會(huì)放入隊(duì)列而是開啟新線程進(jìn)行處理(要求當(dāng)前線程數(shù)沒有超過最大線程數(shù))。當(dāng)前線程數(shù)達(dá)到最大線程數(shù)時(shí)任務(wù)放入隊(duì)列。

          public class EagerThreadPool implements ThreadPool {

              @Override
              public Executor getExecutor(URL url) {

                  // 線程名
                  String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);

                  // 核心線程數(shù)默認(rèn)0
                  int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);

                  // 最大線程數(shù)默認(rèn)Int最大值
                  int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);

                  // 隊(duì)列容量默認(rèn)0
                  int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);

                  // 線程空閑多少時(shí)間被回收默認(rèn)1分鐘
                  int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);

                  // 初始化自定義線程池和隊(duì)列重寫相關(guān)方法
                  TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
                  EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
                          threads,
                          alive,
                          TimeUnit.MILLISECONDS,
                          taskQueue,
                          new NamedInternalThreadFactory(name, true),
                          new AbortPolicyWithReport(name, url));
                  taskQueue.setExecutor(executor);
                  return executor;
              }
          }

          1.3 一個(gè)公式

          現(xiàn)在我們知道DUBBO會(huì)選擇線程池策略進(jìn)行業(yè)務(wù)處理,那么應(yīng)該如何估算可能產(chǎn)生的線程數(shù)呢?我們首先分析一個(gè)問題:一個(gè)公司有7200名員工,每天上班打卡時(shí)間是早上8點(diǎn)到8點(diǎn)30分,每次打卡時(shí)間系統(tǒng)執(zhí)行時(shí)長為5秒。請(qǐng)問RT、QPS、并發(fā)量分別是多少?

          RT表示響應(yīng)時(shí)間,問題已經(jīng)告訴了我們答案:

          RT = 5

          QPS表示每秒查詢量,假設(shè)簽到行為平均分布:

          QPS = 7200 / (30 * 60) = 4

          并發(fā)量表示系統(tǒng)同時(shí)處理的請(qǐng)求數(shù)量:

          并發(fā)量 = QPS x RT = 4 x 5 = 20

          根據(jù)上述實(shí)例引出如下公式:

          并發(fā)量 = QPS x RT

          如果系統(tǒng)為每一個(gè)請(qǐng)求分配一個(gè)處理線程,那么并發(fā)量可以近似等于線程數(shù)?;谏鲜龉讲浑y看出并發(fā)量受QPS和RT影響,這兩個(gè)指標(biāo)任意一個(gè)上升就會(huì)導(dǎo)致并發(fā)量上升。

          但是這只是理想情況,因?yàn)椴l(fā)量受限于系統(tǒng)能力而不可能持續(xù)上升,例如DUBBO線程池就對(duì)線程數(shù)做了限制,超出最大線程數(shù)限制則會(huì)執(zhí)行拒絕策略,而拒絕策略會(huì)提示線程池已滿,這就是DUBBO線程池打滿問題的根源。下面我們分析RT上升和QPS上升這兩個(gè)原因。


          2 RT上升

          2.1 生產(chǎn)者發(fā)生慢服務(wù)

          2.1.1 原因分析

          (1) 生產(chǎn)者配置

          <beans>
              <dubbo:registry address="zookeeper://127.0.0.1:2181" />
              <dubbo:protocol name="dubbo" port="9999" />
              <dubbo:service interface="com.java.front.dubbo.demo.provider.HelloService" ref="helloService" />
          </beans>    

          (2) 生產(chǎn)者業(yè)務(wù)

          package com.java.front.dubbo.demo.provider;
          public interface HelloService {
              public String sayHello(String name) throws Exception;
          }

          public class HelloServiceImpl implements HelloService {
              public String sayHello(String name) throws Exception {
                  String result = "hello[" + name + "]";
                  // 模擬慢服務(wù)
                 Thread.sleep(10000L); 
                 System.out.println("生產(chǎn)者執(zhí)行結(jié)果" + result);
                 return result;
              }
          }

          (3) 消費(fèi)者配置

          <beans>
              <dubbo:registry address="zookeeper://127.0.0.1:2181" />
              <dubbo:reference id="helloService" interface="com.java.front.dubbo.demo.provider.HelloService" />
          </beans>    

          (4) 消費(fèi)者業(yè)務(wù)

          public class Consumer {

              @Test
              public void testThread() {
                  ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:METAINF/spring/dubbo-consumer.xml" });
                  context.start();
                  for (int i = 0; i < 500; i++) {
                      new Thread(new Runnable() {
                          @Override
                          public void run() {
                              HelloService helloService = (HelloService) context.getBean("helloService");
                              String result;
                              try {
                                  result = helloService.sayHello("微信公眾號(hào)「JAVA前線」");
                                  System.out.println("客戶端收到結(jié)果" + result);
                              } catch (Exception e) {
                                  System.out.println(e.getMessage());
                              }
                          }
                      }).start();
                  }
              }
          }

          依次運(yùn)行生產(chǎn)者和消費(fèi)者代碼,會(huì)發(fā)現(xiàn)日志中出現(xiàn)報(bào)錯(cuò)信息。生產(chǎn)者日志會(huì)打印線程池已滿:

          Caused by: java.util.concurrent.RejectedExecutionException: Thread pool is EXHAUSTED! Thread Name: DubboServerHandler-x.x.x.x:9999, Pool Size: 200 (active: 200, core: 200, max: 200, largest: 200), Task: 201 (completed: 1), Executor status:(isShutdown:false, isTerminated:false, isTerminating:false), in dubbo://x.x.x.x:9999!
          at org.apache.dubbo.common.threadpool.support.AbortPolicyWithReport.rejectedExecution(AbortPolicyWithReport.java:67)
          at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
          at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
          at org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandler.caught(AllChannelHandler.java:88)

          消費(fèi)者日志不僅會(huì)打印線程池已滿,還會(huì)打印服務(wù)提供者信息和調(diào)用方法,我們可以根據(jù)日志找到哪一個(gè)方法有問題:

          Failed to invoke the method sayHello in the service com.java.front.dubbo.demo.provider.HelloService. 
          Tried 3 times of the providers [x.x.x.x:9999] (1/1) from the registry 127.0.0.1:2181 on the consumer x.x.x.x 
          using the dubbo version 2.7.0-SNAPSHOT. Last error is: Failed to invoke remote method: sayHello, 
          provider: dubbo://x.x.x.x:9999/com.java.front.dubbo.demo.provider.HelloService?anyhost=true&application=xpz-consumer1&check=false&dubbo=2.0.2&generic=false&group=&interface=com.java.front.dubbo.demo.provider.HelloService&logger=log4j&methods=sayHello&pid=33432&register.ip=x.x.x.x&release=2.7.0-SNAPSHOT&remote.application=xpz-provider&remote.timestamp=1618632597509&side=consumer&timeout=100000000&timestamp=1618632617392, 
          cause: Server side(x.x.x.x,9999) threadpool is exhausted ,detail msg:Thread pool is EXHAUSTED! Thread Name: DubboServerHandler-x.x.x.x:9999, Pool Size: 200 (active: 200, core: 200, max: 200, largest: 200), Task: 401 (completed: 201), Executor status:(isShutdown:false, isTerminated:false, isTerminating:false), in dubbo://x.x.x.x:9999!

          2.1.2 解決方案

          (1) 找出慢服務(wù)

          DUBBO線程池打滿時(shí)會(huì)執(zhí)行拒絕策略:

          public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
              protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);
              private final String threadName;
              private final URL url;
              private static volatile long lastPrintTime = 0;
              private static Semaphore guard = new Semaphore(1);

              public AbortPolicyWithReport(String threadName, URL url) {
                  this.threadName = threadName;
                  this.url = url;
              }

              @Override
              public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                  String msg = String.format("Thread pool is EXHAUSTED!" +
                                             " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
                                             " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
                                             threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
                                             e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
                                             url.getProtocol(), url.getIp(), url.getPort());
                  logger.warn(msg);
                  // 打印線程快照
                  dumpJStack();
                  throw new RejectedExecutionException(msg);
              }

              private void dumpJStack() {
                  long now = System.currentTimeMillis();

                  // 每10分鐘輸出線程快照
                  if (now - lastPrintTime < 10 * 60 * 1000) {
                      return;
                  }
                  if (!guard.tryAcquire()) {
                      return;
                  }

                  ExecutorService pool = Executors.newSingleThreadExecutor();
                  pool.execute(() -> {
                      String dumpPath = url.getParameter(Constants.DUMP_DIRECTORY, System.getProperty("user.home"));
                      System.out.println("AbortPolicyWithReport dumpJStack directory=" + dumpPath);
                      SimpleDateFormat sdf;
                      String os = System.getProperty("os.name").toLowerCase();

                      // linux文件位置/home/xxx/Dubbo_JStack.log.2021-01-01_20:50:15
                      // windows文件位置/user/xxx/Dubbo_JStack.log.2020-01-01_20-50-15
                      if (os.contains("win")) {
                          sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
                      } else {
                          sdf = new SimpleDateFormat("yyyy-MM-dd_HH:mm:ss");
                      }
                      String dateStr = sdf.format(new Date());
                      try (FileOutputStream jStackStream = new FileOutputStream(new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr))) {
                          JVMUtil.jstack(jStackStream);
                      } catch (Throwable t) {
                          logger.error("dump jStack error", t);
                      } finally {
                          guard.release();
                      }
                      lastPrintTime = System.currentTimeMillis();
                  });
                  pool.shutdown();
              }
          }

          拒絕策略會(huì)輸出線程快照文件,在分析線程快照文件時(shí)BLOCKED和TIMED_WAITING線程狀態(tài)需要我們重點(diǎn)關(guān)注。如果發(fā)現(xiàn)大量線程阻塞或者等待狀態(tài)則可以定位到具體代碼行:

          DubboServerHandler-x.x.x.x:9999-thread-200 Id=230 TIMED_WAITING
          at java.lang.Thread.sleep(Native Method)
          at com.java.front.dubbo.demo.provider.HelloServiceImpl.sayHello(HelloServiceImpl.java:13)
          at org.apache.dubbo.common.bytecode.Wrapper1.invokeMethod(Wrapper1.java)
          at org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory$1.doInvoke(JavassistProxyFactory.java:56)
          at org.apache.dubbo.rpc.proxy.AbstractProxyInvoker.invoke(AbstractProxyInvoker.java:85)
          at org.apache.dubbo.config.invoker.DelegateProviderMetaDataInvoker.invoke(DelegateProviderMetaDataInvoker.java:56)
          at org.apache.dubbo.rpc.protocol.InvokerWrapper.invoke(InvokerWrapper.java:56)

          (2) 優(yōu)化慢服務(wù)

          現(xiàn)在已經(jīng)找到了慢服務(wù),此時(shí)我們就可以優(yōu)化慢服務(wù)了。優(yōu)化慢服務(wù)就需要具體問題具體分析了,這不是本文的重點(diǎn)在此不進(jìn)行展開。


          2.2 生產(chǎn)者預(yù)熱不充分

          2.2.1 原因分析

          還有一種RT上升的情況是我們不能忽視的,這種情況就是提供者重啟后預(yù)熱不充分即被調(diào)用。因?yàn)楫?dāng)生產(chǎn)者剛啟動(dòng)時(shí)需要預(yù)熱,需要和其它資源例如數(shù)據(jù)庫、緩存等建立連接,建立連接是需要時(shí)間的。如果此時(shí)大量消費(fèi)者請(qǐng)求到未預(yù)熱的生產(chǎn)者,鏈路時(shí)間增加了連接時(shí)間,RT時(shí)間必然會(huì)增加,從而也會(huì)導(dǎo)致DUBBO線程池打滿問題。


          2.2.2 解決方案

          (1) 等待生產(chǎn)者充分預(yù)熱

          因?yàn)樯a(chǎn)者預(yù)熱不充分導(dǎo)致線程池打滿問題,最容易發(fā)生在系統(tǒng)發(fā)布時(shí)。例如發(fā)布了一臺(tái)機(jī)器后發(fā)現(xiàn)線上出現(xiàn)線程池打滿問題,千萬不要著急重啟機(jī)器,而是給機(jī)器一段時(shí)間預(yù)熱,等連接建立后問題大概率消失。同時(shí)我們?cè)诎l(fā)布時(shí)也要分多批次發(fā)布,不要一次發(fā)布太多機(jī)器導(dǎo)致服務(wù)因?yàn)轭A(yù)熱問題造成大面積影響。


          (2) DUBBO升級(jí)版本大于等于2.7.4

          DUBBO消費(fèi)者在調(diào)用選擇生產(chǎn)者時(shí)本身就會(huì)執(zhí)行預(yù)熱邏輯,為什么還會(huì)出現(xiàn)預(yù)熱不充分問題?這是因?yàn)?.5.5之前版本以及2.7.2版本預(yù)熱機(jī)制是有問題的,簡而言之就是獲取啟動(dòng)時(shí)間不正確,2.7.4版本徹底解決了這個(gè)問題,所以我們要避免使用問題版本。下面我們閱讀2.7.0版本預(yù)熱機(jī)制源碼,看看預(yù)熱機(jī)制如何生效:

          public class RandomLoadBalance extends AbstractLoadBalance {

              public static final String NAME = "random";

              @Override
              protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {

                  // invokers數(shù)量
                  int length = invokers.size();

                  // 權(quán)重是否相同
                  boolean sameWeight = true;

                  // invokers權(quán)重?cái)?shù)組
                  int[] weights = new int[length];

                  // 第一個(gè)invoker權(quán)重
                  int firstWeight = getWeight(invokers.get(0), invocation);
                  weights[0] = firstWeight;

                  // 權(quán)重值之和
                  int totalWeight = firstWeight;
                  for (int i = 1; i < length; i++) {
                      // 計(jì)算權(quán)重值
                      int weight = getWeight(invokers.get(i), invocation);
                      weights[i] = weight;
                      totalWeight += weight;

                      // 任意一個(gè)invoker權(quán)重值不等于第一個(gè)invoker權(quán)重值則sameWeight設(shè)置為FALSE
                      if (sameWeight && weight != firstWeight) {
                          sameWeight = false;
                      }
                  }
                  // 權(quán)重值不等則根據(jù)總權(quán)重值計(jì)算
                  if (totalWeight > 0 && !sameWeight) {
                      int offset = ThreadLocalRandom.current().nextInt(totalWeight);
                      // 不斷減去權(quán)重值當(dāng)小于0時(shí)直接返回
                      for (int i = 0; i < length; i++) {
                          offset -= weights[i];
                          if (offset < 0) {
                              return invokers.get(i);
                          }
                      }
                  }
                  // 所有服務(wù)權(quán)重值一致則隨機(jī)返回
                  return invokers.get(ThreadLocalRandom.current().nextInt(length));
              }
          }

          public abstract class AbstractLoadBalance implements LoadBalance {

              static int calculateWarmupWeight(int uptime, int warmup, int weight) {
                  // uptime/(warmup*weight)
                  // 如果當(dāng)前服務(wù)提供者沒過預(yù)熱期,用戶設(shè)置的權(quán)重將通過uptime/warmup減小
                  // 如果服務(wù)提供者設(shè)置權(quán)重很大但是還沒過預(yù)熱時(shí)間,重新計(jì)算權(quán)重會(huì)很小
                  int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
                  return ww < 1 ? 1 : (ww > weight ? weight : ww);
              }

              protected int getWeight(Invoker<?> invoker, Invocation invocation) {

                  // 獲取invoker設(shè)置權(quán)重值默認(rèn)權(quán)重=100
                  int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);

                  // 如果權(quán)重大于0
                  if (weight > 0) {

                      // 服務(wù)提供者發(fā)布服務(wù)時(shí)間戳
                      long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
                      if (timestamp > 0L) {

                          // 服務(wù)已經(jīng)發(fā)布多少時(shí)間
                          int uptime = (int) (System.currentTimeMillis() - timestamp);

                          // 預(yù)熱時(shí)間默認(rèn)10分鐘
                          int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);

                          // 生產(chǎn)者發(fā)布時(shí)間大于0但是小于預(yù)熱時(shí)間
                          if (uptime > 0 && uptime < warmup) {

                              // 重新計(jì)算權(quán)重值
                              weight = calculateWarmupWeight(uptime, warmup, weight);
                          }
                      }
                  }
                  // 服務(wù)發(fā)布時(shí)間大于預(yù)熱時(shí)間直接返回設(shè)置權(quán)重值
                  return weight >= 0 ? weight : 0;
              }
          }

          3 QPS上升

          上面章節(jié)大篇幅討論了由于RT上升造成的線程池打滿問題,現(xiàn)在我們討論另一個(gè)參數(shù)QPS。當(dāng)上游流量激增會(huì)導(dǎo)致創(chuàng)建大量線程池,也會(huì)造成線程池打滿問題。這時(shí)如果發(fā)現(xiàn)QPS超出了系統(tǒng)承受能力,我們不得不采用降級(jí)方案保護(hù)系統(tǒng),請(qǐng)參看我之前文章《從反脆弱角度談技術(shù)系統(tǒng)的高可用性》


          4 文章總結(jié)

          本文首先介紹了DUBBO線程模型和線程池策略,然后我們引出了公式,發(fā)現(xiàn)并發(fā)量受RT和QPS兩個(gè)參數(shù)影響,這兩個(gè)參數(shù)任意一個(gè)上升都可以造成線程池打滿問題。生產(chǎn)者出現(xiàn)慢服務(wù)或者預(yù)熱不充分都有可能造成RT上升,而上游流量激增會(huì)造成QPS上升,同時(shí)本文也給出了解決方案。DUBBO線程池打滿是一個(gè)必須重視的問題,希望本文對(duì)大家有所幫助。




          JAVA前線 


          歡迎大家關(guān)注公眾號(hào)「JAVA前線」查看更多精彩分享,主要包括源碼分析、實(shí)際應(yīng)用、架構(gòu)思維、職場分享、產(chǎn)品思考等等,同時(shí)也非常歡迎大家加我微信「java_front」一起交流學(xué)習(xí)


          瀏覽 34
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲精品另类 | 亚洲无码高清视频在线 | 午夜水蜜桃 | 99黄色视频| 欧美精品成人网站在线 |