原创

Kafka源码分析(二)——Producer:初始化

Kafka Producer的源码在clients模块下。本章,我们先来看下KafkaProducer在构造的时候,会涉及到哪些内部的核心组件,然后我将讲解KafkaProducer的初始化流程。



一、整体流程

Kafka Producer的核心原理我在前面章节已经详细讲解过了,回顾下这张图,其实就是一条消息从创建到发送的整体流程:



上图中有几个核心组件我在后续分析KafkaProducer的源码时会重点讲解:

  • Partitioner:分区器,用来决定发送的每条消息是路由到Topic的哪个分区里;
  • MetaData:Producer发送消息时,需要根据Topic的元数据信息确认发送到哪个Broker。所以,Producer会从Broker集群去拉取元数据,元数据包括Topic信息、分区信息;
  • RecordAccumulator:缓冲区,负责消息的复杂缓冲机制,发送到每个分区的消息会被打包成ProducerBatch,最后一个Broker上的多个分区对应的多个ProducerBatch又会被打包成一个Request;
  • NetworkClient:网络通信组件,负责Kafka Producer与Broker之间的通信;
  • Sender线程:负责从缓冲区里获取消息并发送到Broker;

Kafka Producer还有拦截器和序列化器,这两个了解一下就可以了,不是我们分析的重点。

二、初始化

KafkaProducer就是Kafka中的生产者,回顾下我们使用Kafka生产者的方式,就是创建KafkaProducer对象,然后传入必要的生产者参数:

public class ProducerDemo {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();

        // 配置Broker地址,配置几台即可,Producer会Broker拉取Topic的元数据并缓存
        props.put("bootstrap.servers", "ressmix01:9092,ressmix02:9092,ressmix03:9092");  
        // key序列化器
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // value序列化器
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 要求所有ISR中的副本写入成功
        props.put("acks", "-1");
        props.put("retries", 3);
        // 每个批量发送
        props.put("batch.size", 323840);
        props.put("linger.ms", 10);
        // 发送消息的缓冲区大小,32MB
        props.put("buffer.memory", 33554432);
        // 发送消息的缓冲区满时,阻塞超时时间
        props.put("max.block.ms", 3000);

        // 创建一个Producer实例
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
        // 创建一个条消息
        ProducerRecord<String, String> record = new ProducerRecord<>(
                "test-topic", "test-key", "test-value");

        // 采用异步发送模式
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if(exception == null) {
                    // 消息发送成功
                    System.out.println("消息发送成功");  
                } else {
                    // 消息发送失败,需要重新发送
                }
            }

        });

        Thread.sleep(10 * 1000); 

        // 采用同步发送模式
        // producer.send(record).get(); 

        // 关闭Producer
        producer.close();
    }    
}

2.1 构造

我们来看下KafkaProducer的构造,我省略了一些重载构造器,只留下最核心的部分。其实,整个构造过程比较简单,就是给KafkaProducer组装各类核心组件和配置:

public class KafkaProducer<K, V> implements Producer<K, V> {
    // 生产者编号,自增序号
    private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
    private String clientId;

    // 分区器
    private final Partitioner partitioner;
    // 单个请求最大大小
    private final int maxRequestSize;
    // 发送缓冲区大小
    private final long totalMemorySize;

    // 元数据
    private final Metadata metadata;

    // 缓冲区(消息累加器)
    private final RecordAccumulator accumulator;

    // Sender线程
    private final Sender sender;
    private final Thread ioThread;

    // 消息压缩类型
    private final CompressionType compressionType;
    private final Sensor errors;
    private final Time time;

    // K/V序列化器
    private final Serializer<K> keySerializer;
    private final Serializer<V> valueSerializer;

    // 生产者参数
    private final ProducerConfig producerConfig;
    // 最大请求阻塞时间
    private final long maxBlockTimeMs;
    // 请求超时时间
    private final int requestTimeoutMs;

    // 拦截器
    private final ProducerInterceptors<K, V> interceptors;

