原创

透彻理解Java网络编程(十一)——Netty原理:EventLoopGroup和EventLoop

Netty服务端通过ServerBootstrap启动的时候,会创建两个EventLoopGroup,它们本质是两个 Reactor 线程池:一个用于与客户端建立TCP连接,另一个用于处理IO相关的读写操作。下图是ServerBootstrap启动时,EventLoopGroup的核心工作流程图:



EventLoopGroup属于Netty最核心的接口之一,Netty默认提供了 NioEventLoopGroup、OioEventLoopGroup 等多种实现。而EventLoop则是EventLoopGroup内部的事件处理器,每个EventLoop内部都有一个线程、一个Java NIO Selector和一个taskQueue任务队列,负责处理客户端请求和内部任务等。

本章,我就对EventLoopGroup和EventLoop的核心原理进行分析讲解。这里补充一句,如果觉得干巴巴看源码难度比较大,完全可以通过调试的方式跟踪,比如本章,我们可以自己创建EventLoopGroup,注册Channel:

public class Demo {
    public static void main(String[] args) throws InterruptedException {
        // 1.创建一个ServerSocketChannel
        ChannelFactory channelFactory = new ReflectiveChannelFactory(NioServerSocketChannel.class);
        Channel channel = channelFactory.newChannel();
        // 2.初始化Pipeline
        channel.pipeline().addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                System.out.println("Chanel:" +ch);
            }
        });
        // 3.创建EventLoopGroup
        EventLoopGroup mainGroup = new NioEventLoopGroup(2);
        // 4.注册Channel
        ChannelFuture future = mainGroup.register(channel);
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                System.out.println("future:" + future);
            }
        });
        Thread.sleep(60 * 3600 * 1000);
    }
}

一、继承体系

在正式分析EventLoopGroup和EventLoop之前,我们需要先从全局了解下Netty的任务调度框架体系:



从上图可以看到,Netty的任务调度框架还是遵循了 java.util.concurrent 中的 Executor 调度体系:

  • io.netty.util.concurrent是Netty对J.U.C并发包功能的扩充,相当于Netty基于J.U.C实现了一套自己的任务调度基础体系;
  • io.netty.channel是Netty基于自己的任务调度框架,实现的一套Channel调度组件。

由于我们一般只使用Netty进行NIO网络编程,所以重点关注包io.netty.channel中的EventLoopGroup和EventLoop即可,它们是负责对Channel事件进行调度的处理器。

二、EventLoopGroup

前面说了,EventLoopGroup本质是一个线程池。既然是线程池,就可以调整内部的线程数量,默认情况下,EventLoopGroup中的线程数是CPU核数 * 2。

EventLoopGroup 内部管理着很多EventLoop对象,这些对象可以看成是EventLoopGroup内的线程,EventLoop是在EventLoopGroup对象构造时创建的。每一个EventLoop负责处理一系列的Channel,我们可以通过下图理解EventLoopGroup和EventLoop之间的关系:




Netty的事件处理机制采用的是无锁串行化的设计思路

- BossEventLoopGroupWorkerEventLoopGroup 包含一个或者多个 NioEventLoop。BossEventLoopGroup 负责监听客户端的 Accept 事件,当事件触发时,将事件注册至 WorkerEventLoopGroup 中的一个 NioEventLoop 上。每新建一个 Channel, 只选择一个 NioEventLoop 与其绑定。所以说 Channel 生命周期的所有事件处理都是线程独立的,不同的 NioEventLoop 线程之间不会发生任何交集;
- NioEventLoop 完成数据读取后,会调用绑定的 ChannelPipeline 进行事件传播,ChannelPipeline 也是线程安全的,数据会被传递到 ChannelPipeline 的第一个 ChannelHandler 中。数据处理完成后,将加工完成的数据再传递给下一个 ChannelHandler,整个过程是串行化执行,不会发生线程上下文切换的问题。

> 无锁串行化的设计不仅使系统吞吐量达到最大化,而且降低了用户开发业务逻辑的难度,不需要花太多精力关心线程安全问题。虽然单线程执行避免了线程切换,但是它的缺陷就是不能执行时间过长的 I/O 操作,一旦某个 I/O 事件发生阻塞,那么后续的所有 I/O 事件都无法执行,甚至造成事件积压。在使用 Netty 进行程序开发时,我们一定要对 ChannelHandler 的实现逻辑有充分的风险意识。

本节,我主要分析EventLoopGroup的实现类NioEventLoopGroup的源码:



2.1 创建NioEventLoopGroup

我们先来看NioEventLoopGroup的构造,NioEventLoopGroup对象的创建流程可以用下面这张时序图表示:



NioEventLoopGroup本身提供了很多构造器,我们重点关注下面这个即可:

// NioEventLoopGroup.java

public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                         final SelectorProvider selectorProvider,
                         final SelectStrategyFactory selectStrategyFactory,
                         final RejectedExecutionHandler rejectedExecutionHandler,
                         final EventLoopTaskQueueFactory taskQueueFactory) {
    super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
          rejectedExecutionHandler, taskQueueFactory);
}

构造时可以指定:

  • nThreads:内部的NioEventLoop数量,如果不指定则为CPU核数的2倍;
  • Executor:J.U.C中的任务执行器,每一个EventLoop对象内部都会包含一个Executor,默认为Netty自定义的ThreadPerTaskExecutor,即为每个任务创建一个线程处理;
  • EventExecutorChooserFactory:用于创建Chooser对象的工厂类,Chooser可以看成一个负载均衡器,用于从NioEventLoopGroup中选择一个EventLoop,默认采用round-robin算法;
  • SelectorProvider:Java NIO提供的工具类,SelectorProvider使用了 JDK 的 SPI 机制来创建Selector、ServerSocketChannel、SocketChannel 等对象;
  • SelectStrategyFactory:用来创建SelectStrategy的工厂,SelectStrategy是Netty用来控制EventLoop轮询方式的策略,默认为DefaultSelectStrategy;
  • RejectedExecutionHandler:线程池的任务拒绝策略,默认是抛出RejectedExecutionException异常;
  • EventLoopTaskQueueFactory:任务队列工厂类,每一个EventLoop对象内部都会包含一个任务队列,这个工厂类就是用来创建队列的,默认会创建一个Netty自定义线程安全的MpscUnboundedArrayQueue无锁队列。

最终调用了父类MultiThreadEventExecutorGroup的构造方法,它内部维护了一个children数组,保存EventLoop对象:

// MultiThreadEventExecutorGroup.java

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    checkPositive(nThreads, "nThreads");
    if (executor == null) {
        // 创建一个任务执行器
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }
    // 创建内部的EventLoop数组,nThreads默认为CPU核数*2
    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            // 创建EventLoop对象
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                //...
            }
        }
    }
    // Chooser本质可以看成一个负载均衡器,用于选择一个内部的EventLoop,默认采用round-robin算法
    chooser = chooserFactory.newChooser(children);
   //...
}

2.2 创建EventLoop

MultiThreadEventExecutorGroup内部通过newChild方法创建NioEventLoop:

// NioEventLoopGroup.java

protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}

NioEventLoop对象的构造,首先调用了父类SingleThreadEventLoop的构造方法,核心是初始化一个任务队列保存到内部:

// SingleThreadEventLoop.java

protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                boolean addTaskWakesUp, Queue<Runnable> taskQueue, 
                                Queue<Runnable> tailTaskQueue,
                                RejectedExecutionHandler rejectedExecutionHandler) {
    // addTaskWakesUp默认为false,用于标记添加任务是否会唤醒线程
    super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
    // 忽略这个tailTasks
    tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
}

SingleThreadEventLoop又调用了父类SingleThreadEventExecutor的构造方法,本质就是设置一些字段值:

// SingleThreadEventLoop.java

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                    boolean addTaskWakesUp, Queue<Runnable> taskQueue,
                                    RejectedExecutionHandler rejectedHandler) {
    super(parent);
    this.addTaskWakesUp = addTaskWakesUp;
    this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
    this.executor = ThreadExecutorMap.apply(executor, this);
    // 初始化NioEventLoop内部的任务队列
    this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
    this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

2.3 注册Channel

前一章我已经讲解了ServerBootStrap的启动流程,ServerBootStrap注册ServerSocketChannel就是通过NioEventLoopGroup.register()方法完成的(定义在父类MultithreadEventLoopGroup中):

// MultithreadEventLoopGroup.java

@Override
public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

public EventLoop next() {
    return (EventLoop) super.next();
}

选择EventLoop

EventLoopGroup管理着内部的EventLoop数组,所以上述的next()方法就是选择一个EventLoop:

// MultithreadEventExecutorGroup.java

private final EventExecutorChooserFactory.EventExecutorChooser chooser;

public EventExecutor next() {
    return chooser.next();
}

具体的选择策略通过EventExecutorChooser完成,默认为DefaultEventExecutorChooserFactory,采用了Round-Robin轮询策略:

// DefaultEventExecutorChooserFactory.java

private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
    private final AtomicInteger idx = new AtomicInteger();
    private final EventExecutor[] executors;

    PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }

    @Override
    public EventExecutor next() {
        // Round-Robin轮询算法
        return executors[idx.getAndIncrement() & executors.length - 1];
    }
}

异步注册

接着,调用父类SingleThreadEventLoop的register方法注册Channel。这里将NioServerSocketChannel和SingleThreadEventLoop封装成了一个DefaultChannelPromise对象,DefaultChannelPromise可以看成是一种特殊的ChannelFuture,可以手动设置异步Future的执行结果状态:

// SingleThreadEventLoop.java

public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}

public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}

Channel内部有一个UnSafe类,封装了对底层Java NIO SocketChannel的许多方法的调用:

// AbstractChannel.AbstractUnsafe.java

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ObjectUtil.checkNotNull(eventLoop, "eventLoop");
    // 如果已经注册过
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    // ...

    AbstractChannel.this.eventLoop = eventLoop;
    // 如果是当前线程是EventLoop内部的工作线程
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } 
    else {
        try {
            // 提交一个异步任务
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
           //...
        }
    }
}

从上面代码可以看到,Channel的注册是EventLoop通过内部的Executor执行器提交一个异步任务完成的,之所以这样做是因为了使得EventLoop通过一个内部的工作线程就可以实现线程安全的任务执行。

本节我们先重点关注Channel的注册,后面我会分析EventLoop的异步任务执行逻辑:

// AbstractChannel.java

private void register0(ChannelPromise promise) {
    try {
        // 由于是异步执行,所以检查一下任务状态,避免Channel已经关闭
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        // 执行注册
        doRegister();
        neverRegistered = false;
        registered = true;

        // 触发handlerAdded事件,在Pipeline中传播
        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        // 触发channelRegistered事件,在Pipeline中传播
        pipeline.fireChannelRegistered();

        // 如果有新连接建立,则触发channelActive事件
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                beginRead();
            }
        }
    } catch (Throwable t) {
        //...
    }
}

对于NioServerSocketChannel来说,doRegister方法在AbstractNioChannel类中实现,它的内部实际就是通过Java NIO的java.nio.channels.SelectableChannel完成在java.nio.channels.Selector上注册的:

// AbstractNioChannel.java

protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            // 这里调用了
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            //...
        }
    }
}

Pipieline事件传播

回到AbstractChannel.register0()方法中,Channel注册完成后,会生成一系列事件,并在该Channel的pipeline中传播,主要有三类事件:

  • handlerAdded事件:通过pipeline.invokeHandlerAddedIfNeeded()触发;
  • channelRegistered事件:通过pipeline.fireChannelRegistered()触发;
  • channelActive事件:通过pipeline.fireChannelActive()触发。
// AbstractChannel.java

private void register0(ChannelPromise promise) {
    try {
        //...
        // 执行注册
        doRegister();
        neverRegistered = false;
        registered = true;

        // 首次注册成功时,触发handlerAdded事件,在Pipeline中传播
        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        // 触发channelRegistered事件,在Pipeline中传播
        pipeline.fireChannelRegistered();

        // 如果有新连接建立,则触发channelActive事件
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                beginRead();
            }
        }
    } catch (Throwable t) {
        //...
    }
}

我们先来看pipeline.invokeHandlerAddedIfNeeded(),我之前分析过ServerBootStrap的启动流程,那时我们创建了一个ChannelInitializer对象,并添加到Channel的pipeline中。

事实上,Channel注册完成后,会调用Pipeline的invokeHandlerAddedIfNeeded方法,而该方法ChannelHandler的handlerAdded方法,由于初始时我们只往Pipeline中添加了一个ChannelInitializer,所以会触发 ChannelInitializer 对象的 handlerAdded 方法:

// ChannelInitializer.java

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if (ctx.channel().isRegistered()) {
        // 初始化Channel
        if (initChannel(ctx)) {
            removeState(ctx);
        }
    }
}

ChannelInitializer的handlerAdded方法内部,调用了initChannel这个模板方法,它的内部又调用了我们自己覆写的initChannel,其实就是将我们自定义的ChannelHandler添加到Pipeline中:

// ChannelInitializer.java

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (initMap.add(ctx)) { 
        try {
            // 这个initChannel方法是一个抽象方法,一般我们自己的实现是将自定义的handlers添加到pipeline中
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {
            exceptionCaught(ctx, cause);
        } finally {
            ChannelPipeline pipeline = ctx.pipeline();
            // 然后将ChannelInitializer自身移除 
            if (pipeline.context(this) != null) {
                pipeline.remove(this);
            }
        }
        return true;
    }
    return false;
}

通过上面的代码可以看出,ChannelInitializer最终会将自己从Pipeline移除,避免重复执行。下图示意了这一过程,LoggingHandler和EchoClientHandler是我们自定义的业务Handler:




我们再来看Pipeline.fireChannelRegistered()方法,这个方法就是往 Pipeline 中扔一个 channelRegistered 事件,register 属于Inbound(入站)事件,Pipeline 接下来要做的就是执行流水线中所有 Inbound 类型的 handlers 中的 channelRegistered() 方法:

// DefaultChannelPipeline.java

public final ChannelPipeline fireChannelRegistered() {
    // Register事件属于入站事件,所以从head头开始触发
    AbstractChannelHandlerContext.invokeChannelRegistered(head);
    return this;
}
// AbstractChannelHandlerContext.java

static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    // 执行Handler节点的invokeChannelRegistered方法
    // 如果当前线程不是EventLoop内的工作线程,依然以提交异步任务方式执行
    if (executor.inEventLoop()) {
        next.invokeChannelRegistered();
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRegistered();
            }
        });
    }
}

private void invokeChannelRegistered() {
    if (invokeHandler()) {    // 确保ChannelHandler#handlerAdded方法已经执行过
        try {
            // 调用当前Handler的channelRegistered方法
            ((ChannelInboundHandler) handler()).channelRegistered(this);
        } catch (Throwable t) {
            invokeExceptionCaught(t);
        }
    } else {
        fireChannelRegistered();
    }
}
// DefaultChannelPipeline.java

public void channelRegistered(ChannelHandlerContext ctx) {
    // 判断是否需要执行当前节点的handlerAdded方法
    invokeHandlerAddedIfNeeded();
    // 向后传播注册inbound事件
    ctx.fireChannelRegistered();
}

关键看当前Handler节点的fireChannelRegistered方法,它内部的findContextInbound()方法会沿着 pipeline 找到下一个 Inbound 类型的 handler:

// AbstractChannelHandlerContext.java

public ChannelHandlerContext fireChannelRegistered() {
    invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
    return this;
}

private AbstractChannelHandlerContext findContextInbound(int mask) {
    AbstractChannelHandlerContext ctx = this;
    EventExecutor currentExecutor = executor();
    // 找到下一个Inbound Handler
    do {
        ctx = ctx.next;
    } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
    return ctx;
}

通过上面的源码,我们也就理解了pipeline.fireChannelRegistered()方法和context.fireChannelRegistered()方法的区别:

  • pipeline.fireChannelRegistered() 是将 channelRegistered 事件抛到 pipeline的head 中,pipeline 中的 handlers 依次处理该事件;
  • context.fireChannelRegistered() 是当前 handler 处理完该事件后,向后传播给下一个 handler,依次执行。

最后,我们来看channelActive事件,它是通过pipeline.fireChannelActive()触发的,原理和channelRegistered 事件是一样的:

// DefaultChannelPipeline.java

public final ChannelPipeline fireChannelActive() {
    // 从pipeline的头部Handler开始向后传播事件
    AbstractChannelHandlerContext.invokeChannelActive(head);
    return this;
}
// AbstractChannelHandlerContext.java

static void invokeChannelActive(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelActive();
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelActive();
            }
        });
    }
}

private void invokeChannelActive() {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelActive(this);
        } catch (Throwable t) {
            invokeExceptionCaught(t);
        }
    } else {
        fireChannelActive();
    }
}

三、EventLoop

EventLoopGroup可以看成是一个管理EventLoop的容器,真正执行任务的是EventLoop,它是 Netty Reactor 线程模型的核心处理引擎。Netty中的所有I/O操作都是异步执行的,比如上一节中讲解的Channel注册,其实只是向EventLoop中提交了一个任务,由EventLoop负责异步执行,那么它是如何高效地实现事件循环和任务处理机制的呢?

本节,我就来分析NioEventLoop的底层原理,NioEventLoop的类继承体系如下图:




NioEventLoop 需要负责 IO 事件和非 IO 事件,NioEventLoop 内部有一个线程一直在执行 Selector 的 select 方法或者正在处理 SelectedKeys,如果有外部线程提交一个任务给NioEventLoop ,任务就会被放到 它内部的taskQueue 中,该队列是线程安全的,默认容量为 16:



我们之前分析EventLoopGroup源码时已经看过EventLoop的构造了,那里只是创建了EventLoop对象,并没有启动它的内部线程。事实上,EventLoop创建内部工作线程的时机是在第一个任务提交时,一般就是SocketChannel的register操作。

3.1 工作线程创建

我们从Channel的注册入手,看下EventLoop是如何进行任务调度的:

// AbstractChannel.java

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    //...
    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            // 重点看这里
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}

在EventLoop中,有一个重要的execute(Runnable command)方法(事实上继承自父类SingleThreadEventExecutor)

// SingleThreadEventExecutor.java

public void execute(Runnable task) {
    ObjectUtil.checkNotNull(task, "task");
    execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}

