原创

透彻理解Java网络编程(二二)——Netty实战:通信协议设计

本章,我先对 RPC 请求调用和结果响应两个过程分别进行详细分析,然后基于Netty完成整个RPC框架的通信协议设计和开发。

RPC 请求的过程对于服务消费者来说是出站操作,对于服务提供者来说是入站操作:

  • 服务消费者将 RPC 请求信息封装成 Protocol 对象,然后通过编码器 Encoder 进行二进制编码,最后直接发送至远端即可;
  • 服务提供者收到请求后,将二进制数据交给解码器 Decoder,解码后再次生成 Protocol 对象,然后传递给 Handler 执行真正的请求执行。

RPC 响应的过程对于服务消费者来说是入站操作,对于服务提供者来说是出站操作:

  • 服务提供者将响应结果封装成 Protocol 对象,然后通过 Encoder 编码发送给服务消费者;
  • 服务消费者对响应结果进行解码,然后由 Handler 根据响应结果找到之前的对应请求,最后将响应结果返回。


一、协议设计

本节,我们来看RPC框架的协议设计,主流的 RPC 框架都会自定义通信协议,相比于 HTTP、HTTPS、JSON 等通用的协议,自定义协议可以实现更好的性能、扩展性以及安全性。

我们把协议分为协议头 Header 协议体 Body 两个部分:

  • 协议头 Header 包含魔数、协议版本号、序列化算法、报文类型、状态、消息 ID、数据长度;
  • 协议体 Body 只包含数据内容部分,数据内容的长度是不固定的。
+---------------------------------------------------------------+
| 魔数 2byte | 协议版本号 1byte | 序列化算法 1byte | 报文类型 1byte  |
+---------------------------------------------------------------+
| 状态 1byte |        消息 ID 8byte     |      数据长度 4byte     |
+---------------------------------------------------------------+
|                   数据内容 (长度不定)                          |
+---------------------------------------------------------------+

1.1 协议实体类

RPC 请求和响应都可以使用上述协议进行通信,对应协议实体类的定义如下:

public class RpcProtocol<T> implements Serializable {
    // 协议头
    private MsgHeader header; 
    // 协议体
    private T body; 
}

协议头

public class MsgHeader implements Serializable {
    // 魔数
    private short magic;     
    // 协议版本号
    private byte version; 
    // 序列化算法
    private byte serialization; 
    // 报文类型
    private byte msgType; 
    // 状态
    private byte status; 
    // 消息 ID
    private long requestId; 
    // 数据长度
    private int msgLen; 
}

协议体

在 RPC 请求调用的场景下,RpcProtocol 中泛型 T 对应 RpcRequest 类型,RpcRequest 主要包含 RPC 远程调用需要的必要参数,定义如下:

public class RpcRequest implements Serializable {
    /**
     * 服务版本号
     */
    private String serviceVersion;
    /**
     * 服务类全限定名
     */
    private String className;
    /**
     * 服务接口全限定名
     */
    private String methodName;
    /**
     * 接口参数值
     */
    private Object[] params;
    /**
     * 接口参数类型
     */
    private Class<?>[] parameterTypes;
}

在 RPC 结果响应的场景下,RpcProtocol 中泛型 T 对应 RpcResponse 类型,RpcResponse 实体类的定义如下:

public class RpcResponse implements Serializable {
    // 请求结果
    private Object data;
    // 错误信息
    private String message;
}

1.2 序列化

上述我们定义的都是Java Bean,数据在底层以字节形式传输,所以我们需要使用序列化工具完成Java Bean和二进制数据之间的转换。目前比较常用的序列化算法包括 Json、Kryo、Hessian、Protobuf 等。

首先,我们定义一个通用的序列化接口 RpcSerialization,所有序列化算法扩展都必须实现该接口,RpcSerialization 接口分别提供了序列化 serialize() 和反序列化 deserialize() 方法:

public interface RpcSerialization {
    <T> byte[] serialize(T obj) throws IOException;
    <T> T deserialize(byte[] data, Class<T> clz) throws IOException;
}

接下来,我为 RpcSerialization 提供 Hessian 和 Json 两种序列化算法的实现类。以 HessianSerialization 为例,实现逻辑如下:

@Component
public class HessianSerialization implements RpcSerialization {

    @Override
    public <T> byte[] serialize(T object) {
        if (object == null) {
            throw new NullPointerException();
        }
        byte[] results;

        HessianSerializerOutput hessianOutput;
        try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
            hessianOutput = new HessianSerializerOutput(os);
            hessianOutput.writeObject(object);
            hessianOutput.flush();
            results = os.toByteArray();
        } catch (Exception e) {
            throw new SerializationException(e);
        }

        return results;
    }

    @Override
    public <T> T deserialize(byte[] bytes, Class<T> clz) {
        if (bytes == null) {
            throw new NullPointerException();
        }
        T result;

        try (ByteArrayInputStream is = new ByteArrayInputStream(bytes)) {
            HessianSerializerInput hessianInput = new HessianSerializerInput(is);
            result = (T) hessianInput.readObject(clz);
        } catch (Exception e) {
            throw new SerializationException(e);
        }

        return result;
    }
}

为了能够支持不同序列化算法,我采用简单工厂模式来实现不同序列化算法之间的切换,具体实现如下:

public class SerializationFactory {

    public static RpcSerialization getRpcSerialization(byte serializationType) {
        SerializationTypeEnum typeEnum = SerializationTypeEnum.findByType(serializationType);

        switch (typeEnum) {
            case HESSIAN:
                return new HessianSerialization();
            case JSON:
                return new JsonSerialization();
            default:
                throw new IllegalArgumentException("serialization type is illegal, " + serializationType);
        }
    }
}

二、编解码器

设计完协议之后,我们需要利用Netty中的Encoder和Decoder自动对字节数据进行编解码。

2.1 RpcEncoder

先来看编码器,RpcEncoder用于将Java Bean对象自动编码成字节数据,使用在两个场景:

  • 服务消费者组装完成RpcProtocol<RpcRequest>对象后,调用writeAndFlush()方法发起请求;
  • 服务提供者组装完成RpcProtocol<RpcResponse>对象后,调用writeAndFlush()方法返回响应。

RpcEncoder 继承 MessageToByteEncoder,并重写 encode() 方法:

public class RpcEncoder extends MessageToByteEncoder<RpcProtocol<Object>> {
    @Override
    protected void encode(ChannelHandlerContext ctx,RpcProtocol<Object> msg, ByteBuf byteBuf) throws IOException {
        MsgHeader header = msg.getHeader();
        byteBuf.writeShort(header.getMagic());
        byteBuf.writeByte(header.getVersion());
        byteBuf.writeByte(header.getSerialization());
        byteBuf.writeByte(header.getMsgType());
        byteBuf.writeByte(header.getStatus());
        byteBuf.writeLong(header.getRequestId());

        RpcSerialization rpcSerialization = SerializationFactory.getRpcSerialization(header.getSerialization());
        byte[] data = rpcSerialization.serialize(msg.getBody());
        byteBuf.writeInt(data.length);
        byteBuf.writeBytes(data);
    }
}

2.2 RpcDecoder

再来看解码器,RpcDecoder用于将字节数据自动解码成Java Bean对象,并传递给下一个 Inbound 处理器,使用在两个场景:

  • 服务消费者获取到响应后,将响应字节数据解码成RpcProtocol<RpcResponse>对象;
  • 服务提供者获取到请求后,将请求字节数据解码成RpcProtocol<RpcRequest>对象。

RpcDecoder 继承 MessageToByteDecoder,并重写 decode() 方法:

