原创

Kafka源码分析(五)——Producer:RecordAccumulator缓冲区

KafkaProducer在通过send方法发送消息时,会首先将消息追加到一个名为RecordAccumulator的组件中。RecordAccumulator又名消息累加器,可以看成是KafkaProducer的一块消息缓冲区,主要用来按批次缓存消息,以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。

// KafkaProducer.java

RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);

本章,我就来讲解RecordAccumulator的内部结构,以及它是如何对消息进行按批次缓存处理的。

一、RecordAccumulator

我们先来看下RecordAccumulator的基本构造:

// RecordAccumulator.java

public final class RecordAccumulator {

    private volatile boolean closed;
    private final AtomicInteger flushesInProgress;
    private final AtomicInteger appendsInProgress;
    private final int batchSize;
    private final CompressionType compression;
    private final long lingerMs;
    private final long retryBackoffMs;
    // 缓冲池,里面是一个个ByteBuffer
    private final BufferPool free;
    private final Time time;
    // 分区和一批次消息的映射Map
    private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
    private final IncompleteRecordBatches incomplete;
    private final Set<TopicPartition> muted;
    private int drainIndex;

    public RecordAccumulator(int batchSize, long totalSize, CompressionType compression,
                             long lingerMs, long retryBackoffMs, Metrics metrics, Time time) {
        this.drainIndex = 0;
        this.closed = false;
        this.flushesInProgress = new AtomicInteger(0);
        this.appendsInProgress = new AtomicInteger(0);
        this.batchSize = batchSize;
        this.compression = compression;
        this.lingerMs = lingerMs;
        this.retryBackoffMs = retryBackoffMs;
        this.batches = new CopyOnWriteMap<>();
        String metricGrpName = "producer-metrics";
        // 创建内部的BufferPool
        this.free = new BufferPool(totalSize, batchSize, metrics, time, metricGrpName);
        this.incomplete = new IncompleteRecordBatches();
        this.muted = new HashSet<>();
        this.time = time;
        registerMetrics(metrics, metricGrpName);
    }
}

上述有两个比较重要的地方:

  • BufferPool:这是一块保存ByteBuffer的缓冲池,用来控制消息缓存的大小,消息的数据最终就是写到它的ByteBuffer中;
  • CopyOnWriteMap:这是一个”写时复制“的Map,保存分区和批次消息的映射关系:<TopicPartition, Deque<RecordBatch>>,因为对分区的操作基本都是并发且读多写少的,所以适合”写时复制“算法。

1.1 BufferPool

我们先来看BufferPool,这是一块内存缓冲区,默认大小32MB,可以通过参数buffer.memory控制:

// BufferPool.java

public final class BufferPool {
    // 缓冲池大小,默认32MB,通过参数buffer.memory控制
    private final long totalMemory;    

    // batch大小,也就是一个ByteBuffer的大小,默认16KB,通过batch.size控制
    private final int poolableSize;    
    private final ReentrantLock lock;
    // 可用
    private final Deque<ByteBuffer> free;
    private final Deque<Condition> waiters;
    private long availableMemory;
    private final Metrics metrics;
    private final Time time;
    private final Sensor waitTime;

    public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
        this.poolableSize = poolableSize;
        this.lock = new ReentrantLock();
        this.free = new ArrayDeque<ByteBuffer>();
        this.waiters = new ArrayDeque<Condition>();
        this.totalMemory = memory;
        this.availableMemory = memory;
        this.metrics = metrics;
        this.time = time;
        this.waitTime = this.metrics.sensor("bufferpool-wait-time");
        MetricName metricName = metrics.metricName("bufferpool-wait-ratio",
                                                   metricGrpName,
                                                   "The fraction of time an appender waits for space allocation.");
        this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
    }
    //...
}

对BufferPool的操作,本质就是对它内部的ByteBuffer的操作。BufferPool内部有一个Deque队列,缓存了可用的ByteBuffer,也就是缓存了一批内存空间,每个ByteBuffer都是16kb,即默认的batch大小。

Deque里ByteBuffer数量 * 16kb 就是已使用的缓存空间大小,availableMemory就是剩余可使用的缓存空间大小,最大32mb,每用掉一个batch,就要减去batchSize的大小,即132mb - 16kb

另外,当调用方想要获取可用ByteBuffer,但是BufferPool可用空间又不足时,调用线程会阻塞,由参数max.block.ms控制:

// BufferPool.java

