Java - LinkedBlockingDeque 源码阅读笔记

in TCEH Java

简介

  LinkedBlockingDeque是由链表结构组成的有界双端阻塞队列,即队列头尾均可插入和移除元素。对数据进行增删改查时,使用一把ReentrantLock锁确保线程安全。使用Condition使线程在适当的时候休眠并释放锁,直至收到同Condition的信号才重新获取锁并继续执行。可作为生产-消费模型使用。

构造方法

默认容量为整型最大值,也可以指定其容量。


    public LinkedBlockingDeque() {
        this(Integer.MAX_VALUE);
    }
    public LinkedBlockingDeque(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
    }
    public LinkedBlockingDeque(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock lock = this.lock;
        lock.lock(); // Never contended, but necessary for visibility
        try {
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (!linkLast(new Node<E>(e)))
                    throw new IllegalStateException("Deque full");
            }
        } finally {
            lock.unlock();
        }
    }

入队

add() 方法

    public boolean add(E e) {
        // 插到队尾
        addLast(e);
        return true;
    }

addLast() 方法

    public void addLast(E e) {
        // 队列满了直接报异常
        if (!offerLast(e))
            throw new IllegalStateException("Deque full");
    }

offerLast() 方法

    public boolean offerLast(E e) {
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        // 加锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return linkLast(node);
        } finally {
            // 最后必须解锁
            lock.unlock();
        }
    }

linkLast() 方法

    private boolean linkLast(Node<E> node) {
        // assert lock.isHeldByCurrentThread();
        // 队列已满
        if (count >= capacity)
            return false;
        Node<E> l = last;
        node.prev = l;
        last = node;   
        if (first == null)
            first = node;
        else
            l.next = node;
        ++count;
        // 发送 notEmpty 信号,以唤醒休眠中同Condition的线程
        notEmpty.signal();
        return true;
    }

put() 方法

   public void put(E e) throws InterruptedException {
        putLast(e);
    }
    public void putLast(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 队列已满,则等待notFull信号
            while (!linkLast(node))
                // await 方法会使线程休眠并释放锁
                notFull.await();
        } finally {
            lock.unlock();
        }
    }

出队

take() 方法

    public E take() throws InterruptedException {
        return takeFirst();
    }

takeFirst() 方法

    public E takeFirst() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E x;
            // 队列为空,则等待notEmpty信号
            while ( (x = unlinkFirst()) == null)
                // await 方法会使线程休眠并释放锁
                notEmpty.await();
            return x;
        } finally {
            lock.unlock();
        }
    }

unlinkFirst() 方法

    private E unlinkFirst() {
        // assert lock.isHeldByCurrentThread();
        Node<E> f = first;
        if (f == null)
            return null;
        Node<E> n = f.next;
        E item = f.item;
        f.item = null;
        f.next = f; // help GC
        first = n;
        if (n == null)
            last = null;
        else
            n.prev = null;
        --count;
        // 发送notFull信号,以唤醒休眠中同Condition的线程
        notFull.signal();
        return item;
    }

remove() 方法

    public boolean remove(Object o) {
        return removeFirstOccurrence(o);
    }

removeFirstOccurrence() 方法

    public boolean removeFirstOccurrence(Object o) {
        if (o == null) return false;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 循环匹配
            for (Node<E> p = first; p != null; p = p.next) {
                if (o.equals(p.item)) {
                    unlink(p);
                    return true;
                }
            }
            return false;
        } finally {
            lock.unlock();
        }
    }

unlink() 方法

    void unlink(Node<E> x) {
        // assert lock.isHeldByCurrentThread();
        Node<E> p = x.prev;
        Node<E> n = x.next;
        // 没有上一个节点说明是头结点
        if (p == null) {
            unlinkFirst();
        // 没有下一个节点说明是尾结点
        } else if (n == null) {
            unlinkLast();
        } else {
            // 将上一项节点与下一项链接,并将删除节点赋值为null,JVM会自动回收
            p.next = n;
            n.prev = p;
            x.item = null;
            // Don't mess with x's links.  They may still be in use by
            // an iterator.
            --count;
            notFull.signal();
        }
    }