原创

透彻理解Java网络编程(二三)——Netty实战:负载均衡机制

在分布式系统中,服务消费者和服务提供者都存在多个节点,所以 RPC 框架需要实现合理的负载均衡算法,控制流量能够均匀地分摊到每个服务提供者,避免单点故障导致的服务调用异常。所以,本章我就来实现 RPC 框架的负载均衡机制。

一、注册中心

服务消费者在发起 RPC 调用之前,需要知道服务提供者有哪些节点是可用的。由于服务提供者节点会存在上/下线的情况,所以服务消费者需要感知服务提供者节点的动态变化。

RPC 框架一般采用注册中心来实现服务的注册和发现,主流的注册中心有 ZooKeeper、Eureka、Etcd、Consul、Nacos 等。在本工程中,我使用 Zookeeper,并设计一个通用的注册中心接口,各类注册中心都可以按该接口规范进行扩展。

1.1 接口设计

注册中心会存储服务提供方的元数据信息,我们需要将服务元数据信息封装成一个对象,该对象包括服务全限定名称服务版本服务地址/端口

/**
 * 服务元数据
 */
public class ServiceMeta {

    /**
     * 服务类全限定名
     */
    private String service;

    /**
     * 服务版本号
     */
    private String version;

    /**
     * 服务实例地址
     */
    private String address;

    /**
     * 服务端实例端口
     */
    private Integer port;

    //...省略get/set
}

接着,需要设计通用的注册中心管理接口,包含注册中心四个基本操作:

  • 服务注册 register
  • 服务注销 unRegister
  • 服务发现 discovery
  • 注册中心销毁 destroy
/**
 * 服务注册管理
 */
public interface RegistryService {

    void register(ServiceMeta serviceMeta) throws Exception;

    void unRegister(ServiceMeta serviceMeta) throws Exception;

    ServiceMeta discovery(String serviceName, int invokerHashCode) throws Exception;

    void destroy() throws IOException;
}

来看具体的实现ZookeeperRegistryService:

public class ZookeeperRegistryService implements RegistryService {
    /**
     * 注册中心连接失败时的重试间隔
     */
    public static final int BASE_SLEEP_TIME_MS = 1000;

    /**
     * 注册中心连接失败时的重试次数
     */
    public static final int MAX_RETRIES = 3;

    /**
     * 注册中心中的RPC服务根节点路径
     */
    public static final String ZK_BASE_PATH = "/rpc";

    private final ServiceDiscovery<ServiceMeta> serviceDiscovery;

    public ZookeeperRegistryService(String registryAddr) throws Exception {
        // 1.连接Zookeeper注册中心
        CuratorFramework client = CuratorFrameworkFactory.newClient(registryAddr,
                new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES));
        client.start();

        JsonInstanceSerializer<ServiceMeta> serializer = new JsonInstanceSerializer<>(ServiceMeta.class);

        // 2.基于Curator框架构建一个服务注册工具类
        this.serviceDiscovery = ServiceDiscoveryBuilder
                .builder(ServiceMeta.class) // 自定义数据
                .client(client)
                .serializer(serializer)
                .basePath(ZK_BASE_PATH)
                .build();
        this.serviceDiscovery.start();
    }

    @Override
    public void register(ServiceMeta serviceMeta) throws Exception {
        String serviceKey = RpcServiceHelper.buildServiceKey(serviceMeta.getService(), serviceMeta.getVersion());

        // 构建一个服务实例对象
        ServiceInstance<ServiceMeta> serviceInstance = ServiceInstance.<ServiceMeta>builder()
                .name(serviceKey)
                .address(serviceMeta.getAddress())
                .port(serviceMeta.getPort())
                .payload(serviceMeta)
                .build();
        // 注册服务实例
        serviceDiscovery.registerService(serviceInstance);
    }

    @Override
    public void unRegister(ServiceMeta serviceMeta) throws Exception {
        String serviceKey = RpcServiceHelper.buildServiceKey(serviceMeta.getService(), serviceMeta.getVersion());
        ServiceInstance<ServiceMeta> serviceInstance = ServiceInstance.<ServiceMeta>builder()
                .name(serviceKey)
                .address(serviceMeta.getAddress())
                .port(serviceMeta.getPort())
                .payload(serviceMeta)
                .build();
        serviceDiscovery.unregisterService(serviceInstance);
    }

    @Override
    public ServiceMeta discovery(String serviceName, int invokerHashCode) throws Exception {
        Collection<ServiceInstance<ServiceMeta>> serviceInstances = serviceDiscovery.queryForInstances(serviceName);
        ServiceLoadBalancer balancer = new ZKConsistentHashLoadBalancer();
        // 基于一致性Hash算法,选择一个服务节点
        ServiceInstance<ServiceMeta> instance = (ServiceInstance<ServiceMeta>) balancer.select(new ArrayList(serviceInstances), invokerHashCode);
        if (instance != null) {
            return instance.getPayload();
        }
        return null;
    }

    @Override
    public void destroy() throws IOException {
        serviceDiscovery.close();
    }
}