public class RpcDecoder extends ByteToMessageDecoder {
    @Override
    public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws IOException {
        // 保证读到完整协议头
        if (in.readableBytes() < ProtocolConstants.HEADER_TOTAL_LEN) {
            return;
        }
        // mark读指针
        in.markReaderIndex();
        // 魔数
        short magic = in.readShort();
        if (magic != ProtocolConstants.MAGIC) {
            throw new IllegalArgumentException("magic number is illegal, " + magic);
        }
        // 协议版本号
        byte version = in.readByte();
        // 序列号
        byte serializeType = in.readByte();
        // 报文类型
        byte msgType = in.readByte();
        // 状态
        byte status = in.readByte();
        // 请求ID
        long requestId = in.readLong();
        // 协议体长度
        int dataLength = in.readInt();

        // 协议体是否足够读取
        if (in.readableBytes() < dataLength) {
            // 重置读指针
            in.resetReaderIndex();
            return;
        }
        byte[] data = new byte[dataLength];
        in.readBytes(data);

        MsgType msgTypeEnum = MsgType.findByType((int)msgType);
        if (msgTypeEnum == null) {
            return;
        }
        // 封装协议头对象
        MsgHeader header = new MsgHeader();
        header.setMagic(magic);
        header.setVersion(version);
        header.setSerialization(serializeType);
        header.setStatus(status);
        header.setRequestId(requestId);
        header.setMsgType(msgType);
        header.setMsgLen(dataLength);

        RpcSerialization rpcSerialization = SerializationFactory.getRpcSerialization(serializeType);
        switch (msgTypeEnum) {
            case REQUEST:
                RpcRequest request = rpcSerialization.deserialize(data, RpcRequest.class);
                if (request != null) {
                    RpcProtocol<RpcRequest> protocol = new RpcProtocol<>();
                    protocol.setHeader(header);
                    protocol.setBody(request);
                    out.add(protocol);
                }
                break;
            case RESPONSE:
               RpcResponse response = rpcSerialization.deserialize(data, RpcResponse.class);
                if (response != null) {
                    RpcProtocol<RpcResponse> protocol = new RpcProtocol<>();
                    protocol.setHeader(header);
                    protocol.setBody(response);
                    out.add(protocol);
                }
                break;
            case HEARTBEAT:
                // TODO
                break;
        }
    }
}

三、请求/响应处理

3.1 请求处理

在 RPC 请求调用的场景下,服务提供者的 RpcDecoder 将二进制数据解码成 RpcProtocol<RpcRequest> 对象后,然后交给 RpcRequestHandler 执行 RPC 请求调用。

RpcRequestHandler 是一个 Inbound 处理器,继承 SimpleChannelInboundHandler 并重写 channelRead0() 方法,具体实现如下:

/**
 * RPC请求处理Handler
 */
public class RpcRequestHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcRequest>> {

    private static final Logger LOG = LoggerFactory.getLogger(RpcRequestHandler.class );

    private final Map<String, Object> rpcServiceMap;

    public RpcRequestHandler(Map<String, Object> rpcServiceMap) {
        this.rpcServiceMap = rpcServiceMap;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcRequest> protocol) {

        // 异步处理RPC请求
        RpcRequestProcessor.submitRequest(() -> {
            // 封装响应
            RpcProtocol<RpcResponse> resProtocol = new RpcProtocol<>();

            MsgHeader header = protocol.getHeader();
            header.setMsgType(MsgType.RESPONSE.getType().byteValue());
            resProtocol.setHeader(header);

            RpcResponse response = new RpcResponse();
            resProtocol.setBody(response);

            try {
                // 处理请求(同步调用)
                Object result = handle(protocol.getBody());
                response.setData(result);
                header.setStatus(MsgStatus.SUCCESS.getCode().byteValue());
            } catch (Throwable throwable) {
                header.setStatus( MsgStatus.FAIL.getCode().byteValue());
                response.setMessage(throwable.toString());
                LOG.error("process request {} error", header.getRequestId(), throwable);
            }
            ctx.writeAndFlush(resProtocol);
        });
    }

