Java - ConcurrentLinkedDeque 源码阅读笔记

in Tech Java

简介

  ConcurrentLinkedDeque是由链接节点组成的无界双端非阻塞队列,即队列头尾均可插入和移除元素。对数据进行增删改查时,使用CAS确保线程安全。读多写少的场景下,相比于使用悲观锁,性能更高。

构造方法

    public ConcurrentLinkedDeque() {
        // 初始化头尾节点
        head = tail = new Node<E>(null);
    }
    public ConcurrentLinkedDeque(Collection<? extends E> c) {
        // Copy c into a private chain of Nodes
        Node<E> h = null, t = null;
        for (E e : c) {
            checkNotNull(e);
            Node<E> newNode = new Node<E>(e);
            if (h == null)
                h = t = newNode;
            else {
                t.lazySetNext(newNode);
                newNode.lazySetPrev(t);
                t = newNode;
            }
        }
        initHeadTail(h, t);
    }

入队

add() 方法

    public boolean add(E e) {
        return offerLast(e);
    }

offerLast() 方法

    public boolean offerLast(E e) {
        linkLast(e);
        return true;
    }

linkLast() 方法

    private void linkLast(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);
        // 跳转标记。也就是下面内层进入continue,会直接跳到外层循环。
        restartFromTail:
        // 将内层for当做一行代码?这种写法还是比较少见啊
        for (;;)
            for (Node<E> t = tail, p = t, q;;) {
                // tail尚未来得及更新
                if ((q = p.next) != null &&
                    (q = (p = q).next) != null)
                    // 若tail被更新了就重新从tail开始往后查找,否则取后继的后继
                    p = (t != (t = tail)) ? t : q;
                else if (p.prev == p) // p节点出队了
                    continue restartFromTail;
                else {
                    // p是最后一个节点
                    newNode.lazySetPrev(p); // p成为新节点的前驱
                    if (p.casNext(null, newNode)) {// 新节点成为p的后继
                        // 每两个节点更新一次tail
                        if (p != t) // hop two nodes at a time
                            // 使用 CAS 确保数据的准确性
                            casTail(t, newNode);  // Failure is OK.
                        return;
                    }
                    // 竞争失败。自旋参与下一次竞争
                }
            }
    }

linkFirst() 方法

    private void linkFirst(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);

        restartFromHead:
        for (;;)
            for (Node<E> h = head, p = h, q;;) {
                // head尚未来得及更新
                if ((q = p.prev) != null &&
                    (q = (p = q).prev) != null)
                    // 若head被更新了就重新从head开始往前查找,否则取前驱的前驱
                    p = (h != (h = head)) ? h : q;
                else if (p.next == p) // p节点出队了
                    continue restartFromHead;
                else {
                    // p是第一个节点
                    newNode.lazySetNext(p); // p成为新节点的后继
                    if (p.casPrev(null, newNode)) {
                        // 每两个节点更新一次head
                        if (p != h) // hop two nodes at a time
                            casHead(h, newNode);  // Failure is OK.
                        return;
                    }
                    // 竞争失败。自旋参与下一次竞争
                }
            }
    }

出队

remove() 方法

    public E remove() {
     return removeFirst(); 
    }

removeFirst() 方法

    public E removeFirst() {
        return screenNullResult(pollFirst());
    }

pollFirst() 方法

    public E pollFirst() {
        for (Node<E> p = first(); p != null; p = succ(p)) {
            E item = p.item;
            // p的值不为空,通过cas将里面的值置换为null
            // 置换成功后,再断开链接
            if (item != null && p.casItem(item, null)) {
                unlink(p);
                return item;
            }
        }
        return null;
    }

first() 方法

// 返回的节点可能会或可能不会被逻辑删除。

    Node<E> first() {
        restartFromHead:
        for (;;)
            for (Node<E> h = head, p = h, q;;) {
                // head尚未来得及更新
                if ((q = p.prev) != null &&
                    (q = (p = q).prev) != null)
                    // 若head被更新了就重新从head开始往前查找,否则取前驱的前驱
                    p = (h != (h = head)) ? h : q;
                else if (p == h
                         // 如果head.prev!=null,但head.prev.prev==null,则更新head
                         // 如果p节点恰巧出队了,CAS操作会失败
                         || casHead(h, p))
                    // 找到了第一个节点
                    return p;
                else
                    continue restartFromHead;
            }
    }

succ() 方法

    final Node<E> succ(Node<E> p) {
        // TODO: should we skip deleted nodes here?
        // p节点已经被删除了
        // p节点的后继等于p的话,则重新查找,否则直接返回p节点的后继
        Node<E> q = p.next;
        return (p == q) ? first() : q;
    }

