原创

透彻理解Spring Cloud系列(十五)——服务注册发现:心跳续租(renew)机制

本章,我将讲解Eureka的心跳(renew)机制。通过前面章节,我们已经知道 Eureka-Client 向 Eureka-Server 注册应用实例成功后,会获得租约 ( Lease )。所以,Eureka-Client 向 Eureka-Server 发送心跳,其实就是进行续租(Lease Renewal),避免租约过期,这样才能让Eureka-Server知道自己还活着。

默认情况下,租约有效期为 90 秒,心跳(续租)频率为 30 秒。两者比例为 1 : 3 ,也就是说,在网络异常等情况下,有三次重试机会。

一、Eureka-Client发起续租

Eureka-Client发起心跳(续租)的整体流程如下:

  1. DiscoveryClient初始化时,会去调度一堆定时任务,其中有一个就是HeartbeatThread——心跳任务;
  2. 心跳任务,默认每隔30秒执行一次发送心跳的逻辑,底层是调用了EurekaHttpClient#sendHeartbeat()
  3. 心跳请求是一个HTTP/PUT请求,URL类似http://localhost:8080/v2/apps/ServiceA/i-000000-1。

1.1 初始化定时任务

Eureka-Client 在构造和初始化时,会创建一个心跳(续租)线程——HeartbeatThread,每隔一段时间发送一次心跳:

/**
* DiscoveryClient.java
*/

DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
               Provider<BackupRegistry> backupRegistryProvider) {
    // ...

    // 定时任务线程池,用于定时触发心跳( 续租 )
    scheduler = Executors.newScheduledThreadPool(2,
               new ThreadFactoryBuilder()
                       .setNameFormat("DiscoveryClient-%d")
                       .setDaemon(true)
                       .build());

    // 心跳任务执行线程池
    heartbeatExecutor = new ThreadPoolExecutor(
              1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
              new SynchronousQueue<Runnable>(),
              new ThreadFactoryBuilder()
                      .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                      .setDaemon(true)
                      .build()
    ); 

    // ...

    // 初始化定时任务
    initScheduledTasks();
}

initScheduledTasks,会开始进行心跳任务的调度,默认每隔30秒调度一次:

private void initScheduledTasks() {
    // 向 Eureka-Server 发送心跳(续租)执行器
    if (clientConfig.shouldRegisterWithEureka()) {
        // 心跳间隔,默认30s
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        // 超时重试因子
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);

        // 开始调度
        scheduler.schedule(
            new TimedSupervisorTask(
                "heartbeat",
                scheduler,
                heartbeatExecutor,
                renewalIntervalInSecs,
                TimeUnit.SECONDS,
                expBackOffBound,
                new HeartbeatThread()    // HeartbeatThread线程
            ),
            renewalIntervalInSecs, TimeUnit.SECONDS);

        //...
    } else {
        logger.info("Not registering with Eureka server per configuration");
    }
}

1.2 HeartbeatThread

上述com.netflix.discovery.DiscoveryClient.HeartbeatThread,是一个包含心跳请求逻辑的Runnable任务,实现执行 Eureka-Client 向 Eureka-Server 发起续租( renew )请求。实现代码如下:

/**
* DiscoveryClient.java
*/

// 最后成功向 Eureka-Server 心跳时间戳
private volatile long lastSuccessfulHeartbeatTimestamp = -1;

private class HeartbeatThread implements Runnable {

   public void run() {
       if (renew()) {
           // 心跳发送成功后,更新时间戳
           lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
       }
   }
}

调用 #renew 方法,执行续租逻辑:

boolean renew() {
   EurekaHttpResponse<InstanceInfo> httpResponse;
   try {
       // 1.利用EurekaTransport通信组件发送HTTP/PUT请求
       httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
       logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());

       // 2.404表示 Eureka-Server 不存在租约,则重新发起注册
       if (httpResponse.getStatusCode() == 404) {
           REREGISTER_COUNTER.increment();
           logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
           long timestamp = instanceInfo.setIsDirtyWithTime();
           // 新发起注册
           boolean success = register();
           if (success) {
               instanceInfo.unsetIsDirty(timestamp);
           }
           return success;
       }
       return httpResponse.getStatusCode() == 200;
   } catch (Throwable e) {
       logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
       return false;
   }
}

1.3 EurekaHttpClient

renew方法内部其实调用了 AbstractJerseyEurekaHttpClient#sendHeartBeat(...) 方法,发起续租请求,实现代码如下:

/**
* AbstractJerseyEurekaHttpClient.java
*/

