原创

Java多线程进阶(四五)—— J.U.C之executors框架:CompletionService实现

一、简介

CompletionService是对于Task进行异步处理之后获取结果的Service,在开始本章节之前需要有FutureFutureTaskExecutor(参见ThreadPoolExecutor)的相关知识。JDK中提供了ExecutorCompletionService作为CompletionService的默认实现,本章会以ExecutorCompletionService为例讲解CompletionService

二、代码解析

首先了解ExecutorCompletionService的定义:

public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    private final AbstractExecutorService aes;
    /**
     * 存储完成任务的队列
     */
    private final BlockingQueue<Future<V>> completionQueue;

    /**
     * 定义一个FutureTask的子类,将task包装起来,
     * 同时当task完成之后会进入到completionQueue中
     */
    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

    /**
     * 根据已提供的基本任务执行器创建ExecutorCompletionService,
     * 并创建一个LinkedBlockingQueue作为存储执行完毕task的队列
     *
     * @param executor 需要使用的执行器
     * @throws如果excutor是null则抛出异常
     */
    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }

    /**
     * 根据已提供的基本任务执行器executor和completionQueue队列创建ExecutorCompletionService,
     *
     * @param executor 需要使用的执行器
     * @param 为此服务提供的一个队列,此队列是无界队列
     * @throws 如果 executor 或 completionQueue 为空则抛出异常
     */
    public ExecutorCompletionService(Executor executor,
                                     BlockingQueue<Future<V>> completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }

}

从定义中我们看到ExecutorCompletionService的核心是ExecutorBlockingQueue<Future<V>>,分别为执行任务的执行器和存储执行完毕的FutureTask的队列。

BlockingQueueAbstractExecutorService的子类ThradPoolExecutor在前述章节已经阐述过,这里不再具体描述。

接下来我们查看在任务提交之后的流程:

    /**
    * 将提交的task封装为RunnableFuture
    */
        private RunnableFuture<V> newTaskFor(Callable<V> task) {
        if (aes == null)
            // 如果实例化ExecutorCompletionService的时候传递的
            // 执行器不是AbstractExecutorService的子类,则直接将task包装为FutureTask
            return new FutureTask<V>(task);
        else
            // 否则,直接执行执行器的newTaskFor方法,将task封装一次
            return aes.newTaskFor(task);
    }

    private RunnableFuture<V> newTaskFor(Runnable task, V result) {
        if (aes == null)
            return new FutureTask<V>(task, result);
        else
            return aes.newTaskFor(task, result);
    }

    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        // 先将task封装为RunnableFuture
        RunnableFuture<V> f = newTaskFor(task);
        // 再将RunnableFuture以内部类QueueingFuture包装
        executor.execute(new QueueingFuture(f));
        return f;
    }

    public Future<V> submit(Runnable task, V result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task, result);
        executor.execute(new QueueingFuture(f));
        return f;
    }

重点分析submit方法,在将task封装之后,代码又将封装之后的RunnableFuture以内部类QueueingFuture包装一次之后提交给执行器执行。之后执行器在execute时实际执行的就是QueueingFuture对象,经过层层封装之后执行的还是FutureTask.run方法:

    Callable<V> c = callable;
    if (c != null && state == NEW) {
        V result;
        boolean ran;
        try {
            result = c.call();
            ran = true;
        } catch (Throwable ex) {
            result = null;
            ran = false;
            setException(ex);
        }
        if (ran)
            set(result);
    }

最后不论是setException还是set方法都会触发finishCompletion方法,而finishCompletion最终会调用抽象方法done,也就是调用了内部类QueueingFuturedone方法:

    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

ThreadPoolExecutorFuture已经在之前的章节描述,所以这里一笔略过。

执行submit最终会将执行完毕的Task入队,而后从队列中获取对应的FutureTask即可。

    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }

    public Future<V> poll() {
        return completionQueue.poll();
    }

    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }

以上三个方法均为BlockingQueue所支持的方法,具体含义可以参见前面BlockingQueue相关章节。

从整个源码的角度来看,CompletionService是较为简单的,是对已有工具的封装,只要认识了之前的工具类,CompletionService就很好理解。

三、应用举例

考虑执行下面一段代码:

    @Test
    public void test() throws Exception{
        CompletionService<String> service = new ExecutorCompletionService<>(Executors.newFixedThreadPool(5));
        for (int i = 0; i < 5; ++i) {
            service.submit(new Task<>(String.valueOf(i)));
        }

        for (int i = 0; i < 5; ++i) {
            System.out.println(service.take().get());
        }
    }

    public static class Task<T extends String> implements Callable<T> {
        private String threadNum;

        public Task(String threadNum) {
            this.threadNum = threadNum;
        }

        @SuppressWarnings("unchecked")
        @Override
        public T call() throws Exception {
            long current = System.currentTimeMillis();
            if ((Integer.parseInt(threadNum) % 2) == 0) {
                TimeUnit.SECONDS.sleep(Math.abs(5 - Integer.parseInt(threadNum)));
            }
            return (T) String.format("completion %s. cost time is %d s.",
                    threadNum, (System.currentTimeMillis() - current) / 1000);
        }
    }

其输出为:

completion 1. cost time is 0 s.
completion 3. cost time is 0 s.
completion 4. cost time is 1 s.
completion 2. cost time is 3 s.
completion 0. cost time is 5 s.

这表明CompletionService获取的结果未必是其提交的先后顺序,而是其执行完毕的先后顺序。

四、总结

ExecutorCompletionServiceExecutorBlockingQueue的功能融合;其获取顺序和提交顺序未必一致;它适合批量执行已知数量的互相独立但是同构(结构一致)的任务。

正文到此结束
该篇文章的评论功能已被站长关闭
本文目录