透彻理解Java网络编程(二三)——Netty实战:负载均衡机制
在分布式系统中,服务消费者和服务提供者都存在多个节点,所以 RPC 框架需要实现合理的负载均衡算法,控制流量能够均匀地分摊到每个服务提供者,避免单点故障导致的服务调用异常。所以,本章我就来实现 RPC 框架的负载均衡机制。
一、注册中心
服务消费者在发起 RPC 调用之前,需要知道服务提供者有哪些节点是可用的。由于服务提供者节点会存在上/下线的情况,所以服务消费者需要感知服务提供者节点的动态变化。
RPC 框架一般采用注册中心来实现服务的注册和发现,主流的注册中心有 ZooKeeper、Eureka、Etcd、Consul、Nacos 等。在本工程中,我使用 Zookeeper,并设计一个通用的注册中心接口,各类注册中心都可以按该接口规范进行扩展。
1.1 接口设计
注册中心会存储服务提供方的元数据信息,我们需要将服务元数据信息封装成一个对象,该对象包括服务全限定名称、服务版本、服务地址/端口:
/**
* 服务元数据
*/
public class ServiceMeta {
/**
* 服务类全限定名
*/
private String service;
/**
* 服务版本号
*/
private String version;
/**
* 服务实例地址
*/
private String address;
/**
* 服务端实例端口
*/
private Integer port;
//...省略get/set
}
接着,需要设计通用的注册中心管理接口,包含注册中心四个基本操作:
- 服务注册 register
- 服务注销 unRegister
- 服务发现 discovery
- 注册中心销毁 destroy
/**
* 服务注册管理
*/
public interface RegistryService {
void register(ServiceMeta serviceMeta) throws Exception;
void unRegister(ServiceMeta serviceMeta) throws Exception;
ServiceMeta discovery(String serviceName, int invokerHashCode) throws Exception;
void destroy() throws IOException;
}
来看具体的实现ZookeeperRegistryService:
public class ZookeeperRegistryService implements RegistryService {
/**
* 注册中心连接失败时的重试间隔
*/
public static final int BASE_SLEEP_TIME_MS = 1000;
/**
* 注册中心连接失败时的重试次数
*/
public static final int MAX_RETRIES = 3;
/**
* 注册中心中的RPC服务根节点路径
*/
public static final String ZK_BASE_PATH = "/rpc";
private final ServiceDiscovery<ServiceMeta> serviceDiscovery;
public ZookeeperRegistryService(String registryAddr) throws Exception {
// 1.连接Zookeeper注册中心
CuratorFramework client = CuratorFrameworkFactory.newClient(registryAddr,
new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES));
client.start();
JsonInstanceSerializer<ServiceMeta> serializer = new JsonInstanceSerializer<>(ServiceMeta.class);
// 2.基于Curator框架构建一个服务注册工具类
this.serviceDiscovery = ServiceDiscoveryBuilder
.builder(ServiceMeta.class) // 自定义数据
.client(client)
.serializer(serializer)
.basePath(ZK_BASE_PATH)
.build();
this.serviceDiscovery.start();
}
@Override
public void register(ServiceMeta serviceMeta) throws Exception {
String serviceKey = RpcServiceHelper.buildServiceKey(serviceMeta.getService(), serviceMeta.getVersion());
// 构建一个服务实例对象
ServiceInstance<ServiceMeta> serviceInstance = ServiceInstance.<ServiceMeta>builder()
.name(serviceKey)
.address(serviceMeta.getAddress())
.port(serviceMeta.getPort())
.payload(serviceMeta)
.build();
// 注册服务实例
serviceDiscovery.registerService(serviceInstance);
}
@Override
public void unRegister(ServiceMeta serviceMeta) throws Exception {
String serviceKey = RpcServiceHelper.buildServiceKey(serviceMeta.getService(), serviceMeta.getVersion());
ServiceInstance<ServiceMeta> serviceInstance = ServiceInstance.<ServiceMeta>builder()
.name(serviceKey)
.address(serviceMeta.getAddress())
.port(serviceMeta.getPort())
.payload(serviceMeta)
.build();
serviceDiscovery.unregisterService(serviceInstance);
}
@Override
public ServiceMeta discovery(String serviceName, int invokerHashCode) throws Exception {
Collection<ServiceInstance<ServiceMeta>> serviceInstances = serviceDiscovery.queryForInstances(serviceName);
ServiceLoadBalancer balancer = new ZKConsistentHashLoadBalancer();
// 基于一致性Hash算法,选择一个服务节点
ServiceInstance<ServiceMeta> instance = (ServiceInstance<ServiceMeta>) balancer.select(new ArrayList(serviceInstances), invokerHashCode);
if (instance != null) {
return instance.getPayload();
}
return null;
}
@Override
public void destroy() throws IOException {
serviceDiscovery.close();
}
}
上述代码,核心是创建了 ServiceDiscovery 对象, ServiceDiscovery 负责服务的注册和发现。
1.2 服务注册
服务注册通过ServiceDiscovery.register()
方法完成:
@Override
public void register(ServiceMeta serviceMeta) throws Exception {
String serviceKey = RpcServiceHelper.buildServiceKey(serviceMeta.getService(), serviceMeta.getVersion());
// 构建一个服务实例对象
ServiceInstance<ServiceMeta> serviceInstance = ServiceInstance.<ServiceMeta>builder()
.name(serviceKey)
.address(serviceMeta.getAddress())
.port(serviceMeta.getPort())
.payload(serviceMeta)
.build();
// 注册服务实例
serviceDiscovery.registerService(serviceInstance);
}
ServiceInstance 对象代表一个服务实例,包含服务名称 name、唯一标识 id、地址 address、端口 port 以及用户自定义的可选属性 payload,ServiceInstance 在 Zookeeper 服务器中的存储形式如下:

