Java - ReentrantReadWriteLock 源码阅读笔记

in Tech Java

前言

  有了前面Java - CountDownLatch 源码阅读笔记Java - AbstractQueuedSynchronizer 源码阅读笔记Java - ReentrantLock 源码阅读笔记等源码分析之后,本篇就显得相当简单了。

简介

  读锁基于AQS的共享模式来实现,写锁则是基于AQS的独占模式来实现。从而实现读-读不互斥,写-写互斥,读-写互斥的效果。

示例

@Log4j2
public class ReentrantReadWriteLockTest {
    private ReentrantReadWriteLock rrw = new ReentrantReadWriteLock();
    private ReentrantReadWriteLock.ReadLock readLock = rrw.readLock();
    private ReentrantReadWriteLock.WriteLock writeLock = rrw.writeLock();

    public static void main(String[] args) {
        ReentrantReadWriteLockTest test = new ReentrantReadWriteLockTest();
        // test.readLock();
        // test.writeLock();
        test.readWriteLock();
    }

    private void readLock() {
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                try {
                    readLock.lock();
                    log.info("获取锁成功!");
                    Thread.sleep(3000);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    readLock.unlock();
                    log.info("释放锁成功!");
                }
            }).start();
        }
    }

    private void writeLock() {
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                try {
                    writeLock.lock();
                    log.info("获取锁成功!");
                    Thread.sleep(3000);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    writeLock.unlock();
                    log.info("释放锁成功!");
                }
            }).start();
        }
    }

    private void readWriteLock() {
        new Thread(() -> {
            try {
                readLock.lock();
                log.info("获取读锁成功");
                Thread.sleep(3000);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                readLock.unlock();
                log.info("释放读锁成功");
            }
        }).start();

        new Thread(() -> {
            try {
                writeLock.lock();
                log.info("获取写锁成功");
                Thread.sleep(3000);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                writeLock.unlock();
                log.info("释放写锁成功");
            }
        }).start();
    }
}

ReentrantReadWriteLock 类

实现了 ReadWriteLock 接口,该接口定义了 readLock() 和 writeLock() 方法。

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
    private static final long serialVersionUID = -6992448646407690164L;
    /** 内部提供 Lock 接口的读锁实现类 */
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /** 内部提供 Lock 接口的写锁实现类 */
    private final ReentrantReadWriteLock.WriteLock writerLock;
    /** 基础同步器的抽象类 */
    final Sync sync;
    ...
    }

Sync 类

Sync 是继承 AbstractQueuedSynchronizer 类的抽象类。代码比较长,这里就不全贴出来啦。主要看区分公平锁和非公平锁的两个核心方法。

     // 由子类实现。返回 true ,则阻塞当前线程获取读锁
     abstract boolean readerShouldBlock();
     // 由子类实现。返回 true ,则阻塞当前线程获取写锁
     abstract boolean writerShouldBlock();

FairSync 类

    static final class FairSync extends Sync {
        private static final long serialVersionUID = -2274990926593161451L;
        final boolean writerShouldBlock() {
            // 直接调用AQS中的方法,前面的文章中介绍过hasQueuedPredecessors()方法
            // 判断队列中有没有比当前线程等待时间更长的线程。如果有则当前线程需要被阻塞
            return hasQueuedPredecessors();
        }
        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }
    }

NonfairSync 类

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -8159625535654395037L;
        final boolean writerShouldBlock() {
            return false; // 写锁总是可以尝试获取锁。以此来体现非公平
        }
        final boolean readerShouldBlock() {

            // 如果队列中存在等待获取独占锁的线程,则当前线程需要被阻塞
            return apparentlyFirstQueuedIsExclusive();
        }
    }

构造方法

    public ReentrantReadWriteLock() {
        this(false);
    }

    /**
     * 可指定公平性,默认非公平
     */
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();// 实例化同步器
        readerLock = new ReadLock(this);// 实例化读锁
        writerLock = new WriteLock(this);// 实例化写锁
    }

读锁

加锁操作

lock() 方法

        public void lock() {
            sync.acquireShared(1);// AQS中的方法
        }

acquireShared() 方法

    public final void acquireShared(int arg) {
        // 尝试获取共享锁
        if (tryAcquireShared(arg) < 0)
            // 获取失败的处理
            doAcquireShared(arg);
    }

tryAcquireShared() 方法

        protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
            // 判断写锁是否被其他线程持有
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);// 获取持有读锁的加锁次数
            if (!readerShouldBlock() &&// 判断当前线程是否需要被阻塞
                r < MAX_COUNT && 
                compareAndSetState(c, c + SHARED_UNIT)) {// 修改持有读锁的加锁次数
                if (r == 0) {// 第一个请求读锁的线程
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;// 第一个线程增加加锁次数
                } else {
                    HoldCounter rh = cachedHoldCounter;// 上一线程的缓存信息
                    if (rh == null || rh.tid != getThreadId(current))// 缓存线程与当前线程不一样
                        cachedHoldCounter = rh = readHolds.get();// 获取当前线程的加锁次数
                    else if (rh.count == 0)// 缓存线程与当前线程一样
                        readHolds.set(rh);
                    rh.count++;// 当前线程加锁次数自增
                }
                return 1;// 获取锁成功
            }
            return fullTryAcquireShared(current);// 快速版获取读锁失败。则使用完整版尝试获取
        }

fullTryAcquireShared() 方法

        final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;
            for (;;) {// 自旋
                int c = getState();
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)// 判断写锁是否被其他线程持有
                        return -1;// 获取读锁失败
                    // else we hold the exclusive lock; blocking here
                    // would cause deadlock.
                } else if (readerShouldBlock()) {// 需要阻塞当前线程获取读锁
                    // Make sure we're not acquiring read lock reentrantly
                    // 确保没有以可重入的方式获取读锁
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();// 移除当前线程可能在保留区的内容
                            }
                        }
                        if (rh.count == 0)
                            return -1;// 获取读锁失败
                    }
                }
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {// 计数
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

doAcquireShared() 方法

    private void doAcquireShared(int arg) {
        // 将当前线程入队
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();// 获取前驱节点
                if (p == head) {// 前驱节点为头结点
                    int r = tryAcquireShared(arg);// 尝试获取锁
                    if (r >= 0) {// 获取锁成功
                        setHeadAndPropagate(node, r);// 设置头节点和共享模式传播
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                // shouldParkAfterFailedAcquire方法用于判断当前线程是否需要被阻塞
                // parkAndCheckInterrupt方法用于阻塞线程并且检测线程是否被中断
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

解锁操作

unlock() 方法

        public void unlock() {
            sync.releaseShared(1);
        }

releaseShared() 方法

    public final boolean releaseShared(int arg) {
        // 尝试解锁
        if (tryReleaseShared(arg)) {
            // 全部解锁成功的处理
            doReleaseShared();
            return true;
        }
        return false;
    }

tryReleaseShared() 方法

        protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)// 仅加锁一次,直接置空
                    firstReader = null;
                else
                    firstReaderHoldCount--;// 加锁次数自减
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();// 读锁计数为1,直接移除
                    if (count <= 0)
                        throw unmatchedUnlockException();// 解锁超次数
                }
                --rh.count;// 当前线程计数自减
            }
            for (;;) {// 自旋
                int c = getState();
                int nextc = c - SHARED_UNIT;// 读锁线程计数减一
                if (compareAndSetState(c, nextc))
                    // 释放读锁
                    return nextc == 0;
            }
        }

doReleaseShared() 方法

    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            // 队列中还有节点
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // 头结点状态修改失败,则进入下一次循环
                    unparkSuccessor(h);      // 修改成功后,唤醒后继节点
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // 若头结点已变更,则进入下一次循环
                break;
        }
    }

写锁

加锁操作

lock() 方法

        public void lock() {
            sync.acquire(1);
        }

acquire() 方法

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            // 获取锁失败且加入同步队列成功,则自行中断
            selfInterrupt();
    }

tryAcquire() 方法

        protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) { // 已存在锁
                // (Note: if c != 0 and w == 0 then shared count != 0)
                // w == 0,已存在读锁 
                // current != getExclusiveOwnerThread(),已存在写锁
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // 重入获取,锁次数加1
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))// 锁次数加1
                return false;
            setExclusiveOwnerThread(current);// 设置独占锁线程为当前线程
            return true;
        }

解锁操作

unlock() 方法

        public void unlock() {
            sync.release(1);
        }

release() 方法

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);// 解锁成功后,唤醒后继节点
            return true;
        }
        return false;
    }

tryRelease() 方法

        protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively())// 当前线程不持有锁,则报异常
                throw new IllegalMonitorStateException();
            int nextc = getState() - releases;// 锁次数减1
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null);// 全部解锁完成,设置独占锁线程为空
            setState(nextc);// 更新锁次数
            return free;
        }

总结