JUC锁框架——ReadWriteLock

ReadWriteLock简单介绍

ReadWriteLock管理一组锁,一个是只读的锁,一个是写锁。读锁能够在没有写锁的时候被多个线程同时持有,写锁是独占的。相对于互斥锁而言,ReadWriteLoc容许更高的并发量。java

全部读写锁的实现必须确保写操做对读操做的内存影响。换句话说,一个得到了读锁的线程必须能看到前一个释放的写锁所更新的内容。缓存

ReadWriteLock接口

public interface ReadWriteLock {
    Lock readLock();//获取读锁
    Lock writeLock();//获取写锁
}

ReentrantReadWriteLock实现类

ReetrantReadWriteLock实现了ReadWriteLock接口并添加了可重入的特性。并发

  1. 锁的获取获取模式
    • 非公平模式(默认):读锁和写锁的获取的顺序是不肯定的。非公平锁主张竞争获取,比公平锁有更高的吞吐量。
    • 公平模式:线程将会以队列的顺序获取锁。当当前线程释放锁后,等待时间最长的写/读锁线程就会被分配写/读锁;当有写线程持有写锁或者有等待的写线程时,一个尝试获取公平的读锁(非重入)的线程就会阻塞。这个线程直到等待时间最长的写锁得到锁后并释放掉锁后才能获取到读锁。
  2. 可重入:容许读锁可写锁可重入。写锁能够得到读锁,读锁不能得到写锁。
  3. 锁降级:容许写锁下降为读锁。
  4. 中断锁的获取:在读锁和写锁的获取过程当中支持中断。
  5. 支持Condition:写锁提供Condition实现。
  6. 监控:提供肯定锁是否被持有等辅助方法

锁下降的简单就示例

class ReadWriteLockTest {
    String data;//缓存中的对象
    volatile boolean cacheValid;//缓存是否还有效
    final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

    void processCachedData() throws InterruptedException {
        rwl.readLock().lock();
        if (!cacheValid) {//缓存失效须要再次读取缓存
            //在读取缓存前,必须释放读锁
            rwl.readLock().unlock();
            rwl.writeLock().lock();
            try {
                //再次检查是否须要再次读取缓存,若是须要则
                if (!cacheValid) {
                    data = Thread.currentThread().getName()+":缓存数据测试,实际开发多是一个对象!";
                    cacheValid = true;
                }
                //在释放以前,经过获取读锁降级写锁
                rwl.readLock().lock();
            } finally {
                rwl.writeLock().unlock(); //释放写锁,持有读锁
            }
        }
        try {
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName()+"【"+data+"】");
        } finally {
            rwl.readLock().unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ReadWriteLockTest readWriteLockTest = new ReadWriteLockTest();
        for(int i=0;i<10;i++){
           Thread thread = new Thread(()->{
               try {
                   readWriteLockTest.processCachedData();
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           });
           thread.start();
        }
    }
}

源码分析

构造方法

public ReentrantReadWriteLock() {
    this(false);//默认为非公平模式
}

public ReentrantReadWriteLock(boolean fair) {
    //决定了Sync是FairSync仍是NonfairSync。Sync继承了AbstractQueuedSynchronizer,而Sync是一个抽象类,NonfairSync和FairSync继承了Sync,并重写了其中的抽象方法。
    sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
}

获取锁

public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

Sync分析

FairSync

static final class FairSync extends Sync {
    private static final long serialVersionUID = -2274990926593161451L;
    final boolean writerShouldBlock() {
        return hasQueuedPredecessors();
    }
    final boolean readerShouldBlock() {
        return hasQueuedPredecessors();
    }
}

writerShouldBlock和readerShouldBlock方法都表示当有别的线程也在尝试获取锁时,是否应该阻塞。app

对于公平模式,hasQueuedPredecessors()方法表示前面是否有等待线程。一旦前面有等待线程,那么为了遵循公平,当前线程也就应该被挂起。工具

NonfairSync

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -8159625535654395037L;
        final boolean writerShouldBlock() {
            return false; // 写线程无需阻塞
        }
        final boolean readerShouldBlock() {
            //apparentlyFirstQueuedIsExclusive在当前线程是写锁占用的线程时,返回true;不然返回false。也就说明,若是当前有一个写线程正在写,那么该读线程应该阻塞。
            return apparentlyFirstQueuedIsExclusive();
        }
    }

ReentrantReadWriteLock中的state

继承AQS的类都须要使用state变量表明某种资源,ReentrantReadWriteLock中的state表明了读锁的数量和写锁的持有与否,整个结构以下: 能够看到state的高16位表明读锁的个数;低16位表明写锁的状态。源码分析

获取锁

读锁的获取

public void lock() {
    sync.acquireShared(1);
}

读锁使用的是AQS的共享模式,AQS的acquireShared方法以下:测试

if (tryAcquireShared(arg) < 0)
   doAcquireShared(arg);

当tryAcquireShared()方法小于0时,那么会执行doAcquireShared方法将该线程加入到等待队列中。ui

Sync实现了tryAcquireShared方法,以下:this

protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
            //若是当前有写线程而且本线程不是写线程,不符合重入,失败.
            //在获取读锁时,若是有写线程,则获取失败
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            //获得读锁的个数
            int r = sharedCount(c);
            //若是读不该该阻塞而且读锁的个数小于最大值65535,而且能够成功更新状态值,成功
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {//若是当前读锁为0
                    firstReader = current;//第一个读线程就是当前线程
                    firstReaderHoldCount = 1;//第一个线程持有读锁的个数
                }
                //若是当前线程重入了,记录firstReaderHoldCount
                else if (firstReader == current) {
                    firstReaderHoldCount++;
                }
                //当前读线程和第一个读线程不一样,记录每个线程读的次数
                else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            //不然,循环尝试
            return fullTryAcquireShared(current);
        }

从上面的代码以及注释能够看到,分为三步:线程

  1. 若是当前有写线程而且本线程不是写线程,那么失败,返回-1
  2. 不然,说明当前没有写线程或者本线程就是写线程(可重入),接下来判断是否应该读线程阻塞而且读锁的个数是否小于最小值,而且CAS成功使读锁+1,成功,返回1。其他的操做主要是用于计数的
  3. 若是2中失败了,失败的缘由有三,第一是应该读线程应该阻塞;第二是由于读锁达到了上线;第三是由于CAS失败,有其余线程在并发更新state,那么会调动fullTryAcquireShared方法。

fullTryAcquiredShared方法

final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;
            for (;;) {
                int c = getState();
                //一旦有别的线程得到了写锁,而且得到写锁的线程不是本线程,返回-1,失败
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                }
                //若是读线程须要阻塞
                else if (readerShouldBlock()) {
                    // Make sure we're not acquiring read lock reentrantly
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    }
                    //说明有别的读线程占有了锁
                    else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
                //若是读锁达到了最大值,抛出异常
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                //若是成功更改状态,成功返回
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

写锁的获取

public void lock() {
    sync.acquire(1);
}

AQS的acquire方法以下:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

从上面能够看到,写锁使用的是AQS的独占模式。首先尝试获取锁,若是获取失败,那么将会把该线程加入到等待队列中。

Sync实现了tryAcquire方法用于尝试获取一把锁,以下:

protected final boolean tryAcquire(int acquires) {
             //获得调用lock方法的当前线程
            Thread current = Thread.currentThread();
            int c = getState();
            //获得写锁的个数
            int w = exclusiveCount(c);
            //若是当前有写锁或者读锁.(对于读锁而言,若是当前写线程能够进行写操做,那么读线程读到的数据可能有误)
            if (c != 0) {
                // 若是写锁为0或者当前线程不是独占线程(不符合重入),返回false
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                //若是写锁的个数超过了最大值(65535),抛出异常
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // 写锁重入,返回true
                setState(c + acquires);
                return true;
            }
            //若是当前没有写锁或者读锁,若是写线程应该阻塞或者CAS失败,返回false
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            //不然将当前线程置为得到写锁的线程,返回true
            setExclusiveOwnerThread(current);
            return true;
        }

释放锁

读锁的释放

ReadLock的unlock方法以下:

public void unlock() {
     sync.releaseShared(1);
 }

调用了Sync的releaseShared方法,该方法在AQS中提供,以下:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

调用tryReleaseShared方法尝试释放锁,若是释放成功,调用doReleaseShared尝试唤醒下一个节点。

AQS的子类须要实现tryReleaseShared方法,Sync中的实现以下:

protected final boolean tryReleaseShared(int unused) {
    //获得调用unlock的线程
    Thread current = Thread.currentThread();
    //若是是第一个得到读锁的线程
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    }
    //不然,是HoldCounter中计数-1
    else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    //死循环
    for (;;) {
        int c = getState();
        //释放一把读锁
        int nextc = c - SHARED_UNIT;
        //若是CAS更新状态成功,返回读锁是否等于0;失败的话,则重试
        if (compareAndSetState(c, nextc))
            //释放读锁对读线程没有影响,可是可能会使等待的写线程解除挂起开始运行。因此,一旦没有锁了,就返回true,不然false;返回true后,那么则须要释放等待队列中的线程,这时读线程和写线程都有可能再得到锁。
            return nextc == 0;
    }
}

写锁的释放

WriteLock的unlock方法以下:

public void unlock() {
    sync.release(1);
}

Sync的release方法使用的AQS中的,以下:

public final boolean release(int arg) {
    if (tryRelease(arg)) {//尝试释放锁
        Node h = head;
        if (h != null && h.waitStatus != 0)//若是等待队列中有线程再等待
            unparkSuccessor(h);//将下一个线程解除挂起。
        return true;
    }
    return false;
}

Sync须要实现tryRelease方法,以下:

protected final boolean tryRelease(int releases) {
    //若是没有线程持有写锁,可是仍要释放,抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    //若是没有写锁了,那么将AQS的线程置为null
    if (free)
        setExclusiveOwnerThread(null);
    //更新状态
    setState(nextc);
    return free;//此处返回当且仅当free为0时返回,若是当前是写锁被占有了,只有当写锁的数据降为0时才认为释放成功;不然失败。由于只要有写锁,那么除了占有写锁的那个线程,其余线程即不能够得到读锁,也不能得到写锁
}