private void execute(Runnable task, boolean immediate) {
    // 1.判断当前执行线程是否为EventLoop内部的工作线程
    boolean inEventLoop = inEventLoop();
    // 2.添加任务到内部队列
    addTask(task);
    // 3.如果不是EventLoop内部线程提交的task,则判断内部线程是否已经启动,没有则启动内部线程
    if (!inEventLoop) {
        // 启动内部线程
        startThread();
        if (isShutdown()) {
            boolean reject = false;
            try {
                if (removeTask(task)) {
                    reject = true;
                }
            } catch (UnsupportedOperationException e) {
            }
            if (reject) {
                // 执行拒绝策略
                reject();
            }
        }
    }

    if (!addTaskWakesUp && immediate) {
        wakeup(inEventLoop);
    }
}

继续看startThread()方法:

// SingleThreadEventExecutor.java

private void startThread() {
    if (state == ST_NOT_STARTED) {
        // 这里用了CAS操作,以非阻塞的线程安全方式更新
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            boolean success = false;
            try {
                // 执行启动内部线程
                doStartThread();
                success = true;
            } finally {
                if (!success) {
                    STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                }
            }
        }
    }
}

doStartThread方法看起来很长,其实就是向NioEventLoop内部的ThreadPerTaskExecutor提交一个任务,ThreadPerTaskExecutor会创建一个新线程来执行这个任务,这个线程就是NioEventLoop内部的工作线程:

// SingleThreadEventExecutor.java

private volatile Thread thread;

private void doStartThread() {
    assert thread == null;
    // 对于NioEventLoop,这个executor就是ThreadPerTaskExecutor
    executor.execute(new Runnable() {
        @Override
        public void run() {
            // 将这个线程设置为NioEventLoop的内部工作线程
            thread = Thread.currentThread();
            if (interrupted) {
                thread.interrupt();
            }

            boolean success = false;
            updateLastExecutionTime();
            try {
                // 执行 SingleThreadEventExecutor 的 run() 方法,它在 NioEventLoop 中实现了
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
               //...
            }
        }
    });
}

工作线程会执行到上述代码的SingleThreadEventExecutor.this.run()这一行,这是一个抽象方法,需要子类也就是NioEventLoop自己实现,NioEventLoop会在此处实现自己的核心任务调用逻辑。

3.2 任务调度流程

我们继续看NioEventLoop的run方法,这是NioEventLoop的核心逻辑,NioEventLoop 每次循环的处理流程都包含IO事件轮询 select事件处理 processSelectedKeys任务处理 runAllTasks 几个步骤,其中有几个关键点我重点说一下:

  1. selectStrategy用于控制工作线程的select策略,在存在异步任务的场景,NioEventLoop 会优先保证 CPU 能够及时处理异步任务;
  2. ioRatio参数用于控制 I/O 事件处理和内部任务处理的时间比例,默认值为50。如果 ioRatio = 100,表示每次处理完 I/O 事件后,会执行所有的 task。如果 ioRatio < 100,也会优先处理完 I/O 事件,再处理异步任务队列。所以无论如何,processSelectedKeys() 都是先执行的。
// NioEventLoop.java

protected void run() {
    int selectCnt = 0;
    // 一直循环执行
    for (;;) {
        try {
            int strategy;
            try {
                // select处理策略,用于控制select循环行为,包含CONTINUE、SELECT、BUSY_WAIT三种策略
                // Netty不支持BUSY_WAIT,所以 BUSY_WAIT 与 SELECT 的执行逻辑是一样的
                strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.BUSY_WAIT:
                    case SelectStrategy.SELECT:
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        if (curDeadlineNanos == -1L) {
                            curDeadlineNanos = NONE; 
                        }
                        nextWakeupNanos.set(curDeadlineNanos);
                        try {
                            if (!hasTasks()) {
                                // 轮询I/O事件
                                strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            nextWakeupNanos.lazySet(AWAKE);
                        }
                    default:
                }
            } catch (IOException e) {
                //...
                continue;
            }

            selectCnt++;
            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            boolean ranTasks;
            // 根据ioRatio,选择执行IO操作还是内部队列中的任务
            if (ioRatio == 100) {
                try {
                    if (strategy > 0) {
                        processSelectedKeys();
                    }
                } finally {
                    ranTasks = runAllTasks();
                }
            } else if (strategy > 0) {
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();
                } finally {
                    final long ioTime = System.nanoTime() - ioStartTime;
                    ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            } else {
                ranTasks = runAllTasks(0); // This will run the minimum number of tasks
            }

            if (ranTasks || strategy > 0) {
                if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector);
                }
                selectCnt = 0;
            } else if (unexpectedSelectorWakeup(selectCnt)) { // 解决JDK的epoll空轮询问题
                selectCnt = 0;
            }
        }
        //...
    }
}

