传统的服务发现机制通常采用客户端轮询的方式获取服务实例列表,这种方式不仅消耗大量网络资源,还存在实时性差的问题。
Nacos 2.x 版本通过引入基于 gRPC 长连接的发布订阅模式,实现了服务变更的实时推送。当服务实例发生变化时,服务端能够主动将变更信息推送给所有订阅者,大大提升了服务发现的响应速度和系统整体性能。
接下来我将深入理解 Nacos 发布订阅模式的源码实现,能更好的帮助我们在遇到复杂业务场景时进行精准的性能调优和问题排查。
服务发现是指服务消费者通过查询 Nacos 注册中心,动态获取服务提供者的地址信息(如IP和端口),从而实现服务间通信的机制。该机制解决了微服务架构中服务位置动态变化的问题,避免了硬编码服务地址的需求。
先来个总结图,从服务注册,健康检查,到服务发现时如何进行发布订阅处理的,一眼明了

在 2.0+ 版本 Nacos 进行了架构升级概述
这也就导致了我们服务发现模式转变,从传统的 轮询查询 方式,转变成 订阅推送模式
NamingService.subscribe() 建立长连接监听ServiceChangedEvent 事件通知客户端变化为什么从加载服务列表开始?
实际上,当我们服务启动时,此时是没有进行服务订阅绑定关系的,当我们通过 OpenFeign 中选择服务实例,调用 getInstances 方法时,才会执行订阅与发布。
现在我们来看下 Nacos 的客户端中服务消费者是如何来加载获取 Nacos 中的服务列表信息的。同样我们先进入到 org.springframework.boot.autoconfigure.AutoConfiguration.imports 中找到 NacosDiscoveryClientConfiguration 这个配置类。

创建服务发现客户端实例对象
java @Bean
public DiscoveryClient nacosDiscoveryClient(
NacosServiceDiscovery nacosServiceDiscovery) {
// Nacos 服务的发现客户端
return new NacosDiscoveryClient(nacosServiceDiscovery);
}
此示例对象类中,包含 getInstances 方法
java@Override
public List<ServiceInstance> getInstances(String serviceId) {
try {
// 缓存服务实例
return Optional.of(serviceDiscovery.getInstances(serviceId))
.map(instances -> {
ServiceCache.setInstances(serviceId, instances);
return instances;
}).get();
}
catch (Exception e) {
if (failureToleranceEnabled) {
return ServiceCache.getInstances(serviceId);
}
throw new RuntimeException(
"Can not get hosts from nacos server. serviceId: " + serviceId, e);
}
}
继续执行就到了 getInstances 方法,可以看到通过 NamingService #selectInstances 查询实例
java/**
* Return all instances for the given service.
* @param serviceId id of service
* @return list of instances
* @throws NacosException nacosException
*/
public List<ServiceInstance> getInstances(String serviceId) throws NacosException {
// 客户端配置的分组名
String group = discoveryProperties.getGroup();
// 获取服务实例
List<Instance> instances = namingService().selectInstances(serviceId, group, true);
// 缓存到服务实例列表
return hostToServiceInstanceList(instances, serviceId);
}
selectInstances 选择实例的方法中,根据是否是订阅模式,进行不同的查询方式,默认 subscribe 是true,是订阅模式
java@Override
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy,
boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
// 集群名称,使用逗号分隔
String clusterString = StringUtils.join(clusters, ",");
// 是否订阅,默认是订阅的
if (subscribe) {
/**
* 1.从缓存中获取ServiceInfo
* ConcurrentMap<String, ServiceInfo> serviceInfoMap
* key: groupName@@serviceName 或者 groupName@@serviceName@@clusterString
* value: ServiceInfo
*/
serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
// 2.缓存为空,执行订阅服务
if (null == serviceInfo) {
serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
}
} else {
// 3.非订阅,通过grpc发送ServiceQueryRequest服务查询请求
serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
}
// 4.筛选满足条件的实例
return selectInstances(serviceInfo, healthy);
}
接下来从客户端、服务端进行源码分析,看服务订阅是如何实现的
如果是订阅模式,我们会从本地缓存对象 ServiceInfoHolder 中获取服务信息,如果获取不到,则调用 gRPC 客户端进行订阅

