以前ReentrantReadWriteLock讲了读写锁的场景,这边来说他的源码,以非公平锁为例,其实和公平锁主要代码是一致的。segmentfault
static final int SHARED_SHIFT = 16;//高16位是共享,用于读,低16位是独占,用于写,用一个字段保证原子性 static final int SHARED_UNIT = (1 << SHARED_SHIFT);//左移16位,也就是高位的最后一个是0000 0000 0000 0001 0000 0000 0000 0000 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;//最大读的数量,正常不会这么多 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;//左移16位,再减1,也就是0000 0000 0000 0000 1111 1111 1111 1111 static int sharedCount(int c) { return c >>> SHARED_SHIFT; }//无符号右移16位 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }//返回的不为1,说明有写锁,由于低16位都是1,1与1为1,若是有些,确定有个为1 static final class HoldCounter {//每一个线程持有的锁的数量 int count = 0; // Use id, not reference, to avoid garbage retention final long tid = getThreadId(Thread.currentThread()); } static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {//本地线程 public HoldCounter initialValue() { return new HoldCounter(); } } private transient ThreadLocalHoldCounter readHolds;//本地线程,记录持有的锁的数量信息 private transient HoldCounter cachedHoldCounter;//缓存HoldCounter 的数据 private transient Thread firstReader = null;//第一个获取读锁的线程 private transient int firstReaderHoldCount;//第一个获取读锁的线程持有的数量
public void lock() { sync.acquireShared(1); } public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0)//小于0没获取到锁 doAcquireShared(arg); } protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread();//获取本地线程 int c = getState();//获取state的值 if (exclusiveCount(c) != 0 &&//不为0说明有写锁,缘由上面分析了 getExclusiveOwnerThread() != current)//不是当前线程,说明不是重入 return -1; int r = sharedCount(c);//获取读锁的个数 if (!readerShouldBlock() &&//读锁无堵塞 r < MAX_COUNT &&//读锁没到最大值 compareAndSetState(c, c + SHARED_UNIT)) {//cas操做,高位加1成功说明获取到了读锁 if (r == 0) {//等于0说明第一个获取读锁 firstReader = current;//当前线程就是第一个 firstReaderHoldCount = 1;//数量为1 } else if (firstReader == current) {//若是不是第一个,可是是当前线程 firstReaderHoldCount++;//数量加1 } else {//既不是第一个,也不是当前线程 HoldCounter rh = cachedHoldCounter;//获取缓存HoldCounter if (rh == null || rh.tid != getThreadId(current))//若是不为空,或者经过线程id对比不是当前线程 cachedHoldCounter = rh = readHolds.get();//缓存设置为当前线程 else if (rh.count == 0)//缓存的是当前线程,并且锁的数量为0,加入到本地缓存,若是数量不为0,说明已经在本地缓存了 readHolds.set(rh); rh.count++;//锁的数量加1 } return 1; } return fullTryAcquireShared(current); } //若是阻塞或者cas失败的状况,再重试获取锁 final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) {// int c = getState(); if (exclusiveCount(c) != 0) {//上面分析了,若是是写锁,而且不是当前线程,放弃 if (getExclusiveOwnerThread() != current) return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. } else if (readerShouldBlock()) {//阻塞的状况 // Make sure we're not acquiring read lock reentrantly if (firstReader == current) {//当前线程是第一个不处理 // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove();//若是是0,移除本地缓存 } } if (rh.count == 0) return -1;// } } if (sharedCount(c) == MAX_COUNT)//读锁数量太大,抛异常 throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) {//没有读锁 firstReader = current;//当前线程就是第一个 firstReaderHoldCount = 1;//数量为1 } else if (firstReader == current) {//若是不是第一个,可是是当前线程 firstReaderHoldCount++;//数量加1 } else {//既不是第一个,也不是当前线程 if (rh == null) rh = cachedHoldCounter;//获取缓存HoldCounter if (rh == null || rh.tid != getThreadId(current))//若是不为空,或者经过线程id对比不是当前线程 rh = readHolds.get();//缓存设置为当前线程 else if (rh.count == 0)//缓存的是当前线程,并且锁的数量为0,加入到本地缓存,若是数量不为0,说明已经在本地缓存了 readHolds.set(rh); rh.count++;//锁的数量加1 cachedHoldCounter = rh; // cache for release } return 1; } } }
public void unlock() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) {//若是第一个是当前线程 // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1)//若是数量为1,就是直接设为空 firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) {//数量小于等于1,移除 readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0;//写锁和读锁为0,无锁 } }
public void lock() { sync.acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//失败了就进入阻塞队列 selfInterrupt(); } protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread();//获取当前线程 int c = getState();//获取state int w = exclusiveCount(c);//不为0说明有写锁 if (c != 0) {//有读或者写锁 // 无写锁或者读锁被占 if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires); return true; } if (writerShouldBlock() ||//无阻塞 !compareAndSetState(c, c + acquires))//设置成功 return false; setExclusiveOwnerThread(current); return true; }
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h);//唤醒 return true; } return false; } protected final boolean tryRelease(int releases) { if (!isHeldExclusively())//不是独占锁,抛异常 throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0;//写锁都释放了 if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }