原创

Java多线程进阶(三五)—— J.U.C之collections框架:SynchronousQueue

一、SynchronousQueue简介

SynchronousQueue是JDK1.5时,随着J.U.C包一起引入的一种阻塞队列,它实现了BlockingQueue接口,底层基于队列实现:



没有看错,SynchronousQueue的底层实现包含两种数据结构——队列。这是一种非常特殊的阻塞队列,它的特点简要概括如下:

  1. 入队线程和出队线程必须一一匹配,否则任意先到达的线程会阻塞。比如ThreadA进行入队操作,在有其它线程执行出队操作之前,ThreadA会一直等待,反之亦然;
  2. SynchronousQueue内部不保存任何元素,也就是说它的容量为0,数据直接在配对的生产者和消费者线程之间传递,不会将数据缓冲到队列中。
  3. SynchronousQueue支持公平/非公平策略。其中非公平模式,基于内部数据结构——“栈”来实现,公平模式,基于内部数据结构——“队列”来实现;
  4. SynchronousQueue基于一种名为“Dual stack and Dual queue”的无锁算法实现。

注意:上述的特点1,和我们之前介绍的Exchanger其实非常相似,可以类比Exchanger的功能来理解。

二、SynchronousQueue原理

2.1 构造

之前提到,SynchronousQueue根据公平/非公平访问策略的不同,内部使用了两种不同的数据结构:栈和队列。我们先来看下对象的构造,SynchronousQueue只有2种构造器:

/**
 * 默认构造器.
 * 默认使用非公平策略.
 */
public SynchronousQueue() {
    this(false);
}
/**
 * 指定策略的构造器.
 */
public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

可以看到,对于公平策略,内部构造了一个TransferQueue对象,而非公平策略则是构造了TransferStack对象。这两个类都继承了内部类Transferer,SynchronousQueue中的所有方法,其实都是委托调用了TransferQueue/TransferStack的方法:

public class SynchronousQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /**
     * tranferer对象, 构造时根据策略类型确定.
     */
    private transient volatile Transferer<E> transferer;

    /**
     * Shared internal API for dual stacks and queues.
     */
    abstract static class Transferer<E> {
        /**
         * Performs a put or take.
         *
         * @param e 非null表示 生产者 -> 消费者;
         *          null表示, 消费者 -> 生产者.
         * @return 非null表示传递的数据; null表示传递失败(超时或中断).
         */
        abstract E transfer(E e, boolean timed, long nanos);
    }

    /**
     * Dual stack(双栈结构).
     * 非公平策略时使用.
     */
    static final class TransferStack<E> extends Transferer<E> {
        // ...
    }

    /**
     * Dual Queue(双端队列).
     * 公平策略时使用.
     */
    static final class TransferQueue<E> extends Transferer<E> {
        // ...
    }

    // ...
}

2.2 栈结构

非公平策略由TransferStack类实现,既然TransferStack是栈,那就有结点。TransferStack内部定义了名为SNode的结点:

static final class SNode {
    volatile SNode next;
    volatile SNode match;       // 与当前结点配对的结点
    volatile Thread waiter;     // 当前结点对应的线程
    Object item;                // 实际数据或null
    int mode;                   // 结点类型

    SNode(Object item) {
        this.item = item;
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long matchOffset;
    private static final long nextOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = SNode.class;
            matchOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("match"));
            nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

    // ...

}

上述SNode结点的定义中有个mode字段,表示结点的类型。TransferStack一共定义了三种结点类型,任何线程对TransferStack的操作都会创建下述三种类型的某种结点:

  • REQUEST:表示未配对的消费者(当线程进行出队操作时,会创建一个mode值为REQUEST的SNode结点 )
  • DATA:表示未配对的生产者(当线程进行入队操作时,会创建一个mode值为DATA的SNode结点 )
  • FULFILLING:表示配对成功的消费者/生产者
static final class TransferStack<E> extends Transferer<E> {

    /**
     * 未配对的消费者
     */
    static final int REQUEST = 0;
    /**
     * 未配对的生产者
     */
    static final int DATA = 1;
    /**
     * 配对成功的消费者/生产者
     */
    static final int FULFILLING = 2;

     volatile SNode head;

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long headOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = TransferStack.class;
            headOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("head"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

    // ...
}

2.3 核心操作——put/take

SynchronousQueue的入队操作调用了put方法:

/**
 * 入队指定元素e.
 * 如果没有另一个线程进行出队操作, 则阻塞该入队线程.
 */
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, false, 0) == null) {
        Thread.interrupted();
        throw new InterruptedException();
    }
}

SynchronousQueue的出队操作调用了take方法:

/**
 * 出队一个元素.
 * 如果没有另一个线程进行出队操作, 则阻塞该入队线程.
 */
public E take() throws InterruptedException {
    E e = transferer.transfer(null, false, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}

可以看到,SynchronousQueue一样不支持null元素,实际的入队/出队操作都是委托给了transfer方法,该方法返回null表示出/入队失败(通常是线程被中断或超时):

/**
 * 入队/出队一个元素.
 */
E transfer(E e, boolean timed, long nanos) {
    SNode s = null; // s表示新创建的结点
    // 入参e==null, 说明当前是出队线程(消费者), 否则是入队线程(生产者)
    // 入队线程创建一个DATA结点, 出队线程创建一个REQUEST结点
    int mode = (e == null) ? REQUEST : DATA;

    for (; ; ) {    // 自旋
        SNode h = head;
        if (h == null || h.mode == mode) {          // CASE1: 栈为空 或 栈顶结点类型与当前mode相同
            if (timed && nanos <= 0) {              // case1.1: 限时等待的情况
                if (h != null && h.isCancelled())
                    casHead(h, h.next);
                else
                    return null;
            } else if (casHead(h, s = snode(s, e, h, mode))) {  // case1.2 将当前结点压入栈
                SNode m = awaitFulfill(s, timed, nanos);        // 阻塞当前调用线程
                if (m == s) {                                   // 阻塞过程中被中断
                    clean(s);
                    return null;
                }

                // 此时m为配对结点
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);

                // 入队线程null, 出队线程返回配对结点的值
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
            // 执行到此处说明入栈失败(多个线程同时入栈导致CAS操作head失败),则进入下一次自旋继续执行

        } else if (!isFulfilling(h.mode)) {          // CASE2: 栈顶结点还未配对成功
            if (h.isCancelled())                     // case2.1: 元素取消情况(因中断或超时)的处理
                casHead(h, h.next);
            else if (casHead(h, s = snode(s, e,
                h, FULFILLING | mode))) {      // case2.2: 将当前结点压入栈中
                for (; ; ) {
                    SNode m = s.next;       // s.next指向原栈顶结点(也就是与当前结点匹配的结点)
                    if (m == null) {        // m==null说明被其它线程抢先匹配了, 则跳出循环, 重新下一次自旋
                        casHead(s, null);
                        s = null;
                        break;
                    }

                    SNode mn = m.next;
                    if (m.tryMatch(s)) {    // 进行结点匹配
                        casHead(s, mn);     // 匹配成功, 将匹配的两个结点全部弹出栈
                        return (E) ((mode == REQUEST) ? m.item : s.item);   // 返回匹配值
                    } else                  // 匹配失败
                        s.casNext(m, mn);   // 移除原待匹配结点
                }
            }
        } else {                            // CASE3: 其它线程正在匹配
            SNode m = h.next;
            if (m == null)                  // 栈顶的next==null, 则直接弹出, 重新进入下一次自旋
                casHead(h, null);
            else {                          // 尝试和其它线程竞争匹配
                SNode mn = m.next;
                if (m.tryMatch(h))
                    casHead(h, mn);         // 匹配成功
                else
                    h.casNext(m, mn);       // 匹配失败(被其它线程抢先匹配成功了)
            }
        }
    }
}

整个transfer方法考虑了限时等待的情况,且入队/出队其实都是调用了同一个方法,其主干逻辑就是在一个自旋中完成以下三种情况之一的操作,直到成功,或者被中断或超时取消:

  1. 栈为空,或栈顶结点类型与当前入队结点相同。这种情况,调用线程会阻塞;
  2. 栈顶结点还未配对成功,且与当前入队结点可以配对。这种情况,直接进行配对操作;
  3. 栈顶结点正在配对中。这种情况,直接进行下一个结点的配对。

2.4 出/入队示例讲解

为了便于理解,我们来看下面这个调用示例(假设不考虑限时等待的情况),假设一共有三个线程ThreadA、ThreadB、ThreadC:

①初始栈结构

初始栈为空,head为栈顶指针,始终指向栈顶结点:



②ThreadA(生产者)执行入队操作

由于此时栈为空,所以ThreadA会进入CASE1,创建一个类型为DATA的结点:

if (h == null || h.mode == mode) {          // CASE1: 栈为空 或 栈顶结点类型与当前mode相同
    if (timed && nanos <= 0) {              // case1.1: 限时等待的情况
        if (h != null && h.isCancelled())
            casHead(h, h.next);
        else
            return null;
    } else if (casHead(h, s = snode(s, e, h, mode))) {  // case1.2 将当前结点压入栈
        SNode m = awaitFulfill(s, timed, nanos);        // 阻塞当前调用线程
        if (m == s) {                                   // 阻塞过程中被中断
            clean(s);
            return null;
        }

        // 此时m为配对结点
        if ((h = head) != null && h.next == s)
            casHead(h, s.next);

        // 入队线程null, 出队线程返回配对结点的值
        return (E) ((mode == REQUEST) ? m.item : s.item);
    }
    // 执行到此处说明入栈失败(多个线程同时入栈导致CAS操作head失败),则进入下一次自旋继续执行
}

CASE1分支中,将结点压入栈后,会调用awaitFulfill方法,该方法会阻塞调用线程:

/**
 * 阻塞当前调用线程, 并将线程信息记录在s.waiter字段上.
 *
 * @param s 等待的结点
 * @return 返回配对的结点 或 当前结点(说明线程被中断了)
 */
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();

    // 性能优化操作(计算自旋次数)
    int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (; ; ) {
        if (w.isInterrupted())
            s.tryCancel();
        /**
         * s.match保存当前结点的匹配结点.
         * s.match==null说明还没有匹配结点
         * s.match==s说明当前结点s对应的线程被中断了
         */
        SNode m = s.match;
        if (m != null)
            return m;
        if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel();
                continue;
            }
        }
        if (spins > 0)
            spins = shouldSpin(s) ? (spins - 1) : 0;
        else if (s.waiter == null)  // 还没有匹配结点, 则保存当前线程
            s.waiter = w;           // s.waiter保存当前阻塞线程
        else if (!timed)
            LockSupport.park(this); // 阻塞当前线程
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}