上述代码,核心是创建了 ServiceDiscovery 对象, ServiceDiscovery 负责服务的注册和发现。

1.2 服务注册

服务注册通过ServiceDiscovery.register()方法完成:

@Override
public void register(ServiceMeta serviceMeta) throws Exception {
    String serviceKey = RpcServiceHelper.buildServiceKey(serviceMeta.getService(), serviceMeta.getVersion());

    // 构建一个服务实例对象
    ServiceInstance<ServiceMeta> serviceInstance = ServiceInstance.<ServiceMeta>builder()
        .name(serviceKey)
        .address(serviceMeta.getAddress())
        .port(serviceMeta.getPort())
        .payload(serviceMeta)
        .build();
    // 注册服务实例
    serviceDiscovery.registerService(serviceInstance);
}

ServiceInstance 对象代表一个服务实例,包含服务名称 name、唯一标识 id、地址 address、端口 port 以及用户自定义的可选属性 payload,ServiceInstance 在 Zookeeper 服务器中的存储形式如下:



RpcProviderInitializer 在启动过程中,根据 @RpcService 注解识别需要发布的服务,然后调用 RegistryService 接口的 register() 方法,将服务发布到注册中心:

// RpcProviderInitializer.java

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
    // 如果类注解了@RpcService,则执行服务注册
    RpcService rpcService = bean.getClass().getAnnotation(RpcService.class);
    if (rpcService != null) {
        // 服务类的全限定名
        String serviceName = rpcService.service().getName();
        // 服务版本号
        String serviceVersion = rpcService.version();

        try {
            // 创建服务元数据对象
            ServiceMeta serviceMeta = new ServiceMeta();
            serviceMeta.setAddress(serverAddress);
            serviceMeta.setPort(serverPort);
            serviceMeta.setService(serviceName);
            serviceMeta.setVersion(serviceVersion);
            // 注册服务
            serviceRegistry.register(serviceMeta);
            // 缓存服务
            String serviceKey = RpcServiceHelper.buildServiceKey(serviceMeta.getService(), serviceMeta.getVersion());
            rpcServiceMap.put(serviceKey, bean);
        } catch (Exception e) {
            LOG.error("failed to register service {}#{}", serviceName, serviceVersion, e);
        }
    }
    return bean;
}

至此,服务提供者在启动后就可以将 @RpcService 注解修饰的服务发布到注册中心了。

二、负载均衡

服务消费者在发起 RPC 调用之前,需要感知有多少服务端节点可用,然后从中选取一个进行调用。之前我提到了几种常用的负载均衡策略:Round-Robin 轮询、Weighted Round-Robin 权重轮询、Least Connections 最少连接数、一致性 Hash 等。

本节,我针对一致性 Hash 算法进行实现,一致性 Hash 算法可以保证每个服务节点分摊的流量尽可能均匀,而且能够把服务节点伸缩带来的影响降到最低

一致性Hash算法的原理请参见我的专栏《分布式系统从理论到实战》

2.1 一致性Hash实现

首先,定义一个通用的负载均衡接口,Round-Robin 、一致性 Hash 等负载均衡算法都需要实现该接口,接口定义如下:

/**
 * 负载均衡器
 * @param <T>
 */
public interface ServiceLoadBalancer<T> {
    T select(List<T> servers, int hashCode);
}

