原创

透彻理解Java网络编程(二一)——Netty实战:服务发布与订阅

从本章开始,我将正式进入 RPC 框架的原型开发阶段。我会从服务发布与订阅、通信协议设计、负载均衡机制、动态代理四个方面详细地讲解一个通用 RPC 框架的实现过程。

本章,我主要完成整个工程的搭建,以及服务发布与订阅功能的源码实现。

工程源码存放在Gitee,需要的童鞋请自行下载。

一、工程搭建

整个工程使用 Spring Boot 2.1.12.RELEASE + JDK 1.8.0_221 + Netty 4.1.65.Final 的技术栈,并使用 Zookeeper 3.4.14 作为注册中心。

Zookeeper的搭建我不再赘述,参考官网或我的专栏《分布式系统从理论到实战》

1.1 项目结构

整个工程划分为七个模块:



  • rpc-provider:服务提供者,负责发布 RPC 服务,接收和处理 RPC 请求;
  • rpc-consumer:服务消费者,使用动态代理发起 RPC 远程调用,帮助使用者来屏蔽底层网络通信的细节;
  • rpc-registry:注册中心模块,提供服务注册、服务发现、负载均衡的基本功能;
  • rpc-protocol:网络通信模块,包含 RPC 协议的编解码器、序列化和反序列化工具等;
  • rpc-core:基础类库,提供通用的工具类以及模型定义,例如 RPC 请求和响应类、RPC 服务元数据类等;
  • rpc-facade:包含RPC 服务接口的存根,即服务提供者需要对外暴露的接口;
  • rpc-test:测试工程,用于RPC框架的自测。

各个模块之间的依赖关系,如下图:



1.2 使用方式

参考Dubbo的使用方式,我们的RPC框架提供了两个核心注解:

  • @RpcService:rpc-provider模块中的服务提供方,通过@RpcService注解暴露 RPC 服务;
  • @RpcReference:rpc-consumer模块中的服务消费方,通过@RpcReference注解订阅 RPC 服务。

我接下来以一个示例,讲解该RPC框架的使用。

首先,服务提供方定义需要暴露的接口:

public interface HelloService {
    String hello(String name);
}

接着,服务提供方实现服务:

@RpcService(service = HelloService.class, version = "1.0.0")
public class HelloServiceImpl implements HelloService {
    @Override
    public String hello(String name) {
        return "hello" + name;
    }
}

最后,服务消费方引用接口存根,并订阅自己需要的服务:

@RestController
public class HelloController {
    @RpcReference(version = "1.0.0", timeout = 8000)
    private HelloService helloService;

    @RequestMapping(value = "/hello", method = RequestMethod.GET)
    public String sayHello() {
        return helloService.hello("hello world");
    }
}

二、服务发布

我们先从模块rpc-provider入手,实现服务发布的整体流程,部分细节实现我会在后续章节讲解。rpc-provider 一共包含四个核心流程:

  • 启动服务,并暴露服务端口;
  • 基于@RpcService注解,扫描需要对外发布的服务,并将服务元数据信息发布到注册中心;
  • 接收 RPC 请求,解码后得到请求消息;
  • 提交请求至自定义线程池进行处理,并将处理结果写回客户端。

2.1 注解定义

服务提供者,需要定义发布的服务类型、版本等属性,主流的 RPC 框架都支持以 XML 或者注解方式定义。这里,采用注解方式来实现。

@RpcService

首先,定义 @RpcService 注解,作用于代表服务提供方的类上:

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Component
public @interface RpcService {

    Class<?> service() default Object.class;

    String version() default "1.0";
}

@RpcService 使用了@Component注解,所以可以将对象注入到Spring 容器中进行管理,那么 serviceversion的属性值怎么才能和 Bean 关联起来呢?这就需要我们对 Spring Bean 的生命周期以及 Bean 的可扩展点有所了解。

Spring 的 BeanPostProcessor 接口给提供了对 Bean 进行再加工的扩展点,常用于处理自定义注解。自定义的 Bean 可以通过实现 BeanPostProcessor 接口,在 Bean 实例化的前后加入自定义的逻辑处理。

