原创

Kafka源码分析(六)——Producer:Sender线程——Batch筛选

KafkaProducer通过组件RecordAccumulator将消息按照批次缓存后,就立马返回了,也就是说消息的发送是异步的。那么,到底是谁负责从缓存中获取各个batch消息,然后通过网络组件发送给Broker呢?

没错,就是Sender线程。本章,我就来讲解Sender线程是如何完成消息发送的。

注:前面章节我讲解过KafkaProdcer的初始化流程,我们应该已经知道Sender本质是一个Runnable任务,实际创建的线程是“KafkaThread”,我这里习惯叫做”Sender线程“。

一、整体流程

我们先来回顾下Sender线程的整体处理流程,然后再逐节分析内部的细节。Sender线程启动后,会在一个Loop循环中不断执行run方法:

// Sender.java

void run(long now) {
    // Cluster包含了元数据
    Cluster cluster = metadata.fetch();

    // 1.获取已经有消息就绪的Partition对应的Broker
    RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

    // 2.如果有Partition对应的元数据都没拉取到,就标识一下,后续需要尝试拉取元数据
    if (!result.unknownLeaderTopics.isEmpty()) {
        for (String topic : result.unknownLeaderTopics)
            this.metadata.add(topic);
        this.metadata.requestUpdate();
    }

    // 3.检查与Broker的连接状态
    Iterator<Node> iter = result.readyNodes.iterator();
    long notReadyTimeout = Long.MAX_VALUE;
    while (iter.hasNext()) {
        Node node = iter.next();
        // 没有建立好连接就移除
        if (!this.client.ready(node, now)) {
            iter.remove();
            notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
        }
    }

    // 4.按照Broker维度,对Partition进行分组
    Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
                                                                     this.maxRequestSize, now);
    if (guaranteeMessageOrder) {
        for (List<RecordBatch> batchList : batches.values()) {
            for (RecordBatch batch : batchList)
                this.accumulator.mutePartition(batch.topicPartition);
        }
    }

    // 5.剔除超时的batch(默认60s)
    List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
    for (RecordBatch expiredBatch : expiredBatches)
        this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);

    sensors.updateProduceRequestMetrics(batches);

    // 6.计算超时时间
    long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
    if (!result.readyNodes.isEmpty()) {
        log.trace("Nodes with data ready to send: {}", result.readyNodes);
        pollTimeout = 0;
    }

    // 7.创建ClientReqeust对象,包括了多个Batch,形成一个请求
    sendProduceRequests(batches, now);

    // 8.负责真正的网络请求发送
    this.client.poll(pollTimeout, now);
}

我来解释下上述的各个步骤:

  1. 首先,获取Cluster对象,里面包含了缓存的Broker集群的元数据信息(也可能没有元数据);
  2. 接着,需要判断哪些Broker上的Partition已经准备好发送数据了,比如:
    • 已经有写满的batch(16kb)的Partition;
    • batch创建时间已经超过了linger.ms的Partition。
  3. 第二步筛选完后,可能有一些Partition不知道Leader信息,对于这种情况,后面需要重新拉取元数据;
  4. 接着,检查这些数据已经就绪的Broker,看看客户端是否已经与它们建立了长连接,如果没有则建立连接;
  5. 接着,还需要对数据重新按照Broker维度分组:<Integer, List<RecordBatch>>,一个Broker可以对应多个Partition的Batch;
  6. 剔除超时的batch(默认60s),也就是说如果有batch在内存缓冲区里停留超过60s,就丢弃掉;
  7. 然后,针对每个要发送的Broker,创建一个ClientReqeust对象,里面包括了多个Batch,形成一个请求,后面会将它发送给Broker;
  8. 最后,通过NetWorkClient网络通信组件,发送实际的网络I/O通信请求,同时也读取响应结果;

以上就是Sender线程运行的整体流程,接下来我们分步骤来看内部实现细节。

二、就绪Batch筛选

首先来看RecordAccumulator是如何筛选哪些Broker上的Partition已经准备好发送数据了:

// Sender.java

// 1.获取已经可以发送消息的Partition
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

核心就是RecordAccumulator的ready方法,该方法会筛选出就绪的Leader Partition所在的Broker节点,ReadyCheckResult本质是一个包装Bean对象,封装了以下内容:

