Java - ReentrantLock 源码阅读笔记

in Tech Java

简介

  ReentrantLock(可重入锁)与synchronized具备类似的同步功能。两者均是可重入互斥锁(独占锁),但前者具有更多功能,也就意味着更大的灵活性。ReentrantLock可以指定公平性(也就是按请求锁的顺序获取锁),但synchronized只能为非公平。ReentrantLock需要手动加锁解锁,synchronized则无需关心。ReentrantLock可以响应中断,但synchronized却不可以。

下面的内容需要结合《Java - AbstractQueuedSynchronizer 源码阅读笔记》来理解

示例

@Log4j2
public class ReentrantLockTest {
    private int balance = 0;
    private final ReentrantLock lock = new ReentrantLock();
    private Condition notFull = lock.newCondition();
    private Condition notEmpty = lock.newCondition();
    private CountDownLatch latch;

    public static void main(String[] args) throws InterruptedException {
        ReentrantLockTest reentrantLockTest = new ReentrantLockTest();
        //reentrantLockTest.lock();
        reentrantLockTest.condition();
    }

    public void lock() throws InterruptedException {
        int threadNum = 100;
        int cash = 1;
        latch = new CountDownLatch(threadNum);
        for (int i = 0; i < threadNum; i++) {
            new Thread(() -> increase(cash)).start();
        }
        latch.await();
        log.info("余额:{}", balance);
    }

    private void increase(int amount) {
        try {
            lock.lock();
            Thread.sleep(10);
            balance += amount;
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            latch.countDown();
            lock.unlock();
        }
    }

    /**
     * 利用Condition实现生产消费阻塞模型。与LinkedBlockingQueue类似
     */
    public void condition() throws InterruptedException {
        int threadNum = 100;
        latch = new CountDownLatch(threadNum * 2);
        for (int i = 0; i < threadNum; i++) {
            new Thread(() -> produce()).start();
        }
        for (int i = 0; i < threadNum; i++) {
            new Thread(() -> consume()).start();
        }
        latch.await();
        log.info("余额:{}", balance);
    }

    private void produce() {
        try {
            lock.lock();
            while (balance == 10) {
                notFull.await();
            }
            balance += 1;
            notEmpty.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            latch.countDown();
            lock.unlock();
        }
    }

    private void consume() {
        try {
            lock.lock();
            while (balance == 0) {
                notEmpty.await();
            }
            balance -= 1;
            notFull.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            latch.countDown();
            lock.unlock();
        }
    }
}

ReentrantLock 类

  与CountDownLatch一样使用AbstractQueuedSynchronizer来实现。

public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    /** 提供所有实现的同步器 */
    private final Sync sync;
    ...
}

Sync 类

定义抽象类Sync,子类实现区分其公平性

    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        abstract void lock();
        // 非公平性加锁
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();// state 用来记录加锁次数,默认值为0
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {// 修改成功后,相当于当前线程占据了锁
                    setExclusiveOwnerThread(current);// 记录锁线程
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {// 锁线程恰巧是当前线程
                int nextc = c + acquires;// 增加加锁次数
                if (nextc < 0) // 加锁次数超了
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);// 记录加锁次数
                return true;
            }
            return false;
        }
        // 尝试释放锁
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;// 减少加锁次数
            if (Thread.currentThread() != getExclusiveOwnerThread())// 锁线程与当前线程不一致
                throw new IllegalMonitorStateException();// 则抛异常
            boolean free = false;
            if (c == 0) {// 释放锁完成
                free = true;
                setExclusiveOwnerThread(null);// 锁线程置空
            }
            setState(c);// 记录剩余加锁次数
            return free;
        }

        // 是否是当前线程持有锁
        protected final boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        // 实例化条件等待对象
        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        // Methods relayed from outer class

        final Thread getOwner() {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }

        // 获取加锁次数
        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }

        // 是否处于锁定状态
        final boolean isLocked() {
            return getState() != 0;
        }

        /**
         * 反序列化构建实例
         */
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // 重置为解锁状态
        }
    }

NonfairSync 类

非公平锁

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        // 加锁
        final void lock() {
            if (compareAndSetState(0, 1))// 快速尝试加锁
                setExclusiveOwnerThread(Thread.currentThread());// 记录锁线程
            else
                acquire(1);// AbstractQueuedSynchronizer 的方法,后续会说到
        }
        // 尝试以独占模式获取锁
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

FairSync 类