所以,我们通过一个 RpcProviderInitializer 类,完成容器启动过程中的服务Bean装配,该类实现了 BeanPostProcessor 接口,可以对@RpcService注解的服务进行自定义处理:

/**
 * RPC服务初始化器
 */
public class RpcProviderInitializer implements InitializingBean, BeanPostProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(RpcProviderInitializer.class);

    private final Integer serverPort;
    private final String serverAddress;
    private final RegistryService serviceRegistry;
    /**
     * 服务实例缓存<服务Key,服务实例对象>
     */
    private final Map<String, Object> rpcServiceMap = new ConcurrentHashMap<>();

    public RpcProviderInitializer(Integer serverPort, RegistryService serviceRegistry) {
        try {
            this.serverAddress = InetAddress.getLocalHost().getHostAddress();
        } catch (UnknownHostException e) {
            throw new RuntimeException("unknown host", e);
        }
        this.serverPort = serverPort;
        this.serviceRegistry = serviceRegistry;
    }

    @Override
    public void afterPropertiesSet() {
        new Thread(() -> {
            try {
                startRpcServer();
            } catch (Exception e) {
                LOG.error("start rpc server error.", e);
            }
        }).start();
    }

    private void startRpcServer() throws Exception {
        // 基于Netty启动RPC服务Server
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(boss, worker)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) {
                            socketChannel.pipeline()
                                    .addLast(new RpcEncoder())
                                    .addLast(new RpcDecoder())
                                    .addLast(new RpcRequestHandler(rpcServiceMap));
                        }
                    })
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture channelFuture = bootstrap.bind(this.serverAddress, this.serverPort).sync();
            LOG.info("server addr {} started on port {}", this.serverAddress, this.serverPort);
            channelFuture.channel().closeFuture().sync();
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        // 如果类注解了@RpcService,则执行服务注册
        RpcService rpcService = bean.getClass().getAnnotation(RpcService.class);
        if (rpcService != null) {
            // 服务类的全限定名
            String serviceName = rpcService.service().getName();
            // 服务版本号
            String serviceVersion = rpcService.version();

            try {
                // 创建服务元数据对象
                ServiceMeta serviceMeta = new ServiceMeta();
                serviceMeta.setAddress(serverAddress);
                serviceMeta.setPort(serverPort);
                serviceMeta.setService(serviceName);
                serviceMeta.setVersion(serviceVersion);
                // 注册服务
                serviceRegistry.register(serviceMeta);
                // 缓存服务
                String serviceKey = RpcServiceHelper.buildServiceKey(serviceMeta.getService(), serviceMeta.getVersion());
                rpcServiceMap.put(serviceKey, bean);
            } catch (Exception e) {
                LOG.error("failed to register service {}#{}", serviceName, serviceVersion, e);
            }
        }
        return bean;
    }
}

上述 RpcProviderInitializer 类重写了 BeanPostProcessor.postProcessAfterInitialization 方法,对所有初始化完成后的 Bean 进行扫描。如果 Bean 包含 @RpcService 注解,那么获取注解上的信息,然后创建 ServiceMeta 对象,将服务元数据发布至注册中心,注册中心的实现我先暂且跳过,后续章节讲解。

RpcProviderInitializer 维护了一个 rpcServiceMap,缓存服务对象,在处理 RPC 请求时可以直接通过 rpcServiceMap 拿到对应的服务对象进行调用。

2.2 参数配置

服务提供方需要配置参数,我们不应该把这些参数写死在代码里,一般通过配置文件方式进行输入。我们定义三个参数,分别为:RPC服务端口 servicePort、注册中心地址 registryAddr 和注册中心类型 registryType,然后使用 Spring Boot 的 @ConfigurationProperties注解实现配置项的加载:

@ConfigurationProperties(prefix = "rpc")
public class RpcConfigProperties {

    /**
     * RPC服务端口
     */
    private int servicePort;

    /**
     * 服务注册中心地址
     */
    private String registryAddr;

    /**
     * 服务注册中心类型
     */
    private String registryType;

    //...省略get/set
}

