原创

Kafka源码分析(九)——Producer:Sender线程——Selector轮询

上一章讲到,Sender线程将请求封装成ClientRequest,然后缓存到InFlightRequestsKafkaChannel中,最后关注KafkaChannel上的OP_WRITE事件。

本章,我们就看看Sender线程Loop执行的最后一步:

// Sender.java

 this.client.poll(pollTimeout, now);

在之前的章节,我其实已经讲解过NetworkClient.poll()方法的底层原理,当时主要关注的是连接建立(OP_CONNECT)。本章,我重点讲解Slector是如何对Channel上的读写事件(OP_READ/OP_WRITE)进行处理的。

一、监听事件

Kafka客户端的底层对Java NIO进行了封装,但是万变不离其宗。Java NIO的Selector通过一个线程来管理多个Socket,我们可以将多个Channel注册到一个Selector上(一个Channel代表了一个Socket,在Java NIO中就是SocketChannel),并设置其感兴趣的事件。

这样一来,在Selector.select()操作时,若发现Channel中有我们感兴趣的事件发生,Selector就会将其记录下来(即SelectedKey),然后我们就可以对事件进行相应的处理了。

一个SelectionKey是与一个KafkaChannel关联的,Kafka Selector会监听以下事件的发生:

  • OP_CONNECT:连接就绪事件;
  • OP_WRITE:写操作就绪事件,即客户端可以将数据写到缓冲区;
  • OP_READ:读操作就绪事件,即客户端可以从缓冲区读取数据。
// NetworkClient.java

public List<ClientResponse> poll(long timeout, long now) {
    //...
    try {
        // 调用org.apache.kafka.common.network.Selector的poll方法轮询Channel
        this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
    } catch (IOException e) {
        log.error("Unexpected error during I/O", e);
    }
    //...
}

二、Selector轮询

我们来看Selector的poll方法:

// Selector.java

public void poll(long timeout) throws IOException {
    //...
    if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
        pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
        pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
    }
    //...
}

核心就是pollSelectionKeys,Selector会轮询一批SelectionKey,如果有关注的事件发生了就执行:

// Selector.java

private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected,
                               long currentTimeNanos) {
    // 遍历SelectionKey
    Iterator<SelectionKey> iterator = selectionKeys.iterator();
    while (iterator.hasNext()) {
        SelectionKey key = iterator.next();
        iterator.remove();
        KafkaChannel channel = channel(key);

        try {
            if (isImmediatelyConnected || key.isConnectable()) {
                // 1.建立连接 OP_CONNECT -> OP_READ
                if (channel.finishConnect()) {
                    this.connected.add(channel.id());
                    this.sensors.connectionCreated.record();
                } else
                    continue;
            }

            // 2.读取数据 OP_READ -> OP_WRITE
            if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
                NetworkReceive networkReceive;
                while ((networkReceive = channel.read()) != null)
                    addToStagedReceives(channel, networkReceive);
            }

            // 3.写数据
            if (channel.ready() && key.isWritable()) {
                Send send = channel.write();
                if (send != null) {
                    this.completedSends.add(send);
                    this.sensors.recordBytesSent(channel.id(), send.size());
                }
            }
            //...
        } catch (Exception e) {
            String desc = channel.socketDescription();
            if (e instanceof IOException)
                log.debug("Connection with {} disconnected", desc, e);
            else
                log.warn("Unexpected error from {}; closing connection", desc, e);
            close(channel, true);
        }
    }
}

2.1 OP_CONNECT

OP_CONNECT事件,是客户端首次尝试与Broker建立连接时设置关注的:



最终由Selector.connect()方法完成对OP_CONNECT事件的监听:

// Selector.java

public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
    //...

    // 在SocketChannel上注册对OP_CONNECT事件的监听
    SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);

    //...
}

注册完监听之后,Sender线程在后续的Loop执行中,会调用到Seletor.poll方法,最终通过KafkaChannel.finishConnect()方法完成连接的建立:

// Seletor.java

private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
                               boolean isImmediatelyConnected,
                               long currentTimeNanos) {
    //...

    if (channel.finishConnect()) {
        this.connected.add(channel.id());
        this.sensors.connectionCreated.record();
    } else
        continue;
}
// KafkaChannel.java

public boolean finishConnect() throws IOException {
    return transportLayer.finishConnect();
}
// SslTransportLayer.java

public boolean finishConnect() throws IOException {
    // 调用SocketChannel的finishConnect方法,完成连接建立
    boolean connected = socketChannel.finishConnect();
    if (connected)
        // 建立成功后,取消对OP_CONNECT事件的关注,增加对OP_READ事件的关注
        key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
    return connected;
}

上述代码,在连接建立完成后,会取消对OP_CONNECT事件的关注,增加对OP_READ的关注。

整个建立连接的调用链路如下图:



2.2 OP_READ

OP_READ事件,是客户端与Broker建立完成连接后设置关注的:

key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);

注册完监听之后,Sender线程在后续的Loop执行中,会调用到Seletor.pollSelectionKeys方法,当出现OP_READ事件时,就会通过KafkaChannel.read()方法读取数据:

// Selector.java

private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
                               boolean isImmediatelyConnected,
                               long currentTimeNanos) {
    //...
    if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
        NetworkReceive networkReceive;
        // 读取数据
        while ((networkReceive = channel.read()) != null)
            addToStagedReceives(channel, networkReceive);
    }
    //...
}

2.3 OP_WRITE

OP_WRITE事件,是客户端在发送消息的过程中设置关注的:



最终会有KafkaChannel将要发送的请求进行缓存,同时增加对这个Channel上的OP_WRITE事件的监听:

// KafkaChannel.java

