一、引言
Nacos 作为阿里开源的分布式服务发现与配置管理平台,2.X 版本在架构上进行了协议升级(HTTP→gRPC)、存储结构优化(嵌套 Map→扁平分层)、一致性增强(Distro→Raft)
Nacos 2.X 主要核心优化:
- 协议升级:gRPC 长连接替代 HTTP,降低通信开销,提升实时性;
- 存储优化:扁平分层注册表替代嵌套 Map,提升查询与同步效率;
- 事件驱动:解耦模块依赖,增强扩展性;
- 健康检查:服务端主动探活替代客户端心跳,减少客户端负担;
- 一致性增强:Raft+Distro 混合协议,平衡一致性与性能。
二、服务注册核心源码剖析
服务注册是 Nacos 的基础能力,2.X 版本通过 gRPC 长连接异步通信替代 1.X 的 HTTP 短连接,同时优化了实例存储与一致性逻辑。
(一)客户端:发起注册请求
客户端核心是封装实例信息、建立 gRPC 连接并发送注册请求,关键类集中在 nacos-naming-client
模块。
- 入口类:
NacosNamingService
客户端对外暴露的注册入口,负责参数预处理与代理分发:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Override public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); if (instance.getClusterName() == null) { instance.setClusterName(ClusterUtils.getDefaultClusterName()); } if (instance.getNamespaceId() == null) { instance.setNamespaceId(this.namespace); } namingClientProxy.registerInstance(groupedServiceName, instance); }
|
- gRPC 代理:
NamingGrpcClientProxy
负责将注册请求封装为 gRPC 协议格式,通过长连接发送至服务端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Override public void registerInstance(String serviceName, Instance instance) throws NacosException { RegisterInstanceRequest request = RegisterInstanceRequest.newBuilder() .setServiceName(serviceName) .setInstance(GrpcUtils.convertInstanceToProto(instance)) .setNamespace(instance.getNamespaceId()) .build(); GrpcUtils.invokeGrpcSync( () -> namingStub.registerInstance(request), "registerInstance", serviceName ); }
|
- 关键细节:客户端初始化时会与服务端建立 gRPC 长连接(默认端口 9848),后续注册、心跳、订阅均复用该连接,减少 TCP 握手开销。
(二)服务端:处理注册请求
服务端核心是接收 gRPC 请求、校验参数、分层存储实例,并触发事件通知与集群同步,关键类集中在 nacos-naming-core
模块。
- gRPC 请求入口:
NamingGrpcService
服务端 gRPC 服务实现类,负责接收客户端请求并转发至业务逻辑层:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Override public void registerInstance(RegisterInstanceRequest request, StreamObserver<CommonResponse> responseObserver) { CommonResponse response = CommonResponse.newBuilder().build(); try { String serviceName = request.getServiceName(); String namespaceId = request.getNamespace(); Instance instance = GrpcUtils.convertProtoToInstance(request.getInstance()); serviceManager.registerInstance(namespaceId, serviceName, instance); response.setSuccess(true); } catch (Exception e) { response.setSuccess(false); response.setMessage("注册失败:" + e.getMessage()); log.error("Instance register failed", e); } finally { responseObserver.onNext(response); responseObserver.onCompleted(); } }
|
- 服务管理核心:
ServiceManager
负责创建/获取服务、调度实例注册,是服务端的”服务中枢”:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { String serviceKey = ServiceKeyGenerator.generateServiceKey(namespaceId, serviceName); Service service = getOrCreateService(namespaceId, serviceName); service.registerInstance(instance); eventPublisher.publishEvent(new ServiceInstanceRegisteredEvent(service, instance)); }
private Service getOrCreateService(String namespaceId, String serviceName) { String serviceKey = ServiceKeyGenerator.generateServiceKey(namespaceId, serviceName); return serviceMap.computeIfAbsent(serviceKey, key -> { Service newService = new Service(); newService.setNamespaceId(namespaceId); newService.setServiceName(serviceName); newService.setId(IdGenerator.generateServiceId(namespaceId, serviceName)); return newService; }); }
|
- 分层存储:
Service
→ Cluster
→ Instance
2.X 采用”服务-集群-实例”三层结构,替代 1.X 的嵌套 Map,每个层级独立管理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public void registerInstance(Instance instance) { String clusterName = instance.getClusterName(); Cluster cluster = clusters.computeIfAbsent(clusterName, key -> new Cluster(clusterName, this)); cluster.addInstance(instance); }
public void addInstance(Instance instance) { ConcurrentHashMap<String, Instance> instanceMap = getInstanceMap(); if (instance.isEphemeral()) { instanceMap.put(instance.getInstanceId(), instance); healthCheckReactor.addHealthCheckTask(new GrpcHealthCheckTask(instance, this)); } else { instanceMap.put(instance.getInstanceId(), instance); persistInstance(instance); } }
|
- 请求校验:隐式在业务逻辑中
2.X 未单独封装 InstanceValidator
,而是在实例注册前通过 参数非空校验+格式校验 确保合法性:
1 2 3 4 5 6 7
| if (StringUtils.isEmpty(instance.getIp()) || instance.getPort() <= 0) { throw new NacosException(NacosException.INVALID_PARAM, "IP/端口不能为空"); } if (instance.getWeight() < 0) { throw new NacosException(NacosException.INVALID_PARAM, "权重不能为负数"); }
|
三、事件驱动架构剖析
Nacos 2.X 基于 “事件发布-订阅”模式 解耦模块依赖,核心由「事件、事件总线、发布者、订阅者」四部分组成,所有核心流程(注册、健康检查、同步)均通过事件触发。
(一)事件:状态变化的载体
事件是系统状态变化的封装,所有事件均继承自顶层接口 Event
,核心事件类如下:
事件类 |
触发场景 |
核心属性 |
ServiceInstanceEvent |
服务实例注册/更新/删除 |
service (服务)、instance (实例) |
HealthStateChangeEvent |
实例健康状态变更(健康→不健康) |
instanceId 、newHealthyState |
ConfigDataChangeEvent |
配置中心数据变更 |
dataId 、group 、content |
示例:服务实例注册事件定义
1 2 3 4 5 6 7 8 9 10 11 12
| public class ServiceInstanceRegisteredEvent extends AbstractEvent { private final Service service; private final Instance instance;
public ServiceInstanceRegisteredEvent(Service service, Instance instance) { this.service = service; this.instance = instance; }
}
|
(二)事件总线:事件路由的核心
事件总线负责连接发布者与订阅者,提供事件注册、发布、路由能力,核心实现类为 NacosEventBus
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class NacosEventBus extends AbstractEventBus { private final ConcurrentHashMap<Class<? extends Event>, CopyOnWriteArrayList<Subscriber>> subscriberMap = new ConcurrentHashMap<>();
@Override public void register(Subscriber subscriber) { Class<? extends Event> eventType = subscriber.getSubscribeType(); subscriberMap.computeIfAbsent(eventType, key -> new CopyOnWriteArrayList<>()) .add(subscriber); }
@Override public void publish(Event event) { Class<? extends Event> eventType = event.getClass(); CopyOnWriteArrayList<Subscriber> subscribers = subscriberMap.get(eventType); if (subscribers == null) { return; } for (Subscriber subscriber : subscribers) { EventExecutor.execute(() -> subscriber.onEvent(event)); } } }
|
(三)发布者:事件的产生者
发布者在特定业务逻辑完成后发布事件,核心实现是 EventPublisher
接口,默认实现为 DefaultEventPublisher
:
1 2
| eventPublisher.publishEvent(new ServiceInstanceRegisteredEvent(service, instance));
|
常见发布场景:
- 服务注册完成 → 发布
ServiceInstanceRegisteredEvent
- 实例健康状态变更 → 发布
HealthStateChangeEvent
- 配置数据更新 → 发布
ConfigDataChangeEvent
(四)订阅者:事件的消费者
订阅者通过 实现 Subscriber
接口 或 添加 @EventListener
注解 订阅事件,核心是编写事件处理逻辑。
示例:健康检查订阅者(监听注册事件,初始化探活任务)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Component public class InstanceRegisteredSubscriber implements Subscriber<ServiceInstanceRegisteredEvent> {
@Autowired private HealthCheckReactor healthCheckReactor;
@Override public Class<? extends Event> getSubscribeType() { return ServiceInstanceRegisteredEvent.class; }
@Override public void onEvent(ServiceInstanceRegisteredEvent event) { Instance instance = event.getInstance(); Cluster cluster = event.getService().getCluster(instance.getClusterName()); if (instance.isEphemeral()) { healthCheckReactor.addHealthCheckTask(new GrpcHealthCheckTask(instance, cluster)); } } }
|
- 关键优势:模块间无需硬编码依赖(如注册模块不直接调用健康检查模块),只需通过事件交互,扩展性极强。
四、注册表结构变动剖析
Nacos 2.X 对注册表的优化是性能提升的核心,主要解决 1.X 嵌套 Map 带来的 深拷贝开销大、查询效率低 问题。
(一)旧结构(1.X):嵌套 Map 的痛点
1.X 注册表采用 四层嵌套 Map,结构如下:
1 2 3
| Map<String, Map<String, Map<String, Map<String, List<Instance>>>>> registry;
|
- 查询实例需逐层遍历(如
registry.get(namespace).get(group).get(service).get(cluster)
),效率低;
- 数据同步/备份时需深拷贝整个嵌套结构,内存与时间开销大;
- 新增维度(如元数据)需修改多层结构,扩展性差。
(二)新结构(2.X):扁平分层存储
2.X 将注册表拆分为 独立的分层存储单元,通过 唯一 Key 关联,核心实现类为 ServiceStorage
:
- 核心存储结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public class ServiceStorage { private static final ServiceStorage INSTANCE = new ServiceStorage(); private final ConcurrentHashMap<Namespace, ConcurrentHashMap<Service, ServiceInfo>> namespaceServiceInfoMap = new ConcurrentHashMap<>();
public ServiceInfo getServiceInfo(Namespace namespace, Service service) { ConcurrentHashMap<Service, ServiceInfo> serviceInfoMap = namespaceServiceInfoMap.get(namespace); if (serviceInfoMap == null) { return null; } return serviceInfoMap.get(service); }
public void putServiceInfo(Namespace namespace, Service service, ServiceInfo serviceInfo) { namespaceServiceInfoMap.computeIfAbsent(namespace, key -> new ConcurrentHashMap<>()) .put(service, serviceInfo); } }
|
- 联合 Key 设计
通过 ServiceKeyGenerator
生成 全局唯一 Key,替代多层嵌套的定位逻辑:
1 2 3 4 5 6 7 8 9 10 11
| public class ServiceKeyGenerator { public static String generateServiceKey(String namespaceId, String serviceName) { String[] parts = serviceName.split("@@"); String groupName = parts.length > 1 ? parts[0] : Constants.DEFAULT_GROUP; String pureServiceName = parts.length > 1 ? parts[1] : parts[0]; return String.format("%s@@%s@@%s", namespaceId, groupName, pureServiceName); } }
|
- 优化效果
- 查询效率:从 O(n^4) 嵌套遍历降至 O(1) 哈希查询;
- 同步开销:数据同步时只需传输单个
ServiceInfo
对象,无需深拷贝;
- 扩展性:新增维度(如实例标签)只需在
Instance
类加字段,不影响存储结构。
五、服务发现与订阅核心源码剖析
服务发现是客户端获取健康实例的过程,2.X 通过 gRPC 长连接+主动推送 替代 1.X 的 HTTP 轮询,实时性与效率大幅提升。
(一)通信方式转变:HTTP → gRPC
2.X 客户端默认使用 RpcNamingClientProxy
(gRPC 代理),仅在兼容模式下使用 HttpNamingClientProxy
,核心优势:
- 长连接复用:减少 TCP 握手与挥手开销;
- 二进制传输:ProtoBuf 格式比 JSON 体积小 30%+;
- 主动推送:服务变动时服务端主动推送,无需客户端轮询。
(二)服务发现逻辑
客户端通过 gRPC 向服务端查询实例,服务端从注册表过滤健康实例后返回。
- 客户端查询入口:
NacosNamingService
1 2 3 4 5 6 7 8
| @Override public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy) throws NacosException { String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); return namingClientProxy.selectInstances(groupedServiceName, healthy); }
|
- gRPC 代理查询实现:
RpcNamingClientProxy
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Override public List<Instance> selectInstances(String serviceName, boolean healthy) throws NacosException { QueryInstancesRequest request = QueryInstancesRequest.newBuilder() .setServiceName(serviceName) .setNamespace(namespace) .setHealthyOnly(healthy) .build(); QueryInstancesResponse response = GrpcUtils.invokeGrpcSync( () -> namingStub.queryInstances(request), "queryInstances", serviceName ); return response.getInstancesList().stream() .map(GrpcUtils::convertProtoToInstance) .collect(Collectors.toList()); }
|
- 服务端查询处理:
InstanceQueryHandler
服务端接收查询请求后,从注册表过滤实例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public QueryInstancesResponse handle(QueryInstancesRequest request) { String namespaceId = request.getNamespace(); String serviceName = request.getServiceName(); boolean healthyOnly = request.isHealthyOnly(); Service service = serviceManager.getService(namespaceId, serviceName); if (service == null) { return QueryInstancesResponse.newBuilder().build(); } List<Instance> filteredInstances = service.getAllInstances().stream() .filter(instance -> !healthyOnly || instance.isHealthy()) .filter(instance -> instance.getWeight() > 0) .collect(Collectors.toList()); return QueryInstancesResponse.newBuilder() .addAllInstances(GrpcUtils.convertInstancesToProtos(filteredInstances)) .build(); }
|
(三)服务订阅逻辑
订阅是”一次订阅,持续接收更新”的机制,客户端注册监听器,服务端变动时主动推送。
- 客户端订阅入口:
NacosNamingService
1 2 3 4 5 6 7 8 9
| @Override public void subscribe(String serviceName, String groupName, EventListener listener) throws NacosException { String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); subscriptionManager.subscribe(groupedServiceName, listener); namingClientProxy.subscribe(groupedServiceName); }
|
- 服务端订阅管理:
SubscriptionManager
服务端存储”服务-客户端”订阅关系,核心是 ConcurrentHashMap
映射:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class SubscriptionManager { private final ConcurrentHashMap<String, Set<Subscriber>> serviceSubscribers = new ConcurrentHashMap<>();
public void subscribe(String serviceKey, Subscriber subscriber) { serviceSubscribers.computeIfAbsent(serviceKey, key -> ConcurrentHashMap.newKeySet()) .add(subscriber); }
public Set<Subscriber> getSubscribers(String serviceKey) { return serviceSubscribers.getOrDefault(serviceKey, Collections.emptySet()); } }
|
- 服务端推送触发:事件驱动
当服务实例变动时,通过事件触发推送逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| @Component public class PushService { @Autowired private SubscriptionManager subscriptionManager;
@EventListener public void onInstanceChange(ServiceInstanceEvent event) { Service service = event.getService(); String serviceKey = ServiceKeyGenerator.generateServiceKey(service.getNamespaceId(), service.getServiceName()); Set<Subscriber> subscribers = subscriptionManager.getSubscribers(serviceKey); if (subscribers.isEmpty()) { return; } ServiceInfo serviceInfo = serviceStorage.getServiceInfo(service.getNamespaceId(), service); for (Subscriber subscriber : subscribers) { pushToClient(subscriber, serviceInfo); } }
private void pushToClient(Subscriber subscriber, ServiceInfo serviceInfo) { ServiceChangeResponse response = ServiceChangeResponse.newBuilder() .setServiceInfo(GrpcUtils.convertServiceInfoToProto(serviceInfo)) .build(); subscriber.getGrpcChannel().writeAndFlush(response); } }
|
六、服务变动推送订阅客户端源码剖析
客户端接收服务端推送的核心是 gRPC 流监听+本地缓存更新+监听器回调,关键类集中在 nacos-naming-client
模块。
(一)推送接收:NamingPushResponseHandler
客户端通过 gRPC 流观察者监听服务端推送,核心是 onNext
方法(推送消息接收入口):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public class NamingPushResponseHandler implements StreamObserver<ServiceChangeResponse> { @Autowired private ServiceInfoHolder serviceInfoHolder; @Autowired private SubscriptionManager subscriptionManager;
@Override public void onNext(ServiceChangeResponse response) { try { ServiceInfoProto serviceInfoProto = response.getServiceInfo(); ServiceInfo newServiceInfo = GrpcUtils.convertProtoToServiceInfo(serviceInfoProto); serviceInfoHolder.processServiceInfo(newServiceInfo); subscriptionManager.notifyListeners(newServiceInfo); } catch (Exception e) { log.error("处理服务推送失败", e); } }
@Override public void onError(Throwable t) { log.error("gRPC 推送流异常", t); }
@Override public void onCompleted() { log.info("gRPC 推送流关闭,尝试重连"); reconnect(); } }
|
(二)本地缓存更新:ServiceInfoHolder
客户端维护本地实例缓存,避免重复查询服务端,核心是 serviceInfoMap
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class ServiceInfoHolder { private final ConcurrentHashMap<String, ServiceInfo> serviceInfoMap = new ConcurrentHashMap<>();
public void processServiceInfo(ServiceInfo newServiceInfo) { String serviceKey = newServiceInfo.getKey(); ServiceInfo oldServiceInfo = serviceInfoMap.get(serviceKey); if (oldServiceInfo != null && newServiceInfo.getLastRefTime() <= oldServiceInfo.getLastRefTime()) { return; } serviceInfoMap.put(serviceKey, newServiceInfo); newServiceInfo.setLastRefTime(System.currentTimeMillis()); }
public ServiceInfo getServiceInfo(String serviceKey) { return serviceInfoMap.get(serviceKey); } }
|
(三)监听器回调:SubscriptionManager
客户端通知业务层服务变动,核心是遍历监听器并调用 onEvent
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public class SubscriptionManager { private final ConcurrentHashMap<String, Set<EventListener>> listenerMap = new ConcurrentHashMap<>();
public void notifyListeners(ServiceInfo serviceInfo) { String serviceKey = serviceInfo.getKey(); Set<EventListener> listeners = listenerMap.get(serviceKey); if (listeners == null) { return; } for (EventListener listener : listeners) { try { listener.onEvent(new NamingEvent(serviceInfo.getName(), serviceInfo.getHosts())); } catch (Exception e) { log.error("监听器回调失败", e); } } } }
|
- 业务层示例:客户端通过实现
EventListener
感知变动
1 2 3 4 5 6 7 8 9 10
| namingService.subscribe("user-service", "DEFAULT_GROUP", new EventListener() { @Override public void onEvent(Event event) { NamingEvent namingEvent = (NamingEvent) event; List<Instance> instances = namingEvent.getInstances(); System.out.println("服务变动,新实例列表:" + instances); } });
|
七、服务端健康检查源码剖析
Nacos 2.X 健康检查从 客户端主动心跳 改为 服务端主动探活,减少客户端开销,同时提高探活准确性。
(一)核心机制:服务端 gRPC 主动探活
2.X 对临时实例(默认)采用 gRPC 探活,对持久化实例采用 HTTP 探活(可配置),核心由 HealthCheckReactor
调度探活任务。
(二)探活任务管理:HealthCheckReactor
负责创建、调度探活任务,采用线程池异步执行,避免阻塞主线程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class HealthCheckReactor { private final ScheduledExecutorService healthCheckExecutor = Executors.newScheduledThreadPool( Runtime.getRuntime().availableProcessors() * 2, new ThreadFactoryBuilder().setNameFormat("nacos-health-check-%d").build() );
public void addHealthCheckTask(HealthCheckTask task) { healthCheckExecutor.scheduleAtFixedRate( task, 0, 20000, TimeUnit.MILLISECONDS ); } }
|
(三)gRPC 探活实现:GrpcHealthCheckTask
临时实例的探活任务,通过 gRPC 向客户端发送探活请求,判断实例是否存活:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| public class GrpcHealthCheckTask implements Runnable { private final Instance instance; private final Cluster cluster; private static final int TIMEOUT = 5000;
@Override public void run() { try { HealthCheckRequest request = HealthCheckRequest.newBuilder() .setInstanceId(instance.getInstanceId()) .build(); ManagedChannel channel = GrpcChannelFactory.getChannel(instance.getIp(), instance.getPort()); HealthCheckGrpc.HealthCheckBlockingStub stub = HealthCheckGrpc.newBlockingStub(channel) .withDeadlineAfter(TIMEOUT, TimeUnit.MILLISECONDS); HealthCheckResponse response = stub.check(request); boolean newHealthy = response.getStatus() == HealthCheckResponse.ServingStatus.SERVING; updateInstanceHealthStatus(newHealthy); } catch (Exception e) { updateInstanceHealthStatus(false); } }
private void updateInstanceHealthStatus(boolean healthy) { boolean oldHealthy = instance.isHealthy(); if (oldHealthy == healthy) { return; } instance.setHealthy(healthy); eventPublisher.publishEvent(new HealthStateChangeEvent(instance.getInstanceId(), healthy)); } }
|
(四)健康状态判断与实例剔除
服务端通过 “探活失败次数” 而非固定时间判断实例状态,默认规则:
- 连续 1 次探活失败 → 标记实例为
healthy=false
;
- 连续 3 次探活失败(累计 60 秒) → 从注册表中移除实例(仅临时实例);
- 持久化实例探活失败不会被剔除,仅标记为不健康。
实例剔除逻辑在 InstanceCleaner
中实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @Component public class InstanceCleaner { @Autowired private ServiceManager serviceManager;
@EventListener public void onHealthChange(HealthStateChangeEvent event) { String instanceId = event.getInstanceId(); boolean healthy = event.isHealthy(); if (healthy) { return; } Instance instance = instanceManager.getInstanceById(instanceId); if (instance == null || !instance.isEphemeral()) { return; } instance.incrementHealthFailureCount(); if (instance.getHealthFailureCount() >= 3) { serviceManager.removeInstance(instance.getNamespaceId(), instance.getServiceName(), instance); } } }
|
八、服务变动集群同步源码剖析
Nacos 2.X 集群同步采用 “Raft 协议(持久化实例)+ Distro 协议(临时实例)” 混合方案,确保数据一致性与性能平衡。
(一)临时实例同步:Distro 协议
临时实例存储在内存,通过 Distro 协议实现 最终一致性 同步,核心是”分片+主动推送”。
- Distro 分片策略
每个 Nacos 节点负责部分实例的同步(按 instanceId
哈希分片),避免全量同步:
1 2 3 4 5 6 7 8 9
| public class DistroHashMapper { public List<String> mapInstanceToServers(String instanceId, List<String> allServers) { int hash = Math.abs(instanceId.hashCode()); int index = hash % allServers.size(); return Collections.singletonList(allServers.get(index)); } }
|
- 同步实现:
DistroDataSyncService
当临时实例变动时,节点向负责该实例的其他节点推送同步数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| @Component public class DistroDataSyncService { @Autowired private DistroHashMapper hashMapper; @Autowired private GrpcClient grpcClient;
public void syncEphemeralInstance(Instance instance) { List<String> targetServers = hashMapper.mapInstanceToServers(instance.getInstanceId(), serverListManager.getServerList()); DistroData data = DistroDataBuilder.buildInstanceData(instance); for (String server : targetServers) { syncToServer(server, data); } }
private void syncToServer(String server, DistroData data) { DistroSyncRequest request = DistroSyncRequest.newBuilder() .setData(ByteString.copyFrom(data.getContent())) .build(); grpcClient.sendRequest(server, request, DistroSyncResponse.class); } }
|
(二)持久化实例同步:Raft 协议
持久化实例存储在 MySQL,通过 Raft 协议实现 强一致性 同步,确保所有节点数据一致。
- Raft 核心实现:
RaftCore
负责 Raft 协议的 ** Leader 选举、日志复制、数据提交**:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| public class RaftCore { private volatile RaftPeer.State state = RaftPeer.State.FOLLOWER; private final List<LogEntry> logEntries = new CopyOnWriteArrayList<>();
public void startElection() { }
public void replicateLog(LogEntry entry) { if (state != RaftPeer.State.LEADER) { throw new IllegalStateException("非 Leader 节点,无法复制日志"); } logEntries.add(entry); for (String follower : followerList) { replicateToFollower(follower, entry); } }
public void commitLog(LogEntry entry) { if (isLogReplicatedToMajority(entry.getIndex())) { commitEntry(entry); notifyFollowersCommit(entry.getIndex()); } } }
|
- 持久化实例同步触发
当持久化实例变动时,Leader 节点通过 Raft 日志复制同步到所有 Follower 节点:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Override public void put(String key, Record value) throws NacosException { LogEntry entry = LogEntry.newBuilder() .setKey(key) .setValue(ByteString.copyFrom(SerializeUtils.serialize(value))) .setType(LogEntryType.WRITE) .build(); raftCore.replicateLog(entry); raftCore.waitForLogCommit(entry.getIndex(), 5000); }
|