原创

Kafka源码分析(十二)——Producer:超时问题

KafkaProducer发送消息的过程中可能会出现消息超时的问题。本章,我会从Kafka客户端的底层对该问题进行讲解。

一、超时场景

我们先来看下,哪些情况下会出现超时问题:

  • RecordBatch长时间停留在BufferPool缓冲区中,压根没有被Sender线程获取;
  • Sender线程将消息发送出去了,但是一直没有收到响应,NetworkSend请求长时间积压在InFlightRequests中。

1.1 BufferPool超时

我们先来看第一种情况。Sender线程的运行主流程中有这么一行代码:

// Sender.java

void run(long now) {
       //...

    // 5.剔除超时的batch(默认60s)
    List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
}

我们来看RecordAccumulator的abortExpiredBatches方法,它的处理逻辑如下:

// RecordAccumulator.java

private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
public List<RecordBatch> abortExpiredBatches(int requestTimeout, long now) {
    List<RecordBatch> expiredBatches = new ArrayList<>();
    int count = 0;
    // 1.遍历查找超时RecordBatch
    for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
        Deque<RecordBatch> dq = entry.getValue();
        TopicPartition tp = entry.getKey();
        if (!muted.contains(tp)) {
            synchronized (dq) {
                RecordBatch lastBatch = dq.peekLast();
                Iterator<RecordBatch> batchIterator = dq.iterator();
                while (batchIterator.hasNext()) {
                    RecordBatch batch = batchIterator.next();
                    boolean isFull = batch != lastBatch || batch.isFull();
                    // 判断是否超时
                    if (batch.maybeExpire(requestTimeout, retryBackoffMs, now, this.lingerMs, isFull)) {
                        expiredBatches.add(batch);
                        count++;
                        batchIterator.remove();    //移除
                    } else {
                        break;
                    }
                }
            }
        }
    }
    // 2.触发回调函数
    if (!expiredBatches.isEmpty()) {
        log.trace("Expired {} batches in accumulator", count);
        for (RecordBatch batch : expiredBatches) {
            // 回调
            batch.expirationDone();
            // 回收分配的Buffer
            deallocate(batch);
        }
    }

    return expiredBatches;
}

RecordBatch的expirationDone方法最终会调用内部的done方法,也就是触发回调函数的执行:

// RecordBatch.java

void expirationDone() {
    if (expiryErrorMessage == null)
        throw new IllegalStateException("Batch has not expired");
    this.done(-1L, Record.NO_TIMESTAMP,
              new TimeoutException("Expiring " + recordCount + " record(s) for " + topicPartition + ": " + expiryErrorMessage));
}

public void done(long baseOffset, long logAppendTime, RuntimeException exception) {
    //...
    // execute callbacks
    for (Thunk thunk : thunks) {
        try {
            if (exception == null) {
                RecordMetadata metadata = thunk.future.value();
                thunk.callback.onCompletion(metadata, null);
            } else {
                thunk.callback.onCompletion(null, exception);
            }
        } catch (Exception e) {
            log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);
        }
    }
    produceFuture.done();
}

1.2 InFlightRequests超时

再来看另一种超时的场景,Sender线程的主流程中调用了NetworkClient的poll方法:

// Sender.java

void run(long now) {
    //...
    this.client.poll(pollTimeout, now);
}

NetworkClient的poll方法内部有一段超时逻辑的判断,也就是说如果发现有对Broker的请求超时了,即超过request.timeout.ms(默认60s)还没响应,此时会关闭掉跟那个Broker的连接,认为那个Broker已经故障了 。同时,进行内存数据结构的清理,并再次标记为需要去重新拉取元数据:

// NetworkClient.java

public List<ClientResponse> poll(long timeout, long now) {
    long metadataTimeout = metadataUpdater.maybeUpdate(now);
    try {
        this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
    } catch (IOException e) {
        log.error("Unexpected error during I/O", e);
    }

    //处理超时请求
    handleTimedOutRequests(responses, updatedNow);

    return responses;
}

private void handleTimedOutRequests(List<ClientResponse> responses, long now) {
    // 获取超时的目标Broker
    List<String> nodeIds = this.inFlightRequests
        .getNodesWithTimedOutRequests(now, this.requestTimeoutMs);
    for (String nodeId : nodeIds) {
        // 关闭与该Broker的连接
        this.selector.close(nodeId);
        processDisconnection(responses, nodeId, now);
    }

    // 标记更新元数据
    if (!nodeIds.isEmpty())
        metadataUpdater.requestUpdate();
}

private void processDisconnection(List<ClientResponse> responses, String nodeId, long now) {
    connectionStates.disconnected(nodeId, now);
    nodeApiVersions.remove(nodeId);
    nodesNeedingApiVersionsFetch.remove(nodeId);
    // 清理InFlightRequests中缓存的针对该Broker的请求
    for (InFlightRequest request : this.inFlightRequests.clearAll(nodeId)) {    
        if (request.isInternalRequest && request.header.apiKey() == ApiKeys.METADATA.id)
            metadataUpdater.handleDisconnection(request.destination);
        else
            responses.add(request.disconnected(now));
    }
}

超时逻辑判断:

// InFlightRequests.java

public List<String> getNodesWithTimedOutRequests(long now, int requestTimeout) {
    List<String> nodeIds = new LinkedList<>();
    for (Map.Entry<String, Deque<NetworkClient.InFlightRequest>> requestEntry : requests.entrySet()) {
        String nodeId = requestEntry.getKey();
        Deque<NetworkClient.InFlightRequest> deque = requestEntry.getValue();
        if (!deque.isEmpty()) {
            NetworkClient.InFlightRequest request = deque.peekLast();
            // 当前事件-请求发送事件超过了`request.timeout.ms`
            long timeSinceSend = now - request.sendTimeMs;
            if (timeSinceSend > requestTimeout)
                nodeIds.add(nodeId);
        }
    }
    return nodeIds;
}

二、总结

本章,我对KafkaProducer发送消息的过程中可能会出现的消息超时问题进行讲解,整体分为两种情况:

  1. 请求积压在BufferPool;
  2. 请求积压在InFlightRequests。

无论哪种情况,请求超时的判断逻辑中都涉及参数request.timeout.ms,默认超时时间为60s。同时,超时后最终会触发回调函数的执行。

正文到此结束

感谢赞赏~

本文目录