RpcProviderInitializer 在启动过程中,根据 @RpcService
注解识别需要发布的服务,然后调用 RegistryService 接口的 register() 方法,将服务发布到注册中心:
// RpcProviderInitializer.java
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
// 如果类注解了@RpcService,则执行服务注册
RpcService rpcService = bean.getClass().getAnnotation(RpcService.class);
if (rpcService != null) {
// 服务类的全限定名
String serviceName = rpcService.service().getName();
// 服务版本号
String serviceVersion = rpcService.version();
try {
// 创建服务元数据对象
ServiceMeta serviceMeta = new ServiceMeta();
serviceMeta.setAddress(serverAddress);
serviceMeta.setPort(serverPort);
serviceMeta.setService(serviceName);
serviceMeta.setVersion(serviceVersion);
// 注册服务
serviceRegistry.register(serviceMeta);
// 缓存服务
String serviceKey = RpcServiceHelper.buildServiceKey(serviceMeta.getService(), serviceMeta.getVersion());
rpcServiceMap.put(serviceKey, bean);
} catch (Exception e) {
LOG.error("failed to register service {}#{}", serviceName, serviceVersion, e);
}
}
return bean;
}
至此,服务提供者在启动后就可以将 @RpcService
注解修饰的服务发布到注册中心了。
二、负载均衡
服务消费者在发起 RPC 调用之前,需要感知有多少服务端节点可用,然后从中选取一个进行调用。之前我提到了几种常用的负载均衡策略:Round-Robin 轮询、Weighted Round-Robin 权重轮询、Least Connections 最少连接数、一致性 Hash 等。
本节,我针对一致性 Hash 算法进行实现,一致性 Hash 算法可以保证每个服务节点分摊的流量尽可能均匀,而且能够把服务节点伸缩带来的影响降到最低。
一致性Hash算法的原理请参见我的专栏《分布式系统从理论到实战》。
2.1 一致性Hash实现
首先,定义一个通用的负载均衡接口,Round-Robin 、一致性 Hash 等负载均衡算法都需要实现该接口,接口定义如下:
/**
* 负载均衡器
* @param <T>
*/
public interface ServiceLoadBalancer<T> {
T select(List<T> servers, int hashCode);
}
上述的select() 方法的传入参数是一批服务节点以及客户端对象的 hashCode,针对 Zookeeper 的场景,我们可以实现一个比较通用的一致性 Hash 算法:
public class ZKConsistentHashLoadBalancer implements ServiceLoadBalancer<ServiceInstance<ServiceMeta>> {
/**
* 每个实节点对应的虚拟节点数
*/
private final static int VIRTUAL_NODE_SIZE = 10;
private final static String VIRTUAL_NODE_SPLIT = "#";
@Override
public ServiceInstance<ServiceMeta> select(List<ServiceInstance<ServiceMeta>> servers, int hashCode) {
// 构造一个Hash环
TreeMap<Integer, ServiceInstance<ServiceMeta>> ring = makeConsistentHashRing(servers);
// 选择一个节点
return allocateNode(ring, hashCode);
}
private ServiceInstance<ServiceMeta> allocateNode(TreeMap<Integer, ServiceInstance<ServiceMeta>> ring, int hashCode) {
// 获取第一个大于等于该hashCode的KEY对应的节点
Map.Entry<Integer, ServiceInstance<ServiceMeta>> entry = ring.ceilingEntry(hashCode);
if (entry == null) {
entry = ring.firstEntry();
}
return entry.getValue();
}
private TreeMap<Integer, ServiceInstance<ServiceMeta>> makeConsistentHashRing(List<ServiceInstance<ServiceMeta>> servers) {
TreeMap<Integer, ServiceInstance<ServiceMeta>> ring = new TreeMap<>();
// 遍历每一个服务实例
for (ServiceInstance<ServiceMeta> instance : servers) {
// 每个实例映射10个虚拟节点
for (int i = 0; i < VIRTUAL_NODE_SIZE; i++) {
ring.put((buildHashKey(instance) + VIRTUAL_NODE_SPLIT + i).hashCode(), instance);
}
}
return ring;
}
private String buildHashKey(ServiceInstance<ServiceMeta> instance) {
ServiceMeta payload = instance.getPayload();
return String.join(":", payload.getAddress(), String.valueOf(payload.getPort()));
}
}
JDK 提供了 TreeMap 数据结构,可以非常方便地构造哈希环。TreeMap 底层基于红黑树实现,可以维持Key的有序性,所以我们构造Hash环的思路就是:
- 遍历所有服务节点,为每个服务节点创建10个虚拟节点,每个虚拟节点的唯一标识是:
服务IP:服务端口#[虚拟节点编号]
,比如192.168.1.1:28083#1
; - 计算虚拟节点唯一标识的hash值,将该值作为Key,服务实例作为Value存入TreeMap;
- 当客户端请求一个服务节点时,以客户端hashcode作为KEY,去TreeMap查找,
TreeMap.ceilingEntry()
方法会返回大于或等于给定KEY的最小键值对,即为客户端对应要调用的服务节点。
2.2 服务发现
服务发现的实现思路比较简单,首先找出被调用服务所有的节点列表,然后通过 ServiceLoadBalancer
提供的负载均衡算法,找出一个相应的服务节点。具体代码实现如下:
@Override
public ServiceMeta discovery(String serviceName, int invokerHashCode) throws Exception {
Collection<ServiceInstance<ServiceMeta>> serviceInstances = serviceDiscovery.queryForInstances(serviceName);
ServiceLoadBalancer balancer = new ZKConsistentHashLoadBalancer();
// 基于一致性Hash算法,选择一个服务节点
ServiceInstance<ServiceMeta> instance = (ServiceInstance<ServiceMeta>) balancer.select(new ArrayList(serviceInstances), invokerHashCode);
if (instance != null) {
return instance.getPayload();
}
return null;
}
三、总结
本章,我对RPC框架的负载均衡机制进行了讲解,主要涉及服务注册和服务发现的负载均衡机制的讲解,我基于TreeMap实现了一致性Hash算法,事实上在生成hash值时,可以使用 Google Guava 工具库中的MurmurHash,它具有更好的防Hash碰撞的特性。
服务消费者通过服务发现接口获取到可调用的服务节点后,是通过动态代理机制完成请求调用的,我会在下一章详细讲解。
感谢赞赏~