<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          DUBBO消費(fèi)異步化實(shí)例與原理

          共 57556字,需瀏覽 116分鐘

           ·

          2021-05-11 17:59


          JAVA前線(xiàn) 


          歡迎大家關(guān)注公眾號(hào)「JAVA前線(xiàn)」查看更多精彩分享,主要包括源碼分析、實(shí)際應(yīng)用、架構(gòu)思維、職場(chǎng)分享、產(chǎn)品思考等等,同時(shí)也非常歡迎大家加我微信「java_front」一起交流學(xué)習(xí)



          1 文章概述

          我們?cè)诜?wù)端開(kāi)發(fā)時(shí)如果需要實(shí)現(xiàn)異步調(diào)用,首先聲明一個(gè)線(xiàn)程池,并將調(diào)用業(yè)務(wù)方法封裝成一個(gè)任務(wù)提交至線(xiàn)程池,如果不需要獲取返回值則封裝為Runnable,需要獲取返回值則封裝為Callable并通過(guò)Future對(duì)象接受結(jié)果。

          class CalcTask1 implements Callable<Integer{

              @Override
              public Integer call() throws Exception {
                  System.out.println("task1耗時(shí)計(jì)算");
                  Thread.sleep(1000L);
                  return 100;
              }
          }

          class CalcTask2 implements Callable<Integer{

              @Override
              public Integer call() throws Exception {
                  System.out.println("task2耗時(shí)計(jì)算");
                  Thread.sleep(3000L);
                  return 200;
              }
          }

          public class CallableTest {

              public static void test1() throws Exception {
                  ExecutorService executor = Executors.newCachedThreadPool();
                  CalcTask1 task1 = new CalcTask1();
                  Future<Integer> f1 = executor.submit(task1);
                  CalcTask2 task2 = new CalcTask2();
                  Future<Integer> f2 = executor.submit(task2);
                  Integer result1 = f1.get();
                  Integer result2 = f2.get();
                  System.out.println("final result=" + (result1 + result2));
                  executor.shutdown();
              }

              public static void test2() throws Exception {
                  ExecutorService executor = Executors.newCachedThreadPool();
                  List<Callable<Integer>> tasks = new ArrayList<Callable<Integer>>();
                  CalcTask1 task1 = new CalcTask1();
                  CalcTask2 task2 = new CalcTask2();
                  tasks.add(task1);
                  tasks.add(task2);
                  for (int i = 0; i < tasks.size(); i++) {
                      Future<Integer> future = executor.submit(tasks.get(i));
                      System.out.println("result=" + future.get());
                  }
                  executor.shutdown();
              }
          }

          1.1 什么是消費(fèi)異步化

          在使用DUBBO進(jìn)行異步化調(diào)用時(shí)不需要這么麻煩,DUBBO基于NIO非阻塞能力使得服務(wù)消費(fèi)者無(wú)需啟用多線(xiàn)程就可以實(shí)現(xiàn)并行調(diào)用多個(gè)服務(wù),在此我們給出基于2.7.0版本調(diào)用實(shí)例。

          1.1.1 生產(chǎn)者

          (1) 服務(wù)聲明

          public interface CalcSumService {
              public Integer sum(int a, int b);
          }

          public class CalcSumServiceImpl implements CalcSumService {

              @Override
              public Integer sum(int a, int b) {
                  return a + b;
              }
          }

          public interface CalcSubtractionService {
              public Integer subtraction(int a, int b);
          }

          public class CalcSubtractionServiceImpl implements CalcSubtractionService {

              @Override
              public Integer subtraction(int a, int b) {
                  return a - b;
              }
          }

          (2) 配置文件

          <beans>
            <dubbo:application name="java-front-provider" />
            <dubbo:registry address="zookeeper://127.0.0.1:2181" />
            <dubbo:protocol name="dubbo" port="9999" />
            <bean id="calcSumService" class="com.java.front.dubbo.demo.provider.service.CalcSumServiceImpl" />
            <bean id="calcSubtractionService" class="com.java.front.dubbo.demo.provider.service.CalcSubtractionServiceImpl" />
            <dubbo:service interface="com.java.front.dubbo.demo.provider.service.CalcSumService" ref="calcSumService" />
            <dubbo:service interface="com.java.front.dubbo.demo.provider.service.CalcSubtractionService" ref="calcSubtractionService" />
          </beans>

          (3) 服務(wù)發(fā)布

          public class Provider {
              public static void main(String[] args) throws Exception {
                  ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath*:META-INF/spring/dubbo-provider.xml");
                  context.start();
                  System.out.println(context);
                  System.in.read();
              }
          }

          1.1.2 消費(fèi)者

          (1) 配置文件

          <beans>
            <dubbo:application name="java-front-consumer" />
            <dubbo:registry address="zookeeper://127.0.0.1:2181" />
            <dubbo:reference id="calcSumService" interface="com.java.front.dubbo.demo.provider.service.CalcSumService" timeout="10000">
              <dubbo:method name="sum" async="true" />
            </dubbo:reference>
            <dubbo:reference id="calcSubtractionService" interface="com.java.front.dubbo.demo.provider.service.CalcSubtractionService" timeout="10000">
              <dubbo:method name="subtraction" async="true" />
            </dubbo:reference>
          </beans>

          (2) 服務(wù)消費(fèi)

          public class Consumer {

              public static void main(String[] args) throws Exception {
                  testAsync();
                  System.in.read();
              }

              public static void testAsync() {
                  try {
                      ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/spring/dubbo-consumer.xml" });
                      System.out.println(context);
                      context.start();

                      /** 加法運(yùn)算 **/
                      CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService");
                      calcSumService.sum(32);
                      CompletableFuture<Integer> futureSum = RpcContext.getContext().getCompletableFuture();

                      /** 減法運(yùn)算 **/
                      CalcSubtractionService calcSubtractionService = (CalcSubtractionService) context.getBean("calcSubtractionService");
                      calcSubtractionService.subtraction(32);
                      CompletableFuture<Integer> futureSubtraction = RpcContext.getContext().getCompletableFuture();

                      /** 輸出結(jié)果 **/
                      int sumResult = futureSum.get();
                      int subtractionResult = futureSubtraction.get();
                      System.out.println("sumResult=" + sumResult + ",subtractionResult=" + subtractionResult);
                  } catch (Exception e) {
                      e.printStackTrace();
                  }
              }
          }

          1.2 為什么消費(fèi)異步化

          異步化可以將原本串行的調(diào)用并行化,減少執(zhí)行時(shí)間從而提升性能。假設(shè)上述實(shí)例加法服務(wù)需要100ms,減法服務(wù)需要200ms,那么串行化執(zhí)行時(shí)間為二者之和300ms:



          如果消費(fèi)異步化那么執(zhí)行時(shí)間減少為二者最大值200ms,異步化所帶來(lái)的性能提升不言而喻:



          2 保護(hù)性暫停模式

          分析DUBBO源碼之前我們首先介紹一種多線(xiàn)程設(shè)計(jì)模式:保護(hù)性暫停模式。我們?cè)O(shè)想這樣一種場(chǎng)景:線(xiàn)程A生產(chǎn)數(shù)據(jù),線(xiàn)程B讀取這個(gè)數(shù)據(jù)。我們必須面對(duì)一種情況:線(xiàn)程B準(zhǔn)備讀取數(shù)據(jù)時(shí),此時(shí)線(xiàn)程A還沒(méi)有生產(chǎn)出數(shù)據(jù)。在這種情況下線(xiàn)程B不能一直空轉(zhuǎn),也不能立即退出,線(xiàn)程B要等到生產(chǎn)數(shù)據(jù)完成并拿到數(shù)據(jù)之后才退出。

          那么在數(shù)據(jù)沒(méi)有生產(chǎn)出這段時(shí)間,線(xiàn)程B需要執(zhí)行一種等待機(jī)制,這樣可以達(dá)到對(duì)系統(tǒng)保護(hù)目的,這就是保護(hù)性暫停。

          public class MyData implements Serializable {
              private static final long serialVersionUID = 1L;
              private String message;

              public MyData(String message) {
                  this.message = message;
              }
          }

          class Resource {
              private MyData data;
              private Object lock = new Object();

              public MyData getData() {
                  synchronized (lock) {
                      while (data == null) {
                          try {
                              // 沒(méi)有數(shù)據(jù)則釋放鎖并暫停等待被喚醒
                              lock.wait();
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                      }
                      return data;
                  }
              }

              public void sendData(MyData data) {
                  synchronized (lock) {
                      // 生產(chǎn)數(shù)據(jù)后喚醒消費(fèi)線(xiàn)程
                      this.data = data;
                      lock.notifyAll();
                  }
              }
          }

          public class ProtectDesignTest {
              public static void main(String[] args) {
                  Resource resource = new Resource();
                  new Thread(() -> {
                      try {
                          MyData data = new MyData("hello");
                          System.out.println(Thread.currentThread().getName() + "生產(chǎn)數(shù)據(jù)=" + data);
                          // 模擬發(fā)送耗時(shí)
                          TimeUnit.SECONDS.sleep(3);
                          resource.sendData(data);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                  }, "t1").start();

                  new Thread(() -> {
                      MyData data = resource.getData();
                      System.out.println(Thread.currentThread().getName() + "接收到數(shù)據(jù)=" + data);
                  }, "t2").start();
              }
          }

          在上述代碼實(shí)例中線(xiàn)程1生產(chǎn)數(shù)據(jù),線(xiàn)程2消費(fèi)數(shù)據(jù),Resource類(lèi)通過(guò)wait/notify實(shí)現(xiàn)了保護(hù)性暫停模式,關(guān)于保護(hù)性暫停模式請(qǐng)參看我之前《保護(hù)性暫停模式詳解以及其在DUBBO應(yīng)用源碼分析》這篇文章。


          3 源碼分析

          本章節(jié)我們分析對(duì)比2.6.9和2.7.0兩個(gè)版本源碼,之所以選取這兩個(gè)版本是因?yàn)?.7.0是一個(gè)里程碑版本,異步化能力得到了明顯增強(qiáng)。


          3.1 version_2.6.9

          3.1.1 異步調(diào)用

          我們首先看看這個(gè)版本異步調(diào)用使用方式,生產(chǎn)者內(nèi)容和消費(fèi)者配置文件同第一章節(jié)不再贅述,我們重點(diǎn)分析服務(wù)消費(fèi)代碼。

          public class AsyncConsumer {

              public static void main(String[] args) throws Exception {
                  test1();
                  System.in.read();
              }

              public static void test1() throws Exception {
                  ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/spring/dubbo-consumer.xml" });
                  System.out.println(context);
                  context.start();

                  /** 加法運(yùn)算 **/
                  CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService");
                  calcSumService.sum(32);
                  Future<Integer> futureSum = RpcContext.getContext().getFuture();

                  /** 減法運(yùn)算 **/
                  CalcSubtractionService calcSubtractionService = (CalcSubtractionService) context.getBean("calcSubtractionService");
                  calcSubtractionService.subtraction(32);
                  Future<Integer> futureSubtraction = RpcContext.getContext().getFuture();

                  /** 輸出結(jié)果 **/
                  int sumResult = futureSum.get();
                  int subtractionResult = futureSubtraction.get();
                  System.out.println("sumResult=" + sumResult + ",subtractionResult=" + subtractionResult);
              }
          }

          消費(fèi)者最終執(zhí)行DubboInvoker.doInvoke,這個(gè)方法包含異步調(diào)用核心:

          public class DubboInvoker<Textends AbstractInvoker<T{

              @Override
              protected Result doInvoke(final Invocation invocation) throws Throwable {
                  RpcInvocation inv = (RpcInvocation) invocation;
                  final String methodName = RpcUtils.getMethodName(invocation);
                  inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
                  inv.setAttachment(Constants.VERSION_KEY, version);

                  ExchangeClient currentClient;
                  if (clients.length == 1) {
                      currentClient = clients[0];
                  } else {
                      currentClient = clients[index.getAndIncrement() % clients.length];
                  }
                  try {
                      boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
                      boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
                      int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                      // 單向調(diào)用
                      if (isOneway) {
                          boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                          currentClient.send(inv, isSent);
                          RpcContext.getContext().setFuture(null);
                          return new RpcResult();
                      }
                      // 異步調(diào)用
                      else if (isAsync) {
                          // 發(fā)起請(qǐng)求給生產(chǎn)者
                          ResponseFuture future = currentClient.request(inv, timeout);
                          // 設(shè)置future對(duì)象至上下文
                          RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                          // 返回空結(jié)果
                          return new RpcResult();
                      }
                      // 同步調(diào)用
                      else {
                          RpcContext.getContext().setFuture(null);
                          return (Result) currentClient.request(inv, timeout).get();
                      }
                  } catch (TimeoutException e) {
                      throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
                  } catch (RemotingException e) {
                      throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
                  }
              }
          }

          如果包含async屬性則表示異步調(diào)用,第一步發(fā)送調(diào)用請(qǐng)求給生產(chǎn)者,第二步設(shè)置Future對(duì)象至上下文,第三步立即返回空結(jié)果。那么在服務(wù)消費(fèi)時(shí)關(guān)鍵一步就是獲取Future對(duì)象,所以我們?cè)谡{(diào)用時(shí)要從上下文獲取Future對(duì)象:

          CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService");
          calcSumService.sum(32);
          Future<Integer> futureSum = RpcContext.getContext().getFuture();

          使用Future對(duì)象獲取結(jié)果:

          int sumResult = futureSum.get();

          進(jìn)入FutureAdapter.get()方法:

          public class FutureAdapter<Vimplements Future<V{
              private final ResponseFuture future;

              public V get() throws InterruptedException, ExecutionException {
                  try {
                      return (V) (((Result) future.get()).recreate());
                  } catch (RemotingException e) {
                      throw new ExecutionException(e.getMessage(), e);
                  } catch (Throwable e) {
                      throw new RpcException(e);
                  }
              }
          }

          進(jìn)入ResponseFuture.get()方法,我們可以看到保護(hù)性暫停模式應(yīng)用,當(dāng)生產(chǎn)者線(xiàn)程沒(méi)有返回?cái)?shù)據(jù)則阻塞并等待被喚醒:

          public class DefaultFuture implements ResponseFuture {
              private final Lock lock = new ReentrantLock();
              private final Condition done = lock.newCondition();

              @Override
              public Object get() throws RemotingException {
                  return get(timeout);
              }

              @Override
              public Object get(int timeout) throws RemotingException {
                  if (timeout <= 0) {
                      timeout = Constants.DEFAULT_TIMEOUT;
                  }
                  if (!isDone()) {
                      long start = System.currentTimeMillis();
                      lock.lock();
                      try {
                          while (!isDone()) {
                              // 遠(yuǎn)程調(diào)用未完成則等待被喚醒
                              done.await(timeout, TimeUnit.MILLISECONDS);
                              // 超時(shí)時(shí)間未完成則退出
                              if (isDone() || System.currentTimeMillis() - start > timeout) {
                                  break;
                              }
                          }
                      } catch (InterruptedException e) {
                          throw new RuntimeException(e);
                      } finally {
                          lock.unlock();
                      }
                      // 拋出超時(shí)異常
                      if (!isDone()) {
                          throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
                      }
                  }
                  return returnFromResponse();
              }
          }

          當(dāng)消費(fèi)者接收到生產(chǎn)者響應(yīng)時(shí)會(huì)調(diào)用received方法喚醒相關(guān)阻塞線(xiàn)程,這時(shí)阻塞在get方法中的線(xiàn)程即可獲取到數(shù)據(jù):

          public class DefaultFuture implements ResponseFuture {
              private final Lock lock = new ReentrantLock();
              private final Condition done = lock.newCondition();

              public static void received(Channel channel, Response response) {
                  try {
                      // 根據(jù)唯一請(qǐng)求號(hào)獲取Future
                      DefaultFuture future = FUTURES.remove(response.getId());
                      if (future != null) {
                          future.doReceived(response);
                      } else {
                          logger.warn("The timeout response finally returned at "
                                      + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                                      + ", response " + response
                                      + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                                         + " -> " + channel.getRemoteAddress()));
                      }
                  } finally {
                      CHANNELS.remove(response.getId());
                  }
              }

              private void doReceived(Response res) {
                  lock.lock();
                  try {
                      response = res;
                      if (done != null) {
                          // 喚醒相關(guān)阻塞線(xiàn)程
                          done.signal();
                      }
                  } finally {
                      lock.unlock();
                  }
                  if (callback != null) {
                      invokeCallback(callback);
                  }
              }
          }

          3.1.2 設(shè)置回調(diào)函數(shù)

          我們現(xiàn)在調(diào)用get方法會(huì)阻塞在那里等到結(jié)果,那么有沒(méi)有一種方式當(dāng)結(jié)果返回時(shí)就立即調(diào)用我們?cè)O(shè)置的回調(diào)函數(shù)?答案是有。

          public class AsyncConsumer {

              public static void main(String[] args) throws Exception {
                  test2();
                  System.in.read();
              }

              public static void test2() throws Exception {
                  ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/spring/dubbo-consumer.xml" });
                  System.out.println(context);
                  context.start();

                  /** 加法運(yùn)算 **/
                  CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService");
                  calcSumService.sum(32);

                  /** 執(zhí)行回調(diào)函數(shù) **/
                  ((FutureAdapter<Object>) RpcContext.getContext().getFuture()).getFuture().setCallback(new ResponseCallback() {
                      @Override
                      public void done(Object response) {
                          System.out.println("sumResult=" + response);
                      }

                      @Override
                      public void caught(Throwable exception) {
                          exception.printStackTrace();
                      }
                  });

                  /** 減法運(yùn)算 **/
                  CalcSubtractionService calcSubtractionService = (CalcSubtractionService) context.getBean("calcSubtractionService");
                  calcSubtractionService.subtraction(32);

                  /** 執(zhí)行回調(diào)函數(shù) **/
                  ((FutureAdapter<Object>) RpcContext.getContext().getFuture()).getFuture().setCallback(new ResponseCallback() {
                      @Override
                      public void done(Object response) {
                          System.out.println("subtractionResult=" + response);
                      }

                      @Override
                      public void caught(Throwable exception) {
                          exception.printStackTrace();
                      }
                  });
              }
          }

          DefaultFuture可以設(shè)置callback回調(diào)函數(shù),當(dāng)結(jié)果返回時(shí)如果回調(diào)函數(shù)不為空則執(zhí)行:

          public class DefaultFuture implements ResponseFuture {
              private volatile ResponseCallback callback;

              private void doReceived(Response res) {
                  lock.lock();
                  try {
                      response = res;
                      if (done != null) {
                          done.signal();
                      }
                  } finally {
                      lock.unlock();
                  }
                  if (callback != null) {
                      // 執(zhí)行回調(diào)函數(shù)
                      invokeCallback(callback);
                  }
              }

              private void invokeCallback(ResponseCallback c) {
                  ResponseCallback callbackCopy = c;
                  if (callbackCopy == null) {
                      throw new NullPointerException("callback cannot be null.");
                  }
                  c = null;
                  Response res = response;
                  if (res == null) {
                      throw new IllegalStateException("response cannot be null. url:" + channel.getUrl());
                  }
                  if (res.getStatus() == Response.OK) {
                      try {
                          // 執(zhí)行成功回調(diào)
                          callbackCopy.done(res.getResult());
                      } catch (Exception e) {
                          logger.error("callback invoke error .reasult:" + res.getResult() + ",url:" + channel.getUrl(), e);
                      }
                  } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
                      try {
                          TimeoutException te = new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
                          // 發(fā)生超時(shí)回調(diào)
                          callbackCopy.caught(te);
                      } catch (Exception e) {
                          logger.error("callback invoke error ,url:" + channel.getUrl(), e);
                      }
                  } else {
                      try {
                          RuntimeException re = new RuntimeException(res.getErrorMessage());
                          callbackCopy.caught(re);
                      } catch (Exception e) {
                          logger.error("callback invoke error ,url:" + channel.getUrl(), e);
                      }
                  }
              }
          }

          3.2 version_2.7.0

          CompletableFuture在這個(gè)版本中被引入實(shí)現(xiàn)異步調(diào)用,可以使用此類(lèi)強(qiáng)大的異步編程API增強(qiáng)異步能力,我們首先回顧1.1.2章節(jié)實(shí)例:

          public class Consumer {

              public static void testAsync() {
                  try {
                      ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/spring/dubbo-consumer.xml" });
                      System.out.println(context);
                      context.start();

                      /** 加法運(yùn)算 **/
                      CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService");
                      calcSumService.sum(32);
                      CompletableFuture<Integer> futureSum = RpcContext.getContext().getCompletableFuture();

                      /** 減法運(yùn)算 **/
                      CalcSubtractionService calcSubtractionService = (CalcSubtractionService) context.getBean("calcSubtractionService");
                      calcSubtractionService.subtraction(32);
                      CompletableFuture<Integer> futureSubtraction = RpcContext.getContext().getCompletableFuture();

                      /** 輸出結(jié)果 **/
                      int sumResult = futureSum.get();
                      int subtractionResult = futureSubtraction.get();
                      System.out.println("sumResult=" + sumResult + ",subtractionResult=" + subtractionResult);
                  } catch (Exception e) {
                      e.printStackTrace();
                  }
              }
          }

          在上述消費(fèi)者代碼的實(shí)例中我們只是應(yīng)用了CompletableFuture.get()方法,并沒(méi)有發(fā)揮其強(qiáng)大功能。我們對(duì)上述實(shí)例稍加改造,兩個(gè)CompletionStage任務(wù)都執(zhí)行完成后,兩個(gè)任務(wù)結(jié)果會(huì)一起交給thenCombine進(jìn)行處理:

          public class Consumer {

              public static void testAsync() {
                  try {
                      ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/spring/dubbo-consumer2.xml" });
                      System.out.println(context);
                      context.start();

                      /** 加法運(yùn)算 **/
                      CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService");
                      calcSumService.sum(32);
                      CompletableFuture<Integer> futureSum = RpcContext.getContext().getCompletableFuture();

                      /** 減法運(yùn)算 **/
                      CalcSubtractionService calcSubtractionService = (CalcSubtractionService) context.getBean("calcSubtractionService");
                      calcSubtractionService.subtraction(32);
                      CompletableFuture<Integer> futureSubtraction = RpcContext.getContext().getCompletableFuture();

                      /** 乘法運(yùn)算 **/
                      CompletableFuture<Integer> multiplyResult = futureSum.thenCombine(futureSubtraction, new BiFunction<Integer, Integer, Integer>() {
                          @Override
                          public Integer apply(Integer t, Integer u) {
                              return (t * u);
                          }
                      });
                      System.out.println("multiplyResult=" + multiplyResult.get());
                  } catch (Exception e) {
                      e.printStackTrace();
                  }
              }
          }

          DubboInvoker代碼有所變化:

          public class DubboInvoker<Textends AbstractInvoker<T{

              @Override
              protected Result doInvoke(final Invocation invocation) throws Throwable {
                  RpcInvocation inv = (RpcInvocation) invocation;
                  final String methodName = RpcUtils.getMethodName(invocation);
                  inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
                  inv.setAttachment(Constants.VERSION_KEY, version);
                  ExchangeClient currentClient;
                  if (clients.length == 1) {
                      currentClient = clients[0];
                  } else {
                      currentClient = clients[index.getAndIncrement() % clients.length];
                  }
                  try {
                      // 是否為異步調(diào)用
                      boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);

                      // 是否為future異步方式
                      boolean isAsyncFuture = RpcUtils.isReturnTypeFuture(inv);

                      // 是否需要響應(yīng)結(jié)果
                      boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);

                      // 超時(shí)時(shí)間
                      int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);

                      // 單向調(diào)用
                      if (isOneway) {
                          boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                          currentClient.send(inv, isSent);
                          RpcContext.getContext().setFuture(null);
                          return new RpcResult();
                      }
                      // 異步請(qǐng)求
                      else if (isAsync) {
                          ResponseFuture future = currentClient.request(inv, timeout);
                          FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);
                          RpcContext.getContext().setFuture(futureAdapter);
                          Result result;
                          if (isAsyncFuture) {
                              result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
                          } else {
                              result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
                          }
                          return result;
                      }
                      // 同步請(qǐng)求
                      else {
                          RpcContext.getContext().setFuture(null);
                          Result result = (Result) currentClient.request(inv, timeout).get();
                          return result;
                      }
                  } catch (TimeoutException e) {
                      throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
                  } catch (RemotingException e) {
                      throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
                  }
              }
          }

          我們看到與2.6.9版本相同的是FutureAdapter同樣會(huì)被設(shè)置到上下文,但是FutureAdapter本身已經(jīng)發(fā)生了變化:

          public class FutureAdapter<Vextends CompletableFuture<V{
              private final ResponseFuture future;
              private CompletableFuture<Result> resultFuture;

              public FutureAdapter(ResponseFuture future) {
                  this.future = future;
                  this.resultFuture = new CompletableFuture<>();

                  // 設(shè)置回調(diào)函數(shù)至DefaultFuture
                  future.setCallback(new ResponseCallback() {

                      // 設(shè)置響應(yīng)結(jié)果至CompletableFuture
                      @Override
                      public void done(Object response) {
                          Result result = (Result) response;
                          FutureAdapter.this.resultFuture.complete(result);
                          V value = null;
                          try {
                              value = (V) result.recreate();
                          } catch (Throwable t) {
                              FutureAdapter.this.completeExceptionally(t);
                          }
                          FutureAdapter.this.complete(value);
                      }

                      // 設(shè)置異常結(jié)果至FutureAdapter
                      @Override
                      public void caught(Throwable exception) {
                          FutureAdapter.this.completeExceptionally(exception);
                      }
                  });
              }

              public ResponseFuture getFuture() {
                  return future;
              }

              public CompletableFuture<Result> getResultFuture() {
                  return resultFuture;
              }
          }

          我們?cè)诜?wù)消費(fèi)時(shí)通過(guò)getResultFuture方法獲取CompletableFuture,這個(gè)對(duì)象值在回調(diào)時(shí)被設(shè)置,回調(diào)時(shí)機(jī)同樣在DefaultFuture.doReceived方法里面:

          public class DefaultFuture implements ResponseFuture {
              private volatile ResponseCallback callback;

              private void doReceived(Response res) {
                  lock.lock();
                  try {
                      response = res;
                      if (done != null) {
                          done.signal();
                      }
                  } finally {
                      lock.unlock();
                  }
                  if (callback != null) {
                      // 執(zhí)行回調(diào)函數(shù)代碼同version_2.6.9
                      invokeCallback(callback);
                  }
              }
          }

          4 文章總結(jié)

          本文第一介紹了DUBBO消費(fèi)異步化是什么,以及異步化為什么會(huì)帶來(lái)性能提升。第二介紹了保護(hù)性暫停模式,這是實(shí)現(xiàn)異步化的基礎(chǔ)。最后我們閱讀了兩個(gè)不同版本異步化源碼,了解了DUBBO異步化演進(jìn)過(guò)程,希望本文對(duì)大家有所幫助。




          JAVA前線(xiàn) 


          歡迎大家關(guān)注公眾號(hào)「JAVA前線(xiàn)」查看更多精彩分享,主要包括源碼分析、實(shí)際應(yīng)用、架構(gòu)思維、職場(chǎng)分享、產(chǎn)品思考等等,同時(shí)也非常歡迎大家加我微信「java_front」一起交流學(xué)習(xí)


          瀏覽 53
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲金品 | 一级a一级a爱片免费免免高潮按摩 | 北条麻妃无码中文 | 熟女日逼| 亚洲AV无码久久精品蜜桃小说 |