Nacos服务注册需要具备的能力

  • 服务提供者把自己的协议地址注册到Nacos server
  • 服务消费者需要从Nacos Server上去查询服务提供者的地址(根据服务名称)
  • Nacos Server需要感知到服务提供者的上下线的变化
  • 服务消费者需要动态感知到Nacos Server端服务地址的变化

Nacos API

SDK(底层也是基于open Api调用) / OPEN API(Rest 接口)

官网

服务注册

void registerInstance(String serviceName, String ip, int port) throws NacosException;

void registerInstance(String serviceName, String ip, int port, String clusterName) throws NacosException;

void registerInstance(String serviceName, Instance instance) throws NacosException;

请求参数

名称 类型 描述
serviceName 字符串 服务名
ip 字符串 服务实例IP
port int 服务实例port
clusterName 字符串 集群名
instance 参见代码注释 实例属性

请求示例

NamingService naming = NamingFactory.createNamingService(System.getProperty("serveAddr"));
naming.registerInstance("nacos.test.3", "11.11.11.11", 8888, "TEST1");

Instance instance = new Instance();
instance.setIp("55.55.55.55");
instance.setPort(9999);
instance.setHealthy(false);
instance.setWeight(2.0);
Map<String, String> instanceMeta = new HashMap<>();
instanceMeta.put("site", "et2");
instance.setMetadata(instanceMeta);

Service service = new Service("nacos.test.4");
service.setApp("nacos-naming");
service.sethealthCheckMode("server");
service.setEnableHealthCheck(true);
service.setProtectThreshold(0.8F);
service.setGroup("CNCF");
Map<String, String> serviceMeta = new HashMap<>();
serviceMeta.put("symmetricCall", "true");
service.setMetadata(serviceMeta);
instance.setService(service);

Cluster cluster = new Cluster();
cluster.setName("TEST5");
AbstractHealthChecker.Http healthChecker = new AbstractHealthChecker.Http();
healthChecker.setExpectedResponseCode(400);
healthChecker.setCurlHost("USer-Agent|Nacos");
healthChecker.setCurlPath("/xxx.html");
cluster.setHealthChecker(healthChecker);
Map<String, String> clusterMeta = new HashMap<>();
clusterMeta.put("xxx", "yyyy");
cluster.setMetadata(clusterMeta);

instance.setCluster(cluster);

naming.registerInstance("nacos.test.4", instance);

服务查询

List<Instance> getAllInstances(String serviceName) throws NacosException;

List<Instance> getAllInstances(String serviceName, List<String> clusters) throws NacosException;

请求参数

名称 类型 描述
serviceName 字符串 服务名
clusters List 集群列表

请求示例

NamingService naming = NamingFactory.createNamingService(System.getProperty("serveAddr"));
System.out.println(naming.getAllInstances("nacos.test.3"));

心跳机制

  • 心跳的发送间隔
  • 心跳的超时时间
    • 设置一个心跳超时的阈值
    • 记录针对于某一个服务实例的最后一次更新的时间
    • 当前时间-当前实例最后一次更新的时间> 心跳超时的阈值

Nacos的实现原理图

Nacos的源码分析

Nacos客户端注册的流程

Dubbo服务注册的流程有两个,一个是和之前分析Eureka源码时的路径一样(参考Eureka源码分析)

另一个是基于Dubbo本身提供的自动装配机制,而在基于Dubbo服务发布的过程中, 是走的事件监听机制,在 DubboServiceRegistrationNonWebApplicationAutoConfiguration 这个类中,这个类会 监听 ApplicationStartedEvent 事件,这个事件是spring boot在2.0新增的,就是当spring boot应用 启动完成之后会发布这个事件。而此时监听到这个事件之后,会触发注册的动作。

Spring Cloud Nacos 继承Dubbo

DubboServiceRegistrationNonWebApplicationAutoConfiguration

//监听ApplicationStartedEvent事件,进行服务的注册
@EventListener(ApplicationStartedEvent.class)
public void onApplicationStarted() {
    setServerPort();
    register();
}

private void register() {
    if (registered) {
        return;
    }
    serviceRegistry.register(registration);
    registered = true;
}

this.serviceRegistry。 是一个注入的实例: NacosServiceRegistry

NacosServiceRegistry.register

@Override
public void register(Registration registration) {

    if (StringUtils.isEmpty(registration.getServiceId())) {
        log.warn("No service to register for nacos client...");
        return;
    }
	//serviceId对应当前应用的application.name
    String serviceId = registration.getServiceId();
    //group表示nacos上的分组配置
    String group = nacosDiscoveryProperties.getGroup();
    //instance表示服务实例信息
    Instance instance = getNacosInstanceFromRegistration(registration);

    try {
        namingService.registerInstance(serviceId, group, instance);
        log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
                 instance.getIp(), instance.getPort());
    }
    catch (Exception e) {
        log.error("nacos registry, {} register failed...{},", serviceId,
                  registration.toString(), e);
        // rethrow a RuntimeException if the registration is failed.
        // issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132
        rethrowRuntimeException(e);
    }
}

NacosNamingService.registerInstance

开始注册实例,主要做两个动作

  • 如果当前注册的是临时节点,则构建心跳信息,通过beat反应堆来构建心跳任务
  • 调用registerService发起服务注册
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
	//是否是临时节点,如果是临时节点,则构建心跳信息
    if (instance.isEphemeral()) {
        BeatInfo beatInfo = new BeatInfo();
        beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
        beatInfo.setIp(instance.getIp());
        beatInfo.setPort(instance.getPort());
        beatInfo.setCluster(instance.getClusterName());
        beatInfo.setWeight(instance.getWeight());
        beatInfo.setMetadata(instance.getMetadata());
        beatInfo.setScheduled(false);
        beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
		//beatReactor, 添加心跳信息进行处理
        beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
    }

    serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}

NamingProxy.registerService

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

    NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
                       namespaceId, serviceName, instance);

    final Map<String, String> params = new HashMap<String, String>(9);
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, serviceName);
    params.put(CommonParams.GROUP_NAME, groupName);
    params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
    params.put("ip", instance.getIp());
    params.put("port", String.valueOf(instance.getPort()));
    params.put("weight", String.valueOf(instance.getWeight()));
    params.put("enable", String.valueOf(instance.isEnabled()));
    params.put("healthy", String.valueOf(instance.isHealthy()));
    params.put("ephemeral", String.valueOf(instance.isEphemeral()));
    params.put("metadata", JSON.toJSONString(instance.getMetadata()));

    reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);

}

NamingProxy.reqAPI

public String reqAPI(String api, Map<String, String> params, String body, List<String> servers, String method) throws NacosException {

    params.put(CommonParams.NAMESPACE_ID, getNamespaceId());

    if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {
        throw new NacosException(NacosException.INVALID_PARAM, "no server available");
    }

    NacosException exception = new NacosException();
    //服务列表不为空的话,对应application.properties中配置的spring.cloud.nacos.discovery.server-addr的Nacos注册中心列表
    if (servers != null && !servers.isEmpty()) {

        Random random = new Random(System.currentTimeMillis());
        int index = random.nextInt(servers.size());
		//随机选取一个服务进行注册
        for (int i = 0; i < servers.size(); i++) {
            String server = servers.get(index);
            try {
                //调用指定服务
                return callServer(api, params, body, server, method);
            } catch (NacosException e) {
                exception = e;
                if (NAMING_LOGGER.isDebugEnabled()) {
                    NAMING_LOGGER.debug("request {} failed.", server, e);
                }
            }
            //轮询
            index = (index + 1) % servers.size();
        }
    }

    if (StringUtils.isNotBlank(nacosDomain)) {
        for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) {
            try {
                return callServer(api, params, body, nacosDomain, method);
            } catch (NacosException e) {
                exception = e;
                if (NAMING_LOGGER.isDebugEnabled()) {
                    NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);
                }
            }
        }
    }

    NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}",
                        api, servers, exception.getErrCode(), exception.getErrMsg());

    throw new NacosException(exception.getErrCode(), "failed to req API:/api/" + api + " after all servers(" + servers + ") tried: "
                             + exception.getMessage());

}

NamingProxy.callServer

public String callServer(String api, Map<String, String> params, String body, String curServer, String method)
    throws NacosException {
    long start = System.currentTimeMillis();
    long end = 0;
    //添加签名信息,
    injectSecurityInfo(params);
    //添加头信息
    List<String> headers = builderHeaders();

    String url;
    if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
        url = curServer + api;
    } else {
        if (!curServer.contains(UtilAndComs.SERVER_ADDR_IP_SPLITER)) {
            curServer = curServer + UtilAndComs.SERVER_ADDR_IP_SPLITER + serverPort;
        }
        url = HttpClient.getPrefix() + curServer + api;
    }
	//发起HttpClient进行注册
    HttpClient.HttpResult result = HttpClient.request(url, headers, params, body, UtilAndComs.ENCODING, method);
    end = System.currentTimeMillis();
	//上报Metrice进行监控
    MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(result.code))
        .observe(end - start);

    if (HttpURLConnection.HTTP_OK == result.code) {
        return result.content;
    }

    if (HttpURLConnection.HTTP_NOT_MODIFIED == result.code) {
        return StringUtils.EMPTY;
    }

    throw new NacosException(result.code, result.content);
}

HttpClient.request

public static HttpResult request(String url, List<String> headers, Map<String, String> paramValues, String body, String encoding, String method) {
    HttpURLConnection conn = null;
    try {
        String encodedContent = encodingParams(paramValues, encoding);
        url += (StringUtils.isEmpty(encodedContent)) ? "" : ("?" + encodedContent);

        conn = (HttpURLConnection) new URL(url).openConnection();

        setHeaders(conn, headers, encoding);
        conn.setConnectTimeout(CON_TIME_OUT_MILLIS);
        conn.setReadTimeout(TIME_OUT_MILLIS);
        conn.setRequestMethod(method);
        conn.setDoOutput(true);
        if (StringUtils.isNotBlank(body)) {
            byte[] b = body.getBytes();
            conn.setRequestProperty("Content-Length", String.valueOf(b.length));
            conn.getOutputStream().write(b, 0, b.length);
            conn.getOutputStream().flush();
            conn.getOutputStream().close();
        }
        conn.connect();
        if (NAMING_LOGGER.isDebugEnabled()) {
            NAMING_LOGGER.debug("Request from server: " + url);
        }
        return getResult(conn);
    } catch (Exception e) {
        try {
            if (conn != null) {
                NAMING_LOGGER.warn("failed to request " + conn.getURL() + " from "
                                   + InetAddress.getByName(conn.getURL().getHost()).getHostAddress());
            }
        } catch (Exception e1) {
            NAMING_LOGGER.error("[NA] failed to request ", e1);
            //ignore
        }

        NAMING_LOGGER.error("[NA] failed to request ", e);

        return new HttpResult(500, e.toString(), Collections.<String, String>emptyMap());
    } finally {
        IoUtils.closeQuietly(conn);
    }
}

Nacos服务端的注册流程

服务端提供了一个InstanceController类,在这个类中提供了服务注册相关的API,而服务端发起初测 时,调用的接口是: [post]: /nacos/v1/ns/instance

InstanceController.register

@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
	//serviceName表示客户端服务名称
    final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    //namespaceId表示nacos的namespace
    final String namespaceId = WebUtils
        .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    // //从请求中解析出instance实例
    final Instance instance = parseInstance(request);

    serviceManager.registerInstance(namespaceId, serviceName, instance);
    return "ok";
}

ServiceManager.registerInstance

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
	//创建一个空服务,在Nacos控制台服务列表展示的服务信息,实际上是初始化一个serviceMap,它是一个ConcurrentHashMap集合
    createEmptyService(namespaceId, serviceName, instance.isEphemeral());
	//从serviceMap中,根据namespaceId和serviceName得到一个服务对象
    Service service = getService(namespaceId, serviceName);

    if (service == null) {
        throw new NacosException(NacosException.INVALID_PARAM,
                                 "service not found, namespace: " + namespaceId + ", service: " + serviceName);
    }
	//调用addInstance创建一个服务实例
    addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

ServiceManager.createServiceIfAbsent

public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
    throws NacosException {
    //根据namespaceId,serviceName(group:serviceName)确定一个服务
    Service service = getService(namespaceId, serviceName);
    //service如果为空,说明是第一次创建,构建一个新的Service
    if (service == null) {

        Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
        service = new Service();
        service.setName(serviceName);
        service.setNamespaceId(namespaceId);
        service.setGroupName(NamingUtils.getGroupName(serviceName));
        // now validate the service. if failed, exception will be thrown
        service.setLastModifiedMillis(System.currentTimeMillis());
        service.recalculateChecksum();
        if (cluster != null) {
            cluster.setService(service);
            service.getClusterMap().put(cluster.getName(), cluster);
        }
        service.validate();
		//存储到Map中
        putServiceAndInit(service);
        if (!local) {
            addOrReplaceService(service);
        }
    }
}

ServiceManager.getService

存储的结构

key namespaceId value Map<String, Service>

​ key group::serviceName value: Service

​ Service

​ List instances;

/**
* Map(namespace, Map(group::serviceName, Service)).
*/
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();

public Service getService(String namespaceId, String serviceName) {
    if (serviceMap.get(namespaceId) == null) {
        return null;
    }
    return chooseServiceMap(namespaceId).get(serviceName);
}

ServiceManager.putServiceAndInit

private void putServiceAndInit(Service service) throws NacosException {
    //将注册的服务添加到Map中
    putService(service);
    //初始化心跳检测机制,检测Server下不同的Insatance之间的心跳
    service.init();
    //实现数据一致性监听,ephemeral=true表示采用raft协议,false表示采用Distro
    consistencyService
        .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
    consistencyService
        .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
    Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}

ServiceManager.putService

将新注册的实例添加到同一个namespaceId下的map中

public void putService(Service service) {
    if (!serviceMap.containsKey(service.getNamespaceId())) {
        synchronized (putServiceLock) {
            if (!serviceMap.containsKey(service.getNamespaceId())) {
                serviceMap.put(service.getNamespaceId(), new ConcurrentHashMap<>(16));
            }
        }
    }
    serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
}

ServiceManager.addInstance

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
    throws NacosException {

    String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
	//Service{name='DEFAULT_GROUP@@fireweed-manager', protectThreshold=0.0, appName='null', groupName='DEFAULT_GROUP', metadata={}}
    Service service = getService(namespaceId, serviceName);

    synchronized (service) {
		//把服务实例添加到集合中,然后基于一致性协议进行数据的同步。
        List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);

        Instances instances = new Instances();
        instances.setInstanceList(instanceList);

        consistencyService.put(key, instances);
    }
}

消费者的服务查询

启动的时候主要做两件事情

  1. 从Nacos Server中读取指定服务名称的实例列表,缓存到本地内存中

  2. 开启一个定时任务,每隔10s去轮询一次服务列表

服务注册成功之后,消费者就可以从nacos server中获取到服务提供者的地址,然后进行服务的调用。 在服务消费中,有一个核心的类 NacosDiscoveryClient 来负责和nacos交互,去获得服务提供者的地 址信息。前置的具体的流程就不在这里复述了,之前在讲dubbo源码的时候已经分析过服务的订阅过 程。 NacosDiscoveryClient 中提供了一个 getInstances 方法用来根据服务提供者名称获取服务提供者的 url地址的方法.

1.客户端启动获取服务列表(从Nacos Server中读取指定服务名称的实例列表,缓存到本地内存中)

NacosDiscoveryClient.getInstances

public List<ServiceInstance> getInstances(String serviceId) {
    try {
        List<Instance> instances = this.discoveryProperties.namingServiceInstance().selectInstances(serviceId, true);
        return hostToServiceInstanceList(instances, serviceId);
    } catch (Exception var3) {
        throw new RuntimeException("Can not get hosts from nacos server. serviceId: " + serviceId, var3);
    }
}

调用NamingService,根据serviceId、group获得服务实例列表。 然后把instance转化为ServiceInstance对象

NacosNamingService.selectInstances

selectInstances首先从hostReactor获取serviceInfo,然后再从serviceInfo.getHosts()剔除非healty、 非enabled、weight小于等于0的instance再返回;如果subscribe为true,则执行 hostReactor.getServiceInfo获取serviceInfo,否则执行 hostReactor.getServiceInfoDirectlyFromServer获取serviceInfo

@Override
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {

    ServiceInfo serviceInfo;
    //是否需要订阅,由于我们要感知服务端的变化,肯定需要进行订阅
    if (subscribe) {
        //hostReactor主机反应堆,这里建立订阅机制,主要是两件事:①.push机制  ②.每隔10s客户端的短轮询
        serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
    } else {
        serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
    }
    return selectInstances(serviceInfo, healthy);
}

HostReactor.getServiceInfo

有两个逻辑,分别是

  • updateServiceNow, 立马从Nacos server中去加载服务地址信息
  • scheduleUpdateIfAbsent 开启定时调度,每1s去查询一次服务地址
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {

    NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
    //拼接服务名称+集群名称(默认为空)
    String key = ServiceInfo.getKey(serviceName, clusters);
    if (failoverReactor.isFailoverSwitch()) {
        return failoverReactor.getService(key);
    }
    //根据ServiceName,cluster先从本地缓存中获取
    ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
	//如果为null,则创建一个新的然后放入serviceInfoMap,同时放入updatingMap,执行 updateServiceNow,再从updatingMap移除; 
    if (null == serviceObj) {
        //构建一个ServiceInfo
        serviceObj = new ServiceInfo(serviceName, clusters);
		//存储到serviceInfoMap本地缓存中
        serviceInfoMap.put(serviceObj.getKey(), serviceObj);

        updatingMap.put(serviceName, new Object());
        //调用Nacos服务端获取实例信息
        updateServiceNow(serviceName, clusters);
        updatingMap.remove(serviceName);

    } else if (updatingMap.containsKey(serviceName)) {
		//如果从serviceInfoMap找出来的serviceObj在updatingMap中则等待UPDATE_HOLD_INTERVAL
        if (UPDATE_HOLD_INTERVAL > 0) {
            // hold a moment waiting for update finish
            synchronized (serviceObj) {
                try {
                    serviceObj.wait(UPDATE_HOLD_INTERVAL);
                } catch (InterruptedException e) {
                    NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                }
            }
        }
    }
	//如果本地缓存中存在,则通过scheduleUpdateIfAbsent开启定时任务,再从serviceInfoMap取出 serviceInfo 
    scheduleUpdateIfAbsent(serviceName, clusters);

    return serviceInfoMap.get(serviceObj.getKey());
}

HostReactor.getServiceInfo0

private Map<String, ServiceInfo> serviceInfoMap;

private ServiceInfo getServiceInfo0(String serviceName, String clusters) {
	//构建key
    String key = ServiceInfo.getKey(serviceName, clusters);
	//根据key从本地缓存中获取
    return serviceInfoMap.get(key);
}

HostReactor.updateServiceNow

private NamingProxy serverProxy;

public void updateServiceNow(String serviceName, String clusters) {
    ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
    try {
		//调用NamingProxy远程获取地址列表,这里的pushReceiver.getUDPPort()在查询列表的列表的时候会传递UDP端口,当服务端有地址列表变化的时候,基于此udp端口,推送push给客户端,客户端监听此端口,更新本地缓存
        String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);

        if (StringUtils.isNotEmpty(result)) {
            //解析获取到结果,保存到本地缓存中
            processServiceJSON(result);
        }
    } catch (Exception e) {
        NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
    } finally {
        if (oldService != null) {
            synchronized (oldService) {
                oldService.notifyAll();
            }
        }
    }
}

HostReactor.processServiceJSON

public ServiceInfo processServiceJSON(String json) {
    //解析
    ServiceInfo serviceInfo = JSON.parseObject(json, ServiceInfo.class);
    //根据key从本地缓存中获取ServiceInfo
    ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
    if (serviceInfo.getHosts() == null || !serviceInfo.validate()) {
        //empty or error push, just ignore
        return oldService;
    }

    boolean changed = false;
	//oldService不为null,更新本地缓存
    if (oldService != null) {

        if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {
            NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime()
                + ", new-t: " + serviceInfo.getLastRefTime());
        }

        serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);

        Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size());
        for (Instance host : oldService.getHosts()) {
            oldHostMap.put(host.toInetAddr(), host);
        }

        Map<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size());
        for (Instance host : serviceInfo.getHosts()) {
            newHostMap.put(host.toInetAddr(), host);
        }

        Set<Instance> modHosts = new HashSet<Instance>();
        Set<Instance> newHosts = new HashSet<Instance>();
        Set<Instance> remvHosts = new HashSet<Instance>();

        List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>(
            newHostMap.entrySet());
        for (Map.Entry<String, Instance> entry : newServiceHosts) {
            Instance host = entry.getValue();
            String key = entry.getKey();
            if (oldHostMap.containsKey(key) && !StringUtils.equals(host.toString(),
                oldHostMap.get(key).toString())) {
                modHosts.add(host);
                continue;
            }

            if (!oldHostMap.containsKey(key)) {
                newHosts.add(host);
            }
        }

        for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) {
            Instance host = entry.getValue();
            String key = entry.getKey();
            if (newHostMap.containsKey(key)) {
                continue;
            }

            if (!newHostMap.containsKey(key)) {
                remvHosts.add(host);
            }

        }

        if (newHosts.size() > 0) {
            changed = true;
            NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: "
                + serviceInfo.getKey() + " -> " + JSON.toJSONString(newHosts));
        }

        if (remvHosts.size() > 0) {
            changed = true;
            NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: "
                + serviceInfo.getKey() + " -> " + JSON.toJSONString(remvHosts));
        }

        if (modHosts.size() > 0) {
            changed = true;
            NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: "
                + serviceInfo.getKey() + " -> " + JSON.toJSONString(modHosts));
        }

        serviceInfo.setJsonFromServer(json);

        if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {
            eventDispatcher.serviceChanged(serviceInfo);
            DiskCache.write(serviceInfo, cacheDir);
        }

    } else {
        //直接缓存
        changed = true;
        NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> " + JSON
            .toJSONString(serviceInfo.getHosts()));
        serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
        eventDispatcher.serviceChanged(serviceInfo);
        serviceInfo.setJsonFromServer(json);
        DiskCache.write(serviceInfo, cacheDir);
    }

    MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());

    if (changed) {
        NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() +
            " -> " + JSON.toJSONString(serviceInfo.getHosts()));
    }

    return serviceInfo;
}

NamingProxy.queryList

public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
    throws NacosException {

    final Map<String, String> params = new HashMap<String, String>(8);
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, serviceName);
    params.put("clusters", clusters);
    params.put("udpPort", String.valueOf(udpPort));
    params.put("clientIP", NetUtils.localIP());
    params.put("healthyOnly", String.valueOf(healthyOnly));

    return reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/list", params, HttpMethod.GET);
}

服务端处理查询列表请求

InstanceController.list

@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception {
	//获取namespaceId
    String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    //获取serviceName
    String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    String agent = WebUtils.getUserAgent(request);
    //获取cluster集群名称
    String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
    String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
    //获取udpPort
    int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
    String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
    //获取是否开启检测(例如Eureka的自我保护机制)
    boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));

    String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
 
    String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
	//获取是否健康状态
    boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));

    return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
                     healthyOnly);
}

InstanceController.doSrvIpxt

public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,
                            int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {

    ClientInfo clientInfo = new ClientInfo(agent);
    ObjectNode result = JacksonUtils.createEmptyJsonNode();
    //根据namespaceId,serviceName获取服务Service
    Service service = serviceManager.getService(namespaceId, serviceName);
	//如果不存在,构建一个空集合返回
    if (service == null) {
        if (Loggers.SRV_LOG.isDebugEnabled()) {
            Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
        }
        result.put("name", serviceName);
        result.put("clusters", clusters);
        result.replace("hosts", JacksonUtils.createEmptyArrayNode());
        return result;
    }

    checkIfDisabled(service);

    long cacheMillis = switchDomain.getDefaultCacheMillis();
	
    //判断updPort端口是否大于0,并且开启自动推送
    // now try to enable the push
    try {
        if (udpPort > 0 && pushService.canEnablePush(agent)) {
			//将客户端的信息添加到PushaService,专门用来当服务端的地址列表发送变化的时候通过upd协议推送给客户端
            pushService
                .addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),
                           pushDataSource, tid, app);
            cacheMillis = switchDomain.getPushCacheMillis(serviceName);
        }
    } catch (Exception e) {
        Loggers.SRV_LOG
            .error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);
        cacheMillis = switchDomain.getDefaultCacheMillis();
    }

    List<Instance> srvedIPs;

    srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));

    // filter ips using selector:
    if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
        srvedIPs = service.getSelector().select(clientIP, srvedIPs);
    }

    if (CollectionUtils.isEmpty(srvedIPs)) {

        if (Loggers.SRV_LOG.isDebugEnabled()) {
            Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
        }

        if (clientInfo.type == ClientInfo.ClientType.JAVA
            && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
            result.put("dom", serviceName);
        } else {
            result.put("dom", NamingUtils.getServiceName(serviceName));
        }

        result.put("hosts", JacksonUtils.createEmptyArrayNode());
        result.put("name", serviceName);
        result.put("cacheMillis", cacheMillis);
        result.put("lastRefTime", System.currentTimeMillis());
        result.put("checksum", service.getChecksum());
        result.put("useSpecifiedURL", false);
        result.put("clusters", clusters);
        result.put("env", env);
        result.put("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
        return result;
    }

    Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
    //TRUE:实例时正常的状态,FALSE:不正常的状态
    ipMap.put(Boolean.TRUE, new ArrayList<>());
    ipMap.put(Boolean.FALSE, new ArrayList<>());
	//获取服务的健康状态,添加到ipMap中
    for (Instance ip : srvedIPs) {
        ipMap.get(ip.isHealthy()).add(ip);
    }
	//判断是否开启检查
    if (isCheck) {
        result.put("reachProtectThreshold", false);
    }

    double threshold = service.getProtectThreshold();
	//判断是否需要开启自我保护机制,正常的服务总数/总的服务数<= 阈值,
    if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {

        Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName);
        if (isCheck) {
            result.put("reachProtectThreshold", true);
        }
        //将健康和不健康的实例全部添加到ipMap中返回
        ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));
        ipMap.get(Boolean.FALSE).clear();
    }

    if (isCheck) {
        result.put("protectThreshold", service.getProtectThreshold());
        result.put("reachLocalSiteCallThreshold", false);

        return JacksonUtils.createEmptyJsonNode();
    }

    ArrayNode hosts = JacksonUtils.createEmptyArrayNode();

    for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
        List<Instance> ips = entry.getValue();

        if (healthyOnly && !entry.getKey()) {
            continue;
        }

        for (Instance instance : ips) {

            // remove disabled instance:
            if (!instance.isEnabled()) {
                continue;
            }

            ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();

            ipObj.put("ip", instance.getIp());
            ipObj.put("port", instance.getPort());
            // deprecated since nacos 1.0.0:
            ipObj.put("valid", entry.getKey());
            ipObj.put("healthy", entry.getKey());
            ipObj.put("marked", instance.isMarked());
            ipObj.put("instanceId", instance.getInstanceId());
            ipObj.put("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata()));
            ipObj.put("enabled", instance.isEnabled());
            ipObj.put("weight", instance.getWeight());
            ipObj.put("clusterName", instance.getClusterName());
            if (clientInfo.type == ClientInfo.ClientType.JAVA
                && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
                ipObj.put("serviceName", instance.getServiceName());
            } else {
                ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));
            }

            ipObj.put("ephemeral", instance.isEphemeral());
            hosts.add(ipObj);

        }
    }
	//组装数据返回
    result.replace("hosts", hosts);
    if (clientInfo.type == ClientInfo.ClientType.JAVA
        && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
        result.put("dom", serviceName);
    } else {
        result.put("dom", NamingUtils.getServiceName(serviceName));
    }
    result.put("name", serviceName);
    result.put("cacheMillis", cacheMillis);
    result.put("lastRefTime", System.currentTimeMillis());
    result.put("checksum", service.getChecksum());
    result.put("useSpecifiedURL", false);
    result.put("clusters", clusters);
    result.put("env", env);
    result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
    return result;
}

服务的订阅,实现服务的动态更新

2.开启一个定时任务,每隔10s去轮询一次服务列表

NacosNamingService.init

public NacosNamingService(Properties properties) {
    init(properties);
}

private void init(Properties properties) {
    namespace = InitUtils.initNamespaceForNaming(properties);
    initServerAddr(properties);
    InitUtils.initWebRootContext();
    initCacheDir();
    initLogName(properties);

    eventDispatcher = new EventDispatcher();
    serverProxy = new NamingProxy(namespace, endpoint, serverList, properties);
    //心跳检查BeatReactor
    beatReactor = new BeatReactor(serverProxy, initClientBeatThreadCount(properties));
    //定时轮询检查HostReactor
    hostReactor = new HostReactor(eventDispatcher, serverProxy, cacheDir, isLoadCacheAtStart(properties),
                                  initPollingThreadCount(properties));
}

HostReactor构造方法

public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir,
                   boolean loadCacheAtStart, int pollingThreadCount) {
	//开启一个线程
    executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            thread.setName("com.alibaba.nacos.client.naming.updater");
            return thread;
        }
    });

    this.eventDispatcher = eventDispatcher;
    this.serverProxy = serverProxy;
    this.cacheDir = cacheDir;
    if (loadCacheAtStart) {
        this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir));
    } else {
        this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16);
    }

    this.updatingMap = new ConcurrentHashMap<String, Object>();
    this.failoverReactor = new FailoverReactor(this, cacheDir);
    //初始化PushReceiver,初始化一个服务监听的订阅
    this.pushReceiver = new PushReceiver(this);
}

PushReceiver构造方法

public PushReceiver(HostReactor hostReactor) {
    try {
        this.hostReactor = hostReactor;
        udpSocket = new DatagramSocket();
		//开启一个线程池,不断接受服务端传递过来的数据
        executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName("com.alibaba.nacos.naming.push.receiver");
                return thread;
            }
        });
		//调用run方法
        executorService.execute(this);
    } catch (Exception e) {
        NAMING_LOGGER.error("[NA] init udp socket failed", e);
    }
}
@Override
public void run() {
    //通过while循环不断监听客户端Nacos Server传递过来的数据,实现一个Push的机制
    while (true) {
        try {
            // byte[] is initialized with 0 full filled by default
            byte[] buffer = new byte[UDP_MSS];
            DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
			//初始化一个监听,不断接受客户端Nacos Server传递过来的数据
            udpSocket.receive(packet);

            String json = new String(IoUtils.tryDecompress(packet.getData()), "UTF-8").trim();
            NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());

            PushPacket pushPacket = JSON.parseObject(json, PushPacket.class);
            String ack;
            if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
                hostReactor.processServiceJSON(pushPacket.data);

                // send ack to server
                ack = "{\"type\": \"push-ack\""
                    + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
                    + "\", \"data\":" + "\"\"}";
            } else if ("dump".equals(pushPacket.type)) {
                // dump data to server
                ack = "{\"type\": \"dump-ack\""
                    + ", \"lastRefTime\": \"" + pushPacket.lastRefTime
                    + "\", \"data\":" + "\""
                    + StringUtils.escapeJavaScript(JSON.toJSONString(hostReactor.getServiceInfoMap()))
                    + "\"}";
            } else {
                // do nothing send ack only
                ack = "{\"type\": \"unknown-ack\""
                    + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
                    + "\", \"data\":" + "\"\"}";
            }

            udpSocket.send(new DatagramPacket(ack.getBytes(Charset.forName("UTF-8")),
                                              ack.getBytes(Charset.forName("UTF-8")).length, packet.getSocketAddress()));
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] error while receiving push data", e);
        }
    }
}

HostReactor.updateServiceNow

在第一次启动的时候查看服务列表的时候,会将自己的ip以及upd端口传递给Nacso Server端,当Nacos Server端服务地址列表发生变化的时候,会基于此端口进行推送,从而能够让上面udpSocket.receive(packet)接收到地址列表的变化

NacosNamingService.getAllInstances--->HostReactor.getServiceInfo---->HostReactor.updateServiceNow
public void updateServiceNow(String serviceName, String clusters) {
        ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
        try {

            String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);

            if (StringUtils.isNotEmpty(result)) {
                processServiceJSON(result);
            }
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
        } finally {
            if (oldService != null) {
                synchronized (oldService) {
                    oldService.notifyAll();
                }
            }
        }
    }

NacosNamingService.getServiceInfo

public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {

    NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
    String key = ServiceInfo.getKey(serviceName, clusters);
    if (failoverReactor.isFailoverSwitch()) {
        return failoverReactor.getService(key);
    }

    ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);

    if (null == serviceObj) {
        serviceObj = new ServiceInfo(serviceName, clusters);

        serviceInfoMap.put(serviceObj.getKey(), serviceObj);

        updatingMap.put(serviceName, new Object());
        updateServiceNow(serviceName, clusters);
        updatingMap.remove(serviceName);

    } else if (updatingMap.containsKey(serviceName)) {

        if (UPDATE_HOLD_INTERVAL > 0) {
            // hold a moment waiting for update finish
            synchronized (serviceObj) {
                try {
                    serviceObj.wait(UPDATE_HOLD_INTERVAL);
                } catch (InterruptedException e) {
                    NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                }
            }
        }
    }
	//开启一个定时任务,客户端每隔10s轮询一次,这个任务会在默认在1s之后开 始执行。而任务的具体实现是一个UpdateTask。
    scheduleUpdateIfAbsent(serviceName, clusters);

    return serviceInfoMap.get(serviceObj.getKey());
}

HostReactor.scheduleUpdateIfAbsent

private final Map<String, ScheduledFuture<?>> futureMap = new HashMap<String, ScheduledFuture<?>>();

public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
    if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
        return;
    }

    synchronized (futureMap) {
        if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
            return;
        }
		//添加一个任务UpdateTask
        ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
        futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
    }
}

UpdateTask

public class UpdateTask implements Runnable {
    long lastRefTime = Long.MAX_VALUE;
    private String clusters;
    private String serviceName;

    public UpdateTask(String serviceName, String clusters) {
        this.serviceName = serviceName;
        this.clusters = clusters;
    }

    @Override
    public void run() {
        try {
            //先从本地缓存中获取服务信息
            ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
			//如果为null,说明本地没有,需要从服务端获取
            if (serviceObj == null) {
                //调用updateServiceNow从服务端获取地址列表
                updateServiceNow(serviceName, clusters);
                //将这个定时任务重新开启,实现重复的轮询功能
                executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
                return;
            }
			//判断服务是否已过期,当前服务的最后一次更新时间 <= 全局的最后一次更新
            if (serviceObj.getLastRefTime() <= lastRefTime) {
                //调用updateServiceNow从服务端获取地址列表,更新服务列表
                updateServiceNow(serviceName, clusters);
                serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
            } else {
                // if serviceName already updated by push, we should not override it
                // since the push data may be different from pull through force push
                //如果服务已经被基于push机制的情况下做了更新,那么我们不需要覆盖本地服务。            
                //因为push过来的数据和pull数据不同,所以这里只是调用请求去刷新服务 
                refreshOnly(serviceName, clusters);
            }
			 //更新最后一次刷新时间 
            lastRefTime = serviceObj.getLastRefTime();
			 //如果没有实现订阅或者futureMap中不包含指定服务信息,则中断更新请求
            if (!eventDispatcher.isSubscribed(serviceName, clusters) &&
                !futureMap.containsKey(ServiceInfo.getKey(serviceName, clusters))) {
                // abort the update task:
                NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
                return;
            }
			 //延后10s执行,实现重复轮询 
            executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);


        } catch (Throwable e) {
            NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
        }

    }
}

