數(shù)據(jù)庫異常導(dǎo)致eureka預(yù)設(shè)問題排查
來源:SegmentFault 思否社區(qū)
作者:無名?
基于
spring-cloud-Greenwich.RELEASE
spring-boot-2.1.3.RELEASE
spring-boot-starter-actuator-2.1.3.RELEASE
Spring-cloud-netflix-eureka-client-2.1.0.RELEASE
背景
線上請(qǐng)求項(xiàng)目接口,spring-cloud-gateway返回404,排查發(fā)現(xiàn)是gateway無法從eureka-server獲取到項(xiàng)目有效的注冊(cè)信息。同時(shí)當(dāng)時(shí)由于網(wǎng)絡(luò)問題,項(xiàng)目無法連上數(shù)據(jù)庫。但是這次出現(xiàn)的網(wǎng)絡(luò)問題,可能影響到項(xiàng)目與數(shù)據(jù)庫的連接,并不影響項(xiàng)目與eureka-server的連接。
通過日志,看到項(xiàng)目一直在對(duì)數(shù)據(jù)庫做健康檢測(cè),并且因?yàn)闊o法連上而一直有異常日志,同時(shí)看到了Eureka下線通知的日志Saw local status change event DOWN,而這兩個(gè)日志都是在同一個(gè)線程里打印的,線程串聯(lián)DiscoveryClient-InstanceInfoReplicator-0,既然是同一個(gè)線程,那說明是兩者之間必然有關(guān)聯(lián)。
那是什么原因?qū)е耬ureka-server沒有項(xiàng)目的注冊(cè)信息?這個(gè)要從Eureka-Client的健康檢測(cè)說起。
健康監(jiān)測(cè)
按照常規(guī),要了解原理,就從閱讀二進(jìn)制入手。
Eureke-client的初始化基本上都是在DiscoveryClient類內(nèi)完成的,包括啟動(dòng)健康監(jiān)測(cè)定時(shí)任務(wù)。
public class DiscoveryClient implements EurekaClient {private void initScheduledTasks() {……if (clientConfig.shouldRegisterWithEureka()) {……// InstanceInfo replicatorinstanceInfoReplicator = new InstanceInfoReplicator(this,instanceInfo,clientConfig.getInstanceInfoReplicationIntervalSeconds(),2); // burstSize……instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());} else {logger.info("Not registering with Eureka server per configuration");}}}
在InstanceInfoReplicator內(nèi)接通本節(jié)能檢查系統(tǒng)健康并刷新當(dāng)前Eureka-client節(jié)點(diǎn)狀態(tài)。
class InstanceInfoReplicator implements Runnable {public void run() {try {discoveryClient.refreshInstanceInfo();……} catch (Throwable t) {logger.warn("There was a problem with the instance info replicator", t);} finally {Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);scheduledPeriodicRef.set(next);}}}public class DiscoveryClient implements EurekaClient {void refreshInstanceInfo() {……InstanceStatus status;try {status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());} catch (Exception e) {logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);status = InstanceStatus.DOWN;}if (null != status) {applicationInfoManager.setInstanceStatus(status);}}}
這里通過HealthCheckHandler獲取instanceInfo的status并修改節(jié)點(diǎn)狀態(tài)和下發(fā)事件通知,如果獲取到的status是DOWN,那這時(shí)候事件監(jiān)聽器就打印了我們?cè)陂_頭看到的日志,并且上報(bào)給Eureka-server的中斷狀態(tài)也是DOWN,最終導(dǎo)致此問題的出現(xiàn):網(wǎng)關(guān)無法從Eureka-server獲取到狀態(tài)為UP的路由器。
public class ApplicationInfoManager {public synchronized void setInstanceStatus(InstanceStatus status) {InstanceStatus next = instanceStatusMapper.map(status);if (next == null) {return;}InstanceStatus prev = instanceInfo.setStatus(next);if (prev != null) {for (StatusChangeListener listener : listeners.values()) {try {listener.notify(new StatusChangeEvent(prev, next));} catch (Exception e) {logger.warn("failed to notify listener: {}", listener.getId(), e);}}}}}public class DiscoveryClient implements EurekaClient {private void initScheduledTasks() {……if (clientConfig.shouldRegisterWithEureka()) {……statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {@Overridepublic String getId() {return "statusChangeListener";}@Overridepublic void notify(StatusChangeEvent statusChangeEvent) {if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {// log at warn level if DOWN was involvedlogger.warn("Saw local status change event {}", statusChangeEvent);} else {logger.info("Saw local status change event {}", statusChangeEvent);}instanceInfoReplicator.onDemandUpdate();}};} else {logger.info("Not registering with Eureka server per configuration");}}}
這里的重點(diǎn)就是DiscoveryClient的
getHealthCheckHandler().getStatus(instanceInfo.getStatus())是怎么獲取到值的?
getHealthCheckHandler返回的是EurekaHealthCheckHandler,繼續(xù)跟進(jìn)原始碼進(jìn)入到EurekaHealthCheckHandler類。
public class EurekaHealthCheckHandler implements HealthCheckHandler, ApplicationContextAware, InitializingBean {private final CompositeHealthIndicator healthIndicator;@Overridepublic void afterPropertiesSet() throws Exception {final MaphealthIndicators = applicationContext.getBeansOfType(HealthIndicator.class); for (Map.Entryentry : healthIndicators.entrySet()) { //ignore EurekaHealthIndicator and flatten the rest of the composite//otherwise there is a never ending cycle of down. See gh-643if (entry.getValue() instanceof DiscoveryCompositeHealthIndicator) {DiscoveryCompositeHealthIndicator indicator = (DiscoveryCompositeHealthIndicator) entry.getValue();for (DiscoveryCompositeHealthIndicator.Holder holder : indicator.getHealthIndicators()) {if (!(holder.getDelegate() instanceof EurekaHealthIndicator)) {healthIndicator.addHealthIndicator(holder.getDelegate().getName(), holder);}}}else {healthIndicator.addHealthIndicator(entry.getKey(), entry.getValue());}}}}
在afterPropertiesSet方法內(nèi)部通過applicationContext.getBeansOfType獲取到所有的健康檢測(cè)類HealthIndicator。
注:applicationContext.getBeansOfType方法是通過遍歷BeanDefinition獲取所有beanName,然后遍歷beanName,確定如果當(dāng)前beanName未創(chuàng)建實(shí)例替換創(chuàng)建了對(duì)應(yīng)的Bean對(duì)象實(shí)例。則會(huì)applicationContext.getBeansOfType確保將指定類型的所有的Bean對(duì)象都創(chuàng)造好。
public class EurekaHealthCheckHandler implements HealthCheckHandler, ApplicationContextAware, InitializingBean {public InstanceStatus getStatus(InstanceStatus instanceStatus) {return getHealthStatus();}protected InstanceStatus getHealthStatus() {final Status status = getHealthIndicator().health().getStatus();return mapToInstanceStatus(status);}protected CompositeHealthIndicator getHealthIndicator() {return healthIndicator;}}
調(diào)用CompositeHealthIndicator的health方法獲取狀態(tài),從前面的afterPropertiesSet方法可以看到,CompositeHealthIndicator是一個(gè)HealthIndicator合集。
public class CompositeHealthIndicator implements HealthIndicator {public void addHealthIndicator(String name, HealthIndicator indicator) {this.registry.register(name, indicator);}@Overridepublic Health health() {Maphealths = new LinkedHashMap<>(); for (Map.Entryentry : this.registry.getAll() .entrySet()) {healths.put(entry.getKey(), entry.getValue().health());}return this.aggregator.aggregate(healths);}}public class OrderedHealthAggregator extends AbstractHealthAggregator {public OrderedHealthAggregator() {setStatusOrder(Status.DOWN, Status.OUT_OF_SERVICE, Status.UP, Status.UNKNOWN);}public void setStatusOrder(Status... statusOrder) {String[] order = new String[statusOrder.length];for (int i = 0; i < statusOrder.length; i++) {order[i] = statusOrder[i].getCode();}setStatusOrder(Arrays.asList(order));}@Overridepublic final Health aggregate(Maphealths) { ListstatusCandidates = healths.values().stream().map(Health::getStatus) .collect(Collectors.toList());Status status = aggregateStatus(statusCandidates);Mapdetails = aggregateDetails(healths); return new Health.Builder(status, details).build();}protected Status aggregateStatus(Listcandidates) { // Only sort those status instances that we know aboutListfilteredCandidates = new ArrayList<>(); for (Status candidate : candidates) {if (this.statusOrder.contains(candidate.getCode())) {filteredCandidates.add(candidate);}}// If no status is given return UNKNOWNif (filteredCandidates.isEmpty()) {return Status.UNKNOWN;}// Sort given Status instances by configured orderfilteredCandidates.sort(new StatusComparator(this.statusOrder));return filteredCandidates.get(0);}private class StatusComparator implements Comparator{ private final ListstatusOrder; StatusComparator(ListstatusOrder) { this.statusOrder = statusOrder;}@Overridepublic int compare(Status s1, Status s2) {int i1 = this.statusOrder.indexOf(s1.getCode());int i2 = this.statusOrder.indexOf(s2.getCode());return (i1 < i2) ? -1 : (i1 != i2) ? 1 : s1.getCode().compareTo(s2.getCode());}}}
CompositeHealthIndicator的health是遍歷所有HealthIndicator,調(diào)用HealthIndicator的健康監(jiān)測(cè)health方法獲取status。再將status根據(jù)DOWN->OUT_OF_SERVICE->UP->UNKNOWN的順序排序并獲取第一個(gè)狀態(tài)(如果有例程狀態(tài)為DOWN,那獲取的結(jié)果就是DOWN)。
public class EurekaHealthCheckHandler implements HealthCheckHandler, ApplicationContextAware, InitializingBean {private static final MapSTATUS_MAPPING = new HashMap() {{ put(Status.UNKNOWN, InstanceStatus.UNKNOWN);put(Status.OUT_OF_SERVICE, InstanceStatus.OUT_OF_SERVICE);put(Status.DOWN, InstanceStatus.DOWN);put(Status.UP, InstanceStatus.UP);}};protected InstanceStatus mapToInstanceStatus(Status status) {if (!STATUS_MAPPING.containsKey(status)) {return InstanceStatus.UNKNOWN;}return STATUS_MAPPING.get(status);}}
最后將通用狀態(tài)STATUS映射成Eureka的例程實(shí)例狀態(tài)InstanceStatus,并修改自身的狀態(tài)。
總結(jié)
Eureka-client通過接通本節(jié)能所有的HealthIndicator的health方法對(duì)應(yīng)電子雜志的健康檢查狀態(tài),有如果HealthIndicator檢測(cè)查詢查詢結(jié)果為DOWN,那Eureka-client就會(huì)判定當(dāng)前服務(wù)有問題,是不可用的,就會(huì)將自身狀態(tài)設(shè)置為DOWN,并上報(bào)給Eureka-server。Eureka-server收到信息之后將該節(jié)點(diǎn)狀態(tài)標(biāo)識(shí)為DOWN,這樣其他服務(wù)就無法從Eureka-server獲取到該計(jì)數(shù)器。
本次事故的原因就是因?yàn)镈ataSourceHealthIndicator檢查的結(jié)果是DOWN,導(dǎo)致Eureka-client的狀態(tài)也有所改變DOWN。
擴(kuò)展
如果項(xiàng)目有某個(gè)重要的功能,一旦這個(gè)功能出問題就希望能將當(dāng)前例程下線,那就可以添加自定義HealthIndicator類,并在health方法檢查改功能是否正常。 可以通過接口+HealthIndicator實(shí)現(xiàn)控制服務(wù)上下線:
@RestController@RequestMapping("/healthIndicator")public class MyHealthIndicator implements HealthIndicator {private boolean up;@GetMapping("setUpVal/{up}")public void setUpVal(@PathVariable("up") boolean up) {this.up = up;}@Overridepublic Health health() {if (up) {return Health.up().build();}return Health.down().build();}public MyHealthIndicator setUp(boolean up) {this.up = up;return this;}}
以上可以通過調(diào)用接口/healthIndicator/setUpVal/false來手動(dòng)下線當(dāng)前服務(wù)中斷。
點(diǎn)擊左下角閱讀原文,到?SegmentFault 思否社區(qū)?和文章作者展開更多互動(dòng)和交流。
-?END -