总之,NioEventLoop的run方法是一个无限循环,不间断执行以下三件事情:

  • select:轮询 Selector 选择器中已经注册的所有 Channel 的 I/O 事件;
  • processSelectedKeys:根据SelectedKeys,处理已经准备就绪的 I/O 事件;
  • runAllTasks:执行内部队列中的任务。


同时,为了提升效率,NioEventLoop会就是根据一定的策略和ioRatio参数配置,选择究竟是执行内部队列中的任务,还是执行IO操作。

epoll空轮询

注意,NioEventLoop的run方法中,有这么一段代码:

// NioEventLoop.java

protected void run() {
    int selectCnt = 0;
    // 一直循环执行
    for (;;) {
        //...
        if (ranTasks || strategy > 0) {
            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                logger.debug("..."");
            }
            selectCnt = 0;
        } else if (unexpectedSelectorWakeup(selectCnt)) { // 解决JDK的epoll空轮询问题
            selectCnt = 0;
        }
        //...
    }
}

private boolean unexpectedSelectorWakeup(int selectCnt) {
    if (Thread.interrupted()) {
        return true;
    }
    if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
        rebuildSelector();
        return true;
    }
    return false;
}

上述的unexpectedSelectorWakeup方法用于解决JDK NIO中 Epoll 实现的空轮询问题。所谓JDK Epoll 空轮询,是指NIO线程在没有感知到select事件时,应该处于阻塞状态,但是JDK的epoll实现会出现即使 Selector 轮询的事件列表为空,NIO线程一样可以被唤醒,导致 CPU 100% 占用。

Netty 作为一个高性能、高可靠的网络框架,需要保证 I/O 线程的安全性,Netty很巧妙的规避了这个问题, 它提供了一种检测机制判断I/O线程是否可能陷入空轮询,具体的实现方式如下:

  1. 首先,EventLoop在每次执行 select 操作前,都会记录当前时间 currentTimeNanos。通过时间比对,如果Netty发现NIO线程的阻塞时间并未达到预期,则认为可能触发了空轮询的 Bug;
  2. 同时,Netty 引入了计数变量 selectCnt。在正常情况下,selectCnt 会重置,否则会对 selectCnt 自增计数。当 selectCnt 达到 SELECTOR_AUTO_REBUILD_THRESHOLD(默认512) 阈值时,会触发重建 Selector 对象;
  3. 然后,Netty会将异常的 Selector 中所有的 SelectionKey 会重新注册到新建的 Selector 上,重建完成之后异常的 Selector 就可以废弃了。

总结一下,Netty解决JDK Epoll 空轮询 问题的思路就是:引入计数器变量,统计一定时间窗口内 select 操作的执行次数,识别出可能存在异常的 Selector 对象,然后采用重建 Selector 的方式巧妙地避免了 JDK epoll 空轮询的问题。

3.3 轮询I/O事件

先来看轮询IO事件,比较简单,就是调用底层Java NIO Selector的select方法

// NioEventLoop.java

private int select(long deadlineNanos) throws IOException {
    if (deadlineNanos == NONE) {
        // 调用了底层NIO Selector的select方法
        return selector.select();
    }
    long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
    return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}

3.4 处理I/O事件

通过 select 过程,NioEventLoop已经获取到准备就绪的 I/O 事件,接下来需要调用 processSelectedKeys() 方法处理IO事件:

// NioEventLoop.java

private SelectedSelectionKeySet selectedKeys;    // 保存java.nio.channels.SelectionKey的集合