3.push服务端请求推送数据

在服务端处理注册请求的时候,InstanceController.registerInstance —-> ServiceManager.registerInstance

—-> ServiceManager.createEmptyService—>ServiceManager.createServiceIfAbsent—->ServiceManager.putServiceAndInit

ServiceManager.putServiceAndInit

private void putServiceAndInit(Service service) throws NacosException {
    putService(service);
    //开启一个健康检查的定时任务,用来检测各实例主键的健康状态
    service.init();
    consistencyService
        .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
    consistencyService
        .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
    Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}

Service.init

这个init方法,会和当前服务提供者建立一个心跳检测机制,这个心跳检测会每5s执行一次

private ClientBeatCheckTask clientBeatCheckTask = new ClientBeatCheckTask(this);

public void init() {
    //健康检测的定时任务,
    HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
    for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
        entry.getValue().setService(this);
        entry.getValue().init();
    }
}

HealthCheckReactor.scheduleCheck

第一次启动之后5s执行,之后每次延时5s执行

public static void scheduleCheck(ClientBeatCheckTask task) {
    futureMap.putIfAbsent(task.taskKey(), GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
}

ClientBeatCheckTask.run

public class ClientBeatCheckTask implements Runnable {

    private Service service;

    public ClientBeatCheckTask(Service service) {
        this.service = service;
    }

    @JsonIgnore
    public PushService getPushService() {
        return ApplicationUtils.getBean(PushService.class);
    }

    @JsonIgnore
    public DistroMapper getDistroMapper() {
        return ApplicationUtils.getBean(DistroMapper.class);
    }

    public GlobalConfig getGlobalConfig() {
        return ApplicationUtils.getBean(GlobalConfig.class);
    }

    public SwitchDomain getSwitchDomain() {
        return ApplicationUtils.getBean(SwitchDomain.class);
    }

    public String taskKey() {
        return KeyBuilder.buildServiceMetaKey(service.getNamespaceId(), service.getName());
    }

    @Override
    public void run() {
        try {
            if (!getDistroMapper().responsible(service.getName())) {
                return;
            }

            if (!getSwitchDomain().isHealthCheckEnabled()) {
                return;
            }

            List<Instance> instances = service.allIPs(true);

            // first set health status of instances:
            //遍历服务节点进行心跳检测
            for (Instance instance : instances) {
                 //如果服务实例的最后一次心跳时间大于设置的超时时间,则认为这个服务已经下线。 
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                    if (!instance.isMarked()) {
                        if (instance.isHealthy()) {
                            //设置实例为非健康状态
                            instance.setHealthy(false);
                            Loggers.EVT_LOG
                                .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                      instance.getIp(), instance.getPort(), instance.getClusterName(),
                                      service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                                      instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                            //发起一个实例变更的推送,推送服务变更事件。
                            getPushService().serviceChanged(service);
                            ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                        }
                    }
                }
            }

            if (!getGlobalConfig().isExpireInstance()) {
                return;
            }

            // then remove obsolete instances:
            for (Instance instance : instances) {

                if (instance.isMarked()) {
                    continue;
                }

                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                    // delete instance
                    Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                                         JacksonUtils.toJson(instance));
                    //删除过期的服务实
                    deleteIp(instance);
                }
            }

        } catch (Exception e) {
            Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
        }

    }

    private void deleteIp(Instance instance) {

        try {
            NamingProxy.Request request = NamingProxy.Request.newRequest();
            request.appendParam("ip", instance.getIp()).appendParam("port", String.valueOf(instance.getPort()))
                .appendParam("ephemeral", "true").appendParam("clusterName", instance.getClusterName())
                .appendParam("serviceName", service.getName()).appendParam("namespaceId", service.getNamespaceId());

            String url = "http://127.0.0.1:" + ApplicationUtils.getPort() + ApplicationUtils.getContextPath()
                + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance?" + request.toUrl();

            // delete instance asynchronously:
            HttpClient.asyncHttpDelete(url, null, null, new AsyncCompletionHandler() {
                @Override
                public Object onCompleted(Response response) throws Exception {
                    if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                        Loggers.SRV_LOG
                            .error("[IP-DEAD] failed to delete ip automatically, ip: {}, caused {}, resp code: {}",
                                   instance.toJson(), response.getResponseBody(), response.getStatusCode());
                    }
                    return null;
                }
            });

        } catch (Exception e) {
            Loggers.SRV_LOG
                .error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(), e);
        }
    }
}

PushService.serviceChanged

public void serviceChanged(Service service) {
    // merge some change events to reduce the push frequency:
    if (futureMap
        .containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) {
        return;
    }
	//发布一个事件变更的通知
    this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));
}

PushService.onApplicationEvent(开启一个事件监听)

public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> {
    @Override
    public void onApplicationEvent(ServiceChangeEvent event) {
        Service service = event.getService();
        String serviceName = service.getName();
        String namespaceId = service.getNamespaceId();

        Future future = GlobalExecutor.scheduleUdpSender(() -> {
            try {
                Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
                ConcurrentMap<String, PushClient> clients = clientMap
                    .get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
                if (MapUtils.isEmpty(clients)) {
                    return;
                }

                Map<String, Object> cache = new HashMap<>(16);
                long lastRefTime = System.nanoTime();
                for (PushClient client : clients.values()) {
                    if (client.zombie()) {
                        Loggers.PUSH.debug("client is zombie: " + client.toString());
                        clients.remove(client.toString());
                        Loggers.PUSH.debug("client is zombie: " + client.toString());
                        continue;
                    }

                    Receiver.AckEntry ackEntry;
                    Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());
                    String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
                    byte[] compressData = null;
                    Map<String, Object> data = null;
                    if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
                        org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
                        compressData = (byte[]) (pair.getValue0());
                        data = (Map<String, Object>) pair.getValue1();

                        Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
                    }

                    if (compressData != null) {
                        ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
                    } else {
                        ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
                        if (ackEntry != null) {
                            cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
                        }
                    }

                    Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",
                                      client.getServiceName(), client.getAddrStr(), client.getAgent(),
                                      (ackEntry == null ? null : ackEntry.key));
					//推送数据包
                    udpPush(ackEntry);
                }
            } catch (Exception e) {
                Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);

            } finally {
                futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
            }

        }, 1000, TimeUnit.MILLISECONDS);

        futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);

    }
}

