原创

透彻理解Java网络编程(七)——Reactor模式

上一章,我对Java NIO进行了介绍。我们使用的很多开源中间件底层均使用了Java NIO,比如Kafka、Zookeeper就基于Java NIO构建了自己的网络通信组件。

在Java网络编程中,如果使用Java NIO,通常是和Reactor模式结合在一起,构建通信模块。本章,我就对Reactor模式进行介绍,并给出使用Java NIO实现Reactor模式的代码示例。

一、Reactor模式

Reactor模式本质是一种事件驱动模型,Doug Lea 曾在《Scalable IO in Java》中对Reactor模式进行定义,Reactor模式由Reactor反应器线程、Handlers处理器两大角色组成:

  1. Reactor反应器线程:负责响应IO事件,并且分发到Handlers处理器;
  2. Handlers处理器:非阻塞的执行业务处理逻辑。

Reactor模式有多个并发输入源,一个Service Handler,多个Request Handlers。这个Service Handler会同步的将请求(Event)多路复用地分发给相应的Request Handler。



1.1 核心组件

从结构上看,Reactor模式有点类似生产者/消费者模式,但是Reactor模式没有queue来做缓冲,每当一个Event输入到Service Handler后,Service Handler会主动根据Event类型分发给对应的Request Handler来处理,并且Reactor模式底层需要依赖操作系统的多路复用函数。



我们通过上述时序图,来看下Reactor模式的具体执行流程:

  1. 初始化一个Initiation Dispatcher,相当于一个容器和Reactor模式的调用入口;
  2. 创建一系列Event Handler,每个Event Handler包含对应的Handle引用,并将Event Handler注册到Initiation Dispatcher中;
  3. 调用Initiation Dispatcher的handle_events方法,来启动事件循环;
  4. Initiation Dispatcher内部使用Synchronous Event Demultiplexer的select方法等待这些handle上事件的发生;
  5. 当某个Handle的Event发生后,select()方法返回,Initiation Dispatcher根据返回的Handle找到注册的EventHandler,并回调该Event Handler的handle_events()方法来进行事件处理。

1.2 模式演化

上述描述的是通用意义上的Reactor模式核心组件以及执行流程,具体落地时根据实现情况有所不同。Doug Lea比较好的描述了Reactor模式的几个不同变种及其演化过程。Doug Lea认为,基本上所有的I/O处理程序都可以抽象成以下处理过程:

  1. Read request;
  2. Decode request
  3. Process service
  4. Encode reply
  5. Send reply

针对处理流程的模式不同,Reactor模式也有很多变种,我在下一节详细讲解:

  • Thread-Per-Connection模式;
  • 单线程Reactor模式;
  • 多线程Reactor模式;
  • 主从Reactor模式。

二、Thread-Per-Connection模式

Thread-Per-Connection模式,就是对于每一个网络连接都分配一个线程进行处理:



2.1 示例

示例代码如下:

class ThreadPerConnection implements Runnable {
    public void run() {
        try {
            // 服务器监听socket
            ServerSocket serverSocket = new ServerSocket(NioDemoConfig.SOCKET_SERVER_PORT);
            while (!Thread.interrupted()) {
                Socket socket = serverSocket.accept();
                // 创建新线程,专门负责一个连接的处理
                new Thread(new Handler(socket)).start();
            }
        } catch (IOException ex) { /* 处理异常 */ }
    }

    // 处理器对象
    static class Handler implements Runnable {
        final Socket socket;
        Handler(Socket s) {
            socket = s;
        }
        public void run() {
            while (true) {
                try {
                    byte[] input = new byte[NioDemoConfig.SERVER_BUFFER_SIZE];
                    // 读取数据
                    socket.getInputStream().read(input);
                    // 处理业务逻辑,获取处理结果
                    byte[] output = null;
                    // 写入结果
                    socket.getOutputStream().write(output);
                } catch (IOException ex) { /*处理异常*/ }
            }
        }
    }
}

2.2 优缺点

这种方式的优点就是简单,缺点也很明显:

  1. 创建/销毁线程开销太大,且机器本身的线程资源有限;
  2. 即使使用线程池,当线程从输入流读取数据时,如果没有足够数据,线程依然会进入阻塞状态,此时线程啥也不能干,造成资源浪费;
  3. 无法应对瞬间的峰值流量,可能瞬间将线程池占满,系统中的线程也是比较昂贵的系统资源,线程数太多,系统无法承受。