unlink() 方法

    void unlink(Node<E> x) {

        final Node<E> prev = x.prev;
        final Node<E> next = x.next;
        if (prev == null) {
            // 前驱节点为空,说明是第一个节点
            unlinkFirst(x, next);
        } else if (next == null) {
            // 后继节点为空,说明是最后一个节点
            unlinkLast(x, prev);
        } else {
            Node<E> activePred, activeSucc;
            boolean isFirst, isLast;
            int hops = 1;

            // Find active predecessor
            // 查找删除节点的有效前驱
            for (Node<E> p = prev; ; ++hops) {
                if (p.item != null) {
                    activePred = p;
                    isFirst = false;
                    break;
                }
                Node<E> q = p.prev;
                // 找到头了
                if (q == null) { 
                    if (p.next == p) 
                        // 发现自连接,直接返回
                        return;
                    activePred = p;
                    isFirst = true;
                    break;
                }
                else if (p == q) 
                    // 发现自连接,直接返回
                    return;
                else 
                    // 更新循环指针
                    p = q;
            }

            // 查找删除节点的有效后继
            for (Node<E> p = next; ; ++hops) {
                if (p.item != null) {
                    activeSucc = p;
                    isLast = false;
                    break;
                }
                Node<E> q = p.next;
                if (q == null) {
                    if (p.prev == p)
                        return;
                    activeSucc = p;
                    isLast = true;
                    break;
                }
                else if (p == q)
                    return;
                else
                    p = q;
            }

            // 头尾删除累积超过阀值才进行unlink操作
            if (hops < HOPS
                // always squeeze out interior deleted nodes
                && (isFirst | isLast))
                return;

            // 使有效前驱和后继节点互连
            // 就好比链表中存储了0~10,现在要删除5,那么下面的步骤就是链接4和6。相当于剔除了5这个节点
            skipDeletedSuccessors(activePred);
            skipDeletedPredecessors(activeSucc);

            // 如果更新的是开头或者结尾,那么就可以尝试进行gc-unlink
            if ((isFirst | isLast) &&
                // 确保前驱和后继的状态没有被改变
                // Recheck expected state of predecessor and successor
                (activePred.next == activeSucc) &&
                (activeSucc.prev == activePred) &&
                (isFirst ? activePred.prev == null : activePred.item != null) &&
                (isLast  ? activeSucc.next == null : activeSucc.item != null)) {

                // 确保x节点不能从head/tail节点被访问
                updateHead(); // Ensure x is not reachable from head
                updateTail(); // Ensure x is not reachable from tail

                // 最后设置前驱后继终结节点
                x.lazySetPrev(isFirst ? prevTerminator() : x);
                x.lazySetNext(isLast  ? nextTerminator() : x);
            }
        }
    }

unlinkFirst() 方法

    private void unlinkFirst(Node<E> first, Node<E> next) {
        for (Node<E> o = null, p = next, q;;) {
            // p是第一个有效节点或最后一个节点
            if (p.item != null || (q = p.next) == null) {
                // p是o的有效后继,第一次循环o为null
                if (o != null && p.prev != p && first.casNext(next, p)) { 
                    skipDeletedPredecessors(p);
                    if (first.prev == null &&
                        (p.next == null || p.item != null) &&
                        p.prev == first) {

                        updateHead(); // Ensure o is not reachable from head
                        updateTail(); // Ensure o is not reachable from tail

                        // Finally, actually gc-unlink
                        o.lazySetNext(o);
                        o.lazySetPrev(prevTerminator());
                    }
                }
                return;
            }
            else if (p == q)
                return;
            else {
                o = p;
                p = q;
            }
        }
    }

skipDeletedSuccessors() 方法

    private void skipDeletedSuccessors(Node<E> x) {
        whileActive:
        do {
            Node<E> next = x.next;
            Node<E> p = next;
            // 查找有效的后继
            findActive:
            for (;;) {
                // 找到了后继
                if (p.item != null)
                    break findActive;
                Node<E> q = p.next;
                // p是最后一个节点
                if (q == null) { 
                    if (p.prev == p)// 自连接,重新开始
                        continue whileActive;
                    break findActive;
                }
                else if (p == q)// 自连接,重新开始
                    continue whileActive;
                else 
                    p = q;
            }

            // 通过CAS将前驱的next设置为p
            if (next == p || x.casNext(next, p))
                return;

        } while (x.item != null || x.prev == null);
    }

updateHead() 方法

    private final void updateHead() {
        Node<E> h, p, q;
        restartFromHead:
        while ((h = head).item == null && (p = h.prev) != null) {
            for (;;) {
                // head尚未更新
                if ((q = p.prev) == null ||
                    (q = (p = q).prev) == null) {
                    // 更新head
                    if (casHead(h, p))
                        return;
                    else
                        continue restartFromHead;
                }
                else if (h != head)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }