Java - AbstractQueuedSynchronizer 源码阅读笔记

in TCEH Java

简介

  提供了一个基于双端链表的FIFO队列,可以用于构建锁或者其他相关同步器的基础框架。如ReentrantLock、CountDownLatch、Semaphore等等。子类通过继承同步器并根据需要实现它的getState()、setState()、compareAndSetState()方法来管理其状态。支持独占模式和共享模式。当以独占模式获取时,其他线程尝试获取不会成功,而共享模式获取时,多个线程的获取可能(但不一定)成功。接下来续之前的《Java - CountDownLatch 源码阅读笔记》来探究同步器是如何工作的。

AbstractQueuedSynchronizer 类

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    static final class Node {
        /** 共享模式标记 */
        static final Node SHARED = new Node();

        /** 独占模式标记 */
        static final Node EXCLUSIVE = null;

        /** 状态:当前线程被取消 */
        static final int CANCELLED =  1;

        /** 状态:需要后继节点处于等待状态 */
        static final int SIGNAL    = -1;

        /** 状态:当前节点在等待condition,也就是在condition队列中 */
        static final int CONDITION = -2;

        /** 状态:当前场景下后续的acquireShared能够得以执行 */
        static final int PROPAGATE = -3;
        // 节点状态
        volatile int waitStatus;

        volatile Node prev;

        volatile Node next;

        volatile Thread thread;

        Node nextWaiter;
        ...
    }
    /**
     * 等待队列的头部,延迟初始化。 除了初始化,只能通过 setHead 方法修改
     */
    private transient volatile Node head;

    /**
     * 等待队列的尾部,延迟初始化。 仅通过 enq 添加新的等待节点
     */
    private transient volatile Node tail;

    /**
     * The synchronization state.
     */
    private volatile int state;
    ...

acquireSharedInterruptibly() 方法

续《Java - CountDownLatch 源码阅读笔记》中调用await()方法,继而调到本方法。
顾名思义,该方法在共享模式下获取,若线程中断则抛异常

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())// 线程如果已经被中断了,抛出中断异常
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)// 获取锁失败,则入队等待
            doAcquireSharedInterruptibly(arg);
    }

tryAcquireShared() 方法

由子类重写,返回值:负值为失败;共享模式下获取成功但后续不能成功为零;共享模式下获取成功,后续也能成功为正值;
在CountDownLatch中的操作为count等于0,则返回1;否则返回-1。也就是意味着CountDownLatch只能使用一次。

    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

doAcquireSharedInterruptibly() 方法

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        // 将当前线程入队
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();// 获取新节点的前驱
                if (p == head) {
                    // 若前驱节点是头结点,则以共享的方式获取锁
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                    // 成功获取锁,设置头节点和共享模式传播
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }

                // shouldParkAfterFailedAcquire方法用于判断当前线程是否需要被阻塞
                // parkAndCheckInterrupt方法用于阻塞线程并且检测线程是否被中断
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())

                    throw new InterruptedException();
            }
        } finally {
            if (failed)
            // 自旋异常退出,取消正在进行锁争抢
                cancelAcquire(node);
        }
    }

addWaiter() 方法

    private Node addWaiter(Node mode) {
        // 将当前线程和共享节点封装成新节点
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        // 将新节点链接到队尾
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 快速入队失败,则加上自旋,确保入队成功
        enq(node);
        return node;
    }

enq() 方法

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // 进行初始化
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                // 将新节点链接到队尾并更新tail指向新节点
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

setHeadAndPropagate() 方法

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // 记录旧的头信息
        setHead(node);
        /*
         *
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

shouldParkAfterFailedAcquire() 方法

用于判断当前线程是否需要被阻塞

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            // 如果前驱节点释放了锁或者被取消,会通知后继节点运行
            // 直接返回true,也就是当前线程需要被阻塞
            return true;
        if (ws > 0) {// 取消状态
            // 将前驱节点从队列中移除,也就是将非取消状态的前驱与当前节点链接
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * 将前驱节点状态修改为SIGNAL。二次循环,会致使当前线程需要被阻塞状态
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

parkAndCheckInterrupt() 方法

用于阻塞线程并且检测线程是否被中断

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);// 阻塞线程
        return Thread.interrupted();// 检测当前线程是否被中断
    }

releaseShared() 方法

续《Java - CountDownLatch 源码阅读笔记》中调用countDown()方法,继而调到本方法。

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

tryReleaseShared() 方法

由子类来实现。在CountDownLatch中的操作为每调用一次count减1,直到count为0时,才返回true

    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }

doReleaseShared() 方法

    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            // 队列中还有节点
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;        // 头结点状态修改失败,则进入下一次循环
                    unparkSuccessor(h); // 修改成功后,唤醒后继节点
                }
                else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // 若头结点已变更,则进入下一次循环
                break;
        }
    }

unparkSuccessor() 方法

    private void unparkSuccessor(Node node) {
        // 获取结点状态
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        // 后继节点为null,或者被取消了
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 实际就是找当前节点未被取消的后继节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);// 唤醒后继节点
    }