此时栈结构如下,结点的waiter字段保存着创建该结点的线程ThreadA,ThreadA等待着被配对消费者线程唤醒:



③ThreadB(生产者)执行入队操作

此时栈顶结点的类型和ThreadB创建的结点相同(都是DATA类型的结点),所以依然走CASE1分支,直接将结点压入栈:



④ThreadC(消费者)执行出队操作

此时栈顶结点的类型和ThreadC创建的结点匹配(栈顶DATA类型,ThreadC创建的是REQUEST类型),所以走CASE2分支,该分支会将匹配的两个结点弹出栈:

else if (!isFulfilling(h.mode)) {          // CASE2: 栈顶结点还未配对成功
    if (h.isCancelled())                     // case2.1: 元素取消情况(因中断或超时)的处理
        casHead(h, h.next);
    else if (casHead(h, s = snode(s, e,
        h, FULFILLING | mode))) {      // case2.2: 将当前结点压入栈中
        for (; ; ) {
            SNode m = s.next;       // s.next指向原栈顶结点(也就是与当前结点匹配的结点)
            if (m == null) {        // m==null说明被其它线程抢先匹配了, 则跳出循环, 重新下一次自旋
                casHead(s, null);
                s = null;
                break;
            }

            SNode mn = m.next;
            if (m.tryMatch(s)) {    // 进行结点匹配
                casHead(s, mn);     // 匹配成功, 将匹配的两个结点全部弹出栈
                return (E) ((mode == REQUEST) ? m.item : s.item);   // 返回匹配值
            } else                  // 匹配失败
                s.casNext(m, mn);   // 移除原待匹配结点
        }
    }
}

上述isFulfilling方法就是判断结点是否匹配:

/**
 * 判断m是否已经配对成功.
 */
static boolean isFulfilling(int m) {
    return (m & FULFILLING) != 0;
}

ThreadC创建结点并压入栈后,栈的结构如下:



此时,ThreadC会调用tryMatch方法进行匹配,该方法的主要作用有两点:

  1. 将待结点的match字段置为与当前配对的结点(如上图中,结点m是待配对结点,最终m.math == s
  2. 唤醒待配对结点中的线程(如上图中,唤醒结点m中ThreadB线程)
/**
 * 尝试将当前结点和s结点配对.
 */
boolean tryMatch(SNode s) {
    if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
        Thread w = waiter;
        if (w != null) {    // 唤醒当前结点对应的线程
            waiter = null;
            LockSupport.unpark(w);
        }
        return true;
    }
    return match == s;      // 配对成功返回true
}

匹配完成后,会将匹配的两个结点弹出栈,并返回匹配值:

if (m.tryMatch(s)) {    // 进行结点匹配
    casHead(s, mn);     // 匹配成功, 将匹配的两个结点全部弹出栈
    return (E) ((mode == REQUEST) ? m.item : s.item);   // 返回匹配值
}

最终,ThreadC拿到了等待配对结点中的数据并返回,此时栈的结构如下:



注意: CASE2分支中ThreadC创建的结点的mode值并不是REQUEST,其mode值为FULFILLING | modeFULFILLING | mode的主要作用就是给栈顶结点置一个标识(二进制为11或10),表示当前有线程正在对栈顶匹配,这时如果有其它线程进入自旋(并发情况),则CASE2一定失败,因为isFulfilling的结果必然为true,所以会进入CASE3分支——跳过栈顶结点进行匹配。

casHead(h, s = snode(s, e, h, FULFILLING | mode))

⑤ThreadB(生产者)唤醒后继续执行

ThreadB被唤醒后,会从原阻塞处继续执行,并进入下一次自旋,在下一次自旋中,由于结点的match字段已经有了匹配结点,所以直接返回配对结点:

/**
 * 阻塞当前调用线程, 并将线程信息记录在s.waiter字段上.
 *
 * @param s 等待的结点
 * @return 返回配对的结点 或 当前结点(说明线程被中断了)
 */
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();

    // 性能优化操作(计算自旋次数)
    int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (; ; ) {
        if (w.isInterrupted())
            s.tryCancel();
        /**
         * s.match保存当前结点的匹配结点.
         * s.match==null说明还没有匹配结点
         * s.match==s说明当前结点s对应的线程被中断了
         */
        SNode m = s.match;
        if (m != null)
            return m;
        if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel();
                continue;
            }
        }
        if (spins > 0)
            spins = shouldSpin(s) ? (spins - 1) : 0;
        else if (s.waiter == null)  // 还没有匹配结点, 则保存当前线程
            s.waiter = w;           // s.waiter保存当前阻塞线程
        else if (!timed)
            LockSupport.park(this); // 阻塞当前线程
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}

最终,在下面分支中返回:

else if (casHead(h, s = snode(s, e, h, mode))) {  // case1.2 将当前结点压入栈
    SNode m = awaitFulfill(s, timed, nanos);        // 阻塞当前调用线程
    if (m == s) {                                   // 阻塞过程中被中断
        clean(s);
        return null;
    }

    // 此时m为配对结点
    if ((h = head) != null && h.next == s)
        casHead(h, s.next);

    // 入队线程null, 出队线程返回配对结点的值
    return (E) ((mode == REQUEST) ? m.item : s.item);
}

注意:对于入队线程(生产者),返回的是它入队时携带的原有元素值。


2.5 队列结构

SynchronousQueue的公平策略由TransferQueue类实现,TransferQueue内部定义了名为QNode的结点,一个head队首指针,一个tail队尾指针:

/**
 * Dual Queue(双端队列).
 * 公平策略时使用.
 */
static final class TransferQueue<E> extends Transferer<E> {

    /**
     * Head of queue
     */
    transient volatile QNode head;
    /**
     * Tail of queue
     */
    transient volatile QNode tail;
    /**
     * Reference to a cancelled node that might not yet have been
     * unlinked from queue because it was the last inserted node
     * when it was cancelled.
     */
    transient volatile QNode cleanMe;

    /**
     * 队列结点定义.
     */
    static final class QNode {
        volatile QNode next;          // next node in queue
        volatile Object item;         // CAS'ed to or from null
        volatile Thread waiter;       // to control park/unpark
        final boolean isData;
        // ...
    }

    // ...
}

关于TransferQueue的transfer方法就不再赘述了,其思路和TransferStack大致相同,总之就是入队/出队必须一一匹配,否则任意一方就会加入队列并等待匹配线程唤醒。读者可以自行阅读TransferQueued的源码。

三、总结

TransferQueue主要用于线程之间的数据交换,由于采用无锁算法,其性能一般比单纯的其它阻塞队列要高。它的最大特点时不存储实际元素,而是在内部通过栈或队列结构保存阻塞线程。后面我们讲JUC线程池框架的时候,还会再次看到它的身影。

正文到此结束
本文目录