上述的select() 方法的传入参数是一批服务节点以及客户端对象的 hashCode,针对 Zookeeper 的场景,我们可以实现一个比较通用的一致性 Hash 算法:

public class ZKConsistentHashLoadBalancer implements ServiceLoadBalancer<ServiceInstance<ServiceMeta>> {
    /**
     * 每个实节点对应的虚拟节点数
     */
    private final static int VIRTUAL_NODE_SIZE = 10;
    private final static String VIRTUAL_NODE_SPLIT = "#";

    @Override
    public ServiceInstance<ServiceMeta> select(List<ServiceInstance<ServiceMeta>> servers, int hashCode) {
        // 构造一个Hash环
        TreeMap<Integer, ServiceInstance<ServiceMeta>> ring = makeConsistentHashRing(servers);
        // 选择一个节点
        return allocateNode(ring, hashCode);
    }

    private ServiceInstance<ServiceMeta> allocateNode(TreeMap<Integer, ServiceInstance<ServiceMeta>> ring, int hashCode) {
        // 获取第一个大于等于该hashCode的KEY对应的节点
        Map.Entry<Integer, ServiceInstance<ServiceMeta>> entry = ring.ceilingEntry(hashCode);
        if (entry == null) {
            entry = ring.firstEntry();
        }
        return entry.getValue();
    }

    private TreeMap<Integer, ServiceInstance<ServiceMeta>> makeConsistentHashRing(List<ServiceInstance<ServiceMeta>> servers) {
        TreeMap<Integer, ServiceInstance<ServiceMeta>> ring = new TreeMap<>();
        // 遍历每一个服务实例
        for (ServiceInstance<ServiceMeta> instance : servers) {
            // 每个实例映射10个虚拟节点
            for (int i = 0; i < VIRTUAL_NODE_SIZE; i++) {
                ring.put((buildHashKey(instance) + VIRTUAL_NODE_SPLIT + i).hashCode(), instance);
            }
        }
        return ring;
    }

    private String buildHashKey(ServiceInstance<ServiceMeta> instance) {
        ServiceMeta payload = instance.getPayload();
        return String.join(":", payload.getAddress(), String.valueOf(payload.getPort()));
    }
}

JDK 提供了 TreeMap 数据结构,可以非常方便地构造哈希环。TreeMap 底层基于红黑树实现,可以维持Key的有序性,所以我们构造Hash环的思路就是:

  1. 遍历所有服务节点,为每个服务节点创建10个虚拟节点,每个虚拟节点的唯一标识是:服务IP:服务端口#[虚拟节点编号],比如192.168.1.1:28083#1
  2. 计算虚拟节点唯一标识的hash值,将该值作为Key,服务实例作为Value存入TreeMap;
  3. 当客户端请求一个服务节点时,以客户端hashcode作为KEY,去TreeMap查找,TreeMap.ceilingEntry()方法会返回大于或等于给定KEY的最小键值对,即为客户端对应要调用的服务节点。

2.2 服务发现

服务发现的实现思路比较简单,首先找出被调用服务所有的节点列表,然后通过 ServiceLoadBalancer 提供的负载均衡算法,找出一个相应的服务节点。具体代码实现如下:

@Override
public ServiceMeta discovery(String serviceName, int invokerHashCode) throws Exception {
    Collection<ServiceInstance<ServiceMeta>> serviceInstances = serviceDiscovery.queryForInstances(serviceName);
    ServiceLoadBalancer balancer = new ZKConsistentHashLoadBalancer();
    // 基于一致性Hash算法,选择一个服务节点
    ServiceInstance<ServiceMeta> instance = (ServiceInstance<ServiceMeta>) balancer.select(new ArrayList(serviceInstances), invokerHashCode);
    if (instance != null) {
        return instance.getPayload();
    }
    return null;
}

三、总结

本章,我对RPC框架的负载均衡机制进行了讲解,主要涉及服务注册和服务发现的负载均衡机制的讲解,我基于TreeMap实现了一致性Hash算法,事实上在生成hash值时,可以使用 Google Guava 工具库中的MurmurHash,它具有更好的防Hash碰撞的特性。

服务消费者通过服务发现接口获取到可调用的服务节点后,是通过动态代理机制完成请求调用的,我会在下一章详细讲解。

正文到此结束

感谢赞赏~

本文目录