<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          @Async異步任務(wù)與線程池

          共 48000字,需瀏覽 96分鐘

           ·

          2021-06-28 02:39

          寫在前面:本篇文章是關(guān)于使用@Async進行異步任務(wù),并且關(guān)于線程池做了一個初步的梳理和總結(jié),包括遇到過的一些坑

          在工作中用到的一些線程池

          以下代碼已做脫敏處理

          1.newCachedThreadPool

              private void startTask(List<String> usersList){
          ExecutorService executor = Executors.newCachedThreadPool();
          executor.submit(()->{
          //do someting
          });
          }

          復制代碼

          2.newScheduledThreadPool


          @Configuration
          public class ScheduleConfig implements SchedulingConfigurer {

          @Override
          public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
          //當然了,這里設(shè)置的線程池是corePoolSize也是很關(guān)鍵了,自己根據(jù)業(yè)務(wù)需求設(shè)定
          taskRegistrar.setScheduler(Executors.newScheduledThreadPool(10));
          }

          }

          復制代碼

          如果在idea中安裝了阿里規(guī)范插件,就會發(fā)現(xiàn)上面兩種創(chuàng)建線程池的方式都會報紅。原因是:

          線程池不允許使用Executors去創(chuàng)建,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規(guī)則,規(guī)避資源耗盡的風險。說明:Executors返回的線程池對象的弊端如下:

          1. FixedThreadPool和SingleThreadPool:

            允許的請求隊列長度為Integer.MAX_VALUE,可能會堆積大量的請求,從而導致OOM。

          2. CachedThreadPool:

            允許的創(chuàng)建線程數(shù)量為Integer.MAX_VALUE,可能會創(chuàng)建大量的線程,從而導致OOM。

          其實這里CachedThreadPool和newScheduledThreadPool是一樣的,都是因為最大線程數(shù)被設(shè)置成了Integer.MAX_VALUE。


          public ScheduledThreadPoolExecutor(int corePoolSize) {
          super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
          }
          復制代碼
              public static ExecutorService newCachedThreadPool() {
          return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
          60L, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>());
          }
          復制代碼

          在源碼中可以看的出newCachedThreadPool使用的是synchronousqueue隊列,也可以看作是一個長度為1的BlockingQueue所以,再加上最大允許線程數(shù)為Integer.MAX_VALUE,就導致可能會創(chuàng)建大量線程導致OOM。

          同理ScheduledThreadPoolExecutor使用的是DelayedWorkQueue,初始化大小為16。當隊列滿后就會創(chuàng)建新線程,就導致可能會創(chuàng)建大量線程導致OOM。

          我們不妨實際來測試一下,以newCachedThreadPool為例,jvm參數(shù)-Xms64m -Xmx192m -Xss1024K -XX:MetaspaceSize=64m -XX:MaxMetaspaceSize=128m。


          @PostMapping("/newCachedThreadPoolExample")
          @ResponseBody
          public void newCachedThreadPoolExample(){
          ExecutorService executorService = Executors.newCachedThreadPool();
          while (true){
          executorService.submit(()->{
          log.info("submit:"+LocalDateTime.now());
          try {
          Thread.sleep(1000);
          }catch (InterruptedException e){
          e.printStackTrace();
          }
          });
          }

          }

          復制代碼

          剛啟動時的情況:

          請求接口后就開始爆炸

          然后就開始卡著不動了

          比較尷尬的是一直沒有出現(xiàn)報錯OOM的情況,就直接卡死了。

          總結(jié)

          以上的線程池雖然可以在外部限制的情況下避免OOM等情況,但是還是建議盡量根據(jù)自己的業(yè)務(wù)情況自定義線程池。

          使用@Async快速創(chuàng)建一個異步任務(wù)

          1. application.yml

          這里是線程池相關(guān)配置,先不詳細說,同理可以在代碼里面配置config。

          線程池緩沖隊列的選擇

          以上發(fā)生的問題大多數(shù)都和線程池的緩沖隊列有關(guān),選擇一個符合自己業(yè)務(wù)特點的緩沖隊列也十分重要。

          spring:
          task:
          execution:
          pool:
          # 最大線程數(shù)
          max-size: 16
          # 核心線程數(shù)
          core-size: 16
          # 存活時間
          keep-alive: 10s
          # 隊列大小
          queue-capacity: 100
          # 是否允許核心線程超時
          allow-core-thread-timeout: true
          # 線程名稱前綴
          thread-name-prefix: async-task-

          復制代碼

          2.ThreadpoolApplication

          這里需要在 Application上添加 @EnableAsync注解,開啟異步任務(wù)。如果是選擇在代碼里面寫config,則需要在config文件上添加@EnableAsync注解。

          @EnableAsync
          @SpringBootApplication
          public class ThreadpoolApplication {

          public static void main(String[] args) {
          SpringApplication.run(ThreadpoolApplication.class, args);
          }

          }
          復制代碼

          3.AsyncTask

          編寫一個異步任務(wù)處理類,在需要開啟異步的方法上面添加@Async

          @Component
          @Slf4j
          public class AsyncTask {
          @Async
          public void asyncRun() throws InterruptedException {
          Thread.sleep(10);
          log.info(Thread.currentThread().getName()+":處理完成");
          }
          }

          復制代碼

          4.AsyncService

          編寫一個調(diào)用異步方法的service

          @Service
          @Slf4j
          public class AsyncService {
          @Autowired
          private AsyncTask asyncTask;

          public void asyncSimpleExample() {
          try {
          log.info("service start");
          asyncTask.asyncRun();
          log.info("service end");
          }catch (InterruptedException e){
          e.printStackTrace();
          }
          }


          }
          復制代碼

          5.AsyncController

          編寫一個Controller去調(diào)用AsyncService


          /**
          * @author kurtl
          */

          @Controller
          @RequestMapping("/")
          public class AsyncController {
          @Autowired
          private AsyncService asyncService;
          @PostMapping("/asyncSimpleExample")
          @ResponseBody
          public void asyncSimpleExample(){
          asyncService.asyncSimpleExample();
          }
          }

          復制代碼

          最后請求這個接口

          可以看到,先輸出了asyncSimpleExample里面打印的service start與service end,表示service方法先執(zhí)行完畢了,而異步方法則在調(diào)用后進行了一個sleep,service沒有同步等待sleep完成,而是直接返回,表示這個是異步任務(wù)。至此我們已經(jīng)通過@Async成功創(chuàng)建的異步任務(wù)。

          關(guān)于@Async和@EnableAsync的原理

          個人覺得源碼中很重要的一部分就是源碼中的注釋,閱讀注釋也可以幫你快速了解源碼的作用等,所有我會把重要的注釋稍微翻譯一下

          1.@Async源碼




          @Target({ElementType.TYPE, ElementType.METHOD})
          @Retention(RetentionPolicy.RUNTIME)
          @Documented
          public @interface Async {

          /**
          * A qualifier value for the specified asynchronous operation(s).
          * <p>May be used to determine the target executor to be used when executing
          * the asynchronous operation(s), matching the qualifier value (or the bean
          * name) of a specific {@link java.util.concurrent.Executor Executor} or
          * {@link org.springframework.core.task.TaskExecutor TaskExecutor}
          * bean definition.
          * <p>When specified on a class-level {@code @Async} annotation, indicates that the
          * given executor should be used for all methods within the class. Method-level use
          * of {@code Async#value} always overrides any value set at the class level.
          * @since 3.1.2
          */


          /**
          * 在這些注釋中有三個非常重要的部分
          * 1.使用@Async的方法只能返回Void 或者 Future類型
          * 2.表明了@Async是通過org.springframework.core.task.TaskExecutor
          * 或者java.util.concurrent.Executor來創(chuàng)建線程池
          * 3.寫了@Async的作用范圍 在類上使用@Async會覆蓋方法上的@Async
          */


          String value() default "";

          }

          復制代碼

          2.@EnableAsync源碼




          /**
          * Enables Spring's asynchronous method execution capability, similar to functionality
          * found in Spring's {@code <task:*>} XML namespace.
          *
          * <p>To be used together with @{@link Configuration Configuration} classes as follows,
          * enabling annotation-driven async processing for an entire Spring application context:
          *
          * <pre class="code">
          * &#064;Configuration
          * &#064;EnableAsync
          * public class AppConfig {
          *
          * }</pre>
          * 這里表示需要聯(lián)合@Configuration注解一起使用,所以@EnableAsync應(yīng)該
          * 添加在線程池Config或者SpringBootApplication 上
          * {@code MyAsyncBean} is a user-defined type with one or more methods annotated with
          * either Spring's {@code @Async} annotation, the EJB 3.1 {@code @javax.ejb.Asynchronous}
          * annotation, or any custom annotation specified via the {@link #annotation} attribute.
          * The aspect is added transparently for any registered bean, for instance via this
          * configuration:
          *
          * <pre class="code">
          * &#064;Configuration
          * public class AnotherAppConfig {
          *
          * &#064;Bean
          * public MyAsyncBean asyncBean() {
          * return new MyAsyncBean();
          * }
          * }</pre>
          *
          * <p>By default, Spring will be searching for an associated thread pool definition:
          * either a unique {@link org.springframework.core.task.TaskExecutor} bean in the context,
          * or an {@link java.util.concurrent.Executor} bean named "taskExecutor" otherwise. If
          * neither of the two is resolvable, a {@link org.springframework.core.task.SimpleAsyncTaskExecutor}
          * 默認情況下spring會先搜索TaskExecutor類型的bean或者名字為
          * taskExecutor的Executor類型的bean,都不存在使用
          * SimpleAsyncTaskExecutor執(zhí)行器但是這個SimpleAsyncTaskExecutor實際
          * 上是有很大的坑的,建議是自定義一個線程池,這個后面會說
          * will be used to process async method invocations. Besides, annotated methods having
          *
          * @author Chris Beams
          * @author Juergen Hoeller
          * @author Stephane Nicoll
          * @author Sam Brannen
          * @since 3.1
          * @see Async
          * @see AsyncConfigurer
          * @see AsyncConfigurationSelector
          */

          @Target(ElementType.TYPE)
          @Retention(RetentionPolicy.RUNTIME)
          @Documented
          @Import(AsyncConfigurationSelector.class)
          public @interface EnableAsync {

          /**
          * Indicate the 'async' annotation type to be detected at either class
          * or method level.
          * <p>By default, both Spring's @{@link Async} annotation and the EJB 3.1
          * {@code @javax.ejb.Asynchronous} annotation will be detected.
          * <p>This attribute exists so that developers can provide their own
          * custom annotation type to indicate that a method (or all methods of
          * a given class) should be invoked asynchronously.
          */

          Class<? extends Annotation> annotation() default Annotation.class;

          /**
          * Indicate whether subclass-based (CGLIB) proxies are to be created as opposed
          * to standard Java interface-based proxies.
          * <p><strong>Applicable only if the {@link #mode} is set to {@link AdviceMode#PROXY}</strong>.
          * <p>The default is {@code false}.
          * <p>Note that setting this attribute to {@code true} will affect <em>all</em>
          * Spring-managed beans requiring proxying, not just those marked with {@code @Async}.
          * For example, other beans marked with Spring's {@code @Transactional} annotation
          * will be upgraded to subclass proxying at the same time. This approach has no
          * negative impact in practice unless one is explicitly expecting one type of proxy
          * vs. another &mdash; for example, in tests.
          *
          * 這個字段用來表示,是否要創(chuàng)建基于CGLIB的代理,實際上在高版本
          * 的spring 上(大概3.x)是自動選擇使用jdk動態(tài)代理還是CGLIB.
          * 設(shè)置為true時,其它spring管理的bean也會升級到CGLIB代理
          */

          boolean proxyTargetClass() default false;

          /**
          * Indicate how async advice should be applied.
          * <p><b>The default is {@link AdviceMode#PROXY}.</b>
          * Please note that proxy mode allows for interception of calls through the proxy
          * only. Local calls within the same class cannot get intercepted that way; an
          * {@link Async} annotation on such a method within a local call will be ignored
          * since Spring's interceptor does not even kick in for such a runtime scenario.
          * For a more advanced mode of interception, consider switching this to
          * {@link AdviceMode#ASPECTJ}.
          * 這個字段用來標識異步通知的模式,默認PROXY,當這個字段為
          * PROXY的時候,在同一個類中,非異步方法調(diào)用異步方法,會導致異
          * 步不生效,相反如果,想實現(xiàn)同一個類非異步方法調(diào)用異步方法就應(yīng)
          * 該設(shè)置為ASPECTJ
          */

          AdviceMode mode() default AdviceMode.PROXY;

          /**
          * Indicate the order in which the {@link AsyncAnnotationBeanPostProcessor}
          * should be applied.
          * <p>The default is {@link Ordered#LOWEST_PRECEDENCE} in order to run
          * after all other post-processors, so that it can add an advisor to
          * existing proxies rather than double-proxy.
          * 標明異步注解bean處理器應(yīng)該遵循的執(zhí)行順序,默認最低的優(yōu)先級
          *(Integer.MAX_VALUE,值越小優(yōu)先級越高)
          */

          int order() default Ordered.LOWEST_PRECEDENCE;

          }


          復制代碼

          在上面的源碼中,其實最核心的代碼只有一句,@Import(AsyncConfigurationSelector.class),引入了相關(guān)的配置。




          /**
          * Selects which implementation of {@link AbstractAsyncConfiguration} should
          * be used based on the value of {@link EnableAsync#mode} on the importing
          * {@code @Configuration} class.
          *
          * @author Chris Beams
          * @author Juergen Hoeller
          * @since 3.1
          * @see EnableAsync
          * @see ProxyAsyncConfiguration
          */

          public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {

          private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
          "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";


          /**
          * Returns {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration}
          * for {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()},
          * respectively.
          */

          /**
          * 這整個方法其實就是一個選擇器和ImportSelector接口的selectImports()方法很像,基于不同的代理模式,加載不同的配置類
          */

          @Override
          @Nullable
          public String[] selectImports(AdviceMode adviceMode) {

          switch (adviceMode) {
          case PROXY:
          return new String[] {ProxyAsyncConfiguration.class.getName()};
          case ASPECTJ:
          return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
          default:
          return null;
          }
          }

          }

          復制代碼

          接下來我們看看默認的ProxyAsyncConfiguration.class




          /**
          * {@code @Configuration} class that registers the Spring infrastructure beans necessary
          * to enable proxy-based asynchronous method execution.
          *
          * @author Chris Beams
          * @author Stephane Nicoll
          * @author Juergen Hoeller
          * @since 3.1
          * @see EnableAsync
          * @see AsyncConfigurationSelector
          */

          @Configuration
          @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
          //繼承了AbstractAsyncConfiguration類
          public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {

          @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
          @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
          public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
          Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
          //初始化AsyncAnnotationBeanPostProcessor類型的bean
          AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
          //設(shè)置執(zhí)行器和異常處理器
          bpp.configure(this.executor, this.exceptionHandler);
          Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
          //設(shè)置annotation
          if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
          bpp.setAsyncAnnotationType(customAsyncAnnotation);
          }
          //設(shè)置注解屬性
          bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
          bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
          return bpp;
          }

          }



          復制代碼

          這一個類繼承了AbstractAsyncConfiguration類,其實也就做了一件事初始化AsyncAnnotationBeanPostProcessor,@Async注解的就是通過AsyncAnnotationBeanPostProcessor這個后置處理器生成一個代理對象來實現(xiàn)異步的,我們先看繼承的config。




          /**
          * Abstract base {@code Configuration} class providing common structure for enabling
          * Spring's asynchronous method execution capability.
          *
          * @author Chris Beams
          * @author Juergen Hoeller
          * @author Stephane Nicoll
          * @since 3.1
          * @see EnableAsync
          */

          @Configuration
          public abstract class AbstractAsyncConfiguration implements ImportAware {

          @Nullable
          protected AnnotationAttributes enableAsync; //;//enableAsync的注解屬性

          @Nullable
          protected Supplier<Executor> executor; //線程執(zhí)行器

          @Nullable
          protected Supplier<AsyncUncaughtExceptionHandler> exceptionHandler; //異常處理器 和上面的代碼對應(yīng)


          @Override
          //設(shè)置注解屬性
          public void setImportMetadata(AnnotationMetadata importMetadata) {
          this.enableAsync = AnnotationAttributes.fromMap(
          importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));
          if (this.enableAsync == null) {
          throw new IllegalArgumentException(
          "@EnableAsync is not present on importing class " + importMetadata.getClassName());
          }
          }

          /**
          * Collect any {@link AsyncConfigurer} beans through autowiring.
          */

          @Autowired(required = false)
          //設(shè)置執(zhí)行器和異常處理器
          void setConfigurers(Collection<AsyncConfigurer> configurers) {
          if (CollectionUtils.isEmpty(configurers)) {
          return;
          }
          if (configurers.size() > 1) {
          throw new IllegalStateException("Only one AsyncConfigurer may exist");
          }
          AsyncConfigurer configurer = configurers.iterator().next();
          this.executor = configurer::getAsyncExecutor;
          this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;
          }

          }


          復制代碼

          整個代碼的結(jié)構(gòu)其實非常明確,我們回到上一個類,看他設(shè)置的bean AsyncAnnotationBeanPostProcessor。這個bean很復雜,所以干脆先生成類圖。弄清楚baen的生命周期。AsyncAnnotationBeanPostProcessor是一個后置處理器,所以我們先找父類AbstractAdvisingBeanPostProcessor中。




          /**
          * Base class for {@link BeanPostProcessor} implementations that apply a
          * Spring AOP {@link Advisor} to specific beans.
          *
          * @author Juergen Hoeller
          * @since 3.2
          */

          @SuppressWarnings("serial")
          public abstract class AbstractAdvisingBeanPostProcessor extends ProxyProcessorSupport implements BeanPostProcessor {

          @Nullable
          protected Advisor advisor;

          protected boolean beforeExistingAdvisors = false;

          private final Map<Class<?>, Boolean> eligibleBeans = new ConcurrentHashMap<>(256);



          public void setBeforeExistingAdvisors(boolean beforeExistingAdvisors) {
          this.beforeExistingAdvisors = beforeExistingAdvisors;
          }


          @Override
          public Object postProcessBeforeInitialization(Object bean, String beanName) {
          return bean;
          }

          @Override
          public Object postProcessAfterInitialization(Object bean, String beanName) {
          // 沒有通知,或者是AopInfrastructureBean,那么不進行代理
          if (this.advisor == null || bean instanceof AopInfrastructureBean) {
          // Ignore AOP infrastructure such as scoped proxies.
          return bean;
          }
          // 添加advisor
          if (bean instanceof Advised) {
          Advised advised = (Advised) bean;
          if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
          // Add our local Advisor to the existing proxy's Advisor chain...
          // 這里通過beforeExistingAdvisors決定是將通知添加到所有通知之前還是添加到所有通知之后
          // 默認false 在@Async中被設(shè)置為true
          if (this.beforeExistingAdvisors) {
          advised.addAdvisor(0, this.advisor);
          }
          else {
          advised.addAdvisor(this.advisor);
          }
          return bean;
          }
          }
          //構(gòu)造ProxyFactory代理工廠
          if (isEligible(bean, beanName)) {
          ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
          //添加代理的接口
          if (!proxyFactory.isProxyTargetClass()) {
          evaluateProxyInterfaces(bean.getClass(), proxyFactory);
          }
          //設(shè)置切面
          proxyFactory.addAdvisor(this.advisor);
          customizeProxyFactory(proxyFactory);
          //返回代理類
          return proxyFactory.getProxy(getProxyClassLoader());
          }

          // No proxy needed.
          return bean;
          }

          //isEligible用于判斷這個類或者這個類中的某個方法是否含有注解
          protected boolean isEligible(Object bean, String beanName) {
          return isEligible(bean.getClass());
          }


          }


          復制代碼

          在上面代碼中可以看出來,proxyFactory.addAdvisor(this.advisor);這里持有一個AsyncAnnotationAdvisor類的對象advisor:buildAdvice()方法生成通知,buildPointcut生成切點。定位到這個類的buildPointcut方法中,看看他的切點匹配規(guī)則。




          @SuppressWarnings("serial")
          public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {

          private Advice advice;

          private Pointcut pointcut;


          /**
          * Create a new {@code AsyncAnnotationAdvisor} for bean-style configuration.
          */

          public AsyncAnnotationAdvisor() {
          this((Supplier<Executor>) null, (Supplier<AsyncUncaughtExceptionHandler>) null);
          }


          @SuppressWarnings("unchecked")
          public AsyncAnnotationAdvisor(
          @Nullable Executor executor, @Nullable AsyncUncaughtExceptionHandler exceptionHandler)
          {

          this(SingletonSupplier.ofNullable(executor), SingletonSupplier.ofNullable(exceptionHandler));
          }


          @SuppressWarnings("unchecked")
          public AsyncAnnotationAdvisor(
          @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler)
          {

          Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
          asyncAnnotationTypes.add(Async.class);
          try {
          asyncAnnotationTypes.add((Class<? extends Annotation>)
          ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
          }
          catch (ClassNotFoundException ex) {
          // If EJB 3.1 API not present, simply ignore.
          }
          this.advice = buildAdvice(executor, exceptionHandler);
          this.pointcut = buildPointcut(asyncAnnotationTypes);
          }



          public void setAsyncAnnotationType(Class<? extends Annotation> asyncAnnotationType) {
          Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");
          Set<Class<? extends Annotation>> asyncAnnotationTypes = new HashSet<>();
          asyncAnnotationTypes.add(asyncAnnotationType);
          this.pointcut = buildPointcut(asyncAnnotationTypes);
          }

          /**
          * Set the {@code BeanFactory} to be used when looking up executors by qualifier.
          */

          @Override
          public void setBeanFactory(BeanFactory beanFactory) {
          if (this.advice instanceof BeanFactoryAware) {
          ((BeanFactoryAware) this.advice).setBeanFactory(beanFactory);
          }
          }


          @Override
          public Advice getAdvice() {
          return this.advice;
          }

          @Override
          public Pointcut getPointcut() {
          return this.pointcut;
          }

          //構(gòu)建通知,一個簡單的攔截器
          protected Advice buildAdvice(
          @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler)
          {

          AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
          interceptor.configure(executor, exceptionHandler);
          return interceptor;
          }


          protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
          ComposablePointcut result = null;
          for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
          // 根據(jù)cpc和mpc 匹配器進行匹配
          //檢查類上是否有@Async注解
          Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
          //檢查方法是是否有@Async注解。
          Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
          if (result == null) {
          result = new ComposablePointcut(cpc);
          }
          else {
          result.union(cpc);
          }
          result = result.union(mpc);
          }
          return (result != null ? result : Pointcut.TRUE);
          }

          }


          復制代碼

          再找到它的通知邏輯buildAdvice,就是一個攔截器,生成AnnotationAsyncExecutionInterceptor對象,對于Interceptor,關(guān)注它的核心方法invoke就行了。它的父類AsyncExecutionInterceptor重寫了AsyncExecutionInterceptor接口的invoke方法。代碼如下





          public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered {


          public AsyncExecutionInterceptor(@Nullable Executor defaultExecutor) {
          super(defaultExecutor);
          }

          public AsyncExecutionInterceptor(@Nullable Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) {
          super(defaultExecutor, exceptionHandler);
          }



          @Override
          @Nullable
          //
          public Object invoke(final MethodInvocation invocation) throws Throwable {
          Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
          Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
          final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
          // 獲取到一個線程池
          AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
          if (executor == null) {
          throw new IllegalStateException(
          "No executor specified and no default executor set on AsyncExecutionInterceptor either");
          }
          // 然后將這個方法封裝成一個 Callable對象傳入到線程池中執(zhí)行
          Callable<Object> task = () -> {
          try {
          Object result = invocation.proceed();
          if (result instanceof Future) {
          //阻塞等待執(zhí)行完畢得到結(jié)果
          return ((Future<?>) result).get();
          }
          }
          catch (ExecutionException ex) {
          handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
          }
          catch (Throwable ex) {
          handleError(ex, userDeclaredMethod, invocation.getArguments());
          }
          return null;
          };

          return doSubmit(task, executor, invocation.getMethod().getReturnType());
          }


          @Override
          @Nullable
          protected String getExecutorQualifier(Method method) {
          return null;
          }


          @Override
          @Nullable
          protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
          Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
          return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
          }

          @Override
          public int getOrder() {
          return Ordered.HIGHEST_PRECEDENCE;
          }

          }


          復制代碼

          可以看到,invoke首先是包裝了一個Callable的對象,然后傳入doSubmit,所以代碼的核心就在doSubmit這個方法中。


          @Nullable
          protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
          //先判斷是否存在CompletableFuture這個類,優(yōu)先使用CompletableFuture執(zhí)行任務(wù)
          if (CompletableFuture.class.isAssignableFrom(returnType)) {
          return CompletableFuture.supplyAsync(() -> {
          try {
          return task.call();
          }
          catch (Throwable ex) {
          throw new CompletionException(ex);
          }
          }, executor);
          }
          else if (ListenableFuture.class.isAssignableFrom(returnType)) {
          return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
          }
          else if (Future.class.isAssignableFrom(returnType)) {
          return executor.submit(task);
          }
          else {
          executor.submit(task);
          return null;
          }
          }

          復制代碼

          這里主要是判斷不同的返回值,最終都走進了submit方法,而submit根據(jù)線程池的不同,其實現(xiàn)也有區(qū)別,下面是SimpleAsyncTaskExecutor的實現(xiàn)方式。


          /**
          * Template method for the actual execution of a task.
          * <p>The default implementation creates a new Thread and starts it.
          * @param task the Runnable to execute
          * @see #setThreadFactory
          * @see #createThread
          * @see java.lang.Thread#start()
          */

          protected void doExecute(Runnable task) {
          Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
          thread.start();
          }


          復制代碼

          @Async的默認線程池

          1.使用@Async一定要定義線程池

          在上面的源碼中寫的很清楚,默認情況下spring會先搜索TaskExecutor類型的bean或者名字為taskExecutor的Executor類型的bean,都不存在使 SimpleAsyncTaskExecutor執(zhí)行器。但是這個SimpleAsyncTaskExecutor不是真的線程池,這個類不重用線程,每次調(diào)用都會創(chuàng)建一個新的線程。很有可能導致OOM。





          @SuppressWarnings("serial")
          public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator
          implements AsyncListenableTaskExecutor, Serializable
          {

          /**
          * Permit any number of concurrent invocations: that is, don't throttle concurrency.
          * @see ConcurrencyThrottleSupport#UNBOUNDED_CONCURRENCY
          */

          public static final int UNBOUNDED_CONCURRENCY = ConcurrencyThrottleSupport.UNBOUNDED_CONCURRENCY;

          /**
          * Switch concurrency 'off': that is, don't allow any concurrent invocations.
          * @see ConcurrencyThrottleSupport#NO_CONCURRENCY
          */

          public static final int NO_CONCURRENCY = ConcurrencyThrottleSupport.NO_CONCURRENCY;


          /** Internal concurrency throttle used by this executor. */
          private final ConcurrencyThrottleAdapter concurrencyThrottle = new ConcurrencyThrottleAdapter();

          @Nullable
          private ThreadFactory threadFactory;

          @Nullable
          private TaskDecorator taskDecorator;


          /**
          * Create a new SimpleAsyncTaskExecutor with default thread name prefix.
          */

          public SimpleAsyncTaskExecutor() {
          super();
          }

          /**
          * Create a new SimpleAsyncTaskExecutor with the given thread name prefix.
          * @param threadNamePrefix the prefix to use for the names of newly created threads
          */

          public SimpleAsyncTaskExecutor(String threadNamePrefix) {
          super(threadNamePrefix);
          }

          /**
          * Create a new SimpleAsyncTaskExecutor with the given external thread factory.
          * @param threadFactory the factory to use for creating new Threads
          */

          public SimpleAsyncTaskExecutor(ThreadFactory threadFactory) {
          this.threadFactory = threadFactory;
          }


          /**
          * Specify an external factory to use for creating new Threads,
          * instead of relying on the local properties of this executor.
          * <p>You may specify an inner ThreadFactory bean or also a ThreadFactory reference
          * obtained from JNDI (on a Java EE 6 server) or some other lookup mechanism.
          * @see #setThreadNamePrefix
          * @see #setThreadPriority
          */

          public void setThreadFactory(@Nullable ThreadFactory threadFactory) {
          this.threadFactory = threadFactory;
          }

          /**
          * Return the external factory to use for creating new Threads, if any.
          */

          @Nullable
          public final ThreadFactory getThreadFactory() {
          return this.threadFactory;
          }


          public final void setTaskDecorator(TaskDecorator taskDecorator) {
          this.taskDecorator = taskDecorator;
          }


          //這里可以設(shè)置最大線程數(shù),通過限流去限制線程數(shù)
          public void setConcurrencyLimit(int concurrencyLimit) {
          this.concurrencyThrottle.setConcurrencyLimit(concurrencyLimit);
          }

          /**
          * Return the maximum number of parallel accesses allowed.
          */

          public final int getConcurrencyLimit() {
          return this.concurrencyThrottle.getConcurrencyLimit();
          }

          /**
          * Return whether this throttle is currently active.
          * @return {@code true} if the concurrency limit for this instance is active
          * @see #getConcurrencyLimit()
          * @see #setConcurrencyLimit
          */

          public final boolean isThrottleActive() {
          return this.concurrencyThrottle.isThrottleActive();
          }


          /**
          * Executes the given task, within a concurrency throttle
          * if configured (through the superclass's settings).
          * @see #doExecute(Runnable)
          */

          @Override
          public void execute(Runnable task) {
          execute(task, TIMEOUT_INDEFINITE);
          }

          /**
          * Executes the given task, within a concurrency throttle
          * if configured (through the superclass's settings).
          * <p>Executes urgent tasks (with 'immediate' timeout) directly,
          * bypassing the concurrency throttle (if active). All other
          * tasks are subject to throttling.
          * @see #TIMEOUT_IMMEDIATE
          * @see #doExecute(Runnable)
          */

          //
          @Override
          public void execute(Runnable task, long startTimeout) {
          Assert.notNull(task, "Runnable must not be null");
          Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task);
          if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
          this.concurrencyThrottle.beforeAccess();
          doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
          }
          else {
          doExecute(taskToUse);
          }
          }

          @Override
          public Future<?> submit(Runnable task) {
          FutureTask<Object> future = new FutureTask<>(task, null);
          execute(future, TIMEOUT_INDEFINITE);
          return future;
          }

          @Override
          public <T> Future<T> submit(Callable<T> task) {
          FutureTask<T> future = new FutureTask<>(task);
          execute(future, TIMEOUT_INDEFINITE);
          return future;
          }

          @Override
          public ListenableFuture<?> submitListenable(Runnable task) {
          ListenableFutureTask<Object> future = new ListenableFutureTask<>(task, null);
          execute(future, TIMEOUT_INDEFINITE);
          return future;
          }

          @Override
          public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
          ListenableFutureTask<T> future = new ListenableFutureTask<>(task);
          execute(future, TIMEOUT_INDEFINITE);
          return future;
          }

          /**
          * Template method for the actual execution of a task.
          * <p>The default implementation creates a new Thread and starts it.
          * @param task the Runnable to execute
          * @see #setThreadFactory
          * @see #createThread
          * @see java.lang.Thread#start()
          */

          //判斷是否有工廠,沒有的話調(diào)用父類創(chuàng)建線程
          protected void doExecute(Runnable task) {
          Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
          thread.start();
          }


          /**
          * Subclass of the general ConcurrencyThrottleSupport class,
          * making {@code beforeAccess()} and {@code afterAccess()}
          * visible to the surrounding class.
          */

          private static class ConcurrencyThrottleAdapter extends ConcurrencyThrottleSupport {

          @Override
          protected void beforeAccess() {
          super.beforeAccess();
          }

          @Override
          protected void afterAccess() {
          super.afterAccess();
          }
          }


          /**
          * This Runnable calls {@code afterAccess()} after the
          * target Runnable has finished its execution.
          */

          private class ConcurrencyThrottlingRunnable implements Runnable {

          private final Runnable target;

          public ConcurrencyThrottlingRunnable(Runnable target) {
          this.target = target;
          }

          @Override
          public void run() {
          try {
          this.target.run();
          }
          finally {
          concurrencyThrottle.afterAccess();
          }
          }
          }

          }

          復制代碼

          最主要的就是這段代碼


          /**
          * Template method for the actual execution of a task.
          * <p>The default implementation creates a new Thread and starts it.
          * @param task the Runnable to execute
          * @see #setThreadFactory
          * @see #createThread
          * @see java.lang.Thread#start()
          */

          //判斷是否有工廠,沒有的話調(diào)用父類創(chuàng)建線程
          protected void doExecute(Runnable task) {
          Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
          thread.start();
          }

          復制代碼

          這里并不是用線程池,而是直接創(chuàng)建新的線程,所以會大量創(chuàng)建線程導致OOM。其實這個類是可以通過setConcurrencyLimit設(shè)置最大線程數(shù),通過synchronized和wati and notify去進行限流,這里不展開講。所以結(jié)論是在使用@Async一定要設(shè)置線程池。

          @Async異步失效

          以下代碼已做脫敏處理

          在看公司代碼的時候,發(fā)現(xiàn)這樣一段代碼

              public UserVO saveUser(HttpServletRequest request,
          String source)
          {
          String token = RequestUtils.getToken(request);
          String uid = checkUserLoginReturnUid(token);
          log.info("注冊登錄, token : {}, uid : {}", token, uid);
          //獲取用戶信息
          User User = getLoginUser(uid);
          if(User == null){
          User = new User();
          //獲取用戶信息
          Map<String,String> userMap = redisTemplateMain.getUserMapByToken(token);
          //保存用戶
          saveUser(User, userMap, source);
          sendUserSystem(Integer.valueOf(userMap.get("id")));
          }
          //用戶信息放進緩存
          setAuth2Redis(User);
          return setUser2Redis(User);
          }


          //通知用戶系統(tǒng),我們這邊成功注冊了一個用戶
          @Async
          public void sendUserSystem(Integer userId){
          Map<String,Object> map = new HashMap<>();
          map.put("mainUid", userId);
          map.put("source", "");
          String json = HttpUtil.post(property.userRegisterSendSystem, map);
          log.info("sendUserSystem userId : {}, json : {}", userId, json);
          }

          復制代碼

          在之前我們看源碼的時候已經(jīng)知道了,由于@Async的AdviceMode默認為PROXY,所以當調(diào)用方和被調(diào)用方是在同一個類中,無法產(chǎn)生切面,@Async沒有被Spring容器管理。所以這個方法跑了這么久一直是同步。

          我們可以寫一個方法去測試一下。


          public void asyncInvalid() {
          try {
          log.info("service start");
          asyncInvalidExample();
          log.info("service end");
          }catch (InterruptedException e){
          e.printStackTrace();
          }
          }


          @Async
          public void asyncInvalidExample() throws InterruptedException{
          Thread.sleep(10);
          log.info(Thread.currentThread().getName()+":處理完成");
          }


          復制代碼

          調(diào)用結(jié)果很明顯,沒有進行異步操作,而是同步。

          線程池拒絕導致線程丟失

          既然線程池都已一個緩沖隊列來保存未被消費的任務(wù),那么就一定存在隊列被塞滿,導致線程丟失的情況。我們寫一段代碼模擬一下。

          配置文件

          spring:
          task:
          execution:
          pool:
          # 最大線程數(shù)
          max-size: 16
          # 核心線程數(shù)
          core-size: 16
          # 存活時間
          keep-alive: 10s
          # 隊列大小
          queue-capacity: 100
          # 是否允許核心線程超時
          allow-core-thread-timeout: true
          # 線程名稱前綴
          thread-name-prefix: async-task-

          復制代碼

          異步方法


          @Async
          public void asyncRefuseRun() throws InterruptedException {
          Thread.sleep(5000000);
          }
          復制代碼

          調(diào)用方法



          public void asyncRefuseRun() {
          for (int t = 0;t<2000;t++){
          log.info(""+t);
          try {
          asyncTask.asyncRefuseRun();
          }catch (InterruptedException e){
          e.printStackTrace();
          }
          }
          }

          復制代碼

          這里我循環(huán)了2000個線程,理論上來說當線程到達maxPoolSize + queueCapacity時會進行拒絕,也就是16+100。

          到了116的時候果然拋出了異常java.util.concurrent.RejectedExecutionException。證明線程執(zhí)行了它的拒絕策略。

          要理解線程池的拒絕策略,先來看看它的接口。


          public interface RejectedExecutionHandler {
          void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
          }

          復制代碼

          當線程池出現(xiàn)拒絕的情況,就會來調(diào)用你設(shè)置的拒絕策略,將當前提交的任務(wù)以及線程池實例本身傳遞給你處理。這里建議根據(jù)自己的業(yè)務(wù)場景,去實現(xiàn)拒絕策略。

          當然如果JDK內(nèi)置的實現(xiàn)可以滿足當前業(yè)務(wù),可以直接用jdk實現(xiàn)的。

          AbortPolicy(中止策略)

          這個中止策略就是我們剛剛演示的,觸發(fā)拒絕策略后,直接中止任務(wù),拋出異常,這個也是ThreadPoolExecutor默認實現(xiàn)。

             /**
          * A handler for rejected tasks that throws a
          * {@code RejectedExecutionException}.
          */

          public static class AbortPolicy implements RejectedExecutionHandler {
          /**
          * Creates an {@code AbortPolicy}.
          */

          public AbortPolicy() { }

          /**
          * Always throws RejectedExecutionException.
          *
          * @param r the runnable task requested to be executed
          * @param e the executor attempting to execute this task
          * @throws RejectedExecutionException always
          */

          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
          throw new RejectedExecutionException("Task " + r.toString() +
          " rejected from " +
          e.toString());
          }
          }


          復制代碼

          DiscardPolicy(丟棄策略)

              /**
          * A handler for rejected tasks that silently discards the
          * rejected task.
          */

          public static class DiscardPolicy implements RejectedExecutionHandler {
          /**
          * Creates a {@code DiscardPolicy}.
          */

          public DiscardPolicy() { }

          /**
          * Does nothing, which has the effect of discarding task r.
          *
          * @param r the runnable task requested to be executed
          * @param e the executor attempting to execute this task
          */

          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
          }
          }

          復制代碼

          很明顯,啥也不干,就是一個空實現(xiàn)。

          DiscardOldestPolicy(棄老策略)

              /**
          * A handler for rejected tasks that discards the oldest unhandled
          * request and then retries {@code execute}, unless the executor
          * is shut down, in which case the task is discarded.
          */

          public static class DiscardOldestPolicy implements RejectedExecutionHandler {
          /**
          * Creates a {@code DiscardOldestPolicy} for the given executor.
          */

          public DiscardOldestPolicy() { }

          /**
          * Obtains and ignores the next task that the executor
          * would otherwise execute, if one is immediately available,
          * and then retries execution of task r, unless the executor
          * is shut down, in which case task r is instead discarded.
          *
          * @param r the runnable task requested to be executed
          * @param e the executor attempting to execute this task
          */

          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
          if (!e.isShutdown()) {
          e.getQueue().poll();
          e.execute(r);
          }
          }
          }

          復制代碼

          如果線程池未關(guān)閉,就彈出隊列頭部的元素,然后嘗試執(zhí)行。實際上還是會丟棄任務(wù),如果頭部元素執(zhí)行失敗,就丟棄了。區(qū)別是優(yōu)先丟棄的是老的元素。

          CallerRunsPolicy(調(diào)用者運行策略)

              /**
          * A handler for rejected tasks that runs the rejected task
          * directly in the calling thread of the {@code execute} method,
          * unless the executor has been shut down, in which case the task
          * is discarded.
          */

          public static class CallerRunsPolicy implements RejectedExecutionHandler {
          /**
          * Creates a {@code CallerRunsPolicy}.
          */

          public CallerRunsPolicy() { }

          /**
          * Executes task r in the caller's thread, unless the executor
          * has been shut down, in which case the task is discarded.
          *
          * @param r the runnable task requested to be executed
          * @param e the executor attempting to execute this task
          */

          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
          if (!e.isShutdown()) {
          r.run();
          }
          }
          }

          復制代碼

          當觸發(fā)拒絕策略時,判斷線程池有沒有關(guān)閉,沒有關(guān)閉就由提交任務(wù)的當前線程處理。但是當大量提交后就會阻塞線程,導致性能降低。

          hutool中的線程池拒絕策略實現(xiàn)

          hutool作為我們經(jīng)常使用的一個工具類,也有線程池工具,我們不如來看看它是如何實現(xiàn)的。


          /**
          * 構(gòu)建ThreadPoolExecutor
          *
          * @param builder {@link ExecutorBuilder}
          * @return {@link ThreadPoolExecutor}
          */

          private static ThreadPoolExecutor build(ExecutorBuilder builder) {
          final int corePoolSize = builder.corePoolSize;
          final int maxPoolSize = builder.maxPoolSize;
          final long keepAliveTime = builder.keepAliveTime;
          final BlockingQueue<Runnable> workQueue;
          if (null != builder.workQueue) {
          workQueue = builder.workQueue;
          } else {
          // corePoolSize為0則要使用SynchronousQueue避免無限阻塞
          workQueue = (corePoolSize <= 0) ? new SynchronousQueue<>() : new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY);
          }
          final ThreadFactory threadFactory = (null != builder.threadFactory) ? builder.threadFactory : Executors.defaultThreadFactory();
          RejectedExecutionHandler handler = ObjectUtil.defaultIfNull(builder.handler, new ThreadPoolExecutor.AbortPolicy());

          final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(//
          corePoolSize, //
          maxPoolSize, //
          keepAliveTime, TimeUnit.NANOSECONDS, //
          workQueue, //
          threadFactory, //
          handler//
          );
          if (null != builder.allowCoreThreadTimeOut) {
          threadPoolExecutor.allowCoreThreadTimeOut(builder.allowCoreThreadTimeOut);
          }
          return threadPoolExecutor;
          }

          復制代碼

          可以很清晰的看到,會判斷是否傳入線程池拒絕策略,如果沒有就用默認的AbortPolicy。

          dubbo中的拒絕策略

          public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {

          protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);

          private final String threadName;

          private final URL url;

          private static volatile long lastPrintTime = 0;

          private static Semaphore guard = new Semaphore(1);

          public AbortPolicyWithReport(String threadName, URL url) {
          this.threadName = threadName;
          this.url = url;
          }

          @Override
          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
          String msg = String.format("Thread pool is EXHAUSTED!" +
          " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
          " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
          threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
          e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
          url.getProtocol(), url.getIp(), url.getPort());
          logger.warn(msg);
          dumpJStack();
          throw new RejectedExecutionException(msg);
          }

          private void dumpJStack() {
          //省略實現(xiàn)
          }
          }

          復制代碼

          dubbo的策略實現(xiàn)主要就是想讓開發(fā)人員知道拒絕任務(wù)的情況以及原因。它先輸出了線程池的詳細設(shè)置參數(shù),以及線程池當前的狀態(tài),還有當前拒絕任務(wù)的信息。然后又輸出了當前線程堆棧詳情在dumpJStack中實現(xiàn),最后拋出RejectedExecutionException。

          Netty中的線程池拒絕策略

              private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
          NewThreadRunsPolicy() {
          super();
          }

          public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
          try {
          final Thread t = new Thread(r, "Temporary task executor");
          t.start();
          } catch (Throwable e) {
          throw new RejectedExecutionException(
          "Failed to start a new thread", e);
          }
          }
          }

          復制代碼

          Netty的線程池拒絕策略很像CallerRunsPolicy(調(diào)用者運行策略),都是不會直接丟棄任務(wù)而是繼續(xù)處理任務(wù),不同的地方是CallerRunsPolicy(調(diào)用者運行策略)是在調(diào)用線程繼續(xù)處理,而Netty是new了一個新線程去處理。

          activeMq中的線程池拒絕策略


          new RejectedExecutionHandler() {
          @Override
          public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
          try {
          executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
          } catch (InterruptedException e) {
          throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
          }

          throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
          }
          });

          復制代碼

          activeMq中的策略屬于最大努力執(zhí)行任務(wù)型,當觸發(fā)拒絕策略時,在嘗試一分鐘的時間重新將任務(wù)塞進任務(wù)隊列,當一分鐘超時還沒成功時,就拋出異常。

          監(jiān)控線程池

          在開發(fā)中,我們線程池的運行狀態(tài),線程狀態(tài),對我們來說都非常重要。所以我們應(yīng)該把線程池監(jiān)控起來。我們可以通過擴展beforeExecute、afterExecute和terminated這三個方法去在執(zhí)行前或執(zhí)行后增加一些新的操作。用來記錄線程池的情況。

          方法含義
          shutdown()線程池延遲關(guān)閉時(等待線程池里的任務(wù)都執(zhí)行完畢),統(tǒng)計已執(zhí)行任務(wù)、正在執(zhí)行任務(wù)、未執(zhí)行任務(wù)數(shù)量
          shutdownNow()任務(wù)執(zhí)行之前,記錄任務(wù)開始時間,startTimes這個HashMap以任務(wù)的hashCode為key,開始時間為值
          beforeExecute(Thread t, Runnable r)線程池延遲關(guān)閉時(等待線程池里的任務(wù)都執(zhí)行完畢),統(tǒng)計已執(zhí)行任務(wù)、正在執(zhí)行任務(wù)、未執(zhí)行任務(wù)數(shù)量
          afterExecute(Runnable r, Throwable t)任務(wù)執(zhí)行之后,計算任務(wù)結(jié)束時間。統(tǒng)計任務(wù)耗時、初始線程數(shù)、核心線程數(shù)、正在執(zhí)行的任務(wù)數(shù)量、已完成任務(wù)數(shù)量、任務(wù)總數(shù)、隊列里緩存的任務(wù)數(shù)量、池中存在的最大線程數(shù)、最大允許的線程數(shù)、線程空閑時間、線程池是否關(guān)閉、線程池是否終止信息

          package com.example.threadpool;

          import lombok.extern.slf4j.Slf4j;

          import java.util.Date;
          import java.util.List;
          import java.util.Objects;
          import java.util.concurrent.*;
          import java.util.concurrent.atomic.AtomicInteger;

          /**
          * @author kurtl
          */

          @Slf4j
          public class ThreadPoolMonitor extends ThreadPoolExecutor {


          /**
          * 保存任務(wù)開始執(zhí)行的時間,當任務(wù)結(jié)束時,用任務(wù)結(jié)束時間減去開始時間計算任務(wù)執(zhí)行時間
          */

          private final ConcurrentHashMap<String, Date> startTimes;

          /**
          * 線程池名稱,一般以業(yè)務(wù)名稱命名,方便區(qū)分
          */

          private final String poolName;

          /**
          * 調(diào)用父類的構(gòu)造方法,并初始化HashMap和線程池名稱
          *
          * @param corePoolSize 線程池核心線程數(shù)
          * @param maximumPoolSize 線程池最大線程數(shù)
          * @param keepAliveTime 線程的最大空閑時間
          * @param unit 空閑時間的單位
          * @param workQueue 保存被提交任務(wù)的隊列
          * @param poolName 線程池名稱
          */

          public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
          TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName)
          {
          this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
          Executors.defaultThreadFactory(), poolName);
          }


          /**
          * 調(diào)用父類的構(gòu)造方法,并初始化HashMap和線程池名稱
          *
          * @param corePoolSize 線程池核心線程數(shù)
          * @param maximumPoolSize 線程池最大線程數(shù)
          * @param keepAliveTime 線程的最大空閑時間
          * @param unit 空閑時間的單位
          * @param workQueue 保存被提交任務(wù)的隊列
          * @param threadFactory 線程工廠
          * @param poolName 線程池名稱
          */

          public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
          TimeUnit unit, BlockingQueue<Runnable> workQueue,
          ThreadFactory threadFactory, String poolName)
          {
          super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
          this.startTimes = new ConcurrentHashMap<>();
          this.poolName = poolName;
          }

          /**
          * 線程池延遲關(guān)閉時(等待線程池里的任務(wù)都執(zhí)行完畢),統(tǒng)計線程池情況
          */

          @Override
          public void shutdown() {
          // 統(tǒng)計已執(zhí)行任務(wù)、正在執(zhí)行任務(wù)、未執(zhí)行任務(wù)數(shù)量
          log.info("{} Going to shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
          this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
          super.shutdown();
          }

          /**
          * 線程池立即關(guān)閉時,統(tǒng)計線程池情況
          */

          @Override
          public List<Runnable> shutdownNow() {
          // 統(tǒng)計已執(zhí)行任務(wù)、正在執(zhí)行任務(wù)、未執(zhí)行任務(wù)數(shù)量
          log.info("{} Going to immediately shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
          this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
          return super.shutdownNow();
          }

          /**
          * 任務(wù)執(zhí)行之前,記錄任務(wù)開始時間
          */

          @Override
          protected void beforeExecute(Thread t, Runnable r) {
          startTimes.put(String.valueOf(r.hashCode()), new Date());
          }

          /**
          * 任務(wù)執(zhí)行之后,計算任務(wù)結(jié)束時間
          */

          @Override
          protected void afterExecute(Runnable r, Throwable t) {
          Date startDate = startTimes.remove(String.valueOf(r.hashCode()));
          Date finishDate = new Date();
          long diff = finishDate.getTime() - startDate.getTime();
          // 統(tǒng)計任務(wù)耗時、初始線程數(shù)、核心線程數(shù)、正在執(zhí)行的任務(wù)數(shù)量、
          // 已完成任務(wù)數(shù)量、任務(wù)總數(shù)、隊列里緩存的任務(wù)數(shù)量、池中存在的最大線程數(shù)、
          // 最大允許的線程數(shù)、線程空閑時間、線程池是否關(guān)閉、線程池是否終止
          log.info("{}-pool-monitor: " +
          "Duration: {} ms, PoolSize: {}, CorePoolSize: {}, Active: {}, " +
          "Completed: {}, Task: {}, Queue: {}, LargestPoolSize: {}, " +
          "MaximumPoolSize: {}, KeepAliveTime: {}, isShutdown: {}, isTerminated: {}",
          this.poolName,
          diff, this.getPoolSize(), this.getCorePoolSize(), this.getActiveCount(),
          this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(),
          this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated());
          }

          /**
          * 創(chuàng)建固定線程池,代碼源于Executors.newFixedThreadPool方法,這里增加了poolName
          *
          * @param nThreads 線程數(shù)量
          * @param poolName 線程池名稱
          * @return ExecutorService對象
          */

          public static ExecutorService newFixedThreadPool(int nThreads, String poolName) {
          return new ThreadPoolMonitor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), poolName);
          }

          /**
          * 創(chuàng)建緩存型線程池,代碼源于Executors.newCachedThreadPool方法,這里增加了poolName
          *
          * @param poolName 線程池名稱
          * @return ExecutorService對象
          */

          public static ExecutorService newCachedThreadPool(String poolName) {
          return new ThreadPoolMonitor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), poolName);
          }

          /**
          * 生成線程池所用的線程,只是改寫了線程池默認的線程工廠,傳入線程池名稱,便于問題追蹤
          */

          static class EventThreadFactory implements ThreadFactory {
          private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
          private final ThreadGroup group;
          private final AtomicInteger threadNumber = new AtomicInteger(1);
          private final String namePrefix;

          /**
          * 初始化線程工廠
          *
          * @param poolName 線程池名稱
          */

          EventThreadFactory(String poolName) {
          SecurityManager s = System.getSecurityManager();
          group = Objects.nonNull(s) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
          namePrefix = poolName + "-pool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
          }

          @Override
          public Thread newThread(Runnable r) {
          Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
          if (t.isDaemon())
          t.setDaemon(false);
          if (t.getPriority() != Thread.NORM_PRIORITY)
          t.setPriority(Thread.NORM_PRIORITY);
          return t;
          }
          }
          }


          復制代碼

          線程池負載關(guān)注的核心問題是:基于當前線程池參數(shù)分配的資源夠不夠。對于這個問題,我們可以從事前和事中兩個角度來看。事前,線程池定義了“活躍度”這個概念,來讓用戶在發(fā)生Reject異常之前能夠感知線程池負載問題,線程池活躍度計算公式為:線程池活躍度 = activeCount/maximumPoolSize。這個公式代表當活躍線程數(shù)趨向于maximumPoolSize的時候,代表線程負載趨高。事中,也可以從兩方面來看線程池的過載判定條件,一個是發(fā)生了Reject異常,一個是隊列中有等待任務(wù)(支持定制閾值)。以上兩種情況發(fā)生了都會觸發(fā)告警,告警信息會通過大象推送給服務(wù)所關(guān)聯(lián)的負責人?!缊F技術(shù)文檔

          核心線程數(shù) 最大線程數(shù) 如何配置

          如何合理的配置線程池參數(shù),比較普遍的說法是。

          IO密集型 = 2Ncpu(可以測試后自己控制大小,2Ncpu一般沒問題)(常出現(xiàn)于線程中:數(shù)據(jù)庫數(shù)據(jù)交互、文件上傳下載、網(wǎng)絡(luò)數(shù)據(jù)傳輸?shù)鹊龋?/p>

          計算密集型 = Ncpu(常出現(xiàn)于線程中:復雜算法)

          而這種方案沒有考慮多線程池的情況,實際使用上也有偏離。

          圖來自美團技術(shù)博客

          所以參數(shù)的設(shè)置應(yīng)該根據(jù)自己實際的應(yīng)用場景定制。

          多線程池的使用

          一般在實際業(yè)務(wù)中,我們會定義不同的線程池來處理不同的業(yè)務(wù)。利用我們之前完成的ThreadPoolMonitor可以很快的定義不同的線程。

          ThreadPoolConfig


          @EnableAsync
          @Configuration
          public class ThreadPoolConfig {

          @Bean
          public ThreadPoolExecutor test01(){
          return new ThreadPoolMonitor(16,32,60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100),"test01");
          }

          @Bean
          public ThreadPoolExecutor test02(){
          return new ThreadPoolMonitor(8,16,60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100),"test02");
          }
          }


          作者:西西弗的石頭
          鏈接:https://juejin.cn/post/6976893903223914527
          來源:掘金
          著作權(quán)歸作者所有。商業(yè)轉(zhuǎn)載請聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請注明出處。



          瀏覽 103
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产18第一无限资源网站 | 热久久最新视频 | 豆花视频网页 | 伊人婷婷色五月色婷婷区 | 在线免费黄色视频 |