private void processSelectedKeys() {
    if (selectedKeys != null) {
        // Netty优化过的selectedKeys
        processSelectedKeysOptimized();
    } else {
        // 正常处理逻辑
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

处理 I/O 事件时有两种选择,Netty 是否采用优化策略由 DISABLE_KEYSET_OPTIMIZATION参数决定:

  1. 处理 Netty 优化过的 selectedKeys;
  2. 正常的处理逻辑。

根据是否设置了 selectedKeys 来判断采用哪种策略,这两种策略的差异就是使用的 selectedKeys 集合不同。第一种Netty 优化过的 selectedKeys 是 SelectedSelectionKeySet 类型,而正常逻辑使用的是 JDK HashSet 类型。

processSelectedKeysPlain

我们先来看正常的处理逻辑:

// NioEventLoop.java

private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
    if (selectedKeys.isEmpty()) {
        return;
    }

    Iterator<SelectionKey> i = selectedKeys.iterator();
    for (;;) {
        final SelectionKey k = i.next();
        final Object a = k.attachment();
        i.remove();

        if (a instanceof AbstractNioChannel) {
            // I/O 事件由 Netty 负责处理
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            // 用户自定义任务
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        if (!i.hasNext()) {
            break;
        }

        // Netty在处理I/O事件时,如果发现超过256个(默认)Channel从Selector对象中移除,就会将needsToSelectAgain置true,重新做一次轮询,从而确保 keySet 的有效性
        if (needsToSelectAgain) {
            selectAgain();
            selectedKeys = selector.selectedKeys();
            if (selectedKeys.isEmpty()) {
                break;
            } else {
                i = selectedKeys.iterator();
            }
        }
    }
}

NioEventLoop会遍历已经就绪的 SelectionKey,SelectionKey 的类型可能是AbstractNioChannelNioTask,这两种类型对应的处理方式是不同的,我们关注AbstractNioChannel类型即可:

// NioEventLoop.java

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if (!k.isValid()) {    // 检查 Key 是否合法
        final EventLoop eventLoop;
        try {
            eventLoop = ch.eventLoop();
        } catch (Throwable ignored) {
            return;
        }
        if (eventLoop == this) {    
            unsafe.close(unsafe.voidPromise());    // Key 不合法,直接关闭连接
        }
        return;
    }

    try {
        int readyOps = k.readyOps();
        // 处理连接事件
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            // 将该事件从事件集合中清除,避免事件集合中一直存在连接建立事件
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);
            // 通知上层连接已经建立
            unsafe.finishConnect();
        }
        // 处理可写事件
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            ch.unsafe().forceFlush();
        }
        // 处理可读事件
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}
  • OP_CONNECT连接事件:EventLoop内部调用了unsafe.finishConnect(),底层调用pipeline().fireChannelActive() 方法,这时会产生一个 Inbound 事件,在Pipeline中传播,依次调用 ChannelHandler 的 channelActive() 方法,通知各个 ChannelHandler 连接建立成功;
  • OP_WRITE可写事件:内部会执行ch.unsafe().forceFlush()操作,将数据刷到对端,最终会调用Java NIO中Channel .write()方法执行底层写操作;
  • OP_READ可读事件:Netty 将 OP_READ和 OP_ACCEPT事件进行了统一封装,都通过unsafe.read()进行处理,unsafe.read() 的处理逻辑如下:
    • 从 Channel 中读取数据并存储到分配的 ByteBuf;
    • 调用pipeline.fireChannelRead()方法产生 Inbound 事件,在Pipeline中传播,依次调用 ChannelHandler 的 channelRead() 方法处理数据;
    • 调用pipeline.fireChannelReadComplete()方法完成读操作,同样在Pipeline传播;
    • 执行 removeReadOp()清除 OP_READ 事件。

processSelectedKeysOptimized

介绍完正常的 I/O 事件处理 processSelectedKeysPlain 后,我们再分析 Netty 优化的 processSelectedKeysOptimized 源码就轻松很多:

// NioEventLoop.java

private void processSelectedKeysOptimized() {
    for (int i = 0; i < selectedKeys.size; ++i) {
        final SelectionKey k = selectedKeys.keys[i];
        selectedKeys.keys[i] = null;

        final Object a = k.attachment();
        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        if (needsToSelectAgain) {
            // null out entries in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.reset(i + 1);

            selectAgain();
            i = -1;
        }
    }
}

可以发现 processSelectedKeysOptimized 与 processSelectedKeysPlain 的主要区别就是 selectedKeys 的遍历方式不同,来看下 SelectedSelectionKeySet 的源码:

// SelectedSelectionKeySet.java

final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
    SelectionKey[] keys;
    int size;

    SelectedSelectionKeySet() {
        keys = new SelectionKey[1024];
    }

    public boolean add(SelectionKey o) {
        if (o == null) {
            return false;
        }

        keys[size++] = o;
        if (size == keys.length) {
            increaseCapacity();
        }

        return true;
    }
    //...
}

可以看到,SelectedSelectionKeySet内部维护了一个SelectionKey数组,所以processSelectedKeysOptimized可以直接通过遍历数组取出 I/O 事件,相比processSelectedKeysPlain采用的 JDK HashSet 的遍历效率更高。

SelectedSelectionKeySet 内部通过 size 变量记录数组的逻辑长度(每次执行 add 操作,会将元素添加到 SelectionKey[] 尾部,当 size 等于 SelectionKey[] 的真实长度时,自动扩容),相比于HashSet,由于不需要考虑哈希冲突的问题,所以可以实现O(1)时间复杂度的 add 操作。