公平锁

    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        // 加锁
        final void lock() {
            acquire(1);
        }

        // 尝试以独占模式获取锁
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {// 未加锁
                // 因为是公平锁,因此等待时间最长的线程最先获得锁
                // hasQueuedPredecessors判断有没有比当前线程等待时间更长的线程
                // 如果没有的话进行加锁操作
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
    // AbstractQueuedSynchronizer#hasQueuedPredecessors
    // 1、h == t 说明队列为空
    // 2、h != t && h.next == null 说明其他线程抢先获取锁
    // 3、h != t && h.next != null && h.next.thread != Thread.currentThread() 说明当前线程并非等待时间最长的线程
    public final boolean hasQueuedPredecessors() {
        Node t = tail; 
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

构造方法

    public ReentrantLock() {
        sync = new NonfairSync();// 默认非公平锁,效率更高
    }
    public ReentrantLock(boolean fair) {// 可以指定公平性
        sync = fair ? new FairSync() : new NonfairSync();
    }

加锁操作

lock() 方法

    public void lock() {
        sync.lock();// 直接调用同步器
    }

acquire() 方法

    public final void acquire(int arg) {
        // tryAcquire方法由子类重写。如果加锁失败,则加入队列等待
        // addWaiter方法:节点尾入队,前面的文章已经探讨过了,这里不再赘述
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();// 中断线程
    }

acquireQueued() 方法

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();// 获取前驱节点
                // 前驱节点为头结点且获取锁成功
                // 则将新节点设置为头结点,并清空节点信息
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }

                //  《Java - AbstractQueuedSynchronizer 源码阅读笔记》一文中已经探讨过了,这里不再赘述
                // shouldParkAfterFailedAcquire()方法用于判断当前线程是否需要被阻塞
                // parkAndCheckInterrupt()方法用于阻塞线程并且检测线程是否被中断
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)// 异常导致加锁失败
                cancelAcquire(node);// 取消获取锁请求
        }
    }

cancelAcquire() 方法

    private void cancelAcquire(Node node) {
        if (node == null)
            return;
        node.thread = null;// 清空线程
        Node pred = node.prev;
        // 前驱节点为取消状态,则往前查找非取消状态的节点,当做有效前驱节点
        // 同时兼顾链接节点(下面的第二、三种情况)
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        Node predNext = pred.next;
        node.waitStatus = Node.CANCELLED;// 修改状态为CANCELLED

        // 1、如果node为尾节点,则将尾结点更新为有效前驱节点
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);// 断开有效前驱节点的后继节点
        } else {
            int ws;
            // 2、有效前驱节点不等于头结点也不是尾结点
            // 则将有效前继节点的等待状态修改为SIGNAL
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    // node有效前驱节点指向后继节点
                    // 而后继指向前驱则是留给其他线程来操作
                    compareAndSetNext(pred, predNext, next);
            } else {
                // 3、有效前驱节点等于头结点,则唤醒后继节点。由后继线程执行出队操作
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

解锁操作

unlock() 方法

    public void unlock() {
        sync.release(1);// 直接调用同步器
    }

release() 方法

    public final boolean release(int arg) {
        if (tryRelease(arg)) {// tryRelease方法由子类重写。解锁成功,唤醒下一个线程
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

条件等待

newCondition() 方法

    public Condition newCondition() {
        return sync.newCondition();
    }

newCondition() 方法

AbstractQueuedSynchronizer#newCondition

        // 抽象同步器中定义了Condition接口的实现类。后续调用await()、signal()等方法,会回调到其相应的实现方法。
        final ConditionObject newCondition() {
            return new ConditionObject();
        }

ConditionObject 类

ConditionObject对象维护了一个单向链表来存储被阻塞的线程对象

 public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        /** 条件队列中的第一个节点. */
        private transient Node firstWaiter;
        /** 条件队列中的最后一个节点. */
        private transient Node lastWaiter;
        ...
 }

await() 方法

AbstractQueuedSynchronizer#ConditionObject#await

        public final void await() throws InterruptedException {
            if (Thread.interrupted())// 中断检测
                throw new InterruptedException();
            // addConditionWaiter做两个事情:
            // 1、如果尾节点被取消了,则从头开始遍历链表,将所有被取消的节点,逐一踢出条件队列
            // 2、将当前线程对象封装成Node对象,并插入条件队列的队尾
            Node node = addConditionWaiter();
            // 完全释放当前线程锁,附带唤醒同步队列中第一个有效节点的线程
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {// 当前线程的节点不在同步队列中
                LockSupport.park(this);// 阻塞当前线程
                // 被唤醒之后,如果线程未被中断,则返回0,一切正常。
                // 如果线程被中断:则尝试将节点状态由CONDITION改为0,
                // 修改成功则将节点插入同步队列,返回-1,意味着后续需要抛中断异常;
                // 修改失败则自旋等待,直至节点在同步队列中,返回1,意味着收到了信号。
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 重新获取锁
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();// 踢出条件队列中被取消的节点
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);// 根据模式抛相应的异常或什么都不做
        }

fullyRelease() 方法

    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();// 获取加锁次数
            if (release(savedState)) {// 释放锁成功
                failed = false;
                return savedState;
            } else { // 释放锁失败,抛异常
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed) 
                node.waitStatus = Node.CANCELLED; // 取消当前线程
        }
    }

signal() 方法

唤醒条件队列中的一个节点线程

        public final void signal() {
            // 当前线程是否获得锁
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

doSignal() 方法

从条件队列队首遍历,查找到第一个状态为CONDITION的节点,并将该节点插入同步队列队尾。
如果该节点的前驱节点是取消状态或修改状态为SIGNAL失败的话,则唤醒节点线程。否则,排队等待唤醒

        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;// 只有一个节点
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

transferForSignal() 方法

    final boolean transferForSignal(Node node) {
        // 等待状态修改失败的话,说明已经被取消了
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        Node p = enq(node);// 当前节点入同步队列队尾
        int ws = p.waitStatus;
        // 前驱节点为取消状态或修改状态失败,则唤醒节点线程
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

总结