三、单Reactor单线程模式

在Reactor模式中,有两类重要的角色——Reactor反应器和Handler处理器:

  1. Reactor反应器:负责查询IO事件,当检测到一个IO事件,将其发送给相应的Handler处理器去处理。这里的IO事件,就是NIO中Selector监控的通道IO事件;
  2. Handler处理器:与IO事件(或者选择键)绑定,负责IO事件的处理,完成真正的连接建立、通道读取、处理业务逻辑、将结果写入通道等操作。

单Reactor单线程模式,是指Reactor反应器和Handers处理器处于一个线程中执行。这种模式下,Reactor线程是个多面手,负责多路分离套接字,accept新连接,并分派请求到处理器链中:



3.1 示例

我这里也给出单Reactor单线程的示例代码,一共三个重要类:

  • Reactor:Reactor类,负责建立新连接,分发处理;
  • AcceptorHandler:负责对已经建立连接的Channel进行处理;
  • IOHandler:完成业务逻辑。
public class Reactor implements Runnable {
    private Selector selector;
    private ServerSocketChannel serverSocket;

    Reactor() throws IOException {
        //...获取选择器、开启ServerSocket服务监听通道
        //...绑定AcceptorHandler新连接处理器到selectKey
    }

    public static void main(String[] args) throws IOException {
        new Thread(new Reactor()).start();
    }