@ConfigurationProperties通过 prefix 属性指定配置参数的前缀,默认会与全局配置文件 application.properties 或者 application.yml 中的参数进行一一绑定。如果你想自定义一个配置文件,可以通过@PropertySource注解指定配置文件的位置。

下面我在 rpc-provider 模块的 resources 目录下创建全局配置文件 application.properties,并配置以上三个参数:

rpc.service.port=2781
rpc.registry.type=ZOOKEEPER
rpc.registry.addr=127.0.0.1:2181

注意,只配置 @ConfigurationProperties 注解,Spring 容器并不能获取配置文件的内容并映射为对象,需要与@EnableConfigurationProperties 注解配合使用,该注解的作用是将声明了 @ConfigurationProperties 注解的类注入为 Spring 容器中的 Bean。具体用法如下:

@Configuration
@EnableConfigurationProperties(RpcConfigProperties.class)
public class RpcProviderAutoConfiguration {

    @Resource
    private RpcConfigProperties rpcProperties;

    @Bean
    public RpcProviderInitializer init() throws Exception {
        RegistryType type = RegistryType.valueOf(rpcProperties.getRegistryType());
        RegistryService serviceRegistry = RegistryFactory.getInstance(rpcProperties.getRegistryAddr(), type);
        return new RpcProviderInitializer(rpcProperties.getServicePort(), serviceRegistry);
    }
}

上述的@Configuration注解主要用于定义配置类,配置类内部可以包含多个@Bean注解的方法,用于创建Bean并注入到Spring容器中,这样就替换了传统的 XML 定义Bean方式。

三、服务订阅

服务消费者的实现要复杂一些,对于声明了 @RpcReference 注解的成员变量,我们需要通过动态代理,构造出一个可以真正可以进行 RPC 调用的 Bean,然后将它注册到 Spring 容器中。

3.1 注解定义

@RpcReference

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
@Autowired
public @interface RpcReference {

    String version() default "1.0";

    String registryType() default "ZOOKEEPER";

    String registryAddr() default "127.0.0.1:2181";

    long timeout() default 5000;
}

@RpcReference 注解提供了服务版本 version、注册中心类型 registryType、注册中心地址 registryAddr 和超时时间 timeout 四个属性。接下来,我们需要使用这些属性,构造出一个自定义的 Bean,然后对该 Bean 的所有方法调用进行拦截。

3.2 Bean定义信息

由于@RpcReference注解是作用在类的字段上的,所以我们就得思考下,如何创建服务接口类对应的对象呢?比如,下面的HelloService接口:

public class HelloController {

    @RpcReference(version = "1.0.0", timeout = 8000)
    private HelloService helloService;

    //...
}

Spring 的 FactoryBean 接口,可以看成是创建Bean的工厂,可以帮助我们实现自定义 Bean 的创建,它的 getObject()方法用于返回一个对象。所以,我们可以将代理类对象的创建逻辑放在FactoryBean实现类中:

public class RpcReferenceBean implements FactoryBean<Object> {

    private Class<?> interfaceClass;

    private String serviceVersion;

    private String registryType;

    private String registryAddr;

    private long timeout;

    private Object object;

    @Override
    public Object getObject() {
        return object;
    }

    /**
     * FactoryBean创建Bean对象时,会调用该方法对对象进行初始化
     * @throws Exception
     */
    public void init() throws Exception {
        // 创建服务提供方接口的代理对象
        RegistryService registryService = RegistryFactory.getInstance(this.registryAddr, RegistryType.valueOf(this.registryType));
        this.object = Proxy.newProxyInstance(
                interfaceClass.getClassLoader(),
                new Class<?>[]{interfaceClass},
                new RpcInvokerProxy(serviceVersion, timeout, registryService));
    }
    //...省略get/set
}

我们可以使用 Spring 的 BeanFactoryPostProcessor 扫描所有引用@RpcReference注解的类字段,生成自定义的RpcReferenceBean对象,注入到容器中,然后Spring容器会根据Bean定义信息创建对象。

BeanFactoryPostProcessor 是 Spring 容器加载 Bean 的定义之后以及 Bean 实例化之前执行,所以 BeanFactoryPostProcessor 可以在 Bean 实例化之前获取 Bean 的配置元数据,并允许用户对其修改。而 BeanPostProcessor 是在 Bean 初始化 前后执行,它并不能修改 Bean 的配置信息。

@Component
public class RpcConsumerPostProcessor implements ApplicationContextAware, BeanClassLoaderAware, BeanFactoryPostProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(RpcConsumerPostProcessor.class);

    private ApplicationContext context;

    private ClassLoader classLoader;

    /**
     * Bean定义信息缓存:<服务提供方的类全限定名,Bean定义对象>
     */
    private final Map<String, BeanDefinition> rpcRefBeanDefinitions = new LinkedHashMap<>();

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.context = applicationContext;
    }

    @Override
    public void setBeanClassLoader(ClassLoader classLoader) {
        this.classLoader = classLoader;
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        // 遍历Spring容器中所有的类
        for (String beanDefinitionName : beanFactory.getBeanDefinitionNames()) {
            BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanDefinitionName);
            String beanClassName = beanDefinition.getBeanClassName();
            if (beanClassName != null) {
                Class<?> clazz = ClassUtils.resolveClassName(beanClassName, this.classLoader);
                ReflectionUtils.doWithFields(clazz, new ReflectionUtils.FieldCallback() {
                    @Override
                    public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {
                        // 对类的字段引用进行处理
                        RpcConsumerPostProcessor.this.parseRpcReference(field);
                    }
                });
            }
        }

        BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;
        this.rpcRefBeanDefinitions.forEach((beanName, beanDefinition) -> {
            if (context.containsBean(beanName)) {
                throw new IllegalArgumentException("spring context already has a bean named " + beanName);
            }
            registry.registerBeanDefinition(beanName,beanDefinition);
            LOG.info("registered RpcReferenceBean {} success.", beanName);
        });
    }

    private void parseRpcReference(Field field) {
        // 如果字段使用了@RpcReference注解,则进行处理
        RpcReference annotation = AnnotationUtils.getAnnotation(field, RpcReference.class);
        if (annotation != null) {
            // 创建Bean定义对象
            BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(RpcReferenceBean.class);
            builder.setInitMethodName("init");
            builder.addPropertyValue("interfaceClass", field.getType());
            builder.addPropertyValue("serviceVersion", annotation.version());
            builder.addPropertyValue("registryType", annotation.registryType());
            builder.addPropertyValue("registryAddr", annotation.registryAddr());
            builder.addPropertyValue("timeout", annotation.timeout());

            BeanDefinition beanDefinition = builder.getBeanDefinition();
            rpcRefBeanDefinitions.put(field.getName(), beanDefinition);
        }
    }
}

RpcConsumerPostProcessor 类重写了 BeanFactoryPostProcessor.postProcessBeanFactory 方法,从 beanFactory 中获取所有 Bean 定义信息,然后对每个 Bean 的所有 field 进行检测,如果 field 声明了 @RpcReference 注解,则进行如下处理:

  1. 通过 BeanDefinitionBuilder 构造 RpcReferenceBean 的定义,并为 RpcReferenceBean 的成员变量赋值,同时执行了创建对象时的初始化方法init,我们就是在该方法中创建代理对象的;
  2. 构造完 RpcReferenceBean 的定义之后,将RpcReferenceBean 的 BeanDefinition 重新注册到 Spring 容器中。

四、总结

本章,我对RPC框架的工程结构进行了讲解,着重介绍了服务提供者使用@RpcService注解是如何发布服务的,服务消费者使用@RpcReference注解是如何订阅服务的,特别要注意 @RpcReference 注解,被该注解修饰的field变量都会被构造成 RpcReferenceBean,然后为该RpcReferenceBean生成BeanDefinition对象注入到Spring容器中。

本章关于@RpcReference和@RpcService这两个注解的开发其实是很有参考意义的,我们的应用可以基于Spring框架开发各类自定义注解,封装出一套自己的开发框架,这在实际项目开发过程中是很常见的。

正文到此结束

感谢赞赏~

本文目录