Nacos13# 配置管理客戶端流程
引言
Nacos注冊中心的主要流程基本上擼完了,下面開始擼配置中心。本文從示例入手走查了客戶端的初始化流程,Listener的注冊邏輯和執(zhí)行邏輯。
示例
通過示例構(gòu)建ConfigService、注冊了Listener分析其流程
Client初始化概覽
支持多種獲取server地址方式(本地、endpoint) 支持多種namespace設(shè)置(本地、阿里云) 支持超時(shí)時(shí)間、重試時(shí)間等參數(shù)設(shè)置 支持用戶名和密碼驗(yàn)證 長輪詢會(huì)從BlockingQueue中獲取元素,隊(duì)列有元素立即執(zhí)行executeConfigListen,隊(duì)列無元素阻塞5秒鐘執(zhí)行executeConfigListen()
Listener注冊邏輯
client添加Listener后會(huì)在cacheMap中緩存CacheData cacheMap中key由「dataId+group+tenant」拼接而成 每個(gè)CacheData會(huì)綁定注冊的Listener列表 每個(gè)CacheData會(huì)綁定taskId,3000個(gè)不同的CacheData對應(yīng)一個(gè)taskId 設(shè)置isSyncWithServer=false表示 cache md5 data不是來自server同步 BlockingQueue中添加new Object() 供長輪詢判斷立即執(zhí)行使用
配置變更執(zhí)行邏輯
執(zhí)行邏輯由executeConfigListen方法實(shí)現(xiàn) 當(dāng)CacheData從Server同步后,會(huì)校驗(yàn)md5是否變更了,變更則回調(diào)注冊的Listener完成通知 注冊Listener后會(huì)構(gòu)建與server的RPC通道rpcClient 向server發(fā)起變更查詢請求configChangeListenRequest Server端通過比較緩存的md5值,返回client變更的key列表 Client通過變更的key列表向server發(fā)起配置查詢請求ConfigQueryRequest 獲取變更內(nèi)容,并回調(diào)注冊的Listener完成通知 回調(diào)注冊的Listener是通過線程池異步執(zhí)行Runnble Job實(shí)現(xiàn)的
@Test
public void test01() throws Exception {
String serverAddr = "localhost:8848";
String dataId = "test";
String group = "DEFAULT_GROUP";
Properties properties = new Properties();
properties.put("serverAddr", serverAddr);
// 構(gòu)建ConfigService
ConfigService configService = NacosFactory.createConfigService(properties);
configService.addListener(dataId, group, new Listener() {
@Override
public void receiveConfigInfo(String configInfo) {
System.out.println("receive:" + configInfo);
}
@Override
public Executor getExecutor() {
return null;
}
});
System.in.read();
}
備注: 示例中構(gòu)建了ConfigService,注入Listener接受server配置變更通知。
NacosConfigService構(gòu)造方法
public NacosConfigService(Properties properties) throws NacosException {
ValidatorUtils.checkInitParam(properties);
// 注解@1
initNamespace(properties);
// 注解@2
ServerListManager serverListManager = new ServerListManager(properties);
// 注解@3
serverListManager.start();
// 注解@4
this.worker = new ClientWorker(this.configFilterChainManager, serverListManager, properties);
// 將被廢棄HttpAgent,先忽略
// will be deleted in 2.0 later versions
agent = new ServerHttpAgent(serverListManager);
}
注解@1 設(shè)置namespace可以通過properties.setProperty(PropertyKeyConst.NAMESPACE),代碼中會(huì)兼容阿里云環(huán)境,在此忽略,默認(rèn)為空。
注解@2 初始化namespace、server地址等信息
注解@3 啟動(dòng)主要用于endpoint方式定時(shí)獲取server地址,當(dāng)本地傳入isFixed=true
注解@4 clientWorker初始化
ClientWorker初始化
public ClientWorker(final ConfigFilterChainManager configFilterChainManager, ServerListManager serverListManager,
final Properties properties) throws NacosException {
this.configFilterChainManager = configFilterChainManager;
// 注解@5
init(properties);
// 注解@6
agent = new ConfigRpcTransportClient(properties, serverListManager);
// 調(diào)度線程池,「處理器核數(shù)」
ScheduledExecutorService executorService = Executors
.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker");
t.setDaemon(true);
return t;
}
});
agent.setExecutor(executorService);
// 注解@7
agent.start();
}
注解@5 初始化超時(shí)時(shí)間、重試時(shí)間等
private void init(Properties properties) {
// 超時(shí)時(shí)間,默認(rèn)30秒
timeout = Math.max(ConvertUtils.toInt(properties.getProperty(PropertyKeyConst.CONFIG_LONG_POLL_TIMEOUT),
Constants.CONFIG_LONG_POLL_TIMEOUT), Constants.MIN_CONFIG_LONG_POLL_TIMEOUT);
// 重試時(shí)間,默認(rèn)2秒
taskPenaltyTime = ConvertUtils
.toInt(properties.getProperty(PropertyKeyConst.CONFIG_RETRY_TIME), Constants.CONFIG_RETRY_TIME);
// 開啟配置刪除同步,默認(rèn)false
this.enableRemoteSyncConfig = Boolean
.parseBoolean(properties.getProperty(PropertyKeyConst.ENABLE_REMOTE_SYNC_CONFIG));
}
注解@6 gRPC config agent初始化
public ConfigTransportClient(Properties properties, ServerListManager serverListManager) {
// 默認(rèn)編碼UTF-8
String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
if (StringUtils.isBlank(encodeTmp)) {
this.encode = Constants.ENCODE;
} else {
this.encode = encodeTmp.trim();
}
// namespace租戶,默認(rèn)空
this.tenant = properties.getProperty(PropertyKeyConst.NAMESPACE);
this.serverListManager = serverListManager;
// 用戶名和密碼驗(yàn)證
this.securityProxy = new SecurityProxy(properties,
ConfigHttpClientManager.getInstance().getNacosRestTemplate());
}
注解@7 gRPC agent啟動(dòng)
public void start() throws NacosException {
// 簡單用戶名和密碼驗(yàn)證
if (securityProxy.isEnabled()) {
securityProxy.login(serverListManager.getServerUrls());
this.executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
securityProxy.login(serverListManager.getServerUrls());
}
}, 0, this.securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS);
}
startInternal();
}
@Override
public void startInternal() throws NacosException {
executor.schedule(new Runnable() {
@Override
public void run() {
while (true) { // 一直運(yùn)行
try {
// 最長等待5秒
listenExecutebell.poll(5L, TimeUnit.SECONDS);
executeConfigListen();
} catch (Exception e) {
LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e);
}
}
}
}, 0L, TimeUnit.MILLISECONDS);
}
小結(jié): 線程會(huì)一直運(yùn)行,從BlockingQueue中獲取元素。隊(duì)里不為空,獲取后立即執(zhí)行executeConfigListen();隊(duì)列為空等待5秒后執(zhí)行
executeConfigListen()。
executeConfigListen的邏輯有點(diǎn)復(fù)雜,先看示例代碼中的添加Listener部分。
configService.addListener(dataId, group, new Listener() {
@Override
public void receiveConfigInfo(String configInfo) {
System.out.println("receive:" + configInfo);
}
@Override
public Executor getExecutor() {
return null;
}
});
public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners)
throws NacosException {
// 默認(rèn)DEFAULT_GROUP
group = null2defaultGroup(group);
// 租戶,默認(rèn)空
String tenant = agent.getTenant();
// 注解@8
CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
synchronized (cache) {
for (Listener listener : listeners) {
cache.addListener(listener);
}
// cache md5 data是否來自server同步
cache.setSyncWithServer(false);
// BlockingQueue中添加new Object()
agent.notifyListenConfig();
}
}
注解@8 構(gòu)建緩存數(shù)據(jù)CacheData并放入cacheMap中,緩存的key為 「dataId+group+tenant」例如:test+DEFAULT_GROUP。
每個(gè)CacheData會(huì)綁定對應(yīng)的taskId,每3000個(gè)CacheData對應(yīng)一個(gè)taskId。其實(shí)從后面的代碼中可以看出,每個(gè)taskId會(huì)對應(yīng)一個(gè)gRPC Client。
public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException {
// 從緩存中獲取
CacheData cache = getCache(dataId, group, tenant);
if (null != cache) {
return cache;
}
// 構(gòu)造緩存key以+連接,test+DEFAULT_GROUP
String key = GroupKey.getKeyTenant(dataId, group, tenant);
synchronized (cacheMap) {
CacheData cacheFromMap = getCache(dataId, group, tenant);
// multiple listeners on the same dataid+group and race condition,so
// double check again
// other listener thread beat me to set to cacheMap
if (null != cacheFromMap) { // 再檢查一遍
cache = cacheFromMap;
// reset so that server not hang this check
cache.setInitializing(true); // 緩存正在初始化
} else {
// 構(gòu)造緩存數(shù)據(jù)對象
cache = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);
// 初始值taskId=0,注意此處每3000個(gè)CacheData共用一個(gè)taskId
int taskId = cacheMap.get().size() / (int) ParamUtil.getPerTaskConfigSize();
cache.setTaskId(taskId);
// fix issue # 1317
if (enableRemoteSyncConfig) { // 默認(rèn)false
String[] ct = getServerConfig(dataId, group, tenant, 3000L, false);
cache.setContent(ct[0]);
}
}
Map<String, CacheData> copy = new HashMap<String, CacheData>(this.cacheMap.get());
// key = test+DEFAULT_GROUP
copy.put(key, cache);
// cacheMap = {test+DEFAULT_GROUP=CacheData [test, DEFAULT_GROUP]}
cacheMap.set(copy);
}
LOGGER.info("[{}] [subscribe] {}", agent.getName(), key);
MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.get().size());
return cache;
}
具體緩存內(nèi)容
| 屬性 | 含義 |
|---|---|
| name | ConfigTransportClient名稱,config_rpc_client |
| configFilterChainManager | filter攔截鏈條,可以執(zhí)行一些列攔截器 |
| dataId | dataId |
| group | group名稱,默認(rèn)為DEFAULT_GROUP |
| tenant | 租戶名稱 |
| listeners | 添加的Listener列表,線程安全CopyOnWriteArrayList |
| content | 啟動(dòng)時(shí)會(huì)從本地文件讀入,默認(rèn)為null |
| md5 | content的md5字符串 |
小結(jié):添加監(jiān)聽器邏輯如下:構(gòu)建CacheData,并緩存在cacheMap中,key是由「dataId+group+tenant」組成;每個(gè)CacheData會(huì)綁定了Listener列表,也綁定了taskId,3000個(gè)不同的CacheData對應(yīng)一個(gè)taskId,對應(yīng)一個(gè)gRPC通道實(shí)例;設(shè)置isSyncWithServer=false表示 cache md5 data不是來自server同步,BlockingQueue中添加new Object() 供前面提到的長輪詢判斷使用。
上文中提到一個(gè)線程一直在輪詢,輪詢執(zhí)行executeConfigListen方法,這個(gè)方法比較關(guān)鍵。
public void executeConfigListen() {
Map<String/*taskId*/, List<CacheData>> listenCachesMap = new HashMap<String, List<CacheData>>(16);
Map<String, List<CacheData>> removeListenCachesMap = new HashMap<String, List<CacheData>>(16);
long now = System.currentTimeMillis();
// 超過5分鐘
boolean needAllSync = now - lastAllSyncTime >= ALL_SYNC_INTERNAL;
for (CacheData cache : cacheMap.get().values()) {
synchronized (cache) {
// --------注解@9開始--------
if (cache.isSyncWithServer()) {
cache.checkListenerMd5(); // 內(nèi)容有變更通知Listener執(zhí)行
if (!needAllSync) { // 不超過5分鐘則不再全局校驗(yàn)
continue;
}
}
// --------注解@9結(jié)束--------
if (!CollectionUtils.isEmpty(cache.getListeners())) { // 有添加Listeners
// get listen config 默認(rèn) false
if (!cache.isUseLocalConfigInfo()) {
List<CacheData> cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId()));
if (cacheDatas == null) {
cacheDatas = new LinkedList<CacheData>();
listenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);
}
// CacheData [test, DEFAULT_GROUP]
cacheDatas.add(cache);
}
} else if (CollectionUtils.isEmpty(cache.getListeners())) { // 沒有添加Listeners
if (!cache.isUseLocalConfigInfo()) {
List<CacheData> cacheDatas = removeListenCachesMap.get(String.valueOf(cache.getTaskId()));
if (cacheDatas == null) {
cacheDatas = new LinkedList<CacheData>();
removeListenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);
}
cacheDatas.add(cache);
}
}
}
}
boolean hasChangedKeys = false;
//-------------------注解@10開始---------------------------------
if (!listenCachesMap.isEmpty()) { // 有Listeners
for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) {
String taskId = entry.getKey();
List<CacheData> listenCaches = entry.getValue();
ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches);
configChangeListenRequest.setListen(true);
try {
// 注解@10.1 每個(gè)taskId構(gòu)建rpcClient
RpcClient rpcClient = ensureRpcClient(taskId);
// 注解@10.2
ConfigChangeBatchListenResponse configChangeBatchListenResponse = (ConfigChangeBatchListenResponse) requestProxy(
rpcClient, configChangeListenRequest);
if (configChangeBatchListenResponse != null && configChangeBatchListenResponse.isSuccess()) {
Set<String> changeKeys = new HashSet<String>();
// handle changed keys,notify listener
// 有變化的configContext
if (!CollectionUtils.isEmpty(configChangeBatchListenResponse.getChangedConfigs())) {
hasChangedKeys = true;
for (ConfigChangeBatchListenResponse.ConfigContext changeConfig : configChangeBatchListenResponse.getChangedConfigs()) {
String changeKey = GroupKey
.getKeyTenant(changeConfig.getDataId(), changeConfig.getGroup(),
changeConfig.getTenant());
changeKeys.add(changeKey);
boolean isInitializing = cacheMap.get().get(changeKey).isInitializing();
// 注解@10.3 回調(diào)Listener
refreshContentAndCheck(changeKey, !isInitializing);
}
}
//handler content configs
for (CacheData cacheData : listenCaches) {
String groupKey = GroupKey
.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.getTenant());
if (!changeKeys.contains(groupKey)) { // 注解@10.4
//sync:cache data md5 = server md5 && cache data md5 = all listeners md5.
synchronized (cacheData) {
if (!cacheData.getListeners().isEmpty()) {
cacheData.setSyncWithServer(true);
continue;
}
}
}
cacheData.setInitializing(false);
}
}
} catch (Exception e) {
LOGGER.error("Async listen config change error ", e);
try {
Thread.sleep(50L);
} catch (InterruptedException interruptedException) {
//ignore
}
}
}
}
//-------------------注解@10結(jié)束---------------------------------
if (!removeListenCachesMap.isEmpty()) {
for (Map.Entry<String, List<CacheData>> entry : removeListenCachesMap.entrySet()) {
String taskId = entry.getKey();
List<CacheData> removeListenCaches = entry.getValue();
ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(removeListenCaches);
configChangeListenRequest.setListen(false);
try {
// 向server發(fā)送Listener取消訂閱請求ConfigBatchListenRequest#listen為false
RpcClient rpcClient = ensureRpcClient(taskId);
boolean removeSuccess = unListenConfigChange(rpcClient, configChangeListenRequest);
if (removeSuccess) {
for (CacheData cacheData : removeListenCaches) {
synchronized (cacheData) {
if (cacheData.getListeners().isEmpty()) {
// 移除本地緩存
ClientWorker.this
.removeCache(cacheData.dataId, cacheData.group, cacheData.tenant);
}
}
}
}
} catch (Exception e) {
LOGGER.error("async remove listen config change error ", e);
}
try {
Thread.sleep(50L);
} catch (InterruptedException interruptedException) {
//ignore
}
}
}
if (needAllSync) {
lastAllSyncTime = now;
}
//If has changed keys,notify re sync md5.
if (hasChangedKeys) { // key有變化觸發(fā)下一輪輪詢
notifyListenConfig();
}
}
注解@9 isSyncWithServer初始為false,在下文代碼中校驗(yàn)結(jié)束后會(huì)設(shè)置為true,表示md5 cache data同步來自server。如果為true會(huì)校驗(yàn)Md5.
void checkListenerMd5() {
for (ManagerListenerWrap wrap : listeners) {
if (!md5.equals(wrap.lastCallMd5)) { // 注解@9.1
safeNotifyListener(dataId, group, content, type, md5, wrap);
}
}
}
注解@9.1 配置內(nèi)容有變更時(shí),回調(diào)到我們示例中注冊的Listener中。
private void safeNotifyListener(final String dataId, final String group, final String content, final String type,
final String md5, final ManagerListenerWrap listenerWrap) {
final Listener listener = listenerWrap.listener;
if (listenerWrap.inNotifying) {
// ...
return;
}
Runnable job = new Runnable() {
@Override
public void run() {
long start = System.currentTimeMillis();
ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
ClassLoader appClassLoader = listener.getClass().getClassLoader();
try {
if (listener instanceof AbstractSharedListener) {
AbstractSharedListener adapter = (AbstractSharedListener) listener;
adapter.fillContext(dataId, group);
// ...
}
Thread.currentThread().setContextClassLoader(appClassLoader);
ConfigResponse cr = new ConfigResponse();
cr.setDataId(dataId);
cr.setGroup(group);
cr.setContent(content);
// filter攔截繼續(xù)過濾
configFilterChainManager.doFilter(null, cr);
String contentTmp = cr.getContent();
listenerWrap.inNotifying = true;
// 注解@9.2
listener.receiveConfigInfo(contentTmp);
// compare lastContent and content
if (listener instanceof AbstractConfigChangeListener) {
Map data = ConfigChangeHandler.getInstance()
.parseChangeData(listenerWrap.lastContent, content, type);
ConfigChangeEvent event = new ConfigChangeEvent(data);
// 回調(diào)變更事件方法
((AbstractConfigChangeListener) listener).receiveConfigChange(event);
listenerWrap.lastContent = content;
}
listenerWrap.lastCallMd5 = md5;
// ..
} catch (NacosException ex) {
// ...
} catch (Throwable t) {
// ...
} finally {
listenerWrap.inNotifying = false;
Thread.currentThread().setContextClassLoader(myClassLoader);
}
}
};
final long startNotify = System.currentTimeMillis();
try {
// 注解@9.3
if (null != listener.getExecutor()) {
listener.getExecutor().execute(job);
} else {
try {
INTERNAL_NOTIFIER.submit(job); // 默認(rèn)線程池執(zhí)行,為5個(gè)線程
} catch (RejectedExecutionException rejectedExecutionException) {
// ...
job.run();
} catch (Throwable throwable) {
// ...
job.run();
}
}
} catch (Throwable t) {
// ...
}
final long finishNotify = System.currentTimeMillis();
// ...
}
注解@9.2 回調(diào)注冊Listener的receiveConfigInfo方法或者receiveConfigChange邏輯
注解@9.3 優(yōu)先使用我們示例中注冊提供的線程池執(zhí)行job,如果沒有設(shè)置使用默認(rèn)線程池「INTERNAL_NOTIFIER」,默認(rèn)5個(gè)線程
備注: 當(dāng)CacheData從server同步后,會(huì)校驗(yàn)md5是否變更了,當(dāng)變更時(shí)會(huì)回調(diào)到我們注冊的Listener完成通知。通知任務(wù)被封裝成Runnable任務(wù),執(zhí)行線程池可以自定義,默認(rèn)為5個(gè)線程。
注解@10.1 每個(gè)taskId構(gòu)建rpcClient,例如:taskId= config-0-c70e0314-4770-43f5-add4-f258a4083fd7;結(jié)合上下文每3000個(gè)CacheData對應(yīng)一個(gè)rpcClient。
注解@10.2 向server發(fā)起configChangeListenRequest,server端由ConfigChangeBatchListenRequestHandler處理,還是比較md5
是否變更了,變更后server端返回變更的key列表。
注解@10.3 當(dāng)server返回變更key列表時(shí)執(zhí)行refreshContentAndCheck方法。
private void refreshContentAndCheck(CacheData cacheData, boolean notify) {
try {
// 注解@10.3.1
String[] ct = getServerConfig(cacheData.dataId, cacheData.group, cacheData.tenant, 3000L, notify);
cacheData.setContent(ct[0]);
if (null != ct[1]) {
cacheData.setType(ct[1]);
}
if (notify) { // 記錄日志
// ...
}
// 注解@10.3.2
cacheData.checkListenerMd5();
} catch (Exception e) {
//...
}
}
注解@10.3.1 向server發(fā)起ConfigQueryRequest,查詢配置內(nèi)容
注解@10.3.2 回調(diào)注冊的Listener邏輯見 注解@9
注解@10.4 key沒有變化的,內(nèi)容由server同步,設(shè)置SyncWithServer=true,下一輪邏輯會(huì)由 注解@9 部分執(zhí)行
備注: 從整個(gè)注解@10 注冊Listener后,會(huì)構(gòu)建與server的RPC通道rpcClient;向server發(fā)起變更查詢請求configChangeListenRequest,server端通過比較緩存的md5值,返回client變更的key列表;client通過變更的key列表向server發(fā)起配置查詢請求ConfigQueryRequest,獲取變更內(nèi)容,并回調(diào)我們注冊的Listener。