Netty在创建Selector时,会通过反射的方式,将 Selector 对象内部的 selectedKeys 和 publicSelectedKeys 替换为 SelectedSelectionKeySet,而原先 selectedKeys 和 publicSelectedKeys 这两个字段都是 HashSet 类型。

3.5 内部任务处理

NioEventLoop 不仅负责处理 I/O 事件,还要兼顾执行任务队列中的任务。任务队列遵循 FIFO 规则,可以保证任务执行的公平性,Netty没有使用J.U.C中的并发队列,而是自己实现了多生产者单消费者队列 MpscChunkedArrayQueue,这个我会在后续章节分析。

NioEventLoop 处理的任务类型基本可以分为三类:

  • 普通任务:通过NioEventLoop.execute()方法向任务队列 taskQueue 中添加任务,例如NioServerSocketChannel 的 注册;
  • 定时任务:通过 NioEventLoop.schedule()方法向定时任务队列 scheduledTaskQueue 添加一个定时任务,用于周期性执行该任务,例如,心跳消息发送等。定时任务队列 scheduledTaskQueue 采用优先队列 PriorityQueue 实现;
  • 尾部队列:tailTasks 相比于普通任务队列优先级较低,在每次执行完 taskQueue 中任务后会去获取尾部队列中任务执行。尾部任务并不常用,主要用于做一些收尾工作,例如统计事件循环的执行时间、监控信息上报等。

EventLoop处理内部任务有两个方法:

  • runAllTasks;
  • runAllTasks(long timeoutNanos)。

第二种方式携带了超时时间,防止NIO工作线程因为处理任务时间过长而导致 I/O 事件阻塞,我们来看下NioEventLoop中的runAllTasks(long timeoutNanos)方法:

// NioEventLoop.java

protected boolean runAllTasks(long timeoutNanos) {
    // 合并定时任务到普通任务队列
    fetchFromScheduledTaskQueue();
    // 从普通任务队列中取出任务并处理
    Runnable task = pollTask();
    if (task == null) {
        afterRunningAllTasks();
        return false;
    }
    // 计算任务处理的超时时间
    final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
    long runTasks = 0;
    long lastExecutionTime;
    for (;;) {
        // 安全执行任务,执行调用Runnable任务的run()方法同步执行
        safeExecute(task);
        runTasks ++;
        // 每执行 64 个任务检查一下是否超时
        if ((runTasks & 0x3F) == 0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                break;
            }
        }
        // 取出下一个任务,继续执行
        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }
    // 收尾工作,执行尾部队列tailTasks的任务
    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

EventLoop内部的定时任务队列scheduledTaskQueue本质是一个优先级队列,里面的定时任务只有满足截止时间 deadlineNanos小于当前时间,才可以取出合并到普通任务队列中。由于优先级队列是有序的,所以只要取出的定时任务不满足合并条件,那么队列中剩下的任务都不会满足条件,合并操作完成并退出。

最后,NioEventLoop执行内部队列中的任务的流程是比较简单的,就是不断取出Runnable任务,最终直接调用run方法执行:

// AbstractEventExecutor.java

protected static void safeExecute(Runnable task) {
    try {
        // 安全执行任务,执行调用Runnable任务的run()方法同步执行
        task.run();
    } catch (Throwable t) {
        logger.warn("A task raised an exception. Task: {}", task, t);
    }
}

另外,整个执行过程中还有一个收尾动作afterRunningAllTasks,主要是执行尾部队列tailTasks的任务。尾部队列并不常用,主要用于一些统计场景,比如,我们想对 Netty 的运行状态进行统计分析,如任务循环耗时、占用内存大小等等,则可以向尾部队列添加一个任务执行统计。

四、总结

本章,我对Netty中核心的EventLoopGroup和EventLoop的底层原理进行了分析,核心是EventLoop的设计和实现。EventLoop的设计思想被广泛运用于各类的开源框架中,如 Kafka、Redis、Nginx等等,学习它的源码可以让我们对Java NIO有更深层次的理解。

通过EventLoop的底层源码解读,我们应该意识到在使用Netty进行日常开发时,需要遵循一些原则:

  1. 尽量使用主从Reactor模型,即采用 Boss 和 Worker 两个 EventLoopGroup,分担 Reactor 线程的压力;
  2. Reactor线程模式只适合处理耗时短的任务场景,对于耗时较长的业务 ChannelHandler,最好自己维护一个业务线程池,异步处理处理任务,避免因为ChannelHandler阻塞而造成EventLoop不可用。
  3. 在设计业务架构时,需要明确业务分层和 Netty 分层之间的界限,不要一味地将业务逻辑都添加到 ChannelHandler 中,完全可以基于MQ等中间件进行解耦。
正文到此结束

感谢赞赏~

本文目录