原创

透彻理解Spring Cloud系列(七)——Eureka启动:EurekaClient初始化

Eureka-Server注册中心启动时,会构造一个EurakaClient对象:

eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);

EurakaClient对象的主要作用如下:

  1. 提供多种方法获取应用集合(com.netflix.discovery.shared.Applications) 和 应用实例( com.netflix.appinfo.InstanceInfo );
  2. 提供方法获取本地客户端信息,例如,应用管理器( com.netflix.appinfo.ApplicationInfoManager )和 Eureka-Client 配置( com.netflix.discovery.EurekaClientConfig );
  3. 提供方法注册本地客户端的健康检查和 Eureka 事件监听器。

LookupService,顾名思义,是查找服务,这个接口提供了一些简单单一的方式获取应用集合(com.netflix.discovery.shared.Applications) 和 应用实例信息( com.netflix.appinfo.InstanceInfo )。

一、DiscoveryClient

DiscoveryClient是EurakaClient的具体实现,用于与 Eureka-Server 交互,它在构造时需要ApplicationInfoManager对象和EurekaClientConfig对象:

DiscoveryClient提供了如下功能:

  • 向 Eureka-Server 注册自身服务;
  • 向 Eureka-Server 续约自身服务;
  • 向 Eureka-Server 取消自身服务;
  • 从 Eureka-Server 查询应用集合和应用实例信息。

1.1 构造

DiscoveryClient对象的构造最终是调用了如下构造器,大家可以先顺着我的注释了解整个流程:

// Applications 在Eureka-Client本地的缓存
private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();

// 拉取注册表次数
private final AtomicLong fetchRegistryGeneration;

// 最后一次成功拉取注册表的时间戳
private volatile long lastSuccessfulRegistryFetchTimestamp = -1;

// 最后一次成功发送心跳(续约)的时间戳
private volatile long lastSuccessfulHeartbeatTimestamp = -1;

// 心跳(续约)的监控
private final ThresholdLevelsMetric heartbeatStalenessMonitor;

// 注册表拉取的监控
private final ThresholdLevelsMetric registryStalenessMonitor;

DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) {
    // 注:BackupRegistry是备份注册表接口,没有具体实现;
    // AbstractDiscoveryClientOptionalArgs是可选参数抽象基类,这两个都可以忽略

    // 1.设置各种健康检查处理器、监听器
    if (args != null) {
        this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
        this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
        // Eureka 事件监听器
        this.eventListeners.addAll(args.getEventListeners());
        this.preRegistrationHandler = args.preRegistrationHandler;
    } else {
        this.healthCheckCallbackProvider = null;
        this.healthCheckHandlerProvider = null;
        this.preRegistrationHandler = null;
    }

    // 2.将ApplicationInfoManager、EurekaClientConfig、InstanceInfo保存到自身的字段中
    this.applicationInfoManager = applicationInfoManager;
    InstanceInfo myInfo = applicationInfoManager.getInfo();
    clientConfig = config;
    staticClientConfig = clientConfig;
    transportConfig = config.getTransportConfig();
    // 设置当前应用实例对象
    instanceInfo = myInfo;
    if (myInfo != null) {
        // 无实际用途,用于打印logger
        appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
    } else {
        logger.warn("Setting instanceInfo to a passed in null value");
    }

    // 设置备份注册表,目前没有默认实现,忽略
    this.backupRegistryProvider = backupRegistryProvider;
    this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);

    // 3.设置Applications在本地的缓存,后续定时任务间隔从 Eureka-Server 拉取注册实例信息到本地缓存
    localRegionApps.set(new Applications());

    // 拉取注册表的次数
    fetchRegistryGeneration = new AtomicLong(0);
    // 获取哪些区域( Region )集合的注册信息
    remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
    // 获取哪些区域( Region )集合的注册信息
    remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));

    // 4.设置是否拉取注册表
    if (config.shouldFetchRegistry()) {
        this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
    } else {
        this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
    }

    // 5.设置是否向注册中心注册自身
    if (config.shouldRegisterWithEureka()) {
        this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
    } else {
        this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
    }

    logger.info("Initializing Eureka in region {}", clientConfig.getRegion());

    // 6.如果当前应用实例是单体部署,也就是既不向注册中心注册,也不拉取注册表
    if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
        logger.info("Client configured to neither register nor query for data.");
        scheduler = null;
        heartbeatExecutor = null;
        cacheRefreshExecutor = null;
        eurekaTransport = null;
        instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());

        // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
        // to work with DI'd DiscoveryClient
        DiscoveryManager.getInstance().setDiscoveryClient(this);
        DiscoveryManager.getInstance().setEurekaClientConfig(config);

        initTimestampMs = System.currentTimeMillis();
        logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                    initTimestampMs, this.getApplications().size());

        // 直接返回,不需要启动任何调度任务
        return;  
    }

    try {
        // 7.创建一个支持定时调度任务的线程池
        scheduler = Executors.newScheduledThreadPool(2,
                                                     new ThreadFactoryBuilder()
                                                     .setNameFormat("DiscoveryClient-%d")
                                                     .setDaemon(true)
                                                     .build());

        // 8.创建一个用于发送心跳(续约)的线程池,注意这里用了SynchronousQueue,且keepAliveTime=0,
        // 那么当线程数量达到maxsize时,由于SynchronousQueue没有空间,所以非核心线程会直接退出
        heartbeatExecutor = new ThreadPoolExecutor(
            1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>(),
            new ThreadFactoryBuilder()
            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
            .setDaemon(true)
            .build()
        );

        // 9.创建一个支持本地注册表缓存刷新的线程池
        cacheRefreshExecutor = new ThreadPoolExecutor(
            1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>(),
            new ThreadFactoryBuilder()
            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
            .setDaemon(true)
            .build()
        );

        // 10.创建一个支持eureka client跟eureka server进行网络通信的底层组件
        eurekaTransport = new EurekaTransport();
        scheduleServerEndpointTask(eurekaTransport, args);

        // 忽略AWS相关逻辑...
    } catch (Throwable e) {
        throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
    }

    // 11.如果配置了需要拉取注册表,则这里就会进行初始拉取
    if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
        // 拉取失败则从备份注册表拉取,没有具体实现,无意义
        fetchRegistryFromBackup();
    }

    // 12.执行注册前的预处理
    if (this.preRegistrationHandler != null) {
        this.preRegistrationHandler.beforeRegistration();
    }

    // 15.启动调度任务
    initScheduledTasks();

    try {
        // 向 Netflix Servo 注册监控,可忽略
        Monitors.registerObject(this);
    } catch (Throwable e) {
        logger.warn("Cannot register timers", e);
    }

    //...

    // 初始化完成时间戳
    initTimestampMs = System.currentTimeMillis();
    logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                initTimestampMs, this.getApplications().size());
}

