Spring-Retry重試實現(xiàn)原理
閱讀本文大概需要 10.5 分鐘。
來自:http://r6d.cn/LJJN
概要
背景
使用介紹
基本使用
@Configuration??
@EnableRetry??
public?class?Application?{??
??
????@Bean??
????public?RetryService?retryService(){??
????????return?new?RetryService();??
????}??
??
????public?static?void?main(String[]?args)?throws?Exception{??
????????ApplicationContext?applicationContext?=?new?AnnotationConfigApplicationContext("springretry");??
????????RetryService?service1?=?applicationContext.getBean("service",?RetryService.class);??
????????service1.service();??
????}??
}??
??
@Service("service")??
public?class?RetryService?{??
??
????@Retryable(value?=?IllegalAccessException.class,?maxAttempts?=?5,??
????????????backoff=?@Backoff(value?=?1500,?maxDelay?=?100000,?multiplier?=?1.2))??
????public?void?service()?throws?IllegalAccessException?{??
????????System.out.println("service?method...");??
????????throw?new?IllegalAccessException("manual?exception");??
????}??
??
????@Recover??
????public?void?recover(IllegalAccessException?e){??
????????System.out.println("service?retry?after?Recover?=>?"?+?e.getMessage());??
????}??
??
}??
重試策略

SimpleRetryPolicy 默認最多重試3次
TimeoutRetryPolicy 默認在1秒內失敗都會重試
ExpressionRetryPolicy 符合表達式就會重試
CircuitBreakerRetryPolicy 增加了熔斷的機制,如果不在熔斷狀態(tài),則允許重試
CompositeRetryPolicy 可以組合多個重試策略
NeverRetryPolicy 從不重試(也是一種重試策略哈)
AlwaysRetryPolicy 總是重試
退避策略

FixedBackOffPolicy 默認固定延遲1秒后執(zhí)行下一次重試
ExponentialBackOffPolicy 指數(shù)遞增延遲執(zhí)行重試,默認初始0.1秒,系數(shù)是2,那么下次延遲0.2秒,再下次就是延遲0.4秒,如此類推,最大30秒。
ExponentialRandomBackOffPolicy 在上面那個策略上增加隨機性
UniformRandomBackOffPolicy 這個跟上面的區(qū)別就是,上面的延遲會不停遞增,這個只會在固定的區(qū)間隨機
StatelessBackOffPolicy 這個說明是無狀態(tài)的,所謂無狀態(tài)就是對上次的退避無感知,從它下面的子類也能看出來
原理
切入點
@EnableRetry
@Target(ElementType.TYPE)??
@Retention(RetentionPolicy.RUNTIME)??
@EnableAspectJAutoProxy(proxyTargetClass?=?false)??
@Import(RetryConfiguration.class)??
@Documented??
public?@interface?EnableRetry?{??
??
?/**??
??*?Indicate?whether?subclass-based?(CGLIB)?proxies?are?to?be?created?as?opposed??
??*?to?standard?Java?interface-based?proxies.?The?default?is?{@code?false}.??
??*??
??*?@return?whether?to?proxy?or?not?to?proxy?the?class??
??*/??
?boolean?proxyTargetClass()?default?false;??
??
}??
@EnableAspectJAutoProxy(proxyTargetClass = false)這個并不陌生,就是打開Spring AOP功能。重點看看@Import(RetryConfiguration.class)@Import相當于注冊這個BeanRetryConfiguration是個什么東西
@PostConstruct??
?public?void?init()?{??
??Set>?retryableAnnotationTypes?=?new?LinkedHashSet >(1);??
??retryableAnnotationTypes.add(Retryable.class);??
????????//創(chuàng)建pointcut??
??this.pointcut?=?buildPointcut(retryableAnnotationTypes);??
????????//創(chuàng)建advice??
??this.advice?=?buildAdvice();??
??if?(this.advice?instanceof?BeanFactoryAware)?{??
???((BeanFactoryAware)?this.advice).setBeanFactory(beanFactory);??
??}??
?}??
protected?Pointcut?buildPointcut(Set
>?retryAnnotationTypes) ?{??
??ComposablePointcut?result?=?null;??
??for?(Class?extends?Annotation>?retryAnnotationType?:?retryAnnotationTypes)?{??
???Pointcut?filter?=?new?AnnotationClassOrMethodPointcut(retryAnnotationType);??
???if?(result?==?null)?{??
????result?=?new?ComposablePointcut(filter);??
???}??
???else?{??
????result.union(filter);??
???}??
??}??
??return?result;??
?}??
//創(chuàng)建advice對象,即攔截器??
???protected?Advice?buildAdvice()?{??
????//下面關注這個對象??
?AnnotationAwareRetryOperationsInterceptor?interceptor?=?new?AnnotationAwareRetryOperationsInterceptor();??
?if?(retryContextCache?!=?null)?{??
??interceptor.setRetryContextCache(retryContextCache);??
?}??
?if?(retryListeners?!=?null)?{??
??interceptor.setListeners(retryListeners);??
?}??
?if?(methodArgumentsKeyGenerator?!=?null)?{??
??interceptor.setKeyGenerator(methodArgumentsKeyGenerator);??
?}??
?if?(newMethodArgumentsIdentifier?!=?null)?{??
??interceptor.setNewItemIdentifier(newMethodArgumentsIdentifier);??
?}??
?if?(sleeper?!=?null)?{??
??interceptor.setSleeper(sleeper);??
?}??
?return?interceptor;??
}??
AnnotationAwareRetryOperationsInterceptor

