java.util.concurrent.locks.ReentrantReadWriteLock 源码

相关类图:

    ReentrantReadWriteLock 实现了ReadWriteLock 接口。其自身有五个内部类,五个内部类之间也是相互关联的。内部类的关系以下图所示。html

    

    如上图所示,Sync继承自AQS、NonfairSync继承自Sync类、FairSync继承自Sync类;ReadLock实现了Lock接口、WriteLock也实现了Lock接口。java

 

java.util.concurrent.locks.ReentrantReadWriteLock 源码:

package java.util.concurrent.locks;

import java.util.concurrent.TimeUnit;
import java.util.Collection;

public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
    private static final long serialVersionUID = -6992448646407690164L;// 版本序列号
    // 读锁
    private final ReentrantReadWriteLock.ReadLock readerLock;
    // 写锁
    private final ReentrantReadWriteLock.WriteLock writerLock;
    // 同步队列
    final Sync sync;//Sync抽象类继承自AQS抽象类,Sync类提供了对ReentrantReadWriteLock的支持

   
    public ReentrantReadWriteLock() {//无参构造函数,默认调用非公平策略构造函数
        this(false);
    }

    //设置公平策略或者非公平策略,并建立读锁与写锁对象实例
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

    //实现了ReadWriteLock的writeLock方法,返回一个写入锁对象
    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }

    ////实现了ReadWriteLock的readLock方法,返回一个读取锁对象
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

    //Sync类内部存在两个内部类,分别为HoldCounter和ThreadLocalHoldCounter
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 6317671515068378041L;// 版本序列号

        static final int SHARED_SHIFT   = 16;// 高16位为读锁,低16位为写锁
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);// 读锁单位
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;// 读锁最大数量
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;// 写锁最大数量

        private transient ThreadLocalHoldCounter readHolds;//本地线程计数器
        private transient HoldCounter cachedHoldCounter; // 缓存的计数器

        private transient Thread firstReader = null;// 第一个读线程
        private transient int firstReaderHoldCount;// 第一个读线程的计数

        //占有读锁的线程数量
        static int sharedCount(int c){
           //直接将state右移16位,就能够获得读锁的线程数量,由于state的高16位表示读锁,对应的第十六位表示写锁数量
           return c >>> SHARED_SHIFT;
        }

        //占有写锁的线程数量
        static int exclusiveCount(int c) {
           //直接将状态state和(2^16 - 1)作与运算,其等效于将state模上2^16。写锁数量由state的低十六位表示
           return c & EXCLUSIVE_MASK;
        }

        //HoldCounter主要与读锁配套使用
        static final class HoldCounter {
            int count = 0;// 某个读线程重入的次数
            
            // 获取当前线程的TID属性的值,用来惟一标识一个线程
            final long tid = getThreadId(Thread.currentThread());
        }

        /*ThreadLocalHoldCounter重写了ThreadLocal的initialValue方法,ThreadLocal类能够将线程与对象相关联。在没有进行set的状况下,get到的均是initialValue方法里面生成的那个HolderCounter对象*/
        static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {// 本地线程计数器
            //重写初始化方法,在没有进行set的状况下,获取的都是该HoldCounter值
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }

        // 构造函数
        Sync() {
            readHolds = new ThreadLocalHoldCounter();// 本地线程计数器
            setState(getState()); // 设置AQS的状态
        }

        //建立IllegalMonitorStateException异常对象实例
        private IllegalMonitorStateException unmatchedUnlockException() {
            return new IllegalMonitorStateException(
                "attempt to unlock read lock, not locked by current thread");
        }

        //读线程是否应该被阻塞
        abstract boolean readerShouldBlock();

        //写线程是否应该被阻塞
        abstract boolean writerShouldBlock();

        //用于释放写锁资源
        protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively())//当前线程不是写锁持有者,则抛出异常
                throw new IllegalMonitorStateException();
            int nextc = getState() - releases;//计算释放资源后的写锁的数量
            boolean free = exclusiveCount(nextc) == 0;//若为0,获得true

            if (free)//若为0,表示须要释放资源;不然,只需修改状态计数值便可,继续保持资源的占用状态.
                setExclusiveOwnerThread(null);

            setState(nextc);//修改状态的计数值
            return free;//返回释放结果
        }

        //用于获取写锁
        protected final boolean tryAcquire(int acquires) {
            // 获取当前线程
            Thread current = Thread.currentThread();
            int c = getState();// 获取状态
            int w = exclusiveCount(c);// 写线程数量

            if (c != 0) {// 状态不为0
                // 写线程数量为0,则为读锁占据;写线程不为0,但当前线程没有占有该独占锁
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;//获取写锁失败
                if (w + exclusiveCount(acquires) > MAX_COUNT)// 判断获取写锁重入次数是否超过最大值限制
                    throw new Error("Maximum lock count exceeded");
                // 设置AQS状态
                setState(c + acquires);
                return true;//获取成功
            }

            //此处的c等于0.此时没有读锁线程和写锁线程
            //判断写线程是否应该被阻塞:非公平策略下老是不会被阻塞,在公平策略下须要进行判断是否有等待时间更长的读取线程
            if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
                return false;//若写线程须要阻塞,或CAS设置状态失败,则返回获取失败
            
            setExclusiveOwnerThread(current);// 设置独占线程
            return true;//获取成功
        }

        //读锁线程释放锁
        /*首先判断当前线程是否为第一个读线程firstReader,如果,则判断第一个读线程占有的资源数firstReaderHoldCount是否为1,如果,则设置第一个读线程firstReader为空,不然,将第一个读线程占有的资源数firstReaderHoldCount减1;若当前线程不是第一个读线程,那么首先会获取缓存计数器(上一个读锁线程对应的计数器 ),若计数器为空或者tid不等于当前线程的tid值,则获取当前线程的计数器,若是计数器的计数count小于等于1,则移除当前线程对应的计数器,若是计数器的计数count小于等于0,则抛出异常,以后再减小计数便可。不管何种状况,都会进入无限循环,该循环能够确保成功设置状态state*/
        protected final boolean tryReleaseShared(int unused) {
            //获取当前线程
            Thread current = Thread.currentThread();
            if (firstReader == current) {// 当前线程为第一个读线程
                if (firstReaderHoldCount == 1)// 读线程占用的资源数为1
                    firstReader = null;
                else// 减小占用的资源
                    firstReaderHoldCount--;
            } else {// 当前线程不为第一个读线程
                // 获取缓存的计数器
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))// 计数器为空或者计数器的tid不为当前正在运行的线程的tid
                    // 获取当前线程对应的计数器
                    rh = readHolds.get();

                // 获取计数
                int count = rh.count;

                if (count <= 1) { // 计数小于等于1
                    // 移除
                    readHolds.remove();
                    if (count <= 0) // 计数小于等于0,抛出异常
                        throw unmatchedUnlockException();
                }

                // 减小计数
                --rh.count;
            }

            //自旋CAS,减去1<<16
            for (;;) { // 无限循环
                // 获取状态
                int c = getState();
                // 获取状态
                int nextc = c - SHARED_UNIT;

                if (compareAndSetState(c, nextc)) // 比较并进行设置
                    return nextc == 0;
            }
        }

        //读锁线程获取读锁
        /*若写锁不为0而且当前线程不占有写锁,则直接返回-1;若读线程须要被阻塞且读线程数量小于最大值以及状态值未改变且修改状态值成功,则得到读锁成功.此时须要判断当前若为第一个读锁,则须要设置第一个读线程firstReader和firstReaderHoldCount;若当前线程为第一个读线程,则增长firstReaderHoldCount;不然,将设置当前线程对应的HoldCounter对象的值*/
        protected final int tryAcquireShared(int unused) {// 共享模式下获取资源
            // 获取当前线程
            Thread current = Thread.currentThread();
            // 获取状态
            int c = getState();

            //存在写锁且当前线程不是获取写锁的线程,返回-1,获取读锁失败
            if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
                return -1;

            // 读线程数量
            int r = sharedCount(c);

            // 若读线程须要被阻塞且读线程数量小于最大值以及状态值未改变且修改状态值成功,则得到读锁成功
            if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { 
                //firstReader是不会放到readHolds里的, 这样,在读锁只有一个的状况下,就避免了查找readHolds                
                if (r == 0) { // 读锁数量为0,(首次获取读锁)
                    // 设置第一个读线程
                    firstReader = current;
                    // 读线程占用的资源数为1
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {// 当前线程为第一个读线程 (firstReader重入)
                    // 占用资源数加1
                    firstReaderHoldCount++;
                } else {// 读锁数量不为0而且第一个线程不为当前线程
                    HoldCounter rh = cachedHoldCounter;//读锁重入计数缓存,基于ThreadLocal实现

                    if (rh == null || rh.tid != getThreadId(current))// 计数器为空或者计数器的tid不为当前正在运行的线程的tid
                        //readHolds是缓存了当前线程的读锁重入次数的ThreadLocal
                        //当前线程天然是最后获取锁的线程,故将当前线程的holdCounter赋给cachedHoldCounter
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0) //计数为0
                        //缓存当前线程的holdCounter
                        //在fullTryAcquireShared()方法中,获取读锁失败的线程会执行:readHolds.remove(),故此时须要从新设置 
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }

           //首次获取读锁失败后,重试获取
            return fullTryAcquireShared(current);
        }

        //处理CAS更新失败和未考虑写锁可重入获取读锁,而获取读锁失败的状况.
        final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;
            for (;;) { // 无限循环
                // 获取状态
                int c = getState();
                
                if (exclusiveCount(c) != 0) {// 写线程数量不为0,且被其余线程持有写入锁
                    if (getExclusiveOwnerThread() != current)
                        return -1;//获取读锁失败,直接返回.
                }else if (readerShouldBlock()) {//写线程数量为0而且读线程应该被阻塞
                    if (firstReader == current) { // 当前线程为第一个读线程
                       
                    } else {//当前线程不为第一个读线程
                        if (rh == null) {
                            // 获取计数器
                            rh = cachedHoldCounter;

                            if (rh == null || rh.tid != getThreadId(current)) { // 计数器为空或者计数器的tid不为当前正在运行的线程的tid
                                rh = readHolds.get();
                                if (rh.count == 0)//计数为0
                                    readHolds.remove();//移除
                            }
                        }

                        if (rh.count == 0)//计数器为0
                            return -1;
                    }
                }
                if (sharedCount(c) == MAX_COUNT) // 读锁数量为最大值,抛出异常
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) { // 比较而且设置成功
                    if (sharedCount(c) == 0) { // 读线程数量为0
                        // 设置第一个读线程
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

        //用于写入锁的tryLock方法
        final boolean tryWriteLock() {
            Thread current = Thread.currentThread();
            int c = getState();
            if (c != 0) {
                int w = exclusiveCount(c);//获得写入锁的数量
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
            }
            if (!compareAndSetState(c, c + 1))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

        //用于读取锁的tryLock方法
        final boolean tryReadLock() {
            Thread current = Thread.currentThread();
            for (;;) {
                int c = getState();
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return false;
                int r = sharedCount(c);//获得读取锁的数量
                if (r == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return true;
                }
            }
        }

        //当前线程是否持有独占锁
        protected final boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        //建立一个内部条件,用于写入锁的newCondition方法
        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        //返回当前拥有写入锁的线程,若是没有这样的线程,则返回 null
        final Thread getOwner() {
            return ((exclusiveCount(getState()) == 0) ?
                    null :
                    getExclusiveOwnerThread());
        }

        //查询持有读取锁的总数
        final int getReadLockCount() {
            return sharedCount(getState());
        }

        //查询是否某个线程保持了写入锁
        final boolean isWriteLocked() {
            return exclusiveCount(getState()) != 0;
        }

        //查询当前线程在此锁上保持的重入写入锁数量
        final int getWriteHoldCount() {
            return isHeldExclusively() ? exclusiveCount(getState()) : 0;
        }

        //查询当前线程在此锁上保持的重入读取锁数量
        final int getReadHoldCount() {
            if (getReadLockCount() == 0)
                return 0;

            Thread current = Thread.currentThread();
            if (firstReader == current)
                return firstReaderHoldCount;

            HoldCounter rh = cachedHoldCounter;
            if (rh != null && rh.tid == getThreadId(current))
                return rh.count;

            int count = readHolds.get().count;
            if (count == 0) readHolds.remove();
            return count;
        }

        //自定义序列化方法
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            readHolds = new ThreadLocalHoldCounter();
            setState(0); // 重置为未锁定状态
        }

        //获得同步状态值        
        final int getCount() { return getState(); }
    }

    //非公平策略
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -8159625535654395037L;//版本号

        final boolean writerShouldBlock() {//非公平策略,写入锁老是阻塞
            return false;
        }

        final boolean readerShouldBlock() {//读取锁是否堵塞,取决于等待队列是否有获取写入锁的线程等待
            return apparentlyFirstQueuedIsExclusive();
        }
    }

    //公平策略
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -2274990926593161451L;//版本号
        
        final boolean writerShouldBlock() {//等待队列中在当前线程以前有等待线程,则阻塞
            return hasQueuedPredecessors();
        }

        final boolean readerShouldBlock() {//等待队列中在当前线程以前有等待线程,则阻塞
            return hasQueuedPredecessors();
        }
    }

    //读取锁
    public static class ReadLock implements Lock, java.io.Serializable {
        private static final long serialVersionUID = -5992448646407690164L;//版本号

        private final Sync sync;//同步队列对象引用

        //构造方法(将同步队列对象引用 指向 ReentrantReadWriteLock 的同步队列)
        protected ReadLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }

        //若是写锁未被另外一个线程持有,则获取读取锁并当即返回
        public void lock() {
            sync.acquireShared(1);
        }

        //支持中断,获取读取锁方法        
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }

        //读锁线程获取读锁
        public boolean tryLock() {
            return sync.tryReadLock();
        }

        //尝试在共享模式下获取读取锁,若是中断,则停止;若是超过给定的时间,则返回false
        public boolean tryLock(long timeout, TimeUnit unit)
                throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }

        //释放锁
        public void unlock() {
            sync.releaseShared(1);
        }

        //读取锁不支持内部条件队列,若调用readLock().newCondition();会抛出UnsupportedOperationException
        public Condition newCondition() {
            throw new UnsupportedOperationException();
        }

        //返回标识此锁及其持有读取锁的重入次数的字符串
        public String toString() {
            int r = sync.getReadLockCount();
            return super.toString() +
                "[Read locks = " + r + "]";
        }
    }

    //写入锁
    public static class WriteLock implements Lock, java.io.Serializable {
        private static final long serialVersionUID = -4992448646407690164L;//版本号

        private final Sync sync;//同步队列对象引用

        //构造方法(将同步队列对象引用 指向 ReentrantReadWriteLock 的同步队列)
        protected WriteLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }

        //获取独占锁,并忽略中断
        public void lock() {
            sync.acquire(1);
        }

        //获取独占锁,支持中断响应
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }

        //获取写锁
        public boolean tryLock( ) {
            return sync.tryWriteLock();
        }

        //尝试在独占模式下获取写入锁,若是中断,则停止;若是超过给定的时间,则返回false
        public boolean tryLock(long timeout, TimeUnit unit)
                throws InterruptedException {
            return sync.tryAcquireNanos(1, unit.toNanos(timeout));
        }

        //释放锁
        public void unlock() {
            sync.release(1);
        }

        //新增条件队列
        public Condition newCondition() {
            return sync.newCondition();
        }

        //返回标识此锁及其锁定状态的字符串
        public String toString() {
            Thread o = sync.getOwner();
            return super.toString() + ((o == null) ?
                                       "[Unlocked]" :
                                       "[Locked by thread " + o.getName() + "]");
        }

        //返回当前线程是否独占锁
        public boolean isHeldByCurrentThread() {
            return sync.isHeldExclusively();
        }

        //返回写入锁的重入次数
        public int getHoldCount() {
            return sync.getWriteHoldCount();
        }
    }

    //查询当前对象是否为公平锁对象实例
    public final boolean isFair() {
        return sync instanceof FairSync;
    }

    //返回当前拥有写入锁的线程,若是没有这样的线程,则返回 null
    protected Thread getOwner() {
        return sync.getOwner();
    }

    //查询为此锁保持的读取锁数量
    public int getReadLockCount() {
        return sync.getReadLockCount();
    }

    //查询是否某个线程保持了写入锁
    public boolean isWriteLocked() {
        return sync.isWriteLocked();
    }

    //查询当前线程是否保持了写入锁
    public boolean isWriteLockedByCurrentThread() {
        return sync.isHeldExclusively();
    }

    //查询当前线程在此锁上保持的重入写入锁数量
    public int getWriteHoldCount() {
        return sync.getWriteHoldCount();
    }

    //查询当前线程在此锁上保持的重入读取锁数量
    public int getReadHoldCount() {
        return sync.getReadHoldCount();
    }

    //返回一个collection,它包含可能正在等待获取写入锁的线程
    protected Collection<Thread> getQueuedWriterThreads() {
        return sync.getExclusiveQueuedThreads();
    }

    //返回一个collection,它包含可能正在等待获取读取锁的线程
    protected Collection<Thread> getQueuedReaderThreads() {
        return sync.getSharedQueuedThreads();
    }

    //查询是否全部的线程正在等待获取读取或写入锁
    public final boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    //查询是否给定线程正在等待获取读取或写入锁
    public final boolean hasQueuedThread(Thread thread) {
        return sync.isQueued(thread);
    }

    //返回等待获取读取或写入锁的线程估计数目
    public final int getQueueLength() {
        return sync.getQueueLength();
    }

    //返回一个 collection,它包含可能正在等待获取读取或写入锁的线程
    protected Collection<Thread> getQueuedThreads() {
        return sync.getQueuedThreads();
    }

    //查询是否有些线程正在等待与写入锁有关的给定条件
    public boolean hasWaiters(Condition condition) {
        if (condition == null)
            throw new NullPointerException();
        if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
            throw new IllegalArgumentException("not owner");
        return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
    }

    //返回正等待与写入锁相关的给定条件的线程估计数目
    public int getWaitQueueLength(Condition condition) {
        if (condition == null)
            throw new NullPointerException();
        if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
            throw new IllegalArgumentException("not owner");
        return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
    }

    //返回一个 collection,它包含可能正在等待与写入锁相关的给定条件的那些线程
    protected Collection<Thread> getWaitingThreads(Condition condition) {
        if (condition == null)
            throw new NullPointerException();
        if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
            throw new IllegalArgumentException("not owner");
        return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
    }

    //返回标识此锁及其锁状态的字符串
    public String toString() {
        int c = sync.getCount();
        int w = Sync.exclusiveCount(c);
        int r = Sync.sharedCount(c);

        return super.toString() +
            "[Write locks = " + w + ", Read locks = " + r + "]";
    }

    static final long getThreadId(Thread thread) {
        return UNSAFE.getLongVolatile(thread, TID_OFFSET);
    }

    // Unsafe类 提供了硬件级别的原子操做
    private static final sun.misc.Unsafe UNSAFE;
    private static final long TID_OFFSET;// 线程ID的偏移地址
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> tk = Thread.class;
            //获取线程的tid字段的内存地址
            TID_OFFSET = UNSAFE.objectFieldOffset(tk.getDeclaredField("tid"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

}

类 ReentrantReadWriteLock

实现的接口:api

    SerializableReadWriteLock缓存

    支持与 ReentrantLock 相似语义的 ReadWriteLock 实现。(在避免"读-写"、"写-写"冲突的同时也容许多个读操做同时进行,从而在某些情形下,提升了程序的性能。数据结构

    此类具备如下属性:并发

  • 获取顺序

        此类不会将读取者优先或写入者优先强加给锁访问的排序。可是,它确实支持可选的公平 策略。app

    非公平模式(默认)函数

        当非公平地(默认)构造时,未指定进入读写锁的顺序。连续竞争的非公平锁可能无限期地推迟一个或多个 reader 或 writer 线程,但吞吐量一般要高于公平锁。

    公平模式高并发

        当公平地构造线程时,线程利用一个近似到达顺序的策略来争夺进入。当释放当前保持的锁时,能够为等待时间最长的单个 writer 线程分配写入锁,若是有一组等待时间大于全部正在等待的 writer 线程 的 reader 线程,将为该组分配读取锁.
        注意,非阻塞 ReentrantReadWriteLock.ReadLock.tryLock() 和 ReentrantReadWriteLock.WriteLock.tryLock() 方法不会遵照此公平设置,而是直接将得到锁,而不考虑等待的线程。
  • 重入

        此锁容许 reader 和 writer 从新获取读取锁或写入锁。须要注意:在写入线程保持的全部写入锁都已经释放后,才容许重入 reader 使用它们。性能

        此外,writer 能够获取读取锁,但反过来则不成立。若是 reader 试图获取写入锁,那么将永远不会得到成功。
  • 锁降级

        重入还容许从写入锁降级为读取锁,其实现方式是:先获取写入锁,而后获取读取锁,最后释放写入锁。可是,从读取锁升级到写入锁是不可能的

  • 锁获取的中断

        读取锁和写入锁都支持锁获取期间的中断。

  • Condition 支持

        写入锁提供了一个 Condition 实现,对于写入锁来讲,该实现的行为与 ReentrantLock.newCondition() 提供的 Condition 实现对 ReentrantLock 所作的行为相同。固然,此 Condition 只能用于写入锁。

        读取锁不支持 ConditionreadLock().newCondition() 会抛出 UnsupportedOperationException

     此类行为的序列化方式与内置锁的相同:反序列化的锁处于解除锁状态,不管序列化该锁时其状态如何。

    下面的代码展现了如何利用重入来执行升级缓存后的锁降级(为简单起见,省略了异常处理):

class CachedData {
   Object data;
   volatile boolean cacheValid;
   ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

   void processCachedData() {
     rwl.readLock().lock();
     if (!cacheValid) {
        // Must release read lock before acquiring write lock
        rwl.readLock().unlock();
        rwl.writeLock().lock();
        // Recheck state because another thread might have acquired
        //   write lock and changed state before we did.
        if (!cacheValid) {
          data = ...
          cacheValid = true;
        }
        // Downgrade by acquiring read lock before releasing write lock
        rwl.readLock().lock();
        rwl.writeLock().unlock(); // Unlock write, still hold read
     }

     use(data);
     rwl.readLock().unlock();
   }
 }

    在使用某些种类的 Collection 时,可使用 ReentrantReadWriteLock 来提升并发性。一般,在预期 collection 很大,读取者线程访问它的次数多于写入者线程,这很值得一试。例如,如下是一个使用 TreeMap 的类,预期它很大,而且能被同时访问。

class RWDictionary {
    private final Map<String, Data> m = new TreeMap<String, Data>();
    private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    private final Lock r = rwl.readLock();
    private final Lock w = rwl.writeLock();

    public Data get(String key) {
        r.lock();
        try { return m.get(key); }
        finally { r.unlock(); }
    }
    public String[] allKeys() {
        r.lock();
        try { return m.keySet().toArray(); }
        finally { r.unlock(); }
    }
    public Data put(String key, Data value) {
        w.lock();
        try { return m.put(key, value); }
        finally { w.unlock(); }
    }
    public void clear() {
        w.lock();
        try { m.clear(); }
        finally { w.unlock(); }
    }
 }

实现注意事项:

    此锁最多支持 65535 个递归写入锁和 65535 个读取锁。试图超出这些限制将致使锁方法抛出 Error

 

嵌套类摘要

static class ReentrantReadWriteLock.ReadLock 
          readLock() 方法返回的锁。
static class ReentrantReadWriteLock.WriteLock 
          writeLock() 方法返回的锁。

构造方法摘要

ReentrantReadWriteLock() 
          使用默认(非公平)的排序属性建立一个新的 ReentrantReadWriteLock
ReentrantReadWriteLock(boolean fair) 
          使用给定的公平策略建立一个新的 ReentrantReadWriteLock

方法摘要

protected  Thread getOwner() 
          返回当前拥有写入锁的线程,若是没有这样的线程,则返回 null
protected  Collection<Thread> getQueuedReaderThreads() 
          返回一个 collection,它包含可能正在等待获取读取锁的线程。
protected  Collection<Thread> getQueuedThreads() 
          返回一个 collection,它包含可能正在等待获取读取或写入锁的线程。
protected  Collection<Thread> getQueuedWriterThreads() 
          返回一个 collection,它包含可能正在等待获取写入锁的线程。
 int getQueueLength() 
          返回等待获取读取或写入锁的线程估计数目。
 int getReadHoldCount() 
          查询当前线程在此锁上保持的重入读取锁数量。
 int getReadLockCount() 
          查询为此锁保持的读取锁数量。
protected  Collection<Thread> getWaitingThreads(Condition condition) 
          返回一个 collection,它包含可能正在等待与写入锁相关的给定条件的那些线程。
 int getWaitQueueLength(Condition condition) 
          返回正等待与写入锁相关的给定条件的线程估计数目。
 int getWriteHoldCount() 
          查询当前线程在此锁上保持的重入写入锁数量。
 boolean hasQueuedThread(Thread thread) 
          查询是否给定线程正在等待获取读取或写入锁。
 boolean hasQueuedThreads() 
          查询是否全部的线程正在等待获取读取或写入锁。
 boolean hasWaiters(Condition condition) 
          查询是否有些线程正在等待与写入锁有关的给定条件。
 boolean isFair() 
          若是此锁将公平性设置为 ture,则返回 true
 boolean isWriteLocked() 
          查询是否某个线程保持了写入锁。
 boolean isWriteLockedByCurrentThread() 
          查询当前线程是否保持了写入锁。
 ReentrantReadWriteLock.ReadLock readLock() 
          返回用于读取操做的锁。
 String toString() 
          返回标识此锁及其锁状态的字符串。
 ReentrantReadWriteLock.WriteLock writeLock() 
          返回用于写入操做的锁。

  

ReentrantReadWriteLock

public ReentrantReadWriteLock()

    使用默认(非公平)的排序属性建立一个新的 ReentrantReadWriteLock

 

ReentrantReadWriteLock

public ReentrantReadWriteLock(boolean fair)

    使用给定的公平策略建立一个新的 ReentrantReadWriteLock

    参数:

    fair - 若是此锁应该使用公平排序策略,则该参数的值为 true

 

writeLock

public ReentrantReadWriteLock.WriteLock writeLock()

    返回用于写入操做的锁。

    实现了接口 ReadWriteLock 中的 writeLock方法

 

readLock

public ReentrantReadWriteLock.ReadLock readLock()

    返回用于读取操做的锁。

    实现了接口 ReadWriteLock 中的 readLock方法

 

isFair

public final boolean isFair()

    若是此锁将公平性设置为 ture,则返回 true

 

getOwner

protected Thread getOwner()

    返回当前拥有写入锁的线程,若是没有这样的线程,则返回 null。当经过不是全部者的线程调用此方法时,返回值反映当前锁状态的最接近近似值。例如,即便存在试图得到锁的线程,可是在它尚未得到前,全部者可能暂时为 null。设计此方法是为了便于构造提供更多扩展的锁监视设施的子类。

    返回:

    全部者;若是没有全部者,则返回 null

 

getReadLockCount

public int getReadLockCount()

    查询为此锁保持的读取锁数量。此方法设计用于监视系统状态,而不是同步控制。

    返回:

        所保持的读取锁数量。

 

isWriteLocked

public boolean isWriteLocked()

    查询是否某个线程保持了写入锁。此方法设计用于监视系统状态,而不是同步控制。

    返回:

        若是某个线程保持写入锁,则返回 true;不然返回 false

 

isWriteLockedByCurrentThread

public boolean isWriteLockedByCurrentThread()

    查询当前线程是否保持了写入锁。

    返回:

        若是当前线程保持写入锁,则返回 true;不然返回 false

 

getWriteHoldCount

public int getWriteHoldCount()

    查询当前线程在此锁上保持的重入写入锁数量。对于与解除锁操做不匹配的每一个锁操做,writer 线程都会为其保持一个锁。

    返回:

        当前线程保持的写入锁数量,若是当前线程从未保持过写入锁,则返回 0

 

getReadHoldCount

public int getReadHoldCount()

    查询当前线程在此锁上保持的重入读取锁数量。对于与解除锁操做不匹配的每一个锁操做,reader 线程都会为其保持一个锁。

    返回:

        当前线程保持的读取锁数量;若是当前线程从未保持过读取锁,则返回 0

 

getQueuedWriterThreads

protected Collection<Thread> getQueuedWriterThreads()

    返回一个 collection,它包含可能正在等待获取写入锁的线程。由于在构成此结果的同时,实际的线程 set 可能不断发生变化,因此返回的 collection 仅是尽力而为得到的估计值。所返回 collection 的元素没有特定的顺序。设计此方法是为了便于构造提供更多扩展的锁监视器设施的子类。

    返回:

    线程的 collection

 

getQueuedReaderThreads

protected Collection<Thread> getQueuedReaderThreads()

    返回一个 collection,它包含可能正在等待获取读取锁的线程。由于在构成此结果的同时,实际的线程 set 可能不断发生变化,因此返回的 collection 仅是尽力而为得到的估计值。所返回 collection 的元素没有特定的顺序。设计此方法是为了便于构造提供更多扩展的锁监视器设施的子类。

    返回:

        线程的 collection

 

hasQueuedThreads

public final boolean hasQueuedThreads()

    查询是否全部的线程正在等待获取读取或写入锁。注意,由于随时可能发生取消操做,因此返回 true 并不保证任何其余线程将获取锁。此方法主要用于监视系统状态。

    返回:

        若是有其余线程正等待获取锁,则返回 true

 

hasQueuedThread

public final boolean hasQueuedThread(Thread thread)

    查询是否给定线程正在等待获取读取或写入锁。注意,由于随时可能发生取消操做,因此返回 true 并不保证此线程将获取锁。此方法主要用于监视系统状态。

    参数:

    thread - 线程

    返回:

        若是将给定的线程加入等待此锁的队列,则返回 true

    抛出:

    NullPointerException - 若是线程为 null

 

getQueueLength

public final int getQueueLength()

    返回等待获取读取或写入锁的线程估计数目。由于在此方法遍历内部数据结构时,能够动态地更改线程数,因此该值只能是一个估计值。此方法设计用于监视系统状态,而不是同步控制。

    返回:

        正在等待此锁的线程估计数目

 

getQueuedThreads

protected Collection<Thread> getQueuedThreads()

    返回一个 collection,它包含可能正在等待获取读取或写入锁的线程。由于在构造此结果的同时,实际的线程 set 可能不断发生变化,因此返回的 collection 仅是尽力而为得到的估计值。所返回 collection 中的元素没有特定的顺序。此方法用于加快子类的构造速度,提供更多的监视设施。

 

hasWaiters

public boolean hasWaiters(Condition condition)

    查询是否有些线程正在等待与写入锁有关的给定条件。注意,由于随时可能发生超时和中断,因此返回 true 并不保证未来某个 signal 将唤醒任何线程。此方法主要用于监视系统状态。

    参数:

    condition - 条件

    返回:

        若是有等待的线程,则返回 true

    抛出:

    IllegalMonitorStateException - 若是没有保持此锁

    IllegalArgumentException - 若是给定的条件与此锁无关

    NullPointerException - 若是条件为 null

 

getWaitQueueLength

public int getWaitQueueLength(Condition condition)

    返回正等待与写入锁相关的给定条件的线程估计数目。注意,由于随时可能发生超时和中断,因此只能将估计值做为实际等待线程数的上限。此方法设计用于监视系统状态,而不是同步控制。

    参数:

    condition - 条件

    返回:

        等待线程的估计数

    抛出:

    IllegalMonitorStateException - 若是没有保持此锁

    IllegalArgumentException - 若是给定的条件与此锁无关

    NullPointerException - 若是条件为 null

 

getWaitingThreads

protected Collection<Thread> getWaitingThreads(Condition condition)

    返回一个 collection,它包含可能正在等待与写入锁相关的给定条件的那些线程。由于在构造此结果的同时,实际的线程 set 可能不断发生变化,因此返回的 collection 仅是尽力而为得到的估计值。所返回 collection 中的元素没有特定的顺序。此方法用于加快子类的构造速度,提供更多的条件监视设施。

    参数:

    condition - 条件

    返回:

        线程的 collection

    抛出:

    IllegalMonitorStateException - 若是没有保持此锁

    IllegalArgumentException - 若是给定 condition 与此锁无关

    NullPointerException - 若是条件为 null

 

toString

public String toString()

    返回标识此锁及其锁状态的字符串。该状态括在括号中,它包括字符串 "Write locks =",后跟重入保持写入锁的数目,而后是字符串 "Read locks =",后跟所保持读取锁的数目。

    覆盖:

        类 Object 中的 toString

    返回:

        标识此锁及其锁状态的字符串

 

实现原理

  ReentrantReadWriteLock 基于AQS实现的,它的自定义同步器Sync(继承AQS)须要在同步状态(一个整型变量state)上维护多个读线程和一个写线程的状态,使得该状态的设计成为读写锁实现的关键。

    若是在一个整型变量上维护多种状态,就必定须要“按位切割” SHARED_SHIFT 这个变量,读写锁将变量切分红了两个部分,高16位表示读,低16位表示写。

      ReentrantReadWriteLock含有两把锁readerLock和writerLock,其中ReadLock和WriteLock都是内部类。

        写锁是一个可重入的独占锁,使用AQS提供的独占式获取同步状态的策略。

        获取写锁的步骤以下:

            1)判断同步状态state是否为0。若是state!=0,说明已经有其余线程获取了读锁或写锁,执行2);不然执行5)。

            2)判断同步状态state的低16位(w)是否为0。若是w=0,说明其余线程获取了读锁,返回false;若是w!=0,说明其余线程获取了写锁,执行步骤3)。

            3)判断获取了写锁是不是当前线程,若不是返回false,不然执行4);

            4)判断当前线程获取写锁的重入次数是否超过最大次数,若超过,抛异常,反之更新同步状态+1,返回true。

            5)此时读锁或写锁都没有被获取,判断是否须要阻塞(公平和非公平方式实现不一样):若是不须要阻塞,则CAS更新同步状态,若CAS成功则返回true,不然返回false;若是须要阻塞,则返回false。

        读锁是一个可重入的共享锁,采用AQS提供的共享式获取同步状态的策略。

        获取读锁的大体步骤以下:

            1)经过同步状态低16位判断,若是存在写锁且当前线程不是获取写锁的线程,返回-1,获取读锁失败;不然执行步骤2)。

            2)经过readerShouldBlock判断当前线程是否应该被阻塞,若是不该该阻塞则尝试CAS同步状态;不然执行3)。

            3)第一次获取读锁失败,经过fullTryAcquireShared再次尝试获取读锁。

 