我们来看下 subscribe 订阅方法,首先构建订阅数据,进行缓存,默认的 registered 为 false
java@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("[GRPC-SUBSCRIBE] service:{}, group:{}, cluster:{} ", serviceName, groupName, clusters);
}
// 缓存订阅数据
redoService.cacheSubscriberForRedo(serviceName, groupName, clusters);
// 执行订阅服务
return doSubscribe(serviceName, groupName, clusters);
}
public void cacheSubscriberForRedo(String serviceName, String groupName, String cluster) {
String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), cluster);
// 订阅者数据
SubscriberRedoData redoData = SubscriberRedoData.build(serviceName, groupName, cluster);
synchronized (subscribes) {
subscribes.put(key, redoData);
}
}
真正的去订阅的方法,gRPC 请求后的响应数据缓存到订阅数据中,同时修改 registered 为 true,最终返回服务信息
javapublic ServiceInfo doSubscribe(String serviceName, String groupName, String clusters) throws NacosException {
// 订阅服务请求
SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, groupName, serviceName, clusters,
true);
// 请求服务
SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class);
// 缓存订阅数据中,修改注册标记
redoService.subscriberRegistered(serviceName, groupName, clusters);
// 返回服务信息
return response.getServiceInfo();
}
public void subscriberRegistered(String serviceName, String groupName, String cluster) {
String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), cluster);
synchronized (subscribes) {
SubscriberRedoData redoData = subscribes.get(key);
if (null != redoData) {
// 修改 registered 为 true
redoData.setRegistered(true);
}
}
}
总结
服务发现的方式从传统的轮询方法,切换成基于 gRPC 的订阅模式,从架构设计上更加有优势:
通过查找 gRPC 接口,定位到订阅接口的服务端处理类 SubscribeServiceRequestHandler

上述流程中,主要是根据请求信息,构建订阅者对象,同时根据请求的数据,来校验是订阅,还是取消订阅,发布对应的事件,最终返回成功响应
接下来我们看下 subscribeService, unsubscribeService 订阅和取消订阅服务的实现
java@Override
public void subscribeService(Service service, Subscriber subscriber, String clientId) {
// 获取注册表里的service
Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
// 获取客户端对象
Client client = clientManager.getClient(clientId);
if (!clientIsLegal(client, clientId)) {
return;
}
// 添加订阅信息
client.addServiceSubscriber(singleton, subscriber);
// 更新时间
client.setLastUpdatedTime();
// 发送事件
NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));
}
@Override
public void unsubscribeService(Service service, Subscriber subscriber, String clientId) {
// 获取注册表里的service
Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
// 获取客户端对象
Client client = clientManager.getClient(clientId);
if (!clientIsLegal(client, clientId)) {
return;
}
// 删除订阅信息
client.removeServiceSubscriber(singleton);
// 更新时间
client.setLastUpdatedTime();
NotifyCenter.publishEvent(new ClientOperationEvent.ClientUnsubscribeServiceEvent(singleton, clientId));
}
上述订阅的源码中,最核心的就是发送了事件 ClientOperationEvent.ClientSubscribeServiceEvent,我们全局查找下事件监听的点,通过断点找到了以下的源码,可以发现,其与客户端在注册时事件监听的逻辑在一起

继续深入源码,其内部主要是添加索引,且添加成功,则发布 ServiceEvent.ServiceSubscribedEvent 事件
javaprivate void addSubscriberIndexes(Service service, String clientId) {
// 添加订阅者索引
subscriberIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
// 只有第一次添加需要通知事件
if (subscriberIndexes.get(service).add(clientId)) {
// 添加订阅者索引事件
NotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, clientId));
}
}
找到事件监听的地方,可以看到此处通过 PushDelayTaskExecuteEngine 推送延迟任务执行引擎,添加了一个延迟处理的任务
javapublic class NamingSubscriberServiceV2Impl extends SmartSubscriber implements NamingSubscriberService {
// ...
/**
* 推送延迟任务执行引擎
*/
private final PushDelayTaskExecuteEngine delayTaskEngine;
@Override
public void onEvent(Event event) {
// 服务变更事件
if (event instanceof ServiceEvent.ServiceChangedEvent) {
// 服务发生变化,推送给所有订阅者
ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
// 服务
Service service = serviceChangedEvent.getService();
// 添加延迟任务,500ms
delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
// 增加服务变更次数
MetricsMonitor.incrementServiceChangeCount(service.getNamespace(), service.getGroup(), service.getName());
}
// 服务订阅事件
else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
// 如果服务由一个客户端订阅,则只推送此客户端
ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;
Service service = subscribedEvent.getService();
// 添加延迟任务,默认 500ms 执行一次
delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(),
subscribedEvent.getClientId()));
}
}
// ...
}
而 PushDelayTaskExecuteEngine 继承了 NacosDelayTaskExecuteEngine Nacos 延迟任务执行引擎,其在对象创建时,创了处理执行器,也就是一个定时线程池,执行固定任务,每间隔 100ms 执行一次 run 方法
javapublic class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractDelayTask> {
/**
* 处理执行器
*/
private final ScheduledExecutorService processingExecutor;
protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;
protected final ReentrantLock lock = new ReentrantLock();
public NacosDelayTaskExecuteEngine(String name) {
this(name, null);
}
public NacosDelayTaskExecuteEngine(String name, Logger logger) {
this(name, 32, logger, 100L);
}
public NacosDelayTaskExecuteEngine(String name, Logger logger, long processInterval) {
this(name, 32, logger, processInterval);
}
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger) {
this(name, initCapacity, logger, 100L);
}
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
super(logger);
tasks = new ConcurrentHashMap<>(initCapacity);
processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
// 自动开启任务执行
processingExecutor
.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}
/**
* process tasks in execute engine.
*/
protected void processTasks() {
Collection<Object> keys = getAllTaskKeys();
for (Object taskKey : keys) {
AbstractDelayTask task = removeTask(taskKey);
if (null == task) {
continue;
}
NacosTaskProcessor processor = getProcessor(taskKey);
if (null == processor) {
getEngineLog().error("processor not found for task, so discarded. " + task);
continue;
}
try {
// 执行延迟任务,如果失败则重试
if (!processor.process(task)) {
retryFailedTask(taskKey, task);
}
} catch (Throwable e) {
getEngineLog().error("Nacos task execute error ", e);
retryFailedTask(taskKey, task);
}
}
}
private void retryFailedTask(Object key, AbstractDelayTask task) {
task.setLastProcessTime(System.currentTimeMillis());
addTask(key, task);
}
private class ProcessRunnable implements Runnable {
@Override
public void run() {
try {
// 处理任务
processTasks();
} catch (Throwable e) {
getEngineLog().error(e.toString(), e);
}
}
}
}
上述源码中执行延迟任务,根据引用会查找到 PushDelayTaskExecuteEngine 中的子类 PushDelayTaskProcessor#process 方法
java
private static class PushDelayTaskProcessor implements NacosTaskProcessor {
private final PushDelayTaskExecuteEngine executeEngine;
public PushDelayTaskProcessor(PushDelayTaskExecuteEngine executeEngine) {
this.executeEngine = executeEngine;
}
@Override
public boolean process(NacosTask task) {
// 推送延迟任务
PushDelayTask pushDelayTask = (PushDelayTask) task;
Service service = pushDelayTask.getService();
// 派发延迟任务
NamingExecuteTaskDispatcher.getInstance()
.dispatchAndExecuteTask(service, new PushExecuteTask(service, executeEngine, pushDelayTask));
return true;
}
}
派发任务进行了以下流转
java // 派发任务
public void dispatchAndExecuteTask(Object dispatchTag, AbstractExecuteTask task) {
executeEngine.addTask(dispatchTag, task);
}
@Override
public void addTask(Object tag, AbstractExecuteTask task) {
// nacos任务处理器
NacosTaskProcessor processor = getProcessor(tag);
if (null != processor) {
processor.process(task);
return;
}
// 任务执行工作器
TaskExecuteWorker worker = getWorker(tag);
// 执行
worker.process(task);
}
到这我们看到了核心流程,通过上述源码中开始调用 process 方法,添加到任务队列中。那任务队列中的任务是如何被执行的呢?
通过查看 TaskExecuteWorker 对象,其在初始化时,启动了一个自定义线程 InnerWorker,并调用 start 方法开启执行任务,同时观察到其运行的 run 方法中,通过 while 循环执行队列中被添加而来的任务。
javapublic final class TaskExecuteWorker implements NacosTaskProcessor, Closeable {
/**
* Max task queue size 32768.
*/
private static final int QUEUE_CAPACITY = 1 << 15;
private final Logger log;
private final String name;
private final BlockingQueue<Runnable> queue;
private final AtomicBoolean closed;
public TaskExecuteWorker(final String name, final int mod, final int total) {
this(name, mod, total, null);
}
public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) {
this.name = name + "_" + mod + "%" + total;
this.queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
this.closed = new AtomicBoolean(false);
this.log = null == logger ? LoggerFactory.getLogger(TaskExecuteWorker.class) : logger;
// 开启线程任务
new InnerWorker(name).start();
}
public String getName() {
return name;
}
@Override
public boolean process(NacosTask task) {
if (task instanceof AbstractExecuteTask) {
putTask((Runnable) task);
}
return true;
}
private void putTask(Runnable task) {
try {
queue.put(task);
} catch (InterruptedException ire) {
log.error(ire.toString(), ire);
}
}
public int pendingTaskCount() {
return queue.size();
}
/**
* Worker status.
*/
public String status() {
return name + ", pending tasks: " + pendingTaskCount();
}
@Override
public void shutdown() throws NacosException {
queue.clear();
closed.compareAndSet(false, true);
}
/**
* Inner execute worker.
*/
private class InnerWorker extends Thread {
InnerWorker(String name) {
setDaemon(false);
setName(name);
}
@Override
public void run() {
// 循环执行任务
while (!closed.get()) {
try {
// 获取任务
Runnable task = queue.take();
long begin = System.currentTimeMillis();
// 执行任务
task.run();
long duration = System.currentTimeMillis() - begin;
if (duration > 1000L) {
log.warn("task {} takes {}ms", task, duration);
}
} catch (Throwable e) {
log.error("[TASK-FAILED] " + e.toString(), e);
}
}
}
}
}
那被循环执行的任务是啥呢?其实就是我们在派发任务时,构建的 PushExecuteTask 对象,其内部 run 方法中,真正执行了对客户端订阅者的推送逻辑。
java@Override
public void run() {
try {
// 包装数据
PushDataWrapper wrapper = generatePushData();
// 客户端管理器
ClientManager clientManager = delayTaskEngine.getClientManager();
for (String each : getTargetClientIds()) {
// 获取客户端
Client client = clientManager.getClient(each);
if (null == client) {
// means this client has disconnect
continue;
}
// 获取客户端的订阅者
Subscriber subscriber = clientManager.getClient(each).getSubscriber(service);
// 执行推送
delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper,
// 推送回调
new ServicePushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll()));
}
} catch (Exception e) {
Loggers.PUSH.error("Push task for service" + service.getGroupedServiceName() + " execute failed ", e);
delayTaskEngine.addTask(service, new PushDelayTask(service, 1000L));
}
}
在服务端我们推送有两种,一种是 服务变更推送,一种是 订阅推送
服务变更推送:对于一个服务来说,订阅者有好多,我们在订阅表中能看到 ConcurrentMap<Service, Set<String>> subscriberIndexes,能获所有订阅者,然后进行推送
订阅者推送:当服务实例在新注册或者注销时,服务端会自动根据订阅关系,进行推送给新增注册的实例
上述的服务订阅中,其服务端源码在最后的操作,则是对客户端的订阅推送流程,其实包含下列2种推送执行器