public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
    if (size > this.totalMemory)
        throw new IllegalArgumentException("Attempt to allocate " + size
                                           + " bytes, but there is a hard limit of "
                                           + this.totalMemory
                                           + " on memory allocations.");
    this.lock.lock();
    try {
        // 有可用空间,且要分配的ByteBuffer块大小就是poolableSize
        if (size == poolableSize && !this.free.isEmpty())
            return this.free.pollFirst();

        // 计算剩余可用空间
        int freeListSize = this.free.size() * this.poolableSize;
        if (this.availableMemory + freeListSize >= size) {
            freeUp(size);
            this.availableMemory -= size;
            lock.unlock();
            return ByteBuffer.allocate(size);
        } else {
            //...
        }
    } finally {
        if (lock.isHeldByCurrentThread())
            lock.unlock();
    }
}

1.2 RecordBatch

RecordAccumulator会按照分区,将同一个分区的消息打包成一个个RecordBatch,每一个RecordBatch可能包含多条消息,这些消息在内存里是按照一定的格式紧凑拼接的:

// RecordBatch.java

public final class RecordBatch {

    final long createdMs;
    final TopicPartition topicPartition;
    final ProduceRequestResult produceFuture;

    private final List<Thunk> thunks = new ArrayList<>();

    // 内存消息构建器,这个很重要,最终是它将消息拼接
    private final MemoryRecordsBuilder recordsBuilder;

    volatile int attempts;
    int recordCount;
    int maxRecordSize;
    long drainedMs;
    long lastAttemptMs;
    long lastAppendTime;
    private String expiryErrorMessage;
    private AtomicBoolean completed;
    private boolean retry;

    public RecordBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long now) {
        this.createdMs = now;
        this.lastAttemptMs = now;
        this.recordsBuilder = recordsBuilder;
        this.topicPartition = tp;
        this.lastAppendTime = createdMs;
        this.produceFuture = new ProduceRequestResult(topicPartition);
        this.completed = new AtomicBoolean();
    }

    // 在内存里拼接消息
    public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value,
                                          Callback callback, long now) {
        // 空间不足
        if (!recordsBuilder.hasRoomFor(key, value)) {
            return null;
        } else {
            // 通过MemoryRecordsBuilder,追加消息到内存
            long checksum = this.recordsBuilder.append(timestamp, key, value);
            this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
            this.lastAppendTime = now;
            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, timestamp, checksum, key == null ? -1 : key.length, value == null ? -1 : value.length);
            if (callback != null)
                thunks.add(new Thunk(callback, future));
            this.recordCount++;
            return future;
        }
    }

    //...
}

可以看到,消息追加的操作最终是通过MemoryRecordsBuilder完成的,每一条消息都是以crc|magic|attribute|timestamp...这样的格式最终追加到分配到ByteBuffer中:



// MemoryRecordsBuilder.java

public long append(long timestamp, byte[] key, byte[] value) {
    return appendWithOffset(lastOffset < 0 ? baseOffset : lastOffset + 1, timestamp, key, value);
}

public long appendWithOffset(long offset, long timestamp, byte[] key, byte[] value) {
    try {
        if (lastOffset >= 0 && offset <= lastOffset)
            throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s (Offsets must increase monotonically).", offset, lastOffset));

        int size = Record.recordSize(magic, key, value);

        // LogEntry日志项,appendStream就是由ByteBuffer转化而来
        LogEntry.writeHeader(appendStream, toInnerOffset(offset), size);

        if (timestampType == TimestampType.LOG_APPEND_TIME)
            timestamp = logAppendTime;
        long crc = Record.write(appendStream, magic, timestamp, key, value, CompressionType.NONE, timestampType);
        recordWritten(offset, timestamp, size + Records.LOG_OVERHEAD);
        return crc;
    } catch (IOException e) {
        throw new KafkaException("I/O exception when writing to the append stream, closing", e);
    }
}

二、消息缓存

了解了RecordAccumulator内部的几个重要组件,我们再来看消息缓存的整体流程。

KafkaProducer发送消息时,内部调用了RecordAccumulator.append方法,消息会被追加到 RecordAccumulator 内部的某个双端队列( Deque )中,并且多个消息会被打包成一个批次——RecordBatch:



2.1 整体流程

消息追加的整体流程是通过RecordAccumulator.append()方法完成的:

  1. 首先,根据消息的分区,从CopyOnWriteMap中找到一个已有的或新建一个Deque<RecordBatch>
  2. 每个RecordBatch可用的缓存块默认大小为16KB,如果消息超过这个大小,就单独作为一个自定义大小的batch入队;
  3. 如果消息没有超过16kb,就将多个消息打包成一个batch入队。