getOwner()

getOwner方法用于返回当前得到写锁的线程,若是没有线程占有写锁,那么返回null。实现以下:

protected Thread getOwner() {
    return sync.getOwner();
}

能够看到直接调用了Sync的getOwner方法,下面是Sync的getOwner方法:

final Thread getOwner() {
  // Must read state before owner to ensure memory consistency
  //若是独占锁的个数为0,说明没有线程占有写锁,那么返回null;不然返回占有写锁的线程。
  return ((exclusiveCount(getState()) == 0) ?null :getExclusiveOwnerThread());
}

getReadLockCount()

getReadLockCount()方法用于返回读锁的个数,实现以下:

public int getReadLockCount() {
    return sync.getReadLockCount();
}

Sync的实现以下:

final int getReadLockCount() {
    return sharedCount(getState());
}
//要想获得读锁的个数,就是看AQS的state的高16位。这和前面讲过的同样,高16位表示读锁的个数,低16位表示写锁的个数。
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }

getReadHoldCount()

getReadHoldCount()方法用于返回当前线程所持有的读锁的个数,若是当前线程没有持有读锁,则返回0。直接看Sync的实现便可:

final int getReadHoldCount() {
   //若是没有读锁,天然每一个线程都是返回0
   if (getReadLockCount() == 0)
       return 0;

   //获得当前线程
   Thread current = Thread.currentThread();
   //若是当前线程是第一个读线程,返回firstReaderHoldCount参数
   if (firstReader == current)
       return firstReaderHoldCount;
   //若是当前线程不是第一个读线程,获得HoldCounter,返回其中的count
   HoldCounter rh = cachedHoldCounter;
   //若是缓存的HoldCounter不为null而且是当前线程的HoldCounter,直接返回count
   if (rh != null && rh.tid == getThreadId(current))
       return rh.count;

   //若是缓存的HoldCounter不是当前线程的HoldCounter,那么从ThreadLocal中获得本线程的HoldCounter,返回计数
    int count = readHolds.get().count;
    //若是本线程持有的读锁为0,从ThreadLocal中移除
    if (count == 0) readHolds.remove();
    return count;
}

从上面的代码中,能够看到两个熟悉的变量,firstReader和HoldCounter类型。这两个变量在读锁的获取中接触过,前面没有细说,这里细说一下。HoldCounter类的实现以下:

static final class HoldCounter {
    int count = 0;
    // Use id, not reference, to avoid garbage retention
    final long tid = getThreadId(Thread.currentThread());
}

readHolds是ThreadLocalHoldCounter类,定义以下:

static final class ThreadLocalHoldCounter
      extends ThreadLocal<HoldCounter> {
      public HoldCounter initialValue() {
          return new HoldCounter();
      }
  }

能够看到,readHolds存储了每个线程的HoldCounter,而HoldCounter中的count变量就是用来记录线程得到的写锁的个数。因此能够得出结论:Sync维持总的读锁的个数,在state的高16位;因为读线程能够同时存在,因此每一个线程还保存了得到的读锁的个数,这个是经过HoldCounter来保存的。 除此以外,对于第一个读线程有特殊的处理,Sync中有以下两个变量:

private transient Thread firstReader = null;//第一个获得读锁的线程
private transient int firstReaderHoldCount;//第一个线程得到的写锁

其他获取到读锁的线程的信息保存在HoldCounter中。

看完了HoldCounter和firstReader,再来看一下getReadLockCount的实现,主要有三步:

  1. 当前没有读锁,那么天然每个线程得到的读锁都是0;
  2. 若是当前线程是第一个获取到读锁的线程,那么返回firstReadHoldCount;
  3. 若是当前线程不是第一个获取到读锁的线程,获得该线程的HoldCounter,而后返回其count字段。若是count字段为0,说明该线程没有占有读锁,那么从readHolds中移除。获取HoldCounter分为两步,第一步是与cachedHoldCounter比较,若是不是,则从readHolds中获取。

getWriteLockCount()

getWriteLockCount()方法返回写锁的个数,Sync的实现以下:

final int getWriteHoldCount() {
    return isHeldExclusively() ? exclusiveCount(getState()) : 0;
}

能够看到若是没有线程持有写锁,那么返回0;不然返回AQS的state的低16位。

总结

当分析ReentranctReadWriteLock时,或者说分析内部使用AQS实现的工具类时,须要明白的就是AQS的state表明的是什么。ReentrantLockReadWriteLock中的state同时表示写锁和读锁的个数。为了实现这种功能,state的高16位表示读锁的个数,低16位表示写锁的个数。AQS有两种模式:共享模式和独占模式,读写锁的实现中,读锁使用共享模式;写锁使用独占模式;另一点须要记住的即便,当有读锁时,写锁就不能得到;而当有写锁时,除了得到写锁的这个线程能够得到读锁外,其余线程不能得到读锁。

相关文章
相关标签/搜索