@Override??
?public?Object?invoke(MethodInvocation?invocation)?throws?Throwable?{??
??MethodInterceptor?delegate?=?getDelegate(invocation.getThis(),?invocation.getMethod());??
??if?(delegate?!=?null)?{??
???return?delegate.invoke(invocation);??
??}??
??else?{??
???return?invocation.proceed();??
??}??
?}??
private?MethodInterceptor?getDelegate(Object?target,?Method?method)?{??
??if?(!this.delegates.containsKey(target)?||?!this.delegates.get(target).containsKey(method))?{??
???synchronized?(this.delegates)?{??
????if?(!this.delegates.containsKey(target))?{??
?????this.delegates.put(target,?new?HashMap());??
????}??
????Map?delegatesForTarget?=?this.delegates.get(target);??
????if?(!delegatesForTarget.containsKey(method))?{??
?????Retryable?retryable?=?AnnotationUtils.findAnnotation(method,?Retryable.class);??
?????if?(retryable?==?null)?{??
??????retryable?=?AnnotationUtils.findAnnotation(method.getDeclaringClass(),?Retryable.class);??
?????}??
?????if?(retryable?==?null)?{??
??????retryable?=?findAnnotationOnTarget(target,?method);??
?????}??
?????if?(retryable?==?null)?{??
??????return?delegatesForTarget.put(method,?null);??
?????}??
?????MethodInterceptor?delegate;??
?????//支持自定義MethodInterceptor,而且優(yōu)先級最高??
?????if?(StringUtils.hasText(retryable.interceptor()))?{??
??????delegate?=?this.beanFactory.getBean(retryable.interceptor(),?MethodInterceptor.class);??
?????}??
?????else?if?(retryable.stateful())?{??
?????????????????????//得到“有狀態(tài)”的interceptor??
??????delegate?=?getStatefulInterceptor(target,?method,?retryable);??
?????}??
?????else?{??
?????????????????????//得到“無狀態(tài)”的interceptor??
??????delegate?=?getStatelessInterceptor(target,?method,?retryable);??
?????}??
?????delegatesForTarget.put(method,?delegate);??
????}??
???}??
??}??
??return?this.delegates.get(target).get(method);??
?}??
private?MethodInterceptor?getStatelessInterceptor(Object?target,?Method?method,?Retryable?retryable)?{??
??//生成一個RetryTemplate??
??RetryTemplate?template?=?createTemplate(retryable.listeners());??
??//生成retryPolicy??
??template.setRetryPolicy(getRetryPolicy(retryable));??
??//生成backoffPolicy??
??template.setBackOffPolicy(getBackoffPolicy(retryable.backoff()));??
??return?RetryInterceptorBuilder.stateless()??
????.retryOperations(template)??
????.label(retryable.label())??
????.recoverer(getRecoverer(target,?method))??
????.build();??
?}??
RetryOperationsInterceptor。RetryOperationsInterceptor也是一個MethodInterceptor,我們來看看它的invoke方法。public?Object?invoke(final?MethodInvocation?invocation)?throws?Throwable?{??
??
??String?name;??
??if?(StringUtils.hasText(label))?{??
???name?=?label;??
??}?else?{??
???name?=?invocation.getMethod().toGenericString();??
??}??
??final?String?label?=?name;??
??
??//定義了一個RetryCallback,其實看它的doWithRetry方法,調用了invocation的proceed()方法,是不是有點眼熟,這就是AOP的攔截鏈調用,如果沒有攔截鏈,那就是對原來方法的調用。??
??RetryCallback
RetryOperationsInterceptor還是StatefulRetryOperationsInterceptor,最終的攔截處理邏輯還是調用到RetryTemplate的execute方法,從名字也看出來,RetryTemplate作為一個模板類,里面包含了重試統(tǒng)一邏輯。不過,我看這個RetryTemplate并不是很“模板”,因為它沒有很多可以擴展的地方。重試邏輯及策略實現(xiàn)
protected?
?T?doExecute(RetryCallback ?retryCallback,?? ??
???RecoveryCallback?recoveryCallback,?RetryState?state)
???throws?E,?ExhaustedRetryException?{??
??
??RetryPolicy?retryPolicy?=?this.retryPolicy;??
??BackOffPolicy?backOffPolicy?=?this.backOffPolicy;??
??
??//新建一個RetryContext來保存本輪重試的上下文??
??RetryContext?context?=?open(retryPolicy,?state);??
??if?(this.logger.isTraceEnabled())?{??
???this.logger.trace("RetryContext?retrieved:?"?+?context);??
??}??
??
??//?Make?sure?the?context?is?available?globally?for?clients?who?need??
??//?it...??
??RetrySynchronizationManager.register(context);??
??
??Throwable?lastException?=?null;??
??
??boolean?exhausted?=?false;??
??try?{??
??
???//如果有注冊RetryListener,則會調用它的open方法,給調用者一個通知。??
???boolean?running?=?doOpenInterceptors(retryCallback,?context);??
??
???if?(!running)?{??
????throw?new?TerminatedRetryException(??
??????"Retry?terminated?abnormally?by?interceptor?before?first?attempt");??
???}??
??
???//?Get?or?Start?the?backoff?context...??
???BackOffContext?backOffContext?=?null;??
???Object?resource?=?context.getAttribute("backOffContext");??
??
???if?(resource?instanceof?BackOffContext)?{??
????backOffContext?=?(BackOffContext)?resource;??
???}??
??
???if?(backOffContext?==?null)?{??
????backOffContext?=?backOffPolicy.start(context);??
????if?(backOffContext?!=?null)?{??
?????context.setAttribute("backOffContext",?backOffContext);??
????}??
???}??
??
???//判斷能否重試,就是調用RetryPolicy的canRetry方法來判斷。??
???//這個循環(huán)會直到原方法不拋出異常,或不需要再重試??
???while?(canRetry(retryPolicy,?context)?&&?!context.isExhaustedOnly())?{??
??
????try?{??
?????if?(this.logger.isDebugEnabled())?{??
??????this.logger.debug("Retry:?count="?+?context.getRetryCount());??
?????}??
?????//清除上次記錄的異常??
?????lastException?=?null;??
?????//doWithRetry方法,一般來說就是原方法??
?????return?retryCallback.doWithRetry(context);??
????}??
????catch?(Throwable?e)?{??
?????//原方法拋出了異常??
?????lastException?=?e;??
??
?????try?{??
??????//記錄異常信息??
??????registerThrowable(retryPolicy,?state,?context,?e);??
?????}??
?????catch?(Exception?ex)?{??
??????throw?new?TerminatedRetryException("Could?not?register?throwable",??
????????ex);??
?????}??
?????finally?{??
??????//調用RetryListener的onError方法??
??????doOnErrorInterceptors(retryCallback,?context,?e);??
?????}??
?????//再次判斷能否重試??
?????if?(canRetry(retryPolicy,?context)?&&?!context.isExhaustedOnly())?{??
??????try?{??
???????//如果可以重試則走退避策略??
???????backOffPolicy.backOff(backOffContext);??
??????}??
??????catch?(BackOffInterruptedException?ex)?{??
???????lastException?=?e;??
???????//?back?off?was?prevented?by?another?thread?-?fail?the?retry??
???????if?(this.logger.isDebugEnabled())?{??
????????this.logger??
??????????.debug("Abort?retry?because?interrupted:?count="??
????????????+?context.getRetryCount());??
???????}??
???????throw?ex;??
??????}??
?????}??
??
?????if?(this.logger.isDebugEnabled())?{??
??????this.logger.debug(??
????????"Checking?for?rethrow:?count="?+?context.getRetryCount());??
?????}??
??
?????if?(shouldRethrow(retryPolicy,?context,?state))?{??
??????if?(this.logger.isDebugEnabled())?{??
???????this.logger.debug("Rethrow?in?retry?for?policy:?count="??
?????????+?context.getRetryCount());??
??????}??
??????throw?RetryTemplate.wrapIfNecessary(e);??
?????}??
??
????}??
??
????/*??
?????*?A?stateful?attempt?that?can?retry?may?rethrow?the?exception?before?now,??
?????*?but?if?we?get?this?far?in?a?stateful?retry?there's?a?reason?for?it,??
?????*?like?a?circuit?breaker?or?a?rollback?classifier.??
?????*/??
????if?(state?!=?null?&&?context.hasAttribute(GLOBAL_STATE))?{??
?????break;??
????}??
???}??
??
???if?(state?==?null?&&?this.logger.isDebugEnabled())?{??
????this.logger.debug(??
??????"Retry?failed?last?attempt:?count="?+?context.getRetryCount());??
???}??
??
???exhausted?=?true;??
???//重試結束后如果有兜底Recovery方法則執(zhí)行,否則拋異常??
???return?handleRetryExhausted(recoveryCallback,?context,?state);??
??
??}??
??catch?(Throwable?e)?{??
???throw?RetryTemplate.wrapIfNecessary(e);??
??}??
??finally?{??
???//處理一些關閉邏輯??
???close(retryPolicy,?context,?state,?lastException?==?null?||?exhausted);??
???//調用RetryListener的close方法??
???doCloseInterceptors(retryCallback,?context,?lastException);??
???RetrySynchronizationManager.clear();??
??}??
??
?}??
getStatelessInterceptor方法中的getRetryPolicy和getRetryPolicy方法。private?RetryPolicy?getRetryPolicy(Annotation?retryable)?{??
??Map?attrs?=?AnnotationUtils.getAnnotationAttributes(retryable);??
??@SuppressWarnings("unchecked")??
??Class?extends?Throwable>[]?includes?=?(Class?extends?Throwable>[])?attrs.get("value");??
??String?exceptionExpression?=?(String)?attrs.get("exceptionExpression");??
??boolean?hasExpression?=?StringUtils.hasText(exceptionExpression);??
??if?(includes.length?==?0)?{??
???@SuppressWarnings("unchecked")??
???Class?extends?Throwable>[]?value?=?(Class?extends?Throwable>[])?attrs.get("include");??
???includes?=?value;??
??}??
??@SuppressWarnings("unchecked")??
??Class?extends?Throwable>[]?excludes?=?(Class?extends?Throwable>[])?attrs.get("exclude");??
??Integer?maxAttempts?=?(Integer)?attrs.get("maxAttempts");??
??String?maxAttemptsExpression?=?(String)?attrs.get("maxAttemptsExpression");??
??if?(StringUtils.hasText(maxAttemptsExpression))?{??
???maxAttempts?=?PARSER.parseExpression(resolve(maxAttemptsExpression),?PARSER_CONTEXT)??
?????.getValue(this.evaluationContext,?Integer.class);??
??}??
??if?(includes.length?==?0?&&?excludes.length?==?0)?{??
???SimpleRetryPolicy?simple?=?hasExpression???new?ExpressionRetryPolicy(resolve(exceptionExpression))??
???????????????.withBeanFactory(this.beanFactory)??
??????????????:?new?SimpleRetryPolicy();??
???simple.setMaxAttempts(maxAttempts);??
???return?simple;??
??}??
??Map,?Boolean>?policyMap?=?new?HashMap ,?Boolean>();??
??for?(Class?extends?Throwable>?type?:?includes)?{??
???policyMap.put(type,?true);??
??}??
??for?(Class?extends?Throwable>?type?:?excludes)?{??
???policyMap.put(type,?false);??
??}??
??boolean?retryNotExcluded?=?includes.length?==?0;??
??if?(hasExpression)?{??
???return?new?ExpressionRetryPolicy(maxAttempts,?policyMap,?true,?exceptionExpression,?retryNotExcluded)??
?????.withBeanFactory(this.beanFactory);??
??}??
??else?{??
???return?new?SimpleRetryPolicy(maxAttempts,?policyMap,?true,?retryNotExcluded);??
??}??
?}??
private?BackOffPolicy?getBackoffPolicy(Backoff?backoff)?{??
??long?min?=?backoff.delay()?==?0???backoff.value()?:?backoff.delay();??
??if?(StringUtils.hasText(backoff.delayExpression()))?{??
???min?=?PARSER.parseExpression(resolve(backoff.delayExpression()),?PARSER_CONTEXT)??
?????.getValue(this.evaluationContext,?Long.class);??
??}??
??long?max?=?backoff.maxDelay();??
??if?(StringUtils.hasText(backoff.maxDelayExpression()))?{??
???max?=?PARSER.parseExpression(resolve(backoff.maxDelayExpression()),?PARSER_CONTEXT)??
?????.getValue(this.evaluationContext,?Long.class);??
??}??
??double?multiplier?=?backoff.multiplier();??
??if?(StringUtils.hasText(backoff.multiplierExpression()))?{??
???multiplier?=?PARSER.parseExpression(resolve(backoff.multiplierExpression()),?PARSER_CONTEXT)??
?????.getValue(this.evaluationContext,?Double.class);??
??}??
??if?(multiplier?>?0)?{??
???ExponentialBackOffPolicy?policy?=?new?ExponentialBackOffPolicy();??
???if?(backoff.random())?{??
????policy?=?new?ExponentialRandomBackOffPolicy();??
???}??
???policy.setInitialInterval(min);??
???policy.setMultiplier(multiplier);??
???policy.setMaxInterval(max?>?min???max?:?ExponentialBackOffPolicy.DEFAULT_MAX_INTERVAL);??
???if?(this.sleeper?!=?null)?{??
????policy.setSleeper(this.sleeper);??
???}??
???return?policy;??
??}??
??if?(max?>?min)?{??
???UniformRandomBackOffPolicy?policy?=?new?UniformRandomBackOffPolicy();??
???policy.setMinBackOffPeriod(min);??
???policy.setMaxBackOffPeriod(max);??
???if?(this.sleeper?!=?null)?{??
????policy.setSleeper(this.sleeper);??
???}??
???return?policy;??
??}??
??FixedBackOffPolicy?policy?=?new?FixedBackOffPolicy();??
??policy.setBackOffPeriod(min);??
??if?(this.sleeper?!=?null)?{??
???policy.setSleeper(this.sleeper);??
??}??
??return?policy;??
?}??
@Override??
?public?boolean?canRetry(RetryContext?context)?{??
??Throwable?t?=?context.getLastThrowable();??
??//判斷拋出的異常是否符合重試的異常??
??//還有,是否超過了重試的次數(shù)??
??return?(t?==?null?||?retryForException(t))?&&?context.getRetryCount()??}??
protected?void?doBackOff()?throws?BackOffInterruptedException?{??
??try?{??
???//就是sleep固定的時間??
???sleeper.sleep(backOffPeriod);??
??}??
??catch?(InterruptedException?e)?{??
???throw?new?BackOffInterruptedException("Thread?interrupted?while?sleeping",?e);??
??}??
?}??
RetryContext

總結
推薦閱讀:
Intellij IDEA 竟然把 Java8 的數(shù)據(jù)流問題這么完美的解決掉了!
微信掃描二維碼,關注我的公眾號
朕已閱?

