原创

Kafka源码分析(十一)——Producer:NIO通讯——读响应拆包和粘包

上一章,我讲解了Kafka发送请求时,对拆包问题的处理流程。Kafka客户端会与每个Broker都建立一个TCP长连接,每一个请求都会发送完后才会发送下一个请求,所以一般不存在粘包问题。

但是,当Kafka客户端读取Broker的响应时就不一样了,存在两种情况:

  1. 读取的响应中,包含了多个请求的完整响应,也就是出现了粘包
  2. 读取的响应中,只包含了一个请求的部分响应,此时就需要多次读取,将多次读取的结果进行合并,也就是出现了拆包

本章,我先来讲解Kafka客户端对OP_READ事件的处理流程,然后对读响应的拆包和粘包问题进行分析。

一、读响应

我们先来回顾下Kafka客户端的底层是如何读取响应数据的。本质还是通过Selector.poll()方法,这块大家应该已经很熟悉了:

// Selector.java

private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;    // 按照Broker维度缓存响应请求
private final List<NetworkReceive> completedReceives;                    // 保存每个Broker的最近一个响应请求
public void poll(long timeout) throws IOException {
    //...

    // 1.遍历SelectionKey进行处理
    if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
        pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
        pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
    }
    // 2.将最近一个读取完成的响应,添加到响应列表completedReceives
    addToCompletedReceives();
}

private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
                               boolean isImmediatelyConnected,
                               long currentTimeNanos) {
    // ...
    // Channel中有OP_READ事件发生,且不存在已经读取完毕的响应
    if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
        NetworkReceive networkReceive;    //用于暂存读取的数据
        // 循环从Channel读取数据,networkReceive!=null,表示一个完整响应读取完成
        while ((networkReceive = channel.read()) != null)
            // 将读取到的数据添加到“接受队列”中
            addToStagedReceives(channel, networkReceive);
    }
}

private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {
    // 不存在则首次创建
    if (!stagedReceives.containsKey(channel))
        stagedReceives.put(channel, new ArrayDeque<NetworkReceive>());
    // 将响应添加到队列
    Deque<NetworkReceive> deque = stagedReceives.get(channel);
    deque.add(receive);
}

private void addToCompletedReceives() {
    if (!this.stagedReceives.isEmpty()) {
        Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator();
        while (iter.hasNext()) {
            Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next();
            KafkaChannel channel = entry.getKey();
            if (!channel.isMute()) {
                Deque<NetworkReceive> deque = entry.getValue();
                addToCompletedReceives(channel, deque);
                if (deque.isEmpty())
                    iter.remove();
            }
        }
    }
}

private void addToCompletedReceives(KafkaChannel channel, Deque<NetworkReceive> stagedDeque) {
    // 获取最近一个读取完成的响应
    NetworkReceive networkReceive = stagedDeque.poll();
    this.completedReceives.add(networkReceive);
    this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit());
}

1.1 读取流程

Selector.poll主流程中的KafkaChannel.read()方法,它从底层的SocketChannel中读取字节并封装成一个NetworkReceive对象,缓存在KafkaChannel的receive字段中,代表当前正在读取的响应。如果一个响应全部读取完毕,就返回结果并清除receive缓存,这个逻辑是上一章讲解的发送请求的逻辑是类似的,主要是因为存在拆包的情况:

// KafkaChannel.java

private NetworkReceive receive;        //缓存当前在读的响应对象
public NetworkReceive read() throws IOException {
    NetworkReceive result = null;

    // 初始化NetworkReceive,maxReceiveSize就是一次最大能接受的字节,id标识一个Broker
    if (receive == null) {
        receive = new NetworkReceive(maxReceiveSize, id);
    }

    // 读取字节
    receive(receive);

    // 如果一个响应全部读取完毕,就返回结果并清除receive缓存
    if (receive.complete()) {
        receive.payload().rewind();    //消息体的position置为0
        result = receive;
        receive = null;    //清除请求缓存
    }
    return result;
}

// 调用NetworkReceive的方法,从底层的SocketChannel读取字节
private long receive(NetworkReceive receive) throws IOException {
    return receive.readFrom(transportLayer);
}

从上面的代码可以看到,真正读取字节的行为是调用了NetworkReceive.readFrom()方法。NetworkReceive是什么呢?我们接着看。

1.2 NetworkReceive

NetworkReceive代表了一个完整的响应请求,它包含了一个4字节的消息头,以及一个消息体,而消息头的内容就是消息体的大小。它对应了我们上一章讲解的发送请求的NetworkSend对象:

// NetworkReceive.java

public class NetworkReceive implements Receive {

    public final static String UNKNOWN_SOURCE = "";
    public final static int UNLIMITED = -1;
    // Broker ID
    private final String source;
    // 消息头Buffer
    private final ByteBuffer size;
    // 最大消息体大小
    private final int maxSize;
    // 消息体Buffer
    private ByteBuffer buffer;

    public NetworkReceive(int maxSize, String source) {
        this.source = source;
        this.size = ByteBuffer.allocate(4);
        this.buffer = null;
        this.maxSize = maxSize;
    }

    @Override
    public boolean complete() {
        // 消息头写满4字节且消息体写满,才算读取完一个完整请求
        return !size.hasRemaining() && !buffer.hasRemaining();
    }

    public long readFrom(ScatteringByteChannel channel) throws IOException {
        return readFromReadableChannel(channel);
    }

    public long readFromReadableChannel(ReadableByteChannel channel) throws IOException {
        int read = 0;
        if (size.hasRemaining()) {    //如果消息头Buffer有空余
            // 读取字节到消息头Buffer,如果出现拆包,可能不足4字节
            int bytesRead = channel.read(size);
            if (bytesRead < 0)
                throw new EOFException();
            read += bytesRead;
            if (!size.hasRemaining()) {    //如果消息头Buffer没有空余(已读满)
                // 计算消息体的大小
                size.rewind();
                int receiveSize = size.getInt();
                if (receiveSize < 0)
                    throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
                if (maxSize != UNLIMITED && receiveSize > maxSize)
                    throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
                // 为消息体分配Buffer
                this.buffer = ByteBuffer.allocate(receiveSize);
            }
        }

        if (buffer != null) {
            // 读取消息体
            int bytesRead = channel.read(buffer);
            if (bytesRead < 0)
                throw new EOFException();
            read += bytesRead;
        }

        return read;
    }

    // 返回消息体
    public ByteBuffer payload() {
        return this.buffer;
    }
}

上述这块代码完美解决了拆包和粘包的问题。

我们先来看第一种“粘包”的情况,也就是说响应中包含了多个不同请求的响应:

  1. Channel一次性读取满数据到消息头;
  2. 计算消息头的数值,得到消息体的大小size;
  3. 一次性读取size大小的消息体;
  4. 这样一个完整的响应就读完了,Selector.pollSelectionKeys()中的While循环就会退出,响应也会被添加到completedReceives列表中。

我们再来看另一种“拆包”的情况,也就是说一次读取并没有完整读取到一个请求,可能只读取了部分消息头,或者部分消息体:

  1. Channel尝试一次性读满消息头Buffer,但是很遗憾没读满,因为出现了粘包;
  2. Selector.pollSelectionKeys()中的While循环不会退出,继续下一轮;
  3. KafkaChannel中缓存了当前正在读取的响应对象receive,继续调用该对象的readFromReadableChannel方法,完成响应的读取;
  4. 全部读取完成后,Selector.pollSelectionKeys()中的While循环就会退出,响应也会被添加到completedReceives列表中。

二、处理响应

通过NetworkReceive完成响应的读取之后,Selector会将每一个Broker的多个响应都缓存到一个Map中,然后遍历这个Map,将每个Broker的最近一个响应请求添加到completedReceives队列

2.1 缓存最近响应

缓存的最近响应列表存放在Selector的completedReceives字段中:

// Selector.java

private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;    // 按照Broker维度缓存响应请求
private final List<NetworkReceive> completedReceives;                    // 保存每个Broker的最近一个响应请求
public void poll(long timeout) throws IOException {
    //...

    // 1.遍历SelectionKey进行处理
    if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
        pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
        pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
    }
    // 2.将最近一个读取完成的响应,添加到响应列表completedReceives
    addToCompletedReceives();
}

private void addToCompletedReceives(KafkaChannel channel, Deque<NetworkReceive> stagedDeque) {
    // 获取最近一个读取完成的响应
    NetworkReceive networkReceive = stagedDeque.poll();
    this.completedReceives.add(networkReceive);
    this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit());
}

2.2 解析响应

现在所有读取完毕的最近响应都已经存放到completedReceives列表中了,我们来看下Kafka客户端是如何处理这些响应的:

// NetworkClient.java

public List<ClientResponse> poll(long timeout, long now) {
    try {
        this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
    } catch (IOException e) {
        log.error("Unexpected error during I/O", e);
    }

    // 处理响应
    handleCompletedReceives(responses, updatedNow);

    // 触发回调函数
    for (ClientResponse response : responses) {
        try {
            response.onComplete();
        } catch (Exception e) {
            log.error("Uncaught error in request completion:", e);
        }
    }
    return responses;
}

private void handleCompletedReceives(List<ClientResponse> responses, long now) {
    // 遍历响应对象
    for (NetworkReceive receive : this.selector.completedReceives()) {
        String source = receive.source();    //Broker ID
        // 1.获取针对该Broker缓存的最早(oldest)发送的请求
        InFlightRequest req = inFlightRequests.completeNext(source);

        // 2.解析响应对象
        AbstractResponse body = parseResponse(receive.payload(), req.header);

        if (req.isInternalRequest && body instanceof MetadataResponse)    // 元数据更新响应
            metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
        else if (req.isInternalRequest && body instanceof ApiVersionsResponse)    // API版本响应
            handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);
        else
            // 3.添加到
            responses.add(req.completed(body, now));
    }
}

public static AbstractResponse parseResponse(ByteBuffer responseBuffer, RequestHeader requestHeader) {
    // 解析响应内容头
    ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer);
    short apiKey = requestHeader.apiKey();
    short apiVer = requestHeader.apiVersion();
    // 解析响应内容体
    Struct responseBody = ProtoUtils.responseSchema(apiKey, apiVer).read(responseBuffer);
    // 匹配发送请求和响应
    correlate(requestHeader, responseHeader);
    // 创建一个ClientResponse对象并返回
    return AbstractResponse.getResponse(apiKey, responseBody);
}

// 判断请求与响应是否配对
private static void correlate(RequestHeader requestHeader, ResponseHeader responseHeader) {
    // correlation_id,是全局唯一的,用来标识一次请求的
    if (requestHeader.correlationId() != responseHeader.correlationId())
        throw new IllegalStateException("Correlation id for response (" + responseHeader.correlationId()
                                        + ") does not match request (" + requestHeader.correlationId() + "), request header: " + requestHeader);
}

上述handleCompletedReceives方法的处理逻辑如下:

  1. 首先,遍历响应对象列表completedReceives,按照下面的步骤处理每一个响应;
  2. 找到该响应来自哪个Broker,从InFlightRequest移除针对该Broker缓存的最早(oldest)发送的请求
  3. 解析响应内容,对响应头中的correlationId和发送请求头中的correlationId进行匹配,如果不能配对则抛出异常;
  4. 根据解析的响应内容,创建一个ClientResponse对象并返回。

这里关键要理解一点:正常情况下InFlightRequest中缓存的最早(oldest)发送的请求与该Broker的最新一个响应是配对的

2.3 处理回调

解析完响应之后,会遍历ClientResponse列表,调用回调处理器RequestCompletionHandler进行处理:

// NetworkClient.java

public List<ClientResponse> poll(long timeout, long now) {
    //...

    // 调用回调处理器
    for (ClientResponse response : responses) {
        try {
            response.onComplete();
        } catch (Exception e) {
            log.error("Uncaught error in request completion:", e);
        }
    }
    return responses;
}

ClientResponse里面包含了请求头信息和响应内容:

public class ClientResponse {
    private final RequestHeader requestHeader;            // 请求头
    private final RequestCompletionHandler callback;    // 回调处理
    private final String destination;                    // Broker ID
    private final long receivedTimeMs;
    private final long latencyMs;
    private final boolean disconnected;
    private final RuntimeException versionMismatch;
    private final AbstractResponse responseBody;        // 响应内容

    public void onComplete() {
        if (callback != null)
            callback.onComplete(this);
    }
}

RequestCompletionHandler是在Sender线程内部创建的,我们来看下RequestCompletionHandler.onComplete()方法,其实就是遍历响应对象,然后关联到发送请求时的RecordBatch,最后触发我们自定义的回调函数:

// Sender.java

private void sendProduceRequest(long now, int destination, short acks, 
                                int timeout, List<RecordBatch> batches) {
    RequestCompletionHandler callback = new RequestCompletionHandler() {
        public void onComplete(ClientResponse response) {
            handleProduceResponse(response, recordsByPartition, time.milliseconds());
        }
    };
}

private void handleProduceResponse(ClientResponse response, Map<TopicPartition, RecordBatch> batches,
                                   long now) {
    // 根据响应的不同状态进行处理
    if (response.wasDisconnected()) {
        //...
    } else if (response.versionMismatch() != null) {
        //...        
    } else {
        // 正常响应
        if (response.hasResponse()) {    
            ProduceResponse produceResponse = (ProduceResponse) response.responseBody();
            // 遍历响应对象
            for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry :
                 produceResponse.responses().entrySet()) {
                TopicPartition tp = entry.getKey();        // 分区
                ProduceResponse.PartitionResponse partResp = entry.getValue();
                // batches是在发送请求的时候组装的,可根据分区匹配到RecordBatch
                RecordBatch batch = batches.get(tp);
                // 处理
                completeBatch(batch, partResp, correlationId, now);
            }
        }
        // 不需要响应的情况(acks==0)
        else {
            for (RecordBatch batch : batches.values()) {
                completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NONE), correlationId, now);
            }
        }
    }
}

private void completeBatch(RecordBatch batch, ProduceResponse.PartitionResponse response,
                           long correlationId, long now) {
    Errors error = response.error;
    // 1.异常情况,且可以重试
    if (error != Errors.NONE && canRetry(batch, error)) {
        // 重试Batch需要重新入缓冲区(放入队列头),后面重新发送
        this.accumulator.reenqueue(batch, now);
    } 
    else {
        RuntimeException exception;
        if (error == Errors.TOPIC_AUTHORIZATION_FAILED)
            exception = new TopicAuthorizationException(batch.topicPartition.topic());
        else
            exception = error.exception();
        // 2.正常情况,完成回调处理
        batch.done(response.baseOffset, response.logAppendTime, exception);
        // 释放缓冲区对应Buffer
        this.accumulator.deallocate(batch);
        if (error != Errors.NONE)
            this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
    }
    // 3.元数据异常
    if (error.exception() instanceof InvalidMetadataException) {
        if (error.exception() instanceof UnknownTopicOrPartitionException)
        // 需要重新更新元数据
        metadata.requestUpdate();
    }

    if (guaranteeMessageOrder)
        this.accumulator.unmutePartition(batch.topicPartition);
}

最后,来看RecordBatch.done(),其实就是设置一些字段值,然后触发我们自定义的回调函数:

// RecordBatch.java

public void done(long baseOffset, long logAppendTime, RuntimeException exception) {
    log.trace("Produced messages to topic-partition {} with base offset offset {} and error: {}.",
              topicPartition, baseOffset, exception);

    if (completed.getAndSet(true))
        throw new IllegalStateException("Batch has already been completed");

    // Set the future before invoking the callbacks as we rely on its state for the `onCompletion` call
    produceFuture.set(baseOffset, logAppendTime, exception);

    // 执行回调函数
    for (Thunk thunk : thunks) {
        try {
            // 1.正常请求
            if (exception == null) {
                // 响应元数据
                RecordMetadata metadata = thunk.future.value();
                // 执行回调方法
                thunk.callback.onCompletion(metadata, null);
            } 
            // 2.存在异常(可能是连续重试都失败)
            else {
                thunk.callback.onCompletion(null, exception);
            }
        } catch (Exception e) {
            log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);
        }
    }
    produceFuture.done();
}

三、总结

本章,我对Kafka客户端处理响应的流程和底层原理进行了详细分析,读者需要关注的重点是Kafka客户端是如何对响应的拆包和粘包问题进行处理的,以及解析完响应内容后的后续处理流程,包含回调、异常、缓冲区处理等等。

Kafka客户端的KafkaChannel缓存了最近读取响应的NetworkReceive对象,只要一个完整的请求没有全部读取完,就不会移除对该响应的缓存。

正文到此结束

感谢赞赏~

本文目录