    // 轮询和分发事件
    public void run() {
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set<SelectionKey> selected = selector.selectedKeys();
                Iterator<SelectionKey> it = selected.iterator();
                while (it.hasNext()) {
                    SelectionKey sk = it.next();
                    dispatch(sk);
                }
                selected.clear();
            }
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }

    void dispatch(SelectionKey sk) {
        Runnable handler = (Runnable) sk.attachment();
        if (handler != null) {
            handler.run();
        }
    }

    class AcceptorHandler implements Runnable {
        public void run() {
            try {
                SocketChannel channel = serverSocket.accept();
                if (channel != null)
                    new IOHandler(selector, channel);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
class IOHandler implements Runnable {
    private static final int RECIEVING = 0, SENDING = 1;
    private final SocketChannel channel;
    private final SelectionKey sk;
    private final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    private int state = RECIEVING;

    IOHandler(Selector selector, SocketChannel c) throws IOException {
        this.channel = c;
        c.configureBlocking(false);
        this.sk = this.channel.register(selector, 0);
        this.sk.attach(this);
        this.sk.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }

    public void run() {
        try {
            if (state == SENDING) {
                channel.write(byteBuffer);
                byteBuffer.clear();
                sk.interestOps(SelectionKey.OP_READ);
                state = RECIEVING;
            } else if (state == RECIEVING) {
                int length = 0;
                while ((length = channel.read(byteBuffer)) > 0) {
                    Logger.info(new String(byteBuffer.array(), 0, length));
                }
                byteBuffer.flip();
                sk.interestOps(SelectionKey.OP_WRITE);
                state = SENDING;
            }
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}

在上面的代码中,设计了一个AcceptorHandler处理器,并attach到ServerSocketChannel中。AcceptorHandler处理器的两大职责:一是接受新连接,二是在为新连接创建一个输入输出的IOHandler。IOHandler,负责Socket的数据输入、业务处理、结果输出。

上述示例的单Reactor单线程模式Reactor反应器和所有的Handler处理器,都执行在同一条线程中。

3.2 优缺点

单Reactor单线程模式,适用于处理器链中业务处理组件能快速完成的场景。这种模型不能充分利用多核资源,所以实际使用的不多,此外它还有以下缺点:

  1. Reactor反应器和Handler处理器,都执行在同一条线程上。一个NIO线程同时处理成百上千的链路,性能上无法支撑。即便NIO线程的CPU负荷达到100%,也无法满足海量消息的编码、解码、读取和发送;
  2. 当NIO线程负载过重之后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往进行重发,这更加重了NIO线程的负载,最终导致大量消息积压和处理超时,NIO线程会成为系统的性能瓶颈。

四、单Reactor多线程模式

单Reactor多线程模式与单Reactor单线程模式最大区别就是有一组NIO线程处理I/O操作,它的特点如下:

  • 有一个专门的NIO线程——acceptor线程,用于监听服务端,接收客户端的TCP连接请求;
  • 网络读写等I/O操作由一个NIO线程池负责,线程池可以采用标准的JDK线程池实现,它包含一个任务队列和N个可用的线程,由这些NIO线程负责消息的读取、解码、编码和发送;
  • 1个acceptor线程可以同时处理N条链路,但是1个链路只对应1个NIO线程,防止发生并发操作问题。


4.1 示例

我们来看一个单Reactor多线程模式的示例,它是我在《透彻理解分布式存储》专栏中,模仿Kafka Broker网络通信模块——NIO Server的一个实现:



上述这张NIO Server的架构图,核心组件定义如下:

  • DataNodeNIOServer:相当于Kafka中的Acceptor线程,负责监听客户端的连接事件,并把建立完成连接的SocketChannel交给各个Processor线程;
  • NioProcessor:相当于Kafka中的Processor线程,负责监听SocketChannel的OP_READ/OP_WRITE事件,解析客户端请求交给业务线程处理,并从响应队列中获取业务线程处理完的结果,响应返回客户端;
  • IOThread:业务线程,负责处理Processor线程解析完的请求,执行业务逻辑,然后将响应扔到Processor线程对应的响应队列中;
  • NetworkRequestQueue:全局请求队列,NioProcessor线程解析完请求后,会将请求封装成NetworkRequest对象,扔到该队列中,IOThread线程会从该队列中获取请求并处理;
  • NetworkResponseQueues:响应队列,内部为每个Processor线程分配了一个队列,IOThread线程会将处理结果扔到该队列中;
  • NetworkRequest:请求对象的抽象,负责从SocketChannel中读取完整请求的字节流;
  • NetworkResponse:响应对象的抽象,负责向SocketChannel写入完整响应的字节流。

DataNodeNIOServer

我们先来看DataNodeNIOServer,它负责监听客户端的连接请求,并将建立好的连接交给Processor线程处理。可以看到,DataNodeNIOServer在构造过程中会创建一系列的Processor线程和IO线程,并给每个Processor线程分配一个响应队列:

/**
 * DataNode NIO Server
 *
 * @author Ressmix
 */
public class DataNodeNIOServer extends Thread {
    public static final Integer PROCESSOR_NUM = 10;
    public static final Integer IO_THREAD_NUM = 10;

    private Selector selector;
    private List<NioProcessor> processors = new ArrayList<>();
    private NameNodeRpcClient rpcClient;

    public DataNodeNIOServer(NameNodeRpcClient rpcClient) {
        this.rpcClient = rpcClient;
        init();
    }

    @Override
    public void run() {
        while (true) {
            try {
                // 阻塞等待
                selector.select();
                Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();
                while (keysIterator.hasNext()) {
                    SelectionKey key = (SelectionKey) keysIterator.next();
                    keysIterator.remove();
                    // 建立连接
                    if (key.isAcceptable()) {
                        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
                        SocketChannel channel = serverSocketChannel.accept();
                        if (channel != null) {
                            // 将建立连接的SocketChannel交给Processor处理
                            channel.configureBlocking(false);
                            Integer processorIndex = new Random().nextInt(PROCESSOR_NUM);
                            NioProcessor processor = processors.get(processorIndex);
                            processor.addChannel(channel);
                        }
                    }
                }
            } catch (Throwable t) {
                t.printStackTrace();
            }
        }
    }

    /*---------------------------------PRIVATE METHOD--------------------------------*/

    private void init() {
        ServerSocketChannel serverChannel = null;
        try {
            // 监听OP_ACCEPT事件
            selector = Selector.open();
            serverChannel = ServerSocketChannel.open();
            serverChannel.configureBlocking(false);
            serverChannel.socket().bind(new InetSocketAddress(NIO_PORT), 100);
            serverChannel.register(selector, SelectionKey.OP_ACCEPT);

            System.out.println("NIOServer已经启动,开始监听端口:" + NIO_PORT);

            // 创建响应队列
            NetworkResponseQueues responseQueues = NetworkResponseQueues.getInstance();
            // 创建Processor线程,每个线程关联一个响应队列
            for (int i = 0; i < PROCESSOR_NUM; i++) {
                NioProcessor processor = new NioProcessor(i);
                processors.add(processor);
                processor.start();
                // 每个Processor线程分配一个响应队列
                responseQueues.assignQueue(i);
            }

            // 创建IO线程
            for (int i = 0; i < IO_THREAD_NUM; i++) {
                new IOThread(rpcClient).start();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

NioProcessor

NioProcessor负责监听已经建立连接的SocketChannel的OP_READOP_WRITE事件,它的整个处理流程遵循一定的模式:

  1. 从内存队列中移除一个已经建立连接的SocketChannel,将它注册到自己的Selector上,并监听OP_READ事件;
  2. 从自己的响应队列中移除一个响应,并在该响应相关的SocketChannel上监听OP_WRITE事件;
  3. 不断轮询Selector监听的发生事件的SocketChannel:
    • 如果是OP_READ事件,则创建一个NetworkRequest对象并将完整请求缓存其中,然后取消对该SocketChannel的OP_READ事件的关注,并交给IO线程处理;
    • 如果是OP_WRITE事件,则向该SocketChannel写入完整响应,并让其取消对OP_WRITE事件的关注。

经过上面这样的处理模式,一定能保证对于同一个客户端的请求,肯定可以处理完一个完整请求/响应后,再进行下一个请求的处理,这是一种”无锁串行化”的设计思想,在NIO编程中很常见

/**
 * Processor线程
 *
 * @author Ressmix
 */
public class NioProcessor extends Thread {
    // Processor唯一标识
    private volatile Integer processorId;

    // 等待注册连接的队列
    private ConcurrentLinkedQueue<SocketChannel> channelQueue = new ConcurrentLinkedQueue<>();

    // 多路复用监听时的最大阻塞时间
    private static final Long POLL_BLOCK_MAX_TIME = 1000L;

    // 每个Processor私有的Selector多路复用器
    private Selector selector;

    // 缓存未读完的请求,Key为客户端IP
    private Map<String, NetworkRequest> cachedRequests = new HashMap<>();

    // 缓存未发送完的响应,Key为客户端IP
    private Map<String, NetworkResponse> cachedResponses = new HashMap<>();

    // 当前Processor维护的所有SelectionKey,Key为客户端IP
    private Map<String, SelectionKey> cachedKeys = new HashMap<>();

    public NioProcessor(Integer processorId) {
        super();
        this.processorId = processorId;
        try {
            this.selector = Selector.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public Integer getProcessorId() {
        return this.processorId;
    }

    public void addChannel(SocketChannel channel) {
        channelQueue.offer(channel);
        // 唤醒Selector
        // 因为Processor自身线程可能在阻塞等待,所以当有新连接添加队列时,需要由server线程唤起它
        selector.wakeup();
    }

    @Override
    public void run() {
        while (true) {
            try {
                // 1.不断对已经建立连接的SocketChannel监听OP_READ事件
                registerQueuedClients();
                // 2.不断对需要返回响应的SocketChannel监听OP_WRITE事件
                cacheQueuedResponse();
                // 3.处理OP_READ事件和OP_WRITE事件
                poll();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


    /*----------------------- PRIVATE METHOD -----------------------------*/
    private void registerQueuedClients() {
        SocketChannel channel = null;
        // 不断出队元素
        while ((channel = channelQueue.poll()) != null) {
            try {
                // 将已经建立连接的Channel注册到Selector上,并监听它的OP_READ事件
                channel.register(selector, SelectionKey.OP_READ);
            } catch (ClosedChannelException e) {
                e.printStackTrace();
            }
        }
    }

    private void cacheQueuedResponse() {
        NetworkResponseQueues responseQueues = NetworkResponseQueues.getInstance();
        NetworkResponse response = null;
        // 遍历当前Processor自己的响应队列中的响应
        while ((response = responseQueues.poll(processorId)) != null) {
            String client = response.getClient();
            cachedResponses.put(client, response);
            // 关注OP_WRITE事件
            cachedKeys.get(client).interestOps(SelectionKey.OP_WRITE);
        }
    }

    private void poll() {
        try {
            // 这里Processor线程可能会阻塞等待
            int keys = selector.select(POLL_BLOCK_MAX_TIME);
            if (keys > 0) {
                Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
                while (keyIterator.hasNext()) {
                    try {
                        SelectionKey key = keyIterator.next();
                        keyIterator.remove();

                        SocketChannel channel = (SocketChannel) key.channel();
                        // 客户端IP地址
                        String client = channel.getRemoteAddress().toString();

                        // 1.发生读事件
                        if (key.isReadable()) {
                            NetworkRequest request = null;
                            if (cachedRequests.get(client) != null) {
                                // 缓存中有,说明上一次未读完,出现了拆包
                                request = cachedRequests.get(client);
                            } else {
                                request = new NetworkRequest();
                            }
                            // 执行读取操作
                            request.setChannel(channel);
                            request.setKey(key);
                            request.read();
                            // 1.1读取完成
                            if (request.hasCompletedRead()) {
                                // 将完整的请求分发到一个全局请求队列中,由IO线程处理
                                request.setClient(client);
                                NetworkRequestQueue.getInstance().offer(request);
                                cachedKeys.put(client, key);
                                // 删除缓存
                                cachedRequests.remove(client);
                                // 取消对OP_READ的关注
                                key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
                            }
                            // 1.2 没有读取完成,缓存等待下次继续读取
                            else {
                                cachedRequests.put(client, request);
                            }
                        }
                        // 2.发生写事件
                        else if (key.isWritable()) {
                            NetworkResponse response = cachedResponses.get(client);
                            // 发送响应
                            channel.write(response.getBuffer());
                            cachedResponses.remove(client);
                            cachedKeys.remove(client);
                            // 取消对OP_WRITE事件的关注
                            key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

上述代码中的cachedRequestscachedResponses主要是用来处理拆包问题,针对没有读取完的请求,按照客户端IP进行缓存,这样就保证了一定能够读完一个完整请求。

NetworkRequestQueue

NetworkRequestQueue是一个全局请求队列,Processor线程解析完SocketChannel后,会将包含完整请求的NetworkRequest对象扔到该队列中。IO线程会从该队列中获取请求进行处理:

/**
 * 全局请求队列
 *
 * @author Ressmix
 */
public class NetworkRequestQueue {
    private final ConcurrentLinkedQueue<NetworkRequest> requestQueue = new ConcurrentLinkedQueue<>();

    private NetworkRequestQueue() {
    }

    private static class InstanceHolder {
        private static final NetworkRequestQueue instance = new NetworkRequestQueue();
    }

    public static NetworkRequestQueue getInstance() {
        return InstanceHolder.instance;
    }

    public void offer(NetworkRequest request) {
        requestQueue.offer(request);
    }

    public NetworkRequest poll() {
        return requestQueue.poll();
    }
}

IOThread

IOThread负责处理业务逻辑,它会从全局请求队列NetworkRequestQueue中不断获取请求,然后进行处理,最后将处理结果封装成NetworkResponse对象,存放到Processor线程的响应队列中:

/**
 * 业务线程
 */
public class IOThread extends Thread {
    // 文件上传
    public static final Integer REQUEST_SEND_FILE = 1;
    // 文件下载
    public static final Integer REQUEST_READ_FILE = 2;

    // 全局请求队列
    private NetworkRequestQueue requestQueue = NetworkRequestQueue.getInstance();

    private final NameNodeRpcClient rpcClient;

    public IOThread(NameNodeRpcClient rpcClient) {
        super();
        this.rpcClient = rpcClient;
    }

    @Override
    public void run() {
        while (true) {
            try {
                // 1.不断从全局请求队列中获取NetworkRequest
                NetworkRequest request = requestQueue.poll();
                if (request == null) {
                    Thread.sleep(100);
                    continue;
                }

                Integer requestType = request.getRequestType();
                // 如果是文件上传请求
                if (requestType.equals(REQUEST_SEND_FILE)) {
                    writeFileToLocalDisk(request);
                }
                // 如果是文件下载请求
                else if (requestType.equals(REQUEST_READ_FILE)) {
                    readFileFromLocalDisk(request);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 写本地磁盘
     */
    private void writeFileToLocalDisk(NetworkRequest request) {
        // 构建针对本地文件的输出流
        FileOutputStream localFileOut = null;
        FileChannel localFileChannel = null;

        try {
            // 1.写磁盘
            localFileOut = new FileOutputStream(request.getFilename());
            localFileChannel = localFileOut.getChannel();
            localFileChannel.position(localFileChannel.size());
            System.out.println("对本地磁盘文件定位到position=" + localFileChannel.size());

            int written = localFileChannel.write(request.getFileContent());
            System.out.println("本次文件上传完毕,将" + written + " bytes的数据写入本地磁盘文件.......");

            // 2.增量上报
            rpcClient.deltaReportDataNodeInfo(request.getFilename(), request.getFilesize());
            System.out.println("增量上报收到的文件副本给NameNode节点......");

            // 3.封装响应
            NetworkResponse response = new NetworkResponse();
            response.setClient(request.getClient());
            response.setBuffer(ByteBuffer.wrap("SUCCESS".getBytes()));
            NetworkResponseQueues.getInstance().offer(request.getProcessorId(), response);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                localFileChannel.close();
                localFileOut.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 从本地磁盘读文件
     */
    private void readFileFromLocalDisk(NetworkRequest request) {
        FileInputStream localFileIn = null;
        FileChannel localFileChannel = null;

        try {
            // 从磁盘读取文件
            File file = new File(request.getFilename());
            Long fileLength = file.length();
            localFileIn = new FileInputStream(request.getFilename());
            localFileChannel = localFileIn.getChannel();

            // 响应buffer:8字节响应头(存文件大小)+文件内容
            ByteBuffer buffer = ByteBuffer.allocate(8 + Integer.valueOf(String.valueOf(fileLength)));
            buffer.putLong(fileLength);
            int hasReadImageLength = localFileChannel.read(buffer);
            System.out.println("从本次磁盘文件中读取了" + hasReadImageLength + " bytes的数据");

            buffer.rewind();

            // 封装响应,扔到处理该请求的Processor的响应队列中
            NetworkResponse response = new NetworkResponse();
            response.setClient(request.getClient());
            response.setBuffer(buffer);
            NetworkResponseQueues.getInstance().offer(request.getProcessorId(), response);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (localFileChannel != null) {
                    localFileChannel.close();
                }
                if (localFileIn != null) {
                    localFileIn.close();
                }
            } catch (Exception ex2) {
                ex2.printStackTrace();
            }
        }
    }
}

NetworkResponseQueues

NetworkResponseQueues内部封装了每个Processor独占的响应队列:

/**
 * 响应队列
 */
public class NetworkResponseQueues {

    private NetworkResponseQueues() {
    }

    // KEY为Processor标识,每个Processor线程对应一个响应队列
    private Map<Integer, ConcurrentLinkedQueue<NetworkResponse>> responseQueues = new HashMap<>();

    public void assignQueue(Integer processorId) {
        ConcurrentLinkedQueue<NetworkResponse> queue = new ConcurrentLinkedQueue<>();
        responseQueues.put(processorId, queue);
    }

    private static class InstanceHolder {
        private static final NetworkResponseQueues instance = new NetworkResponseQueues();
    }

    public static NetworkResponseQueues getInstance() {
        return InstanceHolder.instance;
    }

    // 添加一个响应
    public void offer(Integer processorId, NetworkResponse response) {
        responseQueues.get(processorId).offer(response);
    }

    // 获取一个响应
    public NetworkResponse poll(Integer processorId) {
        return responseQueues.get(processorId).poll();
    }
}

NetworkRequest

NetworkRequest内部包含了一个完整请求的数据,并提供read接口从SocketChannel中读取字节:

public class NetworkRequest {
    // 文件上传
    public static final Integer REQUEST_SEND_FILE = 1;
    // 文件下载
    public static final Integer REQUEST_READ_FILE = 2;

    // Processor标识
    private Integer processorId;

    // 该请求是哪个客户端发送过来的
    private String client;

    // 本次网络请求对应的SelectionKey
    private SelectionKey key;

    // 本次网络请求对应的Channel
    private SocketChannel channel;

    // 缓存的数据,处理拆包
    private InflightRequest cachedRequest = new InflightRequest();
    private ByteBuffer cachedRequestTypeBuffer;
    private ByteBuffer cachedFilenameLengthBuffer;
    private ByteBuffer cachedFilenameBuffer;
    private ByteBuffer cachedFileLengthBuffer;
    private ByteBuffer cachedFileBuffer;

    /**
     * 读取字节流
     */
    public void read() {
        try {
            Integer requestType = null;
            if (cachedRequest.requestType != null) {
                requestType = cachedRequest.requestType;
            } else {
                requestType = getRequestType(channel);
            }
            if (requestType == null) {
                return;
            }
            System.out.println("从请求中解析出来请求类型:" + requestType);

            if (REQUEST_SEND_FILE.equals(requestType)) {
                // 处理上传文件请求
                handleSendFileRequest(channel, key);
            } else if (REQUEST_READ_FILE.equals(requestType)) {
                // 处理下载文件请求
                handleReadFileRequest(channel, key);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 获取本次请求的类型
     */
    public Integer getRequestType(SocketChannel channel) throws Exception {
        Integer requestType = null;

        if (cachedRequest.requestType != null) {
            return cachedRequest.requestType;
        }

        ByteBuffer requestTypeBuffer = null;
        if (cachedRequestTypeBuffer != null) {
            requestTypeBuffer = cachedRequestTypeBuffer;
        } else {
            requestTypeBuffer = ByteBuffer.allocate(4);
        }

        channel.read(requestTypeBuffer);
        if (!requestTypeBuffer.hasRemaining()) {
            // 已经读取出来了4个字节,可以提取出来requestType了
            // 将position变为0,limit还是维持着4
            requestTypeBuffer.rewind();
            requestType = requestTypeBuffer.getInt();
            cachedRequest.requestType = requestType;
        } else {
            cachedRequestTypeBuffer = requestTypeBuffer;
        }
        return requestType;
    }

    /**
     * 发送文件
     */
    private void handleSendFileRequest(SocketChannel channel, SelectionKey key) throws Exception {
        // 从请求中解析文件名
        String filename = getFilename(channel);
        System.out.println("从网络请求中解析出来文件名:" + filename);
        if (filename == null) {
            return;
        }
        // 从请求中解析文件大小
        Long fileLength = getFileLength(channel);
        System.out.println("从网络请求中解析出来文件大小:" + fileLength);
        if (fileLength == null) {
            return;
        }

        // 循环不断的从channel里读取数据,并写入磁盘文件
        ByteBuffer fileBuffer = null;
        if (cachedFileBuffer != null) {
            fileBuffer = cachedFileBuffer;
        } else {
            fileBuffer = ByteBuffer.allocate(Integer.valueOf(String.valueOf(fileLength)));
        }

        channel.read(fileBuffer);
        if (!fileBuffer.hasRemaining()) {
            fileBuffer.rewind();
            cachedRequest.fileContent = fileBuffer;
            cachedRequest.hasCompletedRead = true;
            System.out.println("本次文件上传请求读取完毕.......");
        } else {
            cachedFileBuffer = fileBuffer;
            System.out.println("本次文件上传出现拆包问题,缓存起来,下次继续读取.......");
        }
    }

    /**
     * 获取文件名
     */
    private String getFilename(SocketChannel channel) throws Exception {
        String filename = null;
        if (cachedRequest.filename != null) {
            return cachedRequest.filename;
        } else {
            Integer filenameLength = null;
            // 读取文件名的大小
            if(cachedRequest.filenameLength == null) {
                ByteBuffer filenameLengthBuffer = null;
                if(cachedFilenameLengthBuffer != null) {
                    filenameLengthBuffer = cachedFilenameLengthBuffer;
                } else {
                    filenameLengthBuffer = ByteBuffer.allocate(4);
                }

                channel.read(filenameLengthBuffer);

                if(!filenameLengthBuffer.hasRemaining()) {
                    filenameLengthBuffer.rewind();
                    filenameLength = filenameLengthBuffer.getInt();
                    cachedRequest.filenameLength = filenameLength;
                } else {
                    cachedFilenameLengthBuffer = filenameLengthBuffer;
                    return null;
                }
            }

            // 读取文件名
            ByteBuffer filenameBuffer = null;
            if(cachedFilenameBuffer != null) {
                filenameBuffer = cachedFilenameBuffer;
            } else {
                filenameBuffer = ByteBuffer.allocate(filenameLength);
            }

            channel.read(filenameBuffer);
            if(!filenameBuffer.hasRemaining()) {
                filenameBuffer.rewind();
                filename = new String(filenameBuffer.array());
            } else {
                cachedFilenameBuffer = filenameBuffer;
            }
            cachedRequest.filename = filename;
        }
        return filename;
    }

    /**
     * 获取文件大小
     */
    private Long getFileLength(SocketChannel channel) {
        Long fileLength = null;

        if(cachedRequest.filesize != null) {
            return cachedRequest.filesize;
        } else {
            ByteBuffer filesizeBuffer = null;
            if(cachedFileLengthBuffer != null) {
                filesizeBuffer = cachedFileLengthBuffer;
            } else {
                filesizeBuffer = ByteBuffer.allocate(8);
            }


            if(!filesizeBuffer.hasRemaining()) {
                filesizeBuffer.rewind();
                fileLength = filesizeBuffer.getLong();
                cachedRequest.filesize = fileLength;
            } else {
                cachedFileLengthBuffer = filesizeBuffer;
            }
        }
        return fileLength;
    }


    /**
     * 读取文件
     */
    private void handleReadFileRequest(SocketChannel channel, SelectionKey key) throws Exception {
        // 从请求中解析文件名
        String filename = getFilename(channel);
        System.out.println("从网络请求中解析出来文件名:" + filename);
        if(filename == null) {
            return;
        }
        cachedRequest.hasCompletedRead = true;
    }

    /**
     * 本次请求是否已经读取完成
     */
    public boolean hasCompletedRead() {
        Long hasReaded = cachedRequest.hasReadedSize;
        Long total = cachedRequest.filesize;
        if (hasReaded == null) {
            return false;
        }
        if (total == null) {
            return false;
        }
        return hasReaded.equals(total);
    }


    /**
     * 缓存数据
     */
    class InflightRequest {
        // 请求类型
        Integer requestType;
        // 文件名,以前缀分隔符开始,比如/dir/enclosure/qq.jpg
        String filename;
        // 文件名大小
        Integer filenameLength;
        // 文件总大小
        Long filesize;
        // 文件内容
        ByteBuffer fileContent;
        // 已读取的大小
        Long hasReadedSize;
        // 是否读取完整
        Boolean hasCompletedRead = false;

        public InflightRequest(String filename, Long imageSize, Long hasReadedSize, Integer requestType) {
            this.filename = filename;
            this.filesize = imageSize;
            this.hasReadedSize = hasReadedSize;
            this.requestType = requestType;
        }

        public InflightRequest() {
        }
    }
    //...
}

上述的整个处理流程是很清晰,就是按照我们自定义好的报文格式解析请求,核心点就是对拆包的处理逻辑:

  1. 判断缓存的数据是否存在,存在则直接返回;
  2. 判断缓存数据的ByteBuffer是否已经读完,读完则缓存数据,否则把ByteBuffer缓存起来。

NetworkResponse

NetworkResponse内部包含了一个完整响应的数据,由IO线程创建并写入数据:

public class NetworkResponse {
    private String client;
    private ByteBuffer buffer;

    public ByteBuffer getBuffer() {
        return buffer;
    }

    public void setBuffer(ByteBuffer buffer) {
        this.buffer = buffer;
    }

    public String getClient() {
        return client;
    }

    public void setClient(String client) {
        this.client = client;
    }
}

4.2 优缺点

单Reactor多线程模式,是实际使用最多的Reactor模式,在绝大多数场景都可以满足性能需求。Kafka和Zookeeper底层都是采用了多线程Reactor模式。

但是在极特殊应用场景中,由一个Acceptor线程负责监听和处理所有的客户端连接,可能会存在性能问题。例如,百万客户端并发连接,或者服务端需要对客户端的握手信息进行安全认证,而认证本身非常损耗性能。

在这类场景下,单独一个Acceptor线程可能会存在性能不足问题,为了解决性能问题,产生了主从Reactor多线程模型

五、主从Reactor多线程模式

主从Reactor多线程模式比起单Reactor多线程模式,是将Reactor分成两部分:

  • mainReactor负责监听ServerSocket,accept新连接,并将建立的Socket分派给subReactor;
  • subReactor负责多路分离已连接的Socket,读写网络数据,并交给worker线程池处理。

通常,mainReactor和subReactor这两个线程池中的线程数与CPU核数有关,具体还是要根据机器的压测情况确定:



特点:

  • 服务端不再只用一个Acceptor线程接收客户端连接,而是一个独立的MainReactor线程池;
  • mainReactor线程池中的每个线程都可以接收客户端的TCP连接请求,并将建立完连接后的SocketChannel交给subReactor线程池处理;
  • subReactor线程池中的线程会将接收到的SocketChannel注册到自己的Selector上,并负责监听和处理该SocketChannel的读/写事件;
  • mainReactor线程池只用于客户端的登录、握手和安全认证,一旦链路建立成功,就将链路注册到后端subReactor线程池的线程上,由它们负责后续的I/O操作。

Netty框架底层就是采用主从Reactor多线程模式,后续我在Netty源码分析章节会详细介绍。

六、总结

本章,我对Reactor模式进行了讲解,并使用Java NIO给出了单Reactor多线程模式的代码示例。事实上,Kafka和Zookeeper也是基于Java NIO和Reactor模式构建了自己的底层通信组件,感兴趣的读者可以阅读它们的源码,或者参考我其它专栏中对这两个开源中间件的剖析。

正文到此结束

感谢赞赏~

本文目录