Spring Cloud Eureka 详解

1). Eureka 的核心类

1. InstanceInfo

InstanceInfo 封装了服务实例信息

public class InstanceInfo {
    ....
       // 实例ID 
      private volatile String instanceId;
       // 应用名 
      private volatile String appName;
       //  应用所属群组
      private volatile String appGroupName;
       // ip地址
      private volatile String ipAddr;
       // 默认端口号  7001 
      public static final int DEFAULT_PORT = 7001;
       // 端口号 
      private volatile int port = DEFAULT_PORT;
       //默认 https  端口号
      public static final int DEFAULT_SECURE_PORT = 7002;
    	//  https  端口号 
      private volatile int securePort = DEFAULT_SECURE_PORT;
        // 应用实例的首页URL 
      private volatile String homePageUrl;
    	//  应用实例状态页URL
      private volatile String statusPageUrl;
        //  应用实例健康页URL
      private volatile String healthCheckUrl;
    	// 应用实例健康页的 HTTPS  URL  
      private volatile String secureHealthCheckUrl;
    	// 虚拟地址
      private volatile String vipAddress;
       //  https 虚拟地址 
      private volatile String secureVipAddress;
    	// Defaults to US  默认为1 
      private volatile int countryId = DEFAULT_COUNTRY_ID; 
      	// dataCenter 信息 ,NetFlix 或者 Amazon 或者 MyOwn 
      private volatile DataCenterInfo dataCenterInfo;
    	// 主机名称  
      private volatile String hostName;
    	//实例状态,UP ,DOWN ,STARTING ,OUT_OF_SERVICE,UNKNOWN
    	// 其中 OUT_OF_SERVICE 标识服务停止,处于这个状态的服务不会被路由到,经常用于升级部署的场景。
      private volatile InstanceStatus status = InstanceStatus.UP;
    	// 外界需要强制覆盖的状态值, 默认为UNKNOWN 
       private volatile InstanceStatus overriddenStatus = InstanceStatus.UNKNOWN;
    	// 租约信息
       private volatile LeaseInfo leaseInfo;
    	// 首先标识是否是discoveryServer ,其实标识该discoveryServer是否是响应你请求的实例
        private volatile Boolean isCoordinatingDiscoveryServer = Boolean.FALSE;
        // 应用实例的元数据信息 
        private volatile Map<String, String> metadata;
        // 状态信息更新的时间
        private volatile Long lastUpdatedTimestamp;
    	// 实例信息最新的过期时间 , 实例信息最新的过期时间
        private volatile Long lastDirtyTimestamp;
        // 标识Eureka Server 对该实例的操作,包括ADDED,MODIFIED,DELETED 这三类 。 
        private volatile ActionType actionType;
        // AWS的autoscaling的名称 
        private volatile String asgName;
} 

可以看到InstanceInfo 里既有metadata ,也有dataCenterInfo ,还有一个比较重要的LeaseInfo ,用来标识应用实例的租约信息。

2. LeaseInfo

Eureka 使用LeaseInfo (com.netflix.appinfo.LeaseInfo.java) 来标识应用实例的租约的信息。

public class LeaseInfo {
	
    public static final int DEFAULT_LEASE_RENEWAL_INTERVAL = 30;
    public static final int DEFAULT_LEASE_DURATION = 90;

    // Client settings(客户端配置)续约的时间间隔     
    private int renewalIntervalInSecs=DEFAULT_LEASE_RENEWAL_INTERVAL;
    // 需要设定的租约的有效时长 
    private int durationInSecs = DEFAULT_LEASE_DURATION;
    
    // Server populated (服务端填充)  
    // server 端设置的该租约的第一次注册时间
    private long registrationTimestamp; 
    // server 端设置的该租约的最后一次续约时间
    private long lastRenewalTimestamp;
    // server端设置的该租约被删除的时间
    private long evictionTimestamp;
    //  server端设置的该服务实例标记为UP的时间
    private long serviceUpTimestamp;

}

这些参数主要用于标识应用实例的心跳情况,比如约定的心跳周期,租约有效期,最近一次续约时间。

3. ServieceInstance

ServiceInstance 是Spring Cloud 对 service discovery 的实例信息的抽象接口,约定了服务发现的实例有哪些通用的信息。

public interface ServiceInstance {

   /**
    * 获取服务ID
    */
   String getServiceId();

   /**
    *  服务实例的 host 
    */
   String getHost();

   /**
    *  服务实例的端口
    */
   int getPort();

   /**
    * 服务是否开启 https 
    */
   boolean isSecure();

   /**
    *实例的uri地址
    */
   URI getUri();

   /**
    * 服务实例的元数据
    */
   Map<String, String> getMetadata();