根据 clientId 中是否包含 # 字符,来判断使用udp执行器,还是gRPC执行器,默认是gRPC执行器。

根据 gRPC 中构建的 NotifySubscriberRequest 请求对象,查找到客户端的入口
javapublic class NamingPushRequestHandler implements ServerRequestHandler {
private final ServiceInfoHolder serviceInfoHolder;
public NamingPushRequestHandler(ServiceInfoHolder serviceInfoHolder) {
this.serviceInfoHolder = serviceInfoHolder;
}
@Override
public Response requestReply(Request request) {
// 是订阅者通知的请求
if (request instanceof NotifySubscriberRequest) {
// 请求
NotifySubscriberRequest notifyResponse = (NotifySubscriberRequest) request;
// 处理服务信息
serviceInfoHolder.processServiceInfo(notifyResponse.getServiceInfo());
return new NotifySubscriberResponse();
}
return null;
}
}
根据收到的服务信息,校验服务信息中的实例是否变更过,如果变更过则对本地缓存中的更新
javapublic ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
// 服务标识
String serviceKey = serviceInfo.getKey();
if (serviceKey == null) {
return null;
}
// 旧的服务信息
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
if (isEmptyOrErrorPush(serviceInfo)) {
//empty or error push, just ignore
return oldService;
}
// 新的服务信息
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
// 校验服务信息是否改变
boolean changed = isChangedServiceInfo(oldService, serviceInfo);
if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
}
// 监控服务信息
MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
if (changed) {
NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(),
JacksonUtils.toJson(serviceInfo.getHosts()));
// 发送服务信息变更事件
NotifyCenter.publishEvent(new InstancesChangeEvent(notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), serviceInfo.getHosts()));
DiskCache.write(serviceInfo, cacheDir);
}
return serviceInfo;
}
执行所有订阅的事件监听器
java @Override
public void onEvent(InstancesChangeEvent event) {
// 服务标识
String key = ServiceInfo
.getKey(NamingUtils.getGroupedName(event.getServiceName(), event.getGroupName()), event.getClusters());
ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
if (CollectionUtils.isEmpty(eventListeners)) {
return;
}
// 触发监听器
for (final EventListener listener : eventListeners) {
final com.alibaba.nacos.api.naming.listener.Event namingEvent = transferToNamingEvent(event);
if (listener instanceof AbstractEventListener && ((AbstractEventListener) listener).getExecutor() != null) {
((AbstractEventListener) listener).getExecutor().execute(() -> listener.onEvent(namingEvent));
} else {
listener.onEvent(namingEvent);
}
}
}
在 Nacos 注册中心中会通过事件的发布订阅机制把相关的逻辑串联起来,比如服务注册会发布相关事件通知对应的服务订阅者更新相关服务等。所以我们来看看这块是怎么设计实现的。
在服务注册发布订阅的时候,注册完成会通过 NotifyCenter 来发布相关的事件,而 NotifyCenter 是统一事件通知中心,我们来看下其实现
java /**
* Publisher management container.
* 存储所有的事件发布者
* key :是事件类的class全限定类名
* values 是一个 EventPublisher接口(两个实现DefaultPublisher/NamingEventPublisher)的实现 简单来说就是一个发布者
*/
private final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap<>(16);
/**
* Publisher management container.
* 存储所有的事件发布者
* key :是事件类的class全限定类名
* values 是一个 EventPublisher接口(两个实现DefaultPublisher/NamingEventPublisher)的实现 简单来说就是一个发布者
*/
private final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap<>(16);
static {
// Internal ArrayBlockingQueue buffer size. For applications with high write throughput,
// this value needs to be increased appropriately. default value is 16384
String ringBufferSizeProperty = "nacos.core.notify.ring-buffer-size";
ringBufferSize = Integer.getInteger(ringBufferSizeProperty, 16384);
// The size of the public publisher's message staging queue buffer
String shareBufferSizeProperty = "nacos.core.notify.share-buffer-size";
shareBufferSize = Integer.getInteger(shareBufferSizeProperty, 1024);
// 加载所有的 EventPublisher 接口的实现类
final Collection<EventPublisher> publishers = NacosServiceLoader.load(EventPublisher.class);
Iterator<EventPublisher> iterator = publishers.iterator(); // 获取 事件发布器 的迭代器
if (iterator.hasNext()) {
clazz = iterator.next().getClass(); // 如果有自定义的 发布器
} else {
clazz = DefaultPublisher.class; // 如果没有自定义的迭代器 就用默认的
}
// 定义了 EventPublishFactory 中定义的 apply() 方法的逻辑 在addXXX 方法中会触发
DEFAULT_PUBLISHER_FACTORY = (cls, buffer) -> {
try {
EventPublisher publisher = clazz.newInstance();
publisher.init(cls, buffer);
return publisher;
} catch (Throwable ex) {
LOGGER.error("Service class newInstance has error : ", ex);
throw new NacosRuntimeException(SERVER_ERROR, ex);
}
};
try {
// Create and init DefaultSharePublisher instance. 设置默认的事件发布者
INSTANCE.sharePublisher = new DefaultSharePublisher();
INSTANCE.sharePublisher.init(SlowEvent.class, shareBufferSize); // 初始化事件发布器 执行的是上面的代码
} catch (Throwable ex) {
LOGGER.error("Service class newInstance has error : ", ex);
}
ThreadUtils.addShutdownHook(NotifyCenter::shutdown);
}
在 static 静态代码块中主要的逻辑是判断是否有创建自定义的事件发布器,如果没有创建就使用默认的事件发布器 DefaultSharePublisher,同时执行对应的 init 方法完成初始化的处理。
javastatic {
// 内部 ArrayBlockingQueue 缓冲区大小。对于具有高写入吞吐量的应用程序,需要适当增加此值。默认值为16384
String ringBufferSizeProperty = "nacos.core.notify.ring-buffer-size";
ringBufferSize = Integer.getInteger(ringBufferSizeProperty, 16384);
// 公共发布者的消息暂存队列缓冲区的大小
String shareBufferSizeProperty = "nacos.core.notify.share-buffer-size";
shareBufferSize = Integer.getInteger(shareBufferSizeProperty, 1024);
// 获取 EventPublisher 的实现类
final Collection<EventPublisher> publishers = NacosServiceLoader.load(EventPublisher.class);
Iterator<EventPublisher> iterator = publishers.iterator();
// 获取发布器类
if (iterator.hasNext()) {
clazz = iterator.next().getClass();
} else {
clazz = DefaultPublisher.class;
}
// 定义默认的 EventPublisherFactory,创建 DefaultPublisher 实例
DEFAULT_PUBLISHER_FACTORY = (cls, buffer) -> {
try {
EventPublisher publisher = clazz.newInstance();
// 初始化
publisher.init(cls, buffer);
return publisher;
} catch (Throwable ex) {
LOGGER.error("Service class newInstance has error : ", ex);
throw new NacosRuntimeException(SERVER_ERROR, ex);
}
};
try {
// 创建并初始化 DefaultSharePublisher 实例
INSTANCE.sharePublisher = new DefaultSharePublisher();
INSTANCE.sharePublisher.init(SlowEvent.class, shareBufferSize);
} catch (Throwable ex) {
LOGGER.error("Service class newInstance has error : ", ex);
}
ThreadUtils.addShutdownHook(NotifyCenter::shutdown);
}
创建对应的事件发布器然后执行 init 方法初始化。
java@Override
public void init(Class<? extends Event> type, int bufferSize) {
setDaemon(true);// 设置守护线程
setName("nacos.publisher-" + type.getName()); // 设置 发布器 的名称
this.eventType = type; // 记录发布器
this.queueMaxSize = bufferSize; // 设置阻塞队列的长度
// 集合是一个 阻塞队列
this.queue = new ArrayBlockingQueue<>(bufferSize);
start(); // 调用start方法 开启线程 等待系统调度执行 run 方法
}
@Override
public synchronized void start() {
if (!initialized) {
// start just called once
super.start();
if (queueMaxSize == -1) {
queueMaxSize = ringBufferSize;
}
initialized = true;
}
}
调用 start 方法等待调度执行 run方法
java @Override
public void run() {
openEventHandler();
}
进入到 openEventHandler 方法中
java void openEventHandler() {
try {
// This variable is defined to resolve the problem which message overstock in the queue.
int waitTimes = 60; // 默认的等待次数。主要是解决队列中消息积压的问题
// To ensure that messages are not lost, enable EventHandler when
// waiting for the first Subscriber to register 为了确保消息不丢失。需要等待第一个订阅者注册
while (!shutdown && !hasSubscriber() && waitTimes > 0) {
ThreadUtils.sleep(1000L);
waitTimes--;
}
while (!shutdown) {
// 从阻塞队列中获取对应的事件
final Event event = queue.take();
// 处理事件
receiveEvent(event);
UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
}
} catch (Throwable ex) {
LOGGER.error("Event listener exception : ", ex);
}
}
这里的关键就是 queue.take() 方法。这个方法会阻塞执行。
当有事件添加到这个阻塞队列中的时候,这个方法就会获取到添加进来的事件,receiveEvent(event)就是对应的处理相关的事件的逻辑了
Javavoid receiveEvent(Event event) {
final long currentEventSequence = event.sequence();
if (!hasSubscriber()) { // 如果没有 订阅者 那么就没有处理的意义了
LOGGER.warn("[NotifyCenter] the {} is lost, because there is no subscriber.", event);
return;
}
// Notification single event listener
for (Subscriber subscriber : subscribers) {
if (!subscriber.scopeMatches(event)) {
continue; // 如果订阅者订阅的事件和触发的事件不匹配就结束这次循环
}
// Whether to ignore expiration events
if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
event.getClass());
continue;
}
// Because unifying smartSubscriber and subscriber, so here need to think of compatibility.
// Remove original judge part of codes. 通知所有的订阅者处理事件
notifySubscriber(subscriber, event);
}
}
上面的方法的逻辑是根据当前的事件找到对应的订阅者。然后通过 notifySubscriber(xx) 方法来通知订阅者。
Java@Override
public void notifySubscriber(final Subscriber subscriber, final Event event) {
LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);
// 声明了一个 Runnable 接口。指定了run方法的逻辑
final Runnable job = () -> subscriber.onEvent(event);
final Executor executor = subscriber.executor();
if (executor != null) {
executor.execute(job); // 异步执行
} else {
try {
job.run(); // 同步执行
} catch (Throwable e) {
LOGGER.error("Event callback exception: ", e);
}
}
}
这里就可以看到找到对应的订阅者,调用订阅者的 run 方法,可同步或异步进行执行。
在订阅者中,Nacos 默认实现的 DistroClientDataProcessor #onEvent() 是基于 Distro 协议保证节点间数据最终一致性的

