Java - ArrayBlockingQueue 源码阅读笔记

in Tech Java

简介

  ArrayBlockingQueue是由数组构成的有界阻塞队列。该队列对元素 FIFO(先进先出)进行排序。对数据进行增删改查时,使用一把ReentrantLock锁确保线程安全。使用Condition使线程在适当的时候休眠并释放锁,直至收到同Condition的信号才重新获取锁并继续执行。可作为生产-消费模型使用。

ArrayBlockingQueue 类

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    private static final long serialVersionUID = -817911632652898426L;

    /** 队列元素 */
    final Object[] items;

    /** 查找、删除时,下一个元素的下标 */
    int takeIndex;

    /** 新增时,下一个元素的下标 */
    int putIndex;

    /** 元素个数 */
    int count;

    /** 可重入锁,保护数据的正确性 */
    final ReentrantLock lock;

    /** 适用 takes 操作,取数据的时候,发现队列为空则等待 */
    private final Condition notEmpty;

    /** 适用 puts 操作,添加数据的时候,发现队列已满则等待 */
    private final Condition notFull;

    /**
     * 迭代器会用到
     */
    transient Itrs itrs = null;

构造方法

可指定容量及公平性

    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

入队

add() 方法

    public boolean add(E e) {
        return super.add(e);
    }

add() 方法

BlockingQueue#add
BlockingQueue是接口,调用offer会回调到其实现类

    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

offer() 方法

    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();// 加锁
        try {
            // 容量已满,添加失败
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();// 解锁
        }
    }

enqueue() 方法

    private void enqueue(E x) {
        final Object[] items = this.items;
        items[putIndex] = x;
        // putIndex索引抵达尾部,重置下标,重头开始
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();// 唤醒等待消费的线程
    }

put() 方法

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // 队列已满,则等待notFull信号
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

出队

take() 方法

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // 队列为空,则等待notEmpty信号
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

dequeue() 方法

    private E dequeue() {
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        // takeIndex索引抵达尾部,重置下标,重头开始
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();// 唤醒等待生产的线程
        return x;
    }

remove() 方法

    public boolean remove(Object o) {
        if (o == null) return false;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();// 加锁
        try {
            if (count > 0) {
                final int putIndex = this.putIndex;
                int i = takeIndex;
                // 循环匹配
                do {
                    // 匹配到了,则进行删除
                    if (o.equals(items[i])) {
                        removeAt(i);
                        return true;
                    }
                    // takeIndex索引抵达尾部,重置下标,重头开始
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex);// putIndex索引等于takeIndex索引,等于是遍历完成了所有元素。
            }
            return false;
        } finally {
            lock.unlock();// 解锁
        }
    }

removeAt() 方法

    void removeAt(final int removeIndex) {
        final Object[] items = this.items;
        // 要删除的数据刚好是下一个要拿的数据
        if (removeIndex == takeIndex) {
            // 清空数据
            items[takeIndex] = null;
            // takeIndex索引抵达尾部,重置下标,重头开始
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();
        } else {

            final int putIndex = this.putIndex;
            // 将putIndex之前的数据全部往前挪一位。以此覆盖删除元素,并确保索引的正确性
            for (int i = removeIndex;;) {
                int next = i + 1;
                if (next == items.length)// removeIndex索引抵达尾部,重置下标,重头开始
                    next = 0;
                if (next != putIndex) {// 转移数据
                    items[i] = items[next];
                    i = next;
                } else {
                    items[i] = null;
                    this.putIndex = i;
                    break;
                }
            }
            count--;
            if (itrs != null)
                itrs.removedAt(removeIndex);
        }
        notFull.signal();// 唤醒等待生产的线程
    }