public void setSend(Send send) {
    if (this.send != null)
        throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
    this.send = send;    //缓存在KafkaChannel内部
    // 增加对OP_WRITE事件的监听
    this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}

注册完监听之后,Sender线程在后续的Loop执行中,会调用到Seletor.pollSelectionKeys方法,当出现OP_WRITE事件时,就会通过KafkaChannel.write()方法写数据:

// Selector.java

private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
                               boolean isImmediatelyConnected,
                               long currentTimeNanos) {
    //...
    if (channel.ready() && key.isWritable()) {
        // 写数据
        Send send = channel.write();
        if (send != null) {
            // 将发送出去的请求添加到完成列表CompletedSends中
            this.completedSends.add(send);
            this.sensors.recordBytesSent(channel.id(), send.size());
        }
    }
    //...
}

最后,还要注意一点,一旦写完消息之后,就会取消OP_WRITE事件的监听,也就是不再关注这个写请求的事件了,此时仅仅保留关注OP_READ事件,只有当下一次有写请求缓存时,才会重新关注:

// KafkaChannel.java

private boolean send(Send send) throws IOException {
    send.writeTo(transportLayer);
    if (send.completed())
        // 写成功后,取消到OP_WRITE的关注
        transportLayer.removeInterestOps(SelectionKey.OP_WRITE);

    return send.completed();
}

三、请求后续处理

当请求发送完成后,整个流程并没有结束,因为客户端还没有收到Broker的响应。客户端还会做两件事:

  1. 对响应进行处理;
  2. 对回调进行处理

3.1 回调处理

Selector.poll()写完数据后,会将最近发送的一个请求添加到完成列表CompletedSends中:

public class Selector implements Selectable {
    private final List<Send> completedSends;

    private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
                                   boolean isImmediatelyConnected,
                                   long currentTimeNanos) {
         //...
        if (channel.ready() && key.isWritable()) {
            // 写消息(发送请求)
            Send send = channel.write();
            if (send != null) {
                // 添加到“完成列表”
                this.completedSends.add(send);
                this.sensors.recordBytesSent(channel.id(), send.size());
            }
        }
    }
}

接下来,NetworkClient会对已经发送的请求进行处理,如果请求不需要响应,直接删除inFlightRequests缓存的最近一个等待响应的请求,然后执行回调函数ClientResponse.onComplete(),callback就是RequestCompletionHandler,也就是我们发送消息时自己指定的回调函数:

// NetworkClient.java

public List<ClientResponse> poll(long timeout, long now) {
    long metadataTimeout = metadataUpdater.maybeUpdate(now);
    try {
        this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
    } catch (IOException e) {
        log.error("Unexpected error during I/O", e);
    }
    //...

    // 处理已经发送的请求
    handleCompletedSends(responses, updatedNow);
    //...

    // 执行回调函数
    for (ClientResponse response : responses) {
        try {
            response.onComplete();
        } catch (Exception e) {
            log.error("Uncaught error in request completion:", e);
        }
    }
    return responses;
}

private void handleCompletedSends(List<ClientResponse> responses, long now) {
    // 遍历已经发送的请求
    for (Send send : this.selector.completedSends()) {
        // 往对应的Broker发送的最近一个请求
        InFlightRequest request = this.inFlightRequests.lastSent(send.destination());
        // 不期待响应,说明acks==0
        if (!request.expectResponse) {
            // 删除inFlightRequests中最近发送的请求
            this.inFlightRequests.completeLastSent(send.destination());
            // 添加到响应列表
            responses.add(request.completed(null, now));
        }
    }
}
// ClientResponse.java

public void onComplete() {
    if (callback != null)
        callback.onComplete(this);
}

默认定义的回调函数地方是在Sender线程里面:

// Sender.java

private void sendProduceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
       //...

    // 创建回调函数
    RequestCompletionHandler callback = new RequestCompletionHandler() {
        public void onComplete(ClientResponse response) {
            handleProduceResponse(response, recordsByPartition, time.milliseconds());
        }
    };

    String nodeId = Integer.toString(destination);
    // 注意这里,acks != 0就是expectResponse的值
    ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, callback);
    client.send(clientRequest, now);
    log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}

处理响应:

// Sender.java

private void handleProduceResponse(ClientResponse response, Map<TopicPartition, RecordBatch> batches, long now) {

    //...
    if (response.hasResponse()) {
        ProduceResponse produceResponse = (ProduceResponse) response.responseBody();
        for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
            TopicPartition tp = entry.getKey();
            ProduceResponse.PartitionResponse partResp = entry.getValue();
            RecordBatch batch = batches.get(tp);
            // 处理每个批次消息的响应
            completeBatch(batch, partResp, correlationId, now);
        }
        this.sensors.recordLatency(response.destination(), response.requestLatencyMs());
        this.sensors.recordThrottleTime(produceResponse.getThrottleTime());
    } else {
        // this is the acks = 0 case, just complete all requests
        for (RecordBatch batch : batches.values()) {
            completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NONE), correlationId, now);
        }
    }
}

后面对响应的具体处理我就不赘述了,读者可以自己顺着思路去看源码。

四、总结

本章,我对Sender线程Loop循环处理的最后一步——Selector轮询处理进行了讲解,这也是最核心的一步。Kafka客户端对于请求的处理,完全是NIO模式,Sender线程对请求的异步处理机制,也非常值得我们借鉴学习。

到本章为止,Kafka客户端的消息发送的整体流程我就已经全部分析完了,底层NIO组件的工作机制也已经初步做了分析。从下一章开始,我将深入讲解Kafka底层NIO通讯组件的工作原理,包含对“粘包”和”拆包“的处理、异常处理等等。

正文到此结束

感谢赞赏~

本文目录