上述DiscoveryClient的构造流程, 核心其实就三点:

  1. 根据应用实例信息和客户端配置信息,设置DiscoveryClient对象自身的各种属性;
  2. 初始化两个定时任务涉及的线程池:拉取注册表定时任务心跳(续租)定时任务
  3. 向线程池中提交任务,开始周期性的进行注册表拉取和心跳。

后续章节,我会详细讲解服务注册和发现,本章我们先掌握整体流程。

1.2 任务调度

本节,我们来重点看下initScheduledTasks方法,其实就是启动了与Eureka-Client相关的所有调度任务:

private InstanceInfoReplicator instanceInfoReplicator;

private void initScheduledTasks() {
    // 1.需要拉取注册表
    if (clientConfig.shouldFetchRegistry()) {
        // 客户端拉取注册表的间隔,默认30s
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        // 拉取注册表超时重试延期最大因子
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        // 开始调度任务
        scheduler.schedule(
            new TimedSupervisorTask(
                "cacheRefresh",
                scheduler,
                cacheRefreshExecutor,
                registryFetchIntervalSeconds,
                TimeUnit.SECONDS,
                expBackOffBound,
                new CacheRefreshThread()
            ),
            registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }

    // 2.需要向注册中心注册
    if (clientConfig.shouldRegisterWithEureka()) {
        // 心跳间隔,默认30s
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        // 心跳超时重试延期最大因子
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);

        // 开始调度任务
        scheduler.schedule(
            new TimedSupervisorTask(
                "heartbeat",
                scheduler,
                heartbeatExecutor,
                renewalIntervalInSecs,
                TimeUnit.SECONDS,
                expBackOffBound,
                new HeartbeatThread()
            ),
            renewalIntervalInSecs, TimeUnit.SECONDS);

        // 创建一个应用实例副本传播器,用于Eureka集群之间的数据同步
        instanceInfoReplicator = new InstanceInfoReplicator(
            this,
            instanceInfo,
            clientConfig.getInstanceInfoReplicationIntervalSeconds(),
            2); // burstSize

        // 应用实例状态表更监听器,用于监听应用实例的状态变更事件
        statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
            @Override
            public String getId() {
                return "statusChangeListener";
            }

            @Override
            public void notify(StatusChangeEvent statusChangeEvent) {
                if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                    InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                    // log at warn level if DOWN was involved
                    logger.warn("Saw local status change event {}", statusChangeEvent);
                } else {
                    logger.info("Saw local status change event {}", statusChangeEvent);
                }
                instanceInfoReplicator.onDemandUpdate();
            }
        };

        if (clientConfig.shouldOnDemandUpdateStatusChange()) {
            applicationInfoManager.registerStatusChangeListener(statusChangeListener);
        }

        // 3.开始调度
        instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    } else {
        logger.info("Not registering with Eureka server per configuration");
    }
}

上述方法核心一共启动了三个调度任务,并增加了一个监听器:

  1. Eureka-Client周期性地拉取Eureka-Server的注册表;
  2. Eureka-Client周期性向Eureka-Server发送心跳;
  3. 应用实例副本传播器InstanceInfoReplicator,会定时进行Eureka集群间的应用实例信息同步;
  4. 监听对应用实例状态的变更事件。

二、总结

本章,我讲解了EurekaClient的构造和初始化过程,虽然从代码上看挺复杂的,但是整体流程还是非常清晰的,就是配置EurekaClient实例对象需要的各种属性,然后创建一些线程池,最后开始定时任务的调度。

EurekaClient初始化过程中的核心是它内部的那些调度任务:包括拉取注册表、心跳续约等等,这些我会在后续章节分别一一讲解。

正文到此结束

感谢赞赏~

本文目录