    /**
     * RPC请求处理
     */
    private Object handle(RpcRequest request) throws Throwable {
        String serviceKey = RpcServiceHelper.buildServiceKey(request.getClassName(), request.getServiceVersion());
        // 获取服务类
        Object serviceBean = rpcServiceMap.get(serviceKey);

        if (serviceBean == null) {
            throw new RuntimeException(String.format("service not exist: %s:%s", request.getClassName(), request.getMethodName()));
        }

        // 通过CGLIB的FastClass机制,执行方法
        Class<?> serviceClass = serviceBean.getClass();
        String methodName = request.getMethodName();
        Class<?>[] parameterTypes = request.getParameterTypes();
        Object[] parameters = request.getParams();

        FastClass fastClass = FastClass.create(serviceClass);
        int methodIndex = fastClass.getIndex(methodName, parameterTypes);
        return fastClass.invoke(methodIndex, serviceBean, parameters);
    }
}

RPC 请求调用是比较耗时的,所以将 RPC 请求提交到自定义的业务线程池中执行。上述的 handle() 方法是真正执行 RPC 调用的地方,我会在后续章节讲解。根据 handle() 的执行情况,RpcProtocol<RpcResponse> 最终会被设置成功或者失败的状态,以及相应的请求结果或者错误信息,最终通过 writeAndFlush() 方法将数据写回服务消费者。

3.2 响应处理

在 RPC 响应处理的场景下,服务消费者的 RpcDecoder 将二进制数据解码成 RpcProtocol<RpcResponse> 对象后,然后交给 RpcResponseHandler 处理。

/**
 * RPC响应处理Handler
 */
public class RpcResponseHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcResponse>> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcResponse> msg) {
        long requestId = msg.getHeader().getRequestId();
        // 获取与请求关联的响应对象
        RpcFuture<RpcResponse> future = RpcServiceHelper.REQUEST_MAP.remove(requestId);
        // 设置结果
        future.getPromise().setSuccess(msg.getBody());
    }
}
/**
 * RPC服务工具类
 */
public class RpcServiceHelper {
    /**
     * 请求ID生成器
     */
    public static final AtomicLong REQUEST_ID_GEN = new AtomicLong(0);

    /**
     * 请求/响应映射缓存
     */
    public static final Map<Long, RpcFuture<RpcResponse>> REQUEST_MAP = new ConcurrentHashMap<>();

    /**
     * 根据服务名和版本号生成Key
     *
     * @param serviceName    服务名称
     * @param serviceVersion 服务版本号
     * @return
     */
    public static String buildServiceKey(String serviceName, String serviceVersion) {
        return String.join("#", serviceName, serviceVersion);
    }
}

服务消费者在发起调用时,维护了请求编号 requestIdRpcFuture<RpcResponse> 的映射关系,RpcResponseHandler 会根据请求的 requestId 找到对应发起调用的 RpcFuture,然后为 RpcFuture 设置响应结果。

/**
 * RPC请求异步封装
 *
 * @param <T>
 */
public class RpcFuture<T> {
    private Promise<T> promise;
    private long timeout;

    public RpcFuture(Promise<T> promise, long timeout) {
        this.promise = promise;
        this.timeout = timeout;
    }

    public Promise<T> getPromise() {
        return promise;
    }

    public void setPromise(Promise<T> promise) {
        this.promise = promise;
    }

    public long getTimeout() {
        return timeout;
    }

    public void setTimeout(long timeout) {
        this.timeout = timeout;
    }
}

上述代码,我采用了 Netty 提供的 Promise 工具实现 RPC 请求的同步等待,Promise 模式本质是一种异步编程模型,我们可以先拿到一个查看任务执行结果的凭证,不必等待任务执行完毕,当我们需要获取任务执行结果时,再使用凭证提供的相关接口进行获取。

四、总结

本章,我对我们的RPC 框架的协议设计以及请求响应的编解码进行了讲解,自定义协议、编解码、序列化/反序列化都是实现远程通信的必备基础知识。下一章,我将对 RPC 框架的负载均衡机制进行讲解。

正文到此结束

感谢赞赏~

本文目录