Nacos6# Distro協(xié)議全量同步與校驗
引言
本文接著擼Distro協(xié)議,上文中分析了尋址模式。有了地址就要建立連接,有了連接就能通信了。集群之間都交互啥數(shù)據(jù)?本文就扒一扒全量同步和節(jié)點之間數(shù)據(jù)校驗。
節(jié)點間建立RCP連接
訂閱了MembersChangeEvent事件,集群節(jié)點有變更能夠收到回調通知 與集群中其他節(jié)點建立grpc連接并緩存到Map其中key格式為「Cluster-IP:Port」
節(jié)點間校驗數(shù)據(jù)通信
節(jié)點之間發(fā)送校驗數(shù)據(jù)是在全量同步后進行的 發(fā)送校驗的頻率默認為5秒鐘一次 校驗數(shù)據(jù)包括clientId和version,其中version為保留字段當前為0 接受到校驗數(shù)據(jù)后如果緩存中存在該client表示校驗成功,同時更新保鮮時間,否則校驗失敗
全量數(shù)據(jù)同步
在節(jié)點啟動時會從集群中其他節(jié)點中的一個節(jié)點同步快照數(shù)據(jù)并緩存在Map中 緩存的數(shù)據(jù)類型分類兩類分別為HTTP和gRPC 具體數(shù)據(jù)即客戶端注冊節(jié)點信息含命名空間、分組名稱、服務名稱、節(jié)點Instance信息等 集群中每個節(jié)點都擁有所有的快照數(shù)據(jù)
節(jié)點之間要通信,需要建立連接。Nacos集群節(jié)點之間也不例外,下面看下Nacos是如何和集群之間建立連接的,以gRPC為例。
Nacos中ClusterRpcClientProxy封裝了集群中節(jié)點之間的通道。
@PostConstruct
public void init() {
try {
// 注解@1
NotifyCenter.registerSubscriber(this);
// 注解@2
List<Member> members = serverMemberManager.allMembersWithoutSelf();
// 注解@3
refresh(members);
Loggers.CLUSTER
.warn("[ClusterRpcClientProxy] success to refresh cluster rpc client on start up,members ={} ",
members);
} catch (NacosException e) {
Loggers.CLUSTER.warn("[ClusterRpcClientProxy] fail to refresh cluster rpc client,{} ", e.getMessage());
}
}
注解@1 注冊自己訂閱MembersChangeEvent事件
注解@2 獲取集群中的節(jié)點列表剔除自身節(jié)點
注解@3 與各個節(jié)點建立rpc通道
private void refresh(List<Member> members) throws NacosException {
for (Member member : members) {
if (MemberUtil.isSupportedLongCon(member)) {
// 注解@3.1
createRpcClientAndStart(member, ConnectionType.GRPC);
}
}
Set<Map.Entry<String, RpcClient>> allClientEntrys = RpcClientFactory.getAllClientEntries();
Iterator<Map.Entry<String, RpcClient>> iterator = allClientEntrys.iterator();
List<String> newMemberKeys = members.stream().filter(a -> MemberUtil.isSupportedLongCon(a))
.map(a -> memberClientKey(a)).collect(Collectors.toList());
// 注解@3.2
while (iterator.hasNext()) {
Map.Entry<String, RpcClient> next1 = iterator.next();
if (next1.getKey().startsWith("Cluster-") && !newMemberKeys.contains(next1.getKey())) {
Loggers.CLUSTER.info("member leave,destroy client of member - > : {}", next1.getKey());
RpcClientFactory.getClient(next1.getKey()).shutdown();
iterator.remove();
}
}
}
注解@3.1 為集群中每個節(jié)點member創(chuàng)建rcp client
注解@3.2 關閉舊的grpc連接
private void createRpcClientAndStart(Member member, ConnectionType type) throws NacosException {
Map<String, String> labels = new HashMap<String, String>(2);
labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_CLUSTER);
// 注解@3.1.1
String memberClientKey = memberClientKey(member);
RpcClient client = RpcClientFactory.createClusterClient(memberClientKey, type, labels);
if (!client.getConnectionType().equals(type)) {
Loggers.CLUSTER.info(",connection type changed,destroy client of member - > : {}", member);
RpcClientFactory.destroyClient(memberClientKey);
// 注解@3.1.2
client = RpcClientFactory.createClusterClient(memberClientKey, type, labels);
}
if (client.isWaitInitiated()) {
Loggers.CLUSTER.info("start a new rpc client to member - > : {}", member);
// 注解@3.1.3
client.serverListFactory(new ServerListFactory() {
@Override
public String genNextServer() {
return member.getAddress(); // 返回連接集群其他節(jié)點地址
}
@Override
public String getCurrentServer() {
return member.getAddress();
}
@Override
public List<String> getServerList() {
return Lists.newArrayList(member.getAddress());
}
});
// 注解@3.1.4
client.start();
}
}
注解@3.1.1 memberClientKey由「Cluster-IP:Port」構成,例如:Cluster-1.2.3.4:2008
注解@3.1.2 創(chuàng)建grpc client并緩存在 clientMap,key為memberClientKey 此時client的狀態(tài)為WAIT_INIT
注解@3.1.3 集群中固定的某一臺節(jié)點
注解@3.1.4 grpc連接集群中的member節(jié)點設置client的狀態(tài)RUNNING
小結: 在與Nacos集群其他節(jié)點建立連接的過程中做了兩件事情:@1.訂閱了MembersChangeEvent事件 @2.與集群中其他節(jié)點建立grpc連接并緩存到Map其中key格式為「Cluster-IP:Port」。
節(jié)點之間建立rpc通道必然是為了互相之間能通信,其中一個通信是節(jié)點之間發(fā)送校驗數(shù)據(jù)。那為什么要發(fā)這些校驗數(shù)據(jù)?這些數(shù)據(jù)都是些什么內(nèi)容?下面咱就去扒一扒。
在DistroProtocol的構造函數(shù)中的最后一個行有一個startDistroTask(),主要分析startVerifyTask的邏輯。
public DistroProtocol(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder,
DistroTaskEngineHolder distroTaskEngineHolder, DistroConfig distroConfig) {
this.memberManager = memberManager;
this.distroComponentHolder = distroComponentHolder;
this.distroTaskEngineHolder = distroTaskEngineHolder;
this.distroConfig = distroConfig;
startDistroTask();
}
private void startDistroTask() {
// 單機模式直接返回
if (EnvUtil.getStandaloneMode()) {
isInitialized = true;
return;
}
startVerifyTask();
startLoadTask();
}
private void startVerifyTask() {
// 注解@4
GlobalExecutor.schedulePartitionDataTimedSync(new DistroVerifyTimedTask(memberManager, distroComponentHolder,
distroTaskEngineHolder.getExecuteWorkersManager()), distroConfig.getVerifyIntervalMillis());
}
注解@4 每隔5秒執(zhí)行,也就是節(jié)點之間發(fā)送校驗時間的默認頻率是5秒。
可以通過配置參數(shù)「nacos.core.protocol.distro.data.verify_interval_ms」自定義。
接著看DistroVerifyTimedTask的run方法。
@Override
public void run() {
try {
// 注解@5
List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("server list is: {}", targetServer);
}
// 注解@6
for (String each : distroComponentHolder.getDataStorageTypes()) {
verifyForDataStorage(each, targetServer);
}
} catch (Exception e) {
Loggers.DISTRO.error("[DISTRO-FAILED] verify task failed.", e);
}
}
注解@5 拿到集群中其他節(jié)點
注解@6 在Nacos server啟動時初始化時兩種類型HTTP和gRPC,本文以gRPC為例進行分析。
private void verifyForDataStorage(String type, List<Member> targetServer) {
// 注解@7
DistroDataStorage dataStorage = distroComponentHolder.findDataStorage(type);
// 注解@8
if (!dataStorage.isFinishInitial()) { // 未完成全量數(shù)據(jù)同步退出
Loggers.DISTRO.warn("data storage {} has not finished initial step, do not send verify data",
dataStorage.getClass().getSimpleName());
return;
}
//注解@9
List<DistroData> verifyData = dataStorage.getVerifyData();
if (null == verifyData || verifyData.isEmpty()) {
return;
}
for (Member member : targetServer) {
DistroTransportAgent agent = distroComponentHolder.findTransportAgent(type);
if (null == agent) {
continue;
}
// 注解@10
executeTaskExecuteEngine.addTask(member.getAddress() + type,
new DistroVerifyExecuteTask(agent, verifyData, member.getAddress(), type));
}
}
注解@7 Nacos啟動時緩存在dataStorageMap中兩種類型處理器分別用于處理gRPC和HTTP通信方式。
「Nacos:Naming:v2:ClientData->DistroClientDataProcessor」和 「com.alibaba.nacos.naming.iplist.->DistroDataStorageImpl」
注解@8 當從其他節(jié)點同步了全部數(shù)據(jù)后,則完成了初始化finished initial,全量數(shù)據(jù)同步下小節(jié)分析。
注解@9 獲取校驗的數(shù)據(jù),數(shù)據(jù)為由本節(jié)點負責的clientId列表。
@Override
public List<DistroData> getVerifyData() {
List<DistroData> result = new LinkedList<>(); // 一組DistroData
for (String each : clientManager.allClientId()) {
Client client = clientManager.getClient(each);
if (null == client || !client.isEphemeral()) { // 無效client或者非臨時節(jié)點
continue;
}
// 注解@9.1
if (clientManager.isResponsibleClient(client)) {
// 注解@9.2
DistroClientVerifyInfo verifyData = new DistroClientVerifyInfo(client.getClientId(), 0);
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
DistroData data = new DistroData(distroKey,
ApplicationUtils.getBean(Serializer.class).serialize(verifyData)); // 序列化校驗數(shù)據(jù)
data.setType(DataOperation.VERIFY);
result.add(data);
}
}
return result;
}
注解@9.1 判斷client是否為本幾點負責的邏輯為ClientManagerDelegate#isResponsibleClient。即:屬于ConnectionBasedClient并且
isNative為true表示該client是直連到該節(jié)點的。
@Override
public boolean isResponsibleClient(Client client) {
return (client instanceof ConnectionBasedClient) && ((ConnectionBasedClient) client).isNative();
}
注解@9.2 構造Verify Data 主要信息為clientId,還有一個版本信息作為保留字段,目前都是0。
注解@10 向集群其他節(jié)點發(fā)送校驗數(shù)據(jù)DistroVerifyExecuteTask#run
@Override
public void run() {
for (DistroData each : verifyData) {
try {
if (transportAgent.supportCallbackTransport()) { // grpc支持回調
doSyncVerifyDataWithCallback(each);
} else { // http不支持回調使用同步
doSyncVerifyData(each);
}
} catch (Exception e) {
//...
}
}
}
@Override
public void syncVerifyData(DistroData verifyData, String targetServer, DistroCallback callback) {
if (isNoExistTarget(targetServer)) {
callback.onSuccess();
}
DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY);
Member member = memberManager.find(targetServer);
try {
DistroVerifyCallbackWrapper wrapper = new DistroVerifyCallbackWrapper(targetServer,
verifyData.getDistroKey().getResourceKey(), callback, member);
// 注解@11
clusterRpcClientProxy.asyncRequest(member, request, wrapper);
} catch (NacosException nacosException) {
callback.onFailed(nacosException);
}
}
注解@11 向其他節(jié)點發(fā)送本節(jié)點負責的clientId信息
那集群其他節(jié)點接收到校驗數(shù)據(jù)做什么處理呢?
翻到DistroDataRequestHandler#handle,此處包含了處理校驗數(shù)據(jù)的邏輯。
@Override
public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {
try {
switch (request.getDataOperation()) {
case VERIFY:
return handleVerify(request.getDistroData(), meta);
case SNAPSHOT:
return handleSnapshot();
case ADD:
case CHANGE:
case DELETE:
return handleSyncData(request.getDistroData());
case QUERY:
return handleQueryData(request.getDistroData());
default:
return new DistroDataResponse();
}
} catch (Exception e) {
// ...
}
}
private DistroDataResponse handleVerify(DistroData distroData, RequestMeta meta) {
DistroDataResponse result = new DistroDataResponse();
// 注解@12
if (!distroProtocol.onVerify(distroData, meta.getClientIp())) {
result.setErrorInfo(ResponseCode.FAIL.getCode(), "[DISTRO-FAILED] distro data verify failed");
}
return result;
}
注解@12 數(shù)據(jù)校驗,下面可以看到,如果緩存存在client則校驗成功,刷新client保鮮時間,否則校驗失敗。
@Override
public boolean verifyClient(String clientId) {
ConnectionBasedClient client = clients.get(clientId);
if (null != client) {
client.setLastRenewTime();
return true;
}
return false;
}
小結: 節(jié)點之間發(fā)送校驗數(shù)據(jù)是在全量同步后進行的;發(fā)送校驗的頻率默認為5秒鐘一次;校驗數(shù)據(jù)包括clientId和version,其中version為保留字段當前為0;接受到校驗數(shù)據(jù)后如果緩存中存在該client表示校驗成功,同時更新保鮮時間,否則校驗失敗。
上文中提到在發(fā)送校驗數(shù)據(jù)之前需要先完成全量數(shù)據(jù)同步,先翻回DistroProtocol#startDistroTask()方法的startLoadTask()部分。
private void startLoadTask() {
DistroCallback loadCallback = new DistroCallback() {
@Override
public void onSuccess() {
isInitialized = true;
}
@Override
public void onFailed(Throwable throwable) {
isInitialized = false;
}
};
GlobalExecutor.submitLoadDataTask(
new DistroLoadDataTask(memberManager, distroComponentHolder, distroConfig, loadCallback));
}
DistroLoadDataTask#run
@Override
public void run() {
try {
load(); // 注解@13
if (!checkCompleted()) { // 注解@14
GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());
} else {
loadCallback.onSuccess();
Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");
}
} catch (Exception e) {
loadCallback.onFailed(e);
Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);
}
}
注解@13 從集群中其他節(jié)點全量加載數(shù)據(jù)
注解@14 如果沒有加載成功延遲30秒鐘重新執(zhí)行一次,可以通過參數(shù)「nacos.core.protocol.distro.data.load_retry_delay_ms」指定
private void load() throws Exception {
while (memberManager.allMembersWithoutSelf().isEmpty()) {
Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");
TimeUnit.SECONDS.sleep(1);
}
while (distroComponentHolder.getDataStorageTypes().isEmpty()) {
Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");
TimeUnit.SECONDS.sleep(1);
}
for (String each : distroComponentHolder.getDataStorageTypes()) { // 注解@15
if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each)); // 加載快照
}
}
}
注解@15 為不同的數(shù)據(jù)類型緩存快照,此處有gRPC和http兩類數(shù)據(jù)類型。即:Nacos:Naming:v2:ClientData和com.alibaba.nacos.naming.iplist.
private boolean loadAllDataSnapshotFromRemote(String resourceType) {
DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == transportAgent || null == dataProcessor) {
return false;
}
for (Member each : memberManager.allMembersWithoutSelf()) { // 注解@16
try {
DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
boolean result = dataProcessor.processSnapshot(distroData);
if (result) {
distroComponentHolder.findDataStorage(resourceType).finishInitial(); // 設置為完成初始化
return true;
}
} catch (Exception e) {
}
}
return false;
}
注解@16 獲取集群中除了本節(jié)點的其他節(jié)點,循環(huán)重試獲取快照,直到有成功節(jié)點返回快照,成功后設置狀態(tài)狀態(tài)完成初始化「finishInitial」。
@Override
public DistroData getDatumSnapshot(String targetServer) {
Member member = memberManager.find(targetServer);
if (checkTargetServerStatusUnhealthy(member)) {
throw new DistroException(
String.format("[DISTRO] Cancel get snapshot caused by target server %s unhealthy", targetServer));
}
DistroDataRequest request = new DistroDataRequest();
// 設置請求操作為SNAPSHOT
request.setDataOperation(DataOperation.SNAPSHOT);
try {
// 發(fā)起請求快照數(shù)據(jù)
Response response = clusterRpcClientProxy.sendRequest(member, request);
if (checkResponse(response)) {
return ((DistroDataResponse) response).getDistroData();
} else {
throw new DistroException(
String.format("[DISTRO-FAILED] Get snapshot request to %s failed, code: %d, message: %s",
targetServer, response.getErrorCode(), response.getMessage()));
}
} catch (NacosException e) {
throw new DistroException("[DISTRO-FAILED] Get distro snapshot failed! ", e);
}
}
接下來看看其他節(jié)點收到快照請求如何響應的
還是翻到DistroDataRequestHandler#handle,具體由handleSnapshot()方法來處理。
private DistroDataResponse handleSnapshot() {
DistroDataResponse result = new DistroDataResponse();
DistroData distroData = distroProtocol.onSnapshot(DistroClientDataProcessor.TYPE);
result.setDistroData(distroData);
return result;
}
@Override
public DistroData getDatumSnapshot() {
List<ClientSyncData> datum = new LinkedList<>();
// 把本節(jié)點的所有client數(shù)據(jù)全部封裝
for (String each : clientManager.allClientId()) {
Client client = clientManager.getClient(each);
if (null == client || !client.isEphemeral()) {
continue;
}
datum.add(client.generateSyncData());
}
ClientSyncDatumSnapshot snapshot = new ClientSyncDatumSnapshot();
snapshot.setClientSyncDataList(datum);
byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(snapshot); // 序列化數(shù)據(jù)
return new DistroData(new DistroKey(DataOperation.SNAPSHOT.name(), TYPE), data);
}
下面看下client數(shù)據(jù)信息,命名空間、分組名稱、服務名稱、節(jié)點Instance信息(IP、端口等等)。
public ClientSyncData generateSyncData() {
List<String> namespaces = new LinkedList<>();
List<String> groupNames = new LinkedList<>();
List<String> serviceNames = new LinkedList<>();
List<InstancePublishInfo> instances = new LinkedList<>();
for (Map.Entry<Service, InstancePublishInfo> entry : publishers.entrySet()) {
namespaces.add(entry.getKey().getNamespace());
groupNames.add(entry.getKey().getGroup());
serviceNames.add(entry.getKey().getName());
instances.add(entry.getValue());
}
return new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances);
}
小結: 集群中每個節(jié)點都擁有所有的快照數(shù)據(jù);在節(jié)點啟動時會從集群中其他節(jié)點中的一個節(jié)點同步快照數(shù)據(jù)并緩存在Map中;緩存的數(shù)據(jù)類型分類兩類分別為HTTP和gRPC;具體數(shù)據(jù)即客戶端注冊節(jié)點信息含命名空間、分組名稱、服務名稱、節(jié)點Instance信息等。