    private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, 
                          Serializer<V> valueSerializer) {
        try {
            log.trace("Starting the Kafka producer");
            // 生产者配置
            Map<String, Object> userProvidedConfigs = config.originals();
            this.producerConfig = config;
            this.time = Time.SYSTEM;

            // 设置Producer的ID
            clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
            if (clientId.length() <= 0)
                clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
            // 设置分区器:默认为DefaultPartitioner
            this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
            // 重试发送间隔
            long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);

            // 设置K/V序列化器
            if (keySerializer == null) {
                this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                        Serializer.class);
                this.keySerializer.configure(config.originals(), true);
            } else {
                config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
                this.keySerializer = keySerializer;
            }
            if (valueSerializer == null) {
                this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                        Serializer.class);
                this.valueSerializer.configure(config.originals(), false);
            } else {
                config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
                this.valueSerializer = valueSerializer;
            }

            // 设置拦截器
            List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                    ProducerInterceptor.class);
            this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);

            // 设置元数据
            this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true, clusterResourceListeners);
            // 单个请求最大大小
            this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
            // 发送缓存区大小
            this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
            // 消息压缩方式,默认不压缩
            this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));

            // 设置消息发送阻塞时间;当缓存区满或Producer获取不到元数据时,会进入阻塞,超过阻塞时间后抛出异常
            this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);

            // 设置缓冲区(消息累加器)
            // batch.size: 消息会被按批次封装成ProducerBatch对象,这个参数用来设置batch的大小
            // linger.ms: 消息在缓存区的最大逗留时间,默认值为0,即消息不过缓冲区立即被发送出去
            this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                    this.totalMemorySize,
                    this.compressionType,
                    config.getLong(ProducerConfig.LINGER_MS_CONFIG),
                    retryBackoffMs,
                    metrics,
                    time);

            // 解析配置的Broker地址
            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
            // 更新元数据
            this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
            // 构建网络通信组件
            ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
            NetworkClient client = new NetworkClient(
                    new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
                    this.metadata,
                    clientId,
                    config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
                    config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                    config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                    config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
                    this.requestTimeoutMs,
                    time,
                    true);

            // 构建Sender Runnable任务
            // max.in.flight.requests.per.connection:控制一个连接中最大未响应的请求数量,与消息发送的有序性有关
            this.sender = new Sender(client,
                    this.metadata,
                    this.accumulator,
                    config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
                    config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                    (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
                    config.getInt(ProducerConfig.RETRIES_CONFIG),
                    this.metrics,
                    Time.SYSTEM,
                    this.requestTimeoutMs);
            String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
            // 启动Sender线程
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
            this.ioThread.start();

            //...
            log.debug("Kafka producer started");
        } catch (Throwable t) {
            close(0, TimeUnit.MILLISECONDS, true);
            throw new KafkaException("Failed to construct kafka producer", t);
        }
    }
}

上述构造过程中,有几个核心点要注意下:

  • 默认情况下,每个请求的最大大小为1MB,发送缓冲区的大小为32MB,请求重试间隔为100ms,缓冲区填满之后的阻塞时间为60s,请求超时时间为30s;
  • RecordAccumulator,负责消息的复杂缓冲机制,消息在发送前会被打包成batch,batch的默认大小为16KB。Batch中的消息,最大逗留时间为linger.ms,比如说5ms,如果5ms还没凑出来一个batch,就必须立即把这个消息发送出去;
  • NetworkClient,负责底层的网络通信,一个网络连接最多空闲9分钟,默认情况下每个连接最多允许5个Request没收到响应,重试连接的时间间隔为50ms,Socket发送缓冲区大小为128KB,Socket接收缓冲区大小为32KB;
  • Sender线程,负责从缓冲区里获取消息发送到Broker上去,控制了单个消息最大大小为1MB,acks默认为1,表示要Leader Partition写入成功就认为成功,默认重试次数为0,请求超时时间为30s。

三、总结

本章,我对KafkaProdcer的初始化整体流程进行了分析,下一章开始,我将分析KafkaProducer内部的各个核心组件。

正文到此结束

感谢赞赏~

本文目录