此处的事件类型是 ClientEvent.ClientChangedEvent
javaprivate void syncToAllServer(ClientEvent event) {
Client client = event.getClient();
// 只有 Distro 进行短暂的数据同步,持久客户端才应该通过 raft 进行同步。
if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
return;
}
if (event instanceof ClientEvent.ClientDisconnectEvent) {
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
distroProtocol.sync(distroKey, DataOperation.DELETE);
} else if (event instanceof ClientEvent.ClientChangedEvent) {
// Distro 标识
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
distroProtocol.sync(distroKey, DataOperation.CHANGE);
}
}
通过执行上述源码中 DistroProtocol #sync 方法,构建 Distro 协议的延迟任务,添加到延迟任务引擎中,与之前派发任务的源码地方形成了闭环。
javapublic void sync(DistroKey distroKey, DataOperation action, long delay) {
for (Member each : memberManager.allMembersWithoutSelf()) {
// 同步到目标
syncToTarget(distroKey, action, each.getAddress(), delay);
}
}
public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {
// Distro 标识
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
targetServer);
// Distro 延迟任务
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
// Distro 任务引擎持有者 - 添加任务
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);
}
}
Distro 协议
Nacos 2.0 保留了对 Distro 协议的支持,主要用于处理 临时实例数据 的一致性问题,临时实例数据的分布式同步仍然依赖 Distro 协议机制
虽然保留了 Distro 协议,但在 Nacos 2.0 中对其进行了优化和改进
来看看 Nacos 服务端的事件类型有哪些,每个事件类型处理了什么逻辑。
| 事件名称 | 介绍 | 相关处理器 |
|---|---|---|
| ClientRegisterServiceEvent | 客户端注册服务事件 | ClientServiceIndexesManager |
| ClientDeregisterServiceEvent | 客户端取消注册服务事件 | ClientServiceIndexesManager |
| ClientSubscribeServiceEvent | 客户端订阅服务事件 | ClientServiceIndexesManager |
| ClientUnsubscribeServiceEvent | 客户端取消订阅服务事件 | ClientServiceIndexesManager |
| InstanceMetadataEvent | 元数据事件 | NamingMetadataManager |
| ServiceMetadataEvent | 服务元数据事件 | NamingMetadataManager |
| ClientDisconnectEvent | 客户端断开连接事件 | NamingMetadataManager |
| ClientChangedEvent | 客户端变化的事件 | DistroClientDataProcessor |
| ..... |
本文作者:柳始恭
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!