用法示例Demo:

package com.thread;

import java.util.concurrent.locks.ReentrantReadWriteLock;

class ReadThread extends Thread {
    private ReentrantReadWriteLock rrwLock;

    public ReadThread(String name, ReentrantReadWriteLock rrwLock) {
        super(name);
        this.rrwLock = rrwLock;
    }

    public void run() {
        System.out.println(Thread.currentThread().getName() + " trying to lock");
        try {
            rrwLock.readLock().lock();
            System.out.println(Thread.currentThread().getName() + " lock successfully");
            System.out.println(Thread.currentThread().getName() +" 持有的读取锁重入次数:"+rrwLock.getReadHoldCount());
            System.out.println("持有读取锁的线程总数:"+rrwLock.getReadLockCount());
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            rrwLock.readLock().unlock();
            System.out.println(Thread.currentThread().getName() + " unlock successfully");
        }
    }
}

class WriteThread extends Thread {
    private ReentrantReadWriteLock rrwLock;

    public WriteThread(String name, ReentrantReadWriteLock rrwLock) {
        super(name);
        this.rrwLock = rrwLock;
    }

    public void run() {
        System.out.println(Thread.currentThread().getName() + " trying to lock");
        try {
            rrwLock.writeLock().lock();
            System.out.println(Thread.currentThread().getName() + " lock successfully");
            //写锁尝试获取读锁
            rrwLock.readLock().lock();
            System.out.println(Thread.currentThread().getName() +"写锁 获取读锁成功");
            rrwLock.readLock().unlock();
            System.out.println(Thread.currentThread().getName() +"写锁 释放持有的读锁完成");
            //降级
            rrwLock.readLock().lock();
            System.out.println(Thread.currentThread().getName() +"降级 获取读锁成功");
        } finally {
            rrwLock.writeLock().unlock();
            System.out.println(Thread.currentThread().getName() + " unlock successfully");

            rrwLock.readLock().unlock();
            System.out.println(Thread.currentThread().getName() +"降级 释放读锁完成");
        }
    }
}

public class ReentrantReadWriteLockDemo {
    public static void main(String[] args) {
        ReentrantReadWriteLock rrwLock = new ReentrantReadWriteLock();
        ReadThread rt1 = new ReadThread("rt1", rrwLock);
        ReadThread rt2 = new ReadThread("rt2", rrwLock);
        WriteThread wt1 = new WriteThread("wt1", rrwLock);
        rt1.start();
        rt2.start();
        wt1.start();
    }
}

    运行结果:

rt1 trying to lock
rt2 trying to lock

rt1 lock successfully
rt1 持有的读取锁重入次数:1

wt1 trying to lock
持有读取锁的线程总数:2
rt2 lock successfully
rt2 持有的读取锁重入次数:1
持有读取锁的线程总数:2

rt1 unlock successfully
rt2 unlock successfully

wt1 lock successfully wt1写锁 获取读锁成功 wt1写锁 释放持有的读锁完成 wt1降级 获取读锁成功 wt1 unlock successfully wt1降级 释放读锁完成

相关文章
相关标签/搜索