public final static class ReadyCheckResult {
    // 就绪Partition Leader所在的Broker
    public final Set<Node> readyNodes;
    // 下一次进行就绪筛选要等待的时间
    public final long nextReadyCheckDelayMs;
    // 未知Leader的Topic
    public final Set<String> unknownLeaderTopics;
}

2.1 流程

我们再来看RecordAccumulator.ready()的处理流程,它的核心思路就是:

  1. 遍历缓存的map<TopicPartition, Deque<RecordBatch>>,针对每一个Partition,获取队列头的batch,检查该batch是否符合就绪条件
  2. 如果符合就绪条件,就把Partition对应的Leader Partition所在的Broker加入结果集;
  3. 如果不符合就绪条件,就计算出下一次处理的时间nextReadyCheckDelayMs,那么Sender线程后续会等待该时间之后再来检查。
// RecordAccumulator.java

public ReadyCheckResult ready(Cluster cluster, long nowMs) {
    // 定义各类返回对象
    Set<Node> readyNodes = new HashSet<>();
    long nextReadyCheckDelayMs = Long.MAX_VALUE;
    Set<String> unknownLeaderTopics = new HashSet<>();

    // 判断BufferPool可用内存是否已经耗尽
    boolean exhausted = this.free.queued() > 0;
    for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
        TopicPartition part = entry.getKey();
        Deque<RecordBatch> deque = entry.getValue();
        // Leader Partition
        Node leader = cluster.leaderFor(part);
        synchronized (deque) {
            // 不存在Leader,说明要重新拉取元数据
            if (leader == null && !deque.isEmpty()) {
                unknownLeaderTopics.add(part.topic());
            } else if (!readyNodes.contains(leader) && !muted.contains(part)) {
                // 从队列头获取batch,因为入队是从队尾入的
                RecordBatch batch = deque.peekFirst();
                if (batch != null) {
                    // 判断是否有就绪的Batch
                    boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
                    long waitedTimeMs = nowMs - batch.lastAttemptMs;
                    long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                    long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                    boolean full = deque.size() > 1 || batch.isFull();
                    boolean expired = waitedTimeMs >= timeToWaitMs;
                    boolean sendable = full || expired || exhausted || closed || flushInProgress();
                    if (sendable && !backingOff) {
                        readyNodes.add(leader);
                    } else {
                        // 计算下一次ready check的时间
                        nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                    }
                }
            }
        }
    }
    return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}

上述流程中,还需要关注的一点是:判断缓冲池BufferPool是否已经耗尽。当BufferPool没用可用空间时,条件等待队列waiters不为空:

// BufferPool.java

private final Deque<Condition> waiters;

public int queued() {
    lock.lock();
    try {
        return this.waiters.size();
    } finally {
        lock.unlock();
    }
}

2.2 筛选条件

那么,哪些batch才算是已经就绪的batch呢?判断逻辑在如下代码中,每个变量我都加了详尽注释:

// RecordAccumulator.java

// 是否属于重试batch
boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;

// 表示这个batch目前已经等待了多久:waitedTimeMs = 当前时间 - 上一次重试时间,默认情况下batch.lastAttemptMs为创建时间
long waitedTimeMs = nowMs - batch.lastAttemptMs;

// 表示这个batch最多要等待多久才被发送:timeToWaitMs = 重试时间间隔 或 最大可逗留时间,
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;

// 表示这个batch的剩余等待时间
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);

// 表示batch空间是否已经满了
boolean full = deque.size() > 1 || batch.isFull();

// 表示batch是否到期,到期就是就绪了
boolean expired = waitedTimeMs >= timeToWaitMs;
boolean sendable = full || expired || exhausted || closed || flushInProgress();

if (sendable && !backingOff) {
    // 把就绪batch所在的Leader Partition加入readyNodes
    readyNodes.add(leader);
}

从上述代码可以看出,以下情况就会认为已经就绪:

  1. BufferPool缓冲区满了,默认32MB;
  2. Batch空间满了,默认16KB;
  3. RecordAccumulator已经关闭,因为当客户端关闭时,必须立马把内存缓冲区中的Batch发送出去;
  4. Batch的缓存时间超过了linger.ms

三、总结

本章,我对Sender线程Loop处理流程中,就绪Batch消息筛选的底层原理进行了讲解。我们需要主要关注的是,在哪些情况下,缓冲区中的消息会被立马发送。

正文到此结束

感谢赞赏~

本文目录