public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
    // URL,例如http://localhost:8080/v2/apps/ServiceA/i-000000-1
    String urlPath = "apps/" + appName + '/' + id;
    ClientResponse response = null;
    try {
        // 发送http/put请求
        WebResource webResource = jerseyClient.resource(serviceUrl)
            .path(urlPath)
            .queryParam("status", info.getStatus().toString())
            .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
        if (overriddenStatus != null) {
            webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
        }
        Builder requestBuilder = webResource.getRequestBuilder();
        addExtraHeaders(requestBuilder);
        response = requestBuilder.put(ClientResponse.class);    //PUT Request
        EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));
        if (response.hasEntity()) {
            eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class));
        }
        return eurekaResponseBuilder.build();
    } finally {
        if (logger.isDebugEnabled()) {
            logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
        }
        if (response != null) {
            response.close();
        }
    }
}

二、Eureka-Server接受续租

Eureka-Server接受心跳(续租)的整体流程如下:

  1. 负责接受心跳请求的Controller是InstanceResource#renewLease(),相当于Jersey的一个MVC框架;
  2. 调用AbstractInstanceRegistry的renew()方法,完成应用实例的续租;
  3. 实际续租的处理流程是:根据应用名称和实例ID,从内部的Map中获取一个Lease,更新一下lastUpdateTimestamp时间戳。

2.1 接受续租请求

com.netflix.eureka.resources.InstanceResource,是处理单个应用实例的请求的 Resource ( Controller ):

/**
* InstanceResource.java
*/

@PUT
public Response renewLease(
    @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
    @QueryParam("overriddenstatus") String overriddenStatus,
    @QueryParam("status") String status,
    @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
    boolean isFromReplicaNode = "true".equals(isReplication);

    // 1.执行续租
    boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);

    // 2.续租失败,返回 404 响应。当 Eureka-Client 收到 404 响应后,会重新发起注册
    if (!isSuccess) {
        logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
        return Response.status(Status.NOT_FOUND).build();
    }
    // 比较请求的lastDirtyTimestamp和Server端的InstanceInfo的lastDirtyTimestamp的差异
    // 需要配置 eureka.syncWhenTimestampDiffers = true ( 默认开启 )。
    Response response = null;
    if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
        // 比较 lastDirtyTimestamp 的差异
        response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
        // Store the overridden status since the validation found out the node that replicates wins
        if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
            && (overriddenStatus != null)
            && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
            && isFromReplicaNode) {
            registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
        }
    } else {
        response = Response.ok().build();
    }
    logger.debug("Found (Renew): {} - {}; reply status={}" + app.getName(), id, response.getStatus());
    return response;
}

2.2 执行续租

实际续租逻辑的执行是在Server端的注册表对象的AbstractInstanceRegistry#renew(...) 方法,本质就是根据应用名称和实例ID,在一个内部的Map中找到租约对象,并更新它的时间戳:

/**
* AbstractInstanceRegistry.java
*/

public boolean renew(String appName, String id, boolean isReplication) {
    RENEW.increment(isReplication);

    // key是实例id,value是Lease对象
    Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
    Lease<InstanceInfo> leaseToRenew = null;
    if (gMap != null) {
        leaseToRenew = gMap.get(id);
    }
    // 1.如果没找到租约,直接返回失败
    if (leaseToRenew == null) {
        RENEW_NOT_FOUND.increment(isReplication);
        logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
        return false;
    } 
    // 2.找到了旧的租约
    else {
        InstanceInfo instanceInfo = leaseToRenew.getHolder();
        if (instanceInfo != null) {
            // 获取应用实例的最终状态
            InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                instanceInfo, leaseToRenew, isReplication);

            // 若应用实例的最终状态为UNKNOWN,则返回失败
            if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                            + "; re-register required", instanceInfo.getId());
                RENEW_NOT_FOUND.increment(isReplication);
                return false;
            }

            // 若应用实例的状态与最终状态不相等,则以最终状态覆盖掉应用实例的状态
            if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                Object[] args = {
                    instanceInfo.getStatus().name(),
                    instanceInfo.getOverriddenStatus().name(),
                    instanceInfo.getId()
                };
                logger.info(
                    "The instance status {} is different from overridden instance status {} for instance {}. "
                    + "Hence setting the status to overridden status", args);
                instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
            }
        }

        // 增加续租计数,Netflix Servo监控用
        renewsLastMin.increment();
        // 设置 租约最后更新时间戳
        leaseToRenew.renew();
        return true;
    }
}
/**
* Lease.java
*/
public void renew() {
    // duration是租约有效期,默认90秒
    lastUpdateTimestamp = System.currentTimeMillis() + duration;
}

lastUpdateTimestamp就是最新一次接受到心跳的时间戳,其实这里不需要加这个租约有效期duration,是一个Eureka自身的一个Bug,后面我讲解服务剔除时会讲到。

三、总结

总结,本章我讲解了Eureka的心跳续租机制,本质就是Eureka-Client通过定时调度任务,每隔一段时间发送一次HTTP/PUT请求到Eureka-Server端。Eureka-Server端接受到请求后,在自己的注册表中找到应用实例对应的租约,然后更新租约的时间戳。

正文到此结束

感谢赞赏~

本文目录