// RecordAccumulator.java

public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value,
                                 Callback callback, long maxTimeToBlock) throws InterruptedException {
    appendsInProgress.incrementAndGet();
    try {
        // 1.根据分区,从内部的CopyOnWriteMap获取或新建一个双端队列
        Deque<RecordBatch> dq = getOrCreateDeque(tp);
        synchronized (dq) {
            if (closed)
                throw new IllegalStateException("Cannot send after the producer is closed.");
            // 尝试往Dequeue中追加消息,不存在可用Batch或Batch可用空间不足会追加失败
            RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
            if (appendResult != null)
                // 追加成功
                return appendResult;
        }

        // 2.执行到这里,说明Dequeue队尾没有可用batch,或有batch但可用空间不足
        // 计算待新建的batch大小
        int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
        log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
        // 从BufferPool中获取一块可用的ByteBuffer,如果空间不足会阻塞
        ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
        synchronized (dq) {
            if (closed)
                throw new IllegalStateException("Cannot send after the producer is closed.");
            // 再次追加消息,不存在可用Batch会追加失败
            RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
            // 这里用了一个双重锁检查,主要针对多个线程同时获取多个ByteBuffer的情况进行处理
            if (appendResult != null) {
                // 归还buffer
                free.deallocate(buffer);
                return appendResult;
            }

            // 3.执行到这里,说明是首次往Deque存入batch
            // MemoryRecordsBuilder负责真正的消息往ByteBuffer写入
            MemoryRecordsBuilder recordsBuilder = MemoryRecords.builder(buffer, compression, TimestampType.CREATE_TIME, this.batchSize);
            // 创建一个RecordBatch并入队,持有MemoryRecordsBuilder
            RecordBatch batch = new RecordBatch(tp, recordsBuilder, time.milliseconds());
            FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
            dq.addLast(batch);
            incomplete.add(batch);
            return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
        }
    } finally {
        appendsInProgress.decrementAndGet();
    }
}
// RecordAccumulator.java

// 新建或获取已存在的Deque<RecordBatch>
private Deque<RecordBatch> getOrCreateDeque(TopicPartition tp) {
    // 从内部的CopyOnWriteMap获取
    Deque<RecordBatch> d = this.batches.get(tp);
    if (d != null)
        return d;
    // 如果不存在,则新建一个
    d = new ArrayDeque<>();
    Deque<RecordBatch> previous = this.batches.putIfAbsent(tp, d);
    if (previous == null)
        return d;
    else
        return previous;
}


// 尝试向Deque<RecordBatch>中追加消息
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<RecordBatch> deque) {
    // 拿出队尾的Batch
    RecordBatch last = deque.peekLast();
    if (last != null) {
        FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
        if (future == null)
            last.close();
        else
            return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
    }
    return null;
}

一个RecordBatch内部持有一个ByteBuffer,里面可能存放好几条消息,可以通过batch.size参数可以控制batch的大小,默16KB。所以在实际生产环境中,以下参数都是必须经过调优的:

  • request.max.size:每条消息的最大大小,默认1MB;
  • batch.size:每个RecordBatch的大小,默认16KB;
  • buffer.memory:消息缓冲区的大小,默认32MB。

你必须要根据自己实际发送的消息大小来设置request.max.sizebatch.size,否则如果消息大小频繁超过了batch.sizse的话,那就是一条消息一个批次,起不到提升吞吐量的效果。

三、总结

本章,我对RecordAccumulator的内存结构和消息缓存的底层原理进行了讲解。这里总结一下:

  • RecordAccumulator会按照分区维度将消息缓存,底层采用了一个CopyOnWriteMap来保存这种映射关系;

  • RecordAccumulator会将多个消息打成一个RecordBatch,目的是后续Sender线程可以按批次发送消息,减少网络传输的开销,提升整体吞吐量;

  • RecordAccumulator内部有一个缓冲池BufferPool,缓冲池里面划分了一块块固定大小的ByteBuffer,每一个RecordBatch都会使用一个ByteBuffer来写入多条消息,如果某条消息的大小超过单个ByteBuffer的默认大小(16KB),就会自定义一块ByteBuffer;

  • 消息最终是以一种紧凑的二进制格式offset | size | crc | magic | attibutes | timestamp | key size | key | value size | value写入到底层的ByteBuffer里去。

正文到此结束

感谢赞赏~

本文目录