PushService.udpPush

private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
    if (ackEntry == null) {
        Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
        return null;
    }
    
    if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {
        Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);
        ackMap.remove(ackEntry.key);
        udpSendTimeMap.remove(ackEntry.key);
        failedPush += 1;
        return ackEntry;
    }
    
    try {
        if (!ackMap.containsKey(ackEntry.key)) {
            totalPush++;
        }
        ackMap.put(ackEntry.key, ackEntry);
        udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());
        
        Loggers.PUSH.info("send udp packet: " + ackEntry.key);
        //基于UDPSocket发起请求推送Push
        udpSocket.send(ackEntry.origin);
        
        ackEntry.increaseRetryTime();
        
        GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),
                TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);
        
        return ackEntry;
    } catch (Exception e) {
        Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.data,
                ackEntry.origin.getAddress().getHostAddress(), e);
        ackMap.remove(ackEntry.key);
        udpSendTimeMap.remove(ackEntry.key);
        failedPush += 1;
        
        return null;
    }
}

客户端监听此推送

PushReceiver.run

@Override
public void run() {
    while (true) {
        try {
            // byte[] is initialized with 0 full filled by default
            byte[] buffer = new byte[UDP_MSS];
            DatagramPacket packet = new DatagramPacket(buffer, buffer.length);

            udpSocket.receive(packet);

            String json = new String(IoUtils.tryDecompress(packet.getData()), "UTF-8").trim();
            NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());

            PushPacket pushPacket = JSON.parseObject(json, PushPacket.class);
            String ack;
            if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
                //解析并更新本地缓存
                hostReactor.processServiceJSON(pushPacket.data);

                // send ack to server
                ack = "{\"type\": \"push-ack\""
                    + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
                    + "\", \"data\":" + "\"\"}";
            } else if ("dump".equals(pushPacket.type)) {
                // dump data to server
                ack = "{\"type\": \"dump-ack\""
                    + ", \"lastRefTime\": \"" + pushPacket.lastRefTime
                    + "\", \"data\":" + "\""
                    + StringUtils.escapeJavaScript(JSON.toJSONString(hostReactor.getServiceInfoMap()))
                    + "\"}";
            } else {
                // do nothing send ack only
                ack = "{\"type\": \"unknown-ack\""
                    + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
                    + "\", \"data\":" + "\"\"}";
            }

            udpSocket.send(new DatagramPacket(ack.getBytes(Charset.forName("UTF-8")),
                                              ack.getBytes(Charset.forName("UTF-8")).length, packet.getSocketAddress()));
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] error while receiving push data", e);
        }
    }
}

HostReactor.processServiceJSON

public ServiceInfo processServiceJSON(String json) {
    ServiceInfo serviceInfo = JSON.parseObject(json, ServiceInfo.class);
    ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
    if (serviceInfo.getHosts() == null || !serviceInfo.validate()) {
        //empty or error push, just ignore
        return oldService;
    }

    boolean changed = false;

    if (oldService != null) {

        if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {
            NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime()
                               + ", new-t: " + serviceInfo.getLastRefTime());
        }

        serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);

        Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size());
        for (Instance host : oldService.getHosts()) {
            oldHostMap.put(host.toInetAddr(), host);
        }

        Map<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size());
        for (Instance host : serviceInfo.getHosts()) {
            newHostMap.put(host.toInetAddr(), host);
        }

        Set<Instance> modHosts = new HashSet<Instance>();
        Set<Instance> newHosts = new HashSet<Instance>();
        Set<Instance> remvHosts = new HashSet<Instance>();

        List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>(
            newHostMap.entrySet());
        for (Map.Entry<String, Instance> entry : newServiceHosts) {
            Instance host = entry.getValue();
            String key = entry.getKey();
            if (oldHostMap.containsKey(key) && !StringUtils.equals(host.toString(),
                                                                   oldHostMap.get(key).toString())) {
                modHosts.add(host);
                continue;
            }

            if (!oldHostMap.containsKey(key)) {
                newHosts.add(host);
            }
        }

        for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) {
            Instance host = entry.getValue();
            String key = entry.getKey();
            if (newHostMap.containsKey(key)) {
                continue;
            }

            if (!newHostMap.containsKey(key)) {
                remvHosts.add(host);
            }

        }

        if (newHosts.size() > 0) {
            changed = true;
            NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: "
                               + serviceInfo.getKey() + " -> " + JSON.toJSONString(newHosts));
        }

        if (remvHosts.size() > 0) {
            changed = true;
            NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: "
                               + serviceInfo.getKey() + " -> " + JSON.toJSONString(remvHosts));
        }

        if (modHosts.size() > 0) {
            changed = true;
            NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: "
                               + serviceInfo.getKey() + " -> " + JSON.toJSONString(modHosts));
        }

        serviceInfo.setJsonFromServer(json);

        if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {
            eventDispatcher.serviceChanged(serviceInfo);
            DiskCache.write(serviceInfo, cacheDir);
        }

    } else {
        changed = true;
        NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> " + JSON
                           .toJSONString(serviceInfo.getHosts()));
        serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
        eventDispatcher.serviceChanged(serviceInfo);
        serviceInfo.setJsonFromServer(json);
        DiskCache.write(serviceInfo, cacheDir);
    }

    MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());

    if (changed) {
        NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() +
                           " -> " + JSON.toJSONString(serviceInfo.getHosts()));
    }

    return serviceInfo;
}

版权声明:本文为xingxinggua9620原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/xingxinggua9620/article/details/113403062