原创

透彻理解Spring Cloud系列(九)——Eureka Server:PeerEurekaNodes集群节点集合构造

既然Eureka-Server支持集群部署,那么在启动过程中必然涉及对集群的一些操作,所以Eureka抽象出一个PeerEurekaNodes的概念,这就代表了整个Eureka-Server集群节点集合。在Eureka-Server的启动过程中,会创建一个PeerEurekaNodes对象:

PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes(
    registry,
    eurekaServerConfig,
    eurekaClient.getEurekaClientConfig(),
    serverCodecs,
    applicationInfoManager
);

本章,我就来讲解PeerEurekaNodes到底是个什么东西?

一、PeerEurekaNodes

PeerEurekaNodes代表了整个Eureka-Server集群,它的构造函数没什么特殊的,就是对各种属性进行赋值。PeerEurekaNodes内部保存了一个PeerEurekaNode列表,表示该集群中的所有Eureka-Server节点:

public class PeerEurekaNodes {

    // 应用实例注册表
    protected final PeerAwareInstanceRegistry registry;

    // Eureka-Server 配置
    protected final EurekaServerConfig serverConfig;

    // Eureka-Client 配置
    protected final EurekaClientConfig clientConfig;

    // Eureka-Server 编解码
    protected final ServerCodecs serverCodecs;

    // 应用实例信息管理器
    private final ApplicationInfoManager applicationInfoManager;

    // Eureka-Server 集群节点数组
    private volatile List<PeerEurekaNode> peerEurekaNodes = Collections.emptyList();

    // Eureka-Server 服务地址数组
    private volatile Set<String> peerEurekaNodeUrls = Collections.emptySet();

    // 定时调度线程池
    private ScheduledExecutorService taskExecutor;

    @Inject
    public PeerEurekaNodes(
            PeerAwareInstanceRegistry registry,
            EurekaServerConfig serverConfig,
            EurekaClientConfig clientConfig,
            ServerCodecs serverCodecs,
            ApplicationInfoManager applicationInfoManager) {
        this.registry = registry;
        this.serverConfig = serverConfig;
        this.clientConfig = clientConfig;
        this.serverCodecs = serverCodecs;
        this.applicationInfoManager = applicationInfoManager;
    }
}

1.1 start启动

PeerEurekaNodes包含了一个start方法,整体的逻辑很简单:

public void start() {
    // 1.创建一个线程池,只有一个工作线程
    taskExecutor = Executors.newSingleThreadScheduledExecutor(
        new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                thread.setDaemon(true);
                return thread;
            }
        }
    );
    try {
        // 2.初始化集群节点信息
        updatePeerEurekaNodes(resolvePeerUrls());
        // 3.创建一个任务,用于定时更新集群节点信息,默认10 分钟
        Runnable peersUpdateTask = new Runnable() {
            @Override
            public void run() {
                try {
                    updatePeerEurekaNodes(resolvePeerUrls());
                } catch (Throwable e) {
                    logger.error("Cannot update the replica Nodes", e);
                }

            }
        };
        // 4.提交任务
        taskExecutor.scheduleWithFixedDelay(
            peersUpdateTask,
            serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
            serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
            TimeUnit.MILLISECONDS
        );
    } catch (Exception e) {
        throw new IllegalStateException(e);
    }
    // 打印集群节点信息
    for (PeerEurekaNode node : peerEurekaNodes) {
        logger.info("Replica node URL:  " + node.getServiceUrl());
    }
}

1.2 更新集群节点信息

我们来看下上面的resolvePeerUrls() 方法,这个方法返回一个 Eureka-Server 集群服务地址的数组:

protected List<String> resolvePeerUrls() {
    // 1.根据当前应用实例信息,解析出整个Eureka-Server集群的服务地址
    // 涉及EndPoint和解析器,后续章节会讲解
    InstanceInfo myInfo = applicationInfoManager.getInfo();
    String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
    List<String> replicaUrls = EndpointUtils.getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));

    // 2.移除当前应用实例(自己),避免向自己同步
    int idx = 0;
    while (idx < replicaUrls.size()) {
        if (isThisMyUrl(replicaUrls.get(idx))) {
            replicaUrls.remove(idx);
        } else {
            idx++;
        }
    }
    return replicaUrls;
}

然后,我们看下updatePeerEurekaNodes这个更新集群节点信息的方法,它主要做两件事情:

  • 添加新增的集群节点
  • 关闭删除的集群节点
protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
    if (newPeerUrls.isEmpty()) {
        logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
        return;
    }

    // 1.计算删除的集群节点地址
    Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
    toShutdown.removeAll(newPeerUrls);

    // 2.计算新增的集群节点地址
    Set<String> toAdd = new HashSet<>(newPeerUrls);
    toAdd.removeAll(peerEurekaNodeUrls);

    if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
        return;
    }

    // 3.关闭删除的集群节点
    List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
    if (!toShutdown.isEmpty()) {
        logger.info("Removing no longer available peer nodes {}", toShutdown);
        int i = 0;
        while (i < newNodeList.size()) {
            PeerEurekaNode eurekaNode = newNodeList.get(i);
            if (toShutdown.contains(eurekaNode.getServiceUrl())) {
                newNodeList.remove(i);
                eurekaNode.shutDown();
            } else {
                i++;
            }
        }
    }

    // 4.添加新增的集群节点
    if (!toAdd.isEmpty()) {
        logger.info("Adding new peer nodes {}", toAdd);
        for (String peerUrl : toAdd) {
            newNodeList.add(createPeerEurekaNode(peerUrl));
        }
    }

    // 5.重新给属性赋值
    this.peerEurekaNodes = newNodeList;
    this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
}

// PeerEurekaNodes#createPeerEurekaNode
protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
    // 根据服务地址,创建一个集群节点

    HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);
    String targetHost = hostFromUrl(peerEurekaNodeUrl);
    if (targetHost == null) {
        targetHost = "host";
    }
    return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
}

二、总结

本章,我简单介绍了PeerEurekaNodes,它代表的其实就是整个Eureka-Server集群。PeerEurekaNodes在构造成功后,可以调用start方法解析集群中其它节点(创建一些PeerEurekaNode对象),然后启动一个定时任务更新PeerEurekaNode信息。

正文到此结束

感谢赞赏~

本文目录