   /**
    * 实例的 scheme 
    */
   default String getScheme() {
      return null;
   }
}

2). 服务的核心操作

对于服务发现来说,关于服务实例的主要操作又几个重要的操作:

  • 服务注册(register)

  • 服务下线(cancel)

  • 服务租约(renew)

  • 服务剔除(evict)

    围绕这几个功能,Eureka 设计了几个核心操作类:

    com.netflix.eureka.lease.LeaseManager

    com.netflix.discovery.shared.LookupService

    com.netflix.eureka.registry.InstanceRegistry

    com.netflix.eureka.registry.AbstractInstanceRegistry

    com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl

    Spring Cloud Eureka 在netflix 的基础上,抽象和定义了如下几个核心类:

    org.springframework.cloud.netflix.eureka.server.InstanceRegistry

    org.springframework.cloud.client.serviceregistry.ServiceRegistry

    org.springframework.cloud.netflix.eureka.serviceregistry.EurekaServiceRegistry

    org.springframework.cloud.netflix.eureka.serviceregistry.EurekaRegistration

    org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration

    org.springframework.cloud.netflix.eureka.EurekaClientConfigBean

    org.springframework.cloud.netflix.eureka.EurekaInstanceConfigBean

    LeaseManager 以及LookupService 是eureka关于服务发现操作相关的接口类,Leasemanager 定义了服务写操作相关的方法,LookupService定义了查询的操作方法。

1. LeaseManager

public interface LeaseManager<T> {

    /*
    	注册服务实例	
     */
    void register(T r, int leaseDuration, boolean isReplication);

    /*
    	删除服务实例
     */
    boolean cancel(String appName, String id, boolean isReplication);

    /*
    	与Eureka server 进行心跳检测  , 维持租约
     */
    boolean renew(String appName, String id, boolean isReplication);

    /**
       去除租约过期的服务实例
     */
    void evict();
}

具体实现在: AbstractInstanceRegistry

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
            // 上读锁 
            read.lock();
            // 判断实例是否已经注册
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            REGISTER.increment(isReplication);
            if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            // 如果已经存在租约 , 保存最后的过期时间
            if (existingLease != null && (existingLease.getHolder() != null)) {
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

                //  已存在的实例过期时间 大于 注册的过期时间 ,将用本地的副本
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                            " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    registrant = existingLease.getHolder();
                }
            } else {
                // 是一个新注册的实例
                synchronized (lock) {
                    if (this.expectedNumberOfRenewsPerMin > 0) {
                        // Since the client wants to cancel it, reduce the threshold
                        // (1
                        // for 30 seconds, 2 for a minute)
                        this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                        this.numberOfRenewsPerMinThreshold =
                                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
            }
            // 以下代码就是将  将信息设置好 ,如果是已存在的实例将一些旧信息填充。
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if (existingLease != null) {
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            gMap.put(registrant.getId(), lease);
            synchronized (recentRegisteredQueue) {
                recentRegisteredQueue.add(new Pair<Long, String>(
                        System.currentTimeMillis(),
                        registrant.getAppName() + "(" + registrant.getId() + ")"));
            }
            // This is where the initial state transfer of overridden status happens
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }

            // Set the status based on the overridden status rules
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);

            // If the lease is registered with UP status, set lease service up timestamp
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            registrant.setActionType(ActionType.ADDED);
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            registrant.setLastUpdatedTimestamp();
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {
            read.unlock();
        }
    }
// cancel(String, String, boolean) 被PeerAwareInstanceRegistry 重写了,向其他peer 发送 cancel 请求 
protected boolean internalCancel(String appName, String id, boolean isReplication) {
        try {
            read.lock();
            CANCEL.increment(isReplication);
            Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
            Lease<InstanceInfo> leaseToCancel = null;
            if (gMap != null) {
                // 删除
                leaseToCancel = gMap.remove(id);
            }
            // 加入最近删除的队列
            synchronized (recentCanceledQueue) {
                recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
            }
            InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
            if (instanceStatus != null) {
                logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
            }
            if (leaseToCancel == null) {
                CANCEL_NOT_FOUND.increment(isReplication);
                logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
                return false;
            } else {
                leaseToCancel.cancel();
                InstanceInfo instanceInfo = leaseToCancel.getHolder();
                String vip = null;
                String svip = null;
                if (instanceInfo != null) {
                    // 将实例信息  设置为  DELETED 
                    instanceInfo.setActionType(ActionType.DELETED);
                    recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                    instanceInfo.setLastUpdatedTimestamp();
                    vip = instanceInfo.getVIPAddress();
                    svip = instanceInfo.getSecureVipAddress();
                }
                invalidateCache(appName, vip, svip);
                logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
                return true;
            }
        } finally {
            read.unlock();
        }
    }
 public boolean renew(String appName, String id, boolean isReplication) {
     // EurakaMonitor   renew  加一
        RENEW.increment(isReplication);
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) {
            // 取出 lease 
            leaseToRenew = gMap.get(id);
        }
        if (leaseToRenew == null) {
            RENEW_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
            return false;
        } else {
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
            // 判断
            if (instanceInfo != null) {
                // touchASGCache(instanceInfo.getASGName());
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                        instanceInfo, leaseToRenew, isReplication);
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                    logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                            + "; re-register required", instanceInfo.getId());
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                    logger.info(
                            "The instance status {} is different from overridden instance status {} for instance {}. "
                                    + "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
                                    instanceInfo.getOverriddenStatus().name(),
                                    instanceInfo.getId());
                    instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);

                }
            }
            renewsLastMin.increment();
            // 刷新最后更新的时间
            leaseToRenew.renew();
            return true;
        }
    }
 public void evict(long additionalLeaseMs) {
        logger.debug("Running the evict task");

        if (!isLeaseExpirationEnabled()) {
            logger.debug("DS: lease expiration is currently disabled.");
            return;
        }

        // 将需要 提出的实例 列出
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
        for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                    Lease<InstanceInfo> lease = leaseEntry.getValue();
                    //  isExpired 逻辑为当前时间 是否大于最新的更新时间加上 租约时间间隔 加上additionalLeaseMs
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                        expiredLeases.add(lease);
                    }
                }
            }
        }

        //  将registrySize 作为基数  判断是否需要开启自我保护机制(不提出任何服务)。 Eureka Server 在运行期间会去统计心跳失败比例在 15 分钟之内是否低于 85%,如果低于 85%,Eureka Server 会将这些实例保护起来,让这些实例不会过期,但是在保护期内如果服务刚好这个服务提供者非正常下线了,此时服务消费者就会拿到一个无效的服务实例,此时会调用失败,对于这个问题需要服务消费者端要有一些容错机制,如重试,断路器等。
        int registrySize = (int) getLocalRegistrySize();
        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
        int evictionLimit = registrySize - registrySizeThreshold;

        int toEvict = Math.min(expiredLeases.size(), evictionLimit);
        if (toEvict > 0) {
            logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);

            Random random = new Random(System.currentTimeMillis());
            for (int i = 0; i < toEvict; i++) {
                // Pick a random item (Knuth shuffle algorithm)
                int next = i + random.nextInt(expiredLeases.size() - i);
                Collections.swap(expiredLeases, i, next);
                Lease<InstanceInfo> lease = expiredLeases.get(i);

                String appName = lease.getHolder().getAppName();
                String id = lease.getHolder().getId();
                EXPIRED.increment();
                logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                internalCancel(appName, id, false);
            }
        }
    }

这里说一下Eureka 自我保护机制的用意:,该模式被激活后,它不会从注册列表中剔除因长时间没收到心跳导致租期过期的服务,而是等待修复,直到心跳恢复正常之后,它自动退出自我保护模式。这种模式旨在避免因网络分区故障导致服务不可用的问题。例如,两个客户端实例 C1 和 C2 的连通性是良好的,但是由于网络故障,C2 未能及时向 Eureka 发送心跳续约,这时候 Eureka 不能简单的将 C2 从注册表中剔除。因为如果剔除了,C1 就无法从 Eureka 服务器中获取 C2 注册的服务,但是这时候 C2 服务是可用的。

2.LookupSserice

public interface LookupService<T> {

    /**
     * 获取指定 应用  Application ( 持有该应用名的实例列表 )
     */
    Application getApplication(String appName);

    /**
     * 获取 所有 Application  
     */
    Applications getApplications();

    /**
     *获取指定ID   实例  
     */
    List<InstanceInfo> getInstancesById(String id);

    /**
     *  获取下一个服务 
     * @param virtualHostname
     				与服务段通信的主机名
     *           
     * @param secure
     *            是否为 HTTPS
     */
    InstanceInfo getNextServerFromEureka(String virtualHostname, boolean secure);
}

LookupSserice 实现较为简单,这里不介绍了 。

3.EurekaClient

上面的LeaseManager和LookupService 都是eureka服务端的操作,下面介绍客服端

public class DiscoveryClient implements EurekaClient {

	......
        /**		
         ApplicationInfoManager :初始化注册所需的Instance信息 , 信息由CloudInstanceConfig 提供
         EurekaClientConfig :eureka client 配置信息 在SpringCloud中由 EurekaClientConfigBean提供
         AbstractDiscoveryClientOptionalArgs: eureka client 执行时需要的参数 在SpringCloud中由RestTemplateDiscoveryClientOptionalArgs ,其主要是将eureka rest操作默认的Jersey设置为spring自己的RestTemplate。
        */
        @Inject
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider) {
        //  初始化和设置属性
        if (args != null) {
            this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
            this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
            this.eventListeners.addAll(args.getEventListeners());
            this.preRegistrationHandler = args.preRegistrationHandler;
        } else {
            this.healthCheckCallbackProvider = null;
            this.healthCheckHandlerProvider = null;
            this.preRegistrationHandler = null;
        }
        
        this.applicationInfoManager = applicationInfoManager;
        InstanceInfo myInfo = applicationInfoManager.getInfo();

        clientConfig = config;
        staticClientConfig = clientConfig;
        transportConfig = config.getTransportConfig();
        instanceInfo = myInfo;
        if (myInfo != null) {
            appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
        } else {
            logger.warn("Setting instanceInfo to a passed in null value");
        }

        this.backupRegistryProvider = backupRegistryProvider;

        this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
        localRegionApps.set(new Applications());

        fetchRegistryGeneration = new AtomicLong(0);

        remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
        remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));

        if (config.shouldFetchRegistry()) {
            this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }

        if (config.shouldRegisterWithEureka()) {
            this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }

        logger.info("Initializing Eureka in region {}", clientConfig.getRegion());

        if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
            logger.info("Client configured to neither register nor query for data.");
            scheduler = null;
            heartbeatExecutor = null;
            cacheRefreshExecutor = null;
            eurekaTransport = null;
            instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());

            // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
            // to work with DI'd DiscoveryClient
            DiscoveryManager.getInstance().setDiscoveryClient(this);
            DiscoveryManager.getInstance().setEurekaClientConfig(config);

            initTimestampMs = System.currentTimeMillis();
            logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                    initTimestampMs, this.getApplications().size());

            return;  // no need to setup up an network tasks and we are done
        }

        try {
            // default size of 2 - 1 each for heartbeat and cacheRefresh
            scheduler = Executors.newScheduledThreadPool(2,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());
			 //  创建心跳检测线程池 , 主要用于服务续约.
            heartbeatExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff

            cacheRefreshExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff
			// 初始化 EurekaTransport 
            eurekaTransport = new EurekaTransport();
             // 设置 EurekaTransport 属性 ,指定RestTemplateEurekaHttpClient为rest请求的client
            scheduleServerEndpointTask(eurekaTransport, args);

            AzToRegionMapper azToRegionMapper;
            if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
                azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
            } else {
                azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
            }
            if (null != remoteRegionsToFetch.get()) {
                azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
            }
            instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
        } catch (Throwable e) {
            throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
        }

        if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
            fetchRegistryFromBackup();
        }

        // 在注册执行之前调用beforeRegistration 
        if (this.preRegistrationHandler != null) {
            this.preRegistrationHandler.beforeRegistration();
        }
		//
        if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
            try {
                // 注册 
                if (!register() ) {
                    throw new IllegalStateException("Registration error at startup. Invalid server response.");
                }
            } catch (Throwable th) {
                logger.error("Registration error at startup: {}", th.getMessage());
                throw new IllegalStateException(th);
            }
        }

        // 启动定时任务 (心跳检测,服务刷新,服务端之间的复制)
        initScheduledTasks();

        try {
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register timers", e);
        }

        // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
        // to work with DI'd DiscoveryClient
        DiscoveryManager.getInstance().setDiscoveryClient(this);
        DiscoveryManager.getInstance().setEurekaClientConfig(config);

        initTimestampMs = System.currentTimeMillis();
        logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                initTimestampMs, this.getApplications().size());
    }
        
        
        
   /**
     *向eureka  server 发送 rest 注册请求 将instanceInfo信息 交给EurekaHttpClient处理(SpringCloud 中RestTemplateEurekaHttpClient处理)
     */
    boolean register() throws Throwable {
        logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
        EurekaHttpResponse<Void> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo信息交给);
        } catch (Exception e) {
            logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == 204;
    }
    /**
    	renew() ;
    	unregister() ;
    	refreshRegistry() ; 
    	原理与register() 类似,都是将instanceInfo信息交给EurekaHttpClient 向服务端发送请求.
    
    */
}

从上面的代码可以看出,eureka客户端在初始化的时候经过以下几步:

  • 初始化和设置属性。
  • 创建服务间通信的定时任务
  • 注册
  • 启动定时任务
  • 设置好实例信息

在这里,我们已经讲解了Netflix eureka 中服务端和客户端核心操作以及实现(部分细节还请读者自行研究),下一节,我们将介绍Spring Cloud Eureka 对 Netflix Eureka 扩展。


版权声明:本文为qq_38165752原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/qq_38165752/article/details/85783461