阿里巴巴2021版JDK源码笔记(2月第三版).pdfjava
连接:https://pan.baidu.com/s/1XhVcfbGTpU83snOZVu8AXg
提取码:l3gynode
当一个线程调用 object.lock()拿到锁,进入互斥区后,再次调用object.lock(), 仍然能够拿到该锁(不然会死锁)数组
lock.java安全
public interface Lock { void lock(); void lockInterruptibly() throws InterruptedException; boolean tryLock(); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock(); Condition newCondition(); }
Sync是一个抽象类,它有两个子类FairSync与NonfairSync,分别 对应公平锁和非公平锁多线程
Sync的父类AbstractQueuedSynchronizer常常被称做队列同步器 (AQS),这个类很是关键ide
AbstractOwnableSynchronizer具备阻塞线程的做用,为了实现一把具备阻塞和唤醒功能的锁,须要一下核心要素:函数
针对1,2ui
针对3this
在Unsafe类中,提供了阻塞或唤醒线程的一对操做原语,也就是park/unpark线程
LockSupport对其作了简单的封装
public class LockSupport { public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); UNSAFE.park(false, 0L); setBlocker(t, null); } public static void unpark(Thread thread) { if (thread != null) UNSAFE.unpark(thread); } }
在当前线程中调用park(),该线程就会被阻塞;在另一个线 程中,调用unpark(Thread t),传入一个被阻塞的线程,就能够唤醒阻塞在park()地方的线程
尤为是 unpark(Thread t),它实现了一个线程对另一个线程 的“精准唤醒”
针对4
在AQS中利用双向链表和CAS实现了一个阻塞队列。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { static final class Node { volatile Node prev; volatile Node next; volatile Thread thread; } private transient volatile Node head; private transient volatile Node tail; }
FairSync 公平锁
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) {//等于0,资源空闲,能够拿到锁 if (!hasQueuedPredecessors() && //判断是否存在等待队列或者当前线程是不是队头 compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) {//被锁了,可是当前线程就是已经获取锁了(重入锁),state+1 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
NonfairSync 非公平锁
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
公平锁和非公平锁的区别:
公平锁就多了这块代码 !hasQueuedPredecessors()
,看源码
public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
这里其实就是判断当前线程是否能够被公平的执行(队列为空,或者当前在队头的时候表示到当前线程处理了)
AQS类中,有尝试拿锁的方法
public final void acquire(int arg) { if (!tryAcquire(arg) && //这里尝试去拿锁,没有拿到锁才执行下一个条件 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //将当前线程入队进入等待 //addWaiter就是将线程加入到队列中, //acquireQueued该线程被阻塞。在该函数返回 的一刻,就是拿到锁的那一刻,也就是被唤醒的那一刻,此时会删除队列的第一个元素(head指针前移1个节点) selfInterrupt(); } private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) {//这里会阻塞住,直到拿到锁 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
unlock不区分公平仍是非公平
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; //只有锁的拥有者才能够释放锁 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //这里须要考虑重入锁 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
release()里面作了两件事:tryRelease(..)函数释放锁;unparkSuccessor(..)函数唤醒队列中的后继者。
当parkAndCheckInterrupt()返回true的时候,说明有其余线程发送中断信号,直接抛出InterruptedException,跳出for循环,整个函数返回。
tryLock()实现基于调用非公平锁的tryAcquire(..),对state进行CAS操做,若是操做成功就拿到锁;若是操做不成功则直接返回false,也不阻塞
和互斥锁相比,读写锁(ReentrantReadWriteLock)就是读线程 和读线程之间能够不用互斥了。在正式介绍原理以前,先看一下相关类的继承体系。
public interface ReadWriteLock { Lock readLock(); Lock writeLock(); }
当使用 ReadWriteLock 的时候,并非直接使用,而是得到其内部的读锁和写锁,而后分别调用lock/unlock。
public static void main(String[] args) { ReadWriteLock rwlock = new ReentrantReadWriteLock(); Lock rlock = rwlock.readLock(); rlock.lock(); rlock.unlock(); Lock wlock = rwlock.writeLock(); wlock.lock(); wlock.unlock(); }
从表面来看,ReadLock和WriteLock是两把锁,实际上它只是同一 把锁的两个视图而已
两个视图: 能够理解为是一把锁,线程分红两类:读线程和写线程。读线程和读线程之间不互斥(能够同时拿到这把锁),读线程和写线程互斥,写线程和写线程也互斥。
readerLock和writerLock实际共 用同一个sync对象
public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); }
同互斥锁同样,读写锁也是用state变量来表示锁状态的。只是state变量在这里的含义和互斥锁彻底不一样
是把 state 变量拆成两半,低16位,用来记录写锁,高16位,用来“读”锁。但同一 时间既然只能有一个线程写,为何还须要16位呢?这是由于一个写 线程可能屡次重入
abstract static class Sync extends AbstractQueuedSynchronizer { static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; static int sharedCount(int c) { return c >>> SHARED_SHIFT; } static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } }
为何要把一个int类型变量拆成两半, 而不是用两个int型变量分别表示读锁和写锁的状态呢?这是由于没法 用一次CAS 同时操做两个int变量,因此用了一个int型的高16位和低16位分别表示读锁和写锁的状态。
当state=0时,说明既没有线程持有读锁,也没有线程持有写锁; 当state!=0时,要么有线程持有读锁,要么有线程持有写锁,二者不 能同时成立,由于读和写互斥。
ReentrantReadWriteLock的两个内部类ReadLock和WriteLock中,是如何使用state变量的
acquire/release、acquireShared/releaseShared 是AQS里面的 两对模板方法。互斥锁和读写锁的写锁都是基于acquire/release模板 方法来实现的。读写锁的读锁是基于acquireShared/releaseShared这对模板方法来实现的
将读/写、公平/非公平进行排列组合,就有4种组合
对于公平,比较容易理解,不管是读锁,仍是写锁,只要队列中 有其余线程在排队(排队等读锁,或者排队等写锁),就不能直接去抢锁,要排在队列尾部。
对于非公平,读锁和写锁的实现策略略有差别。先说写锁,写线 程能抢锁,前提是state=0,只有在没有其余线程持有读锁或写锁的情 况下,它才有机会去抢锁。或者state!=0,但那个持有写锁的线程是 它本身,再次重入。写线程是非公平的,就是无论三七二十一就去抢,即一直返回false。
由于读线程和读线程是不互斥的,假设当前线程被读线程持有,而后其余读线程还非公平地一直去抢,可能致使写线程永远拿不到锁,所 以对于读线程的非公平,要作一些“约束”
当发现队列的第1个元素 是写线程的时候,读线程也要阻塞一下,不能“肆无忌惮”地直接去抢
写锁是排他锁,实现策略相似于互斥锁,重写了tryAcquire/tryRelease方法。
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires); return true; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc);//写锁是排他的 return free; }
读锁是共享锁,重写了 tryAcquireShared/tryReleaseShared 方法,其实现策略和排他锁有很大的差别。
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && //写锁被某线程持有,且不是本身,读锁确定拿不到,直接返回 getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); if (!readerShouldBlock() &&//公平和非公平的差别 r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {//高位读锁+1 if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } }
由于读锁是共享锁,多个线程会同时持有读锁,因此对读锁的释 放不能直接减1,而是须要经过一个for循环+CAS操做不断重试。这是tryReleaseShared和tryRelease的根本差别所在。
Condition自己也是一个接口,其功能和wait/notify相似
public interface Condition { void await() throws InterruptedException; void signal(); void signalAll(); }
在讲多线程基础的时候,强调wait()/notify()必须和synchronized一块儿使用,Condition也是如此,必须和Lock一块儿使用。所以,在Lock的接口中,有一个与Condition相关的接口:
public interface Lock { Condition newCondition(); }
为一个用数组实现的阻塞 队列,执行put(..)操做的时候,队列满了,生成者线程被阻塞;执行take()操做的时候,队列为空,消费者线程被阻塞。
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //核心就是一把锁,两个条件 final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull; public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } }
使用很简洁,避免了 wait/notify 的生成者通知生 成者、消费者通知消费者的问题。
由于Condition必须和Lock一块儿使用,因此Condition的实现也是Lock的一部分
public final void await() throws InterruptedException { if (Thread.interrupted())//正要执行await操做,收到了中断信号,抛出异常 throw new InterruptedException(); Node node = addConditionWaiter();//加入condition等待队列 long savedState = fullyRelease(node);//阻塞在condition以前必须释放锁,不然会释放锁 int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this);//本身阻塞本身 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE)//从新拿锁 interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
与await()不一样,awaitUninterruptibly()不会响应中断,其 函数的定义中不会有中断异常抛出,下面分析其实现和await()的区别
public final void awaitUninterruptibly() { Node node = addConditionWaiter(); long savedState = fullyRelease(node); boolean interrupted = false; while (!isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted())//从park中醒来,收到中断,不退出,继续执行循环 interrupted = true; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); }
能够看出,总体代码和 await()相似,区别在于收到异常后,不会抛出异常,而是继续执行while循环。
public final void signal() { if (!isHeldExclusively())//只有持有锁的队列才能够调用signal throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } private void doSignal(Node first) {//唤醒队列的第一个线程 do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node);//先把Node放入互斥锁的同步队列中,再调用下面的unpark方法 int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
JDK8引入
StampedLock引入了“乐观读”策略,读的时候不加读锁,读出来发现数据被修改 了,再升级为“悲观读”,至关于下降了“读”的地位,把抢锁的天平往“写”的一方倾斜了一下,避免写线程被饿死。
public class Point { private double x, y; private final StampedLock s1 = new StampedLock(); void move(double deltaX, double deltaY) { //多个线程调用,修改x,y的值 long stamp = s1.writeLock(); try { x = deltaX; y = deltaY; } finally { s1.unlock(stamp); } } double distanceFromOrigin() { long stamp = s1.tryOptimisticRead();//使用乐观锁 double currentX = x, currentY = y; if (!s1.validate(stamp)) { /** * 上面这三行关键代码对顺序很是敏感,不能有重排序。 因 * 为 state 变量已是volatile,因此能够禁止重排序,但stamp并 不是volatile的。 * 为此,在validate(stamp)函数里面插入内存屏 障。 */ stamp = s1.readLock();//升级悲观锁 try { currentX = x; currentY = y; } finally { s1.unlockRead(stamp); } } return Math.sqrt(currentX * currentX + currentY * currentY); } }
StampedLock是一个读写锁,所以也会像读写锁那样,把一 个state变量分红两半,分别表示读锁和写锁的状态。同时,它还须要 一个数据的version。但正如前面所说,一次CAS没有办法操做两个变 量,因此这个state变量自己同时也表示了数据的version。下面先分析state变量。
同ReadWriteLock同样,StampedLock也要进行悲观的读锁和写锁 操做。不过,它不是基于AQS实现的,而是内部从新实现了一个阻塞队列
public class StampedLock implements java.io.Serializable { static final class WNode { volatile WNode prev; volatile WNode next; volatile WNode cowait; // list of linked readers volatile Thread thread; // non-null while possibly parked volatile int status; // 0, WAITING, or CANCELLED final int mode; // RMODE or WMODE WNode(int m, WNode p) { mode = m; prev = p; } } }
这个阻塞队列和 AQS 里面的很像。刚开始的时候,whead=wtail=NULL,而后初始化,建一个空节点,whead和wtail都指向这个空节 点,以后往里面加入一个个读线程或写线程节点。但基于这个阻塞队 列实现的锁的调度策略和AQS很不同,也就是“自旋”。在AQS里 面,当一个线程CAS state失败以后,会当即加入阻塞队列,而且进入 阻塞状态。但在StampedLock中,CAS state失败以后,会不断自旋, 自旋足够多的次数以后,若是还拿不到锁,才进入阻塞状态。为此, 根据CPU的核数,定义了自旋次数的常量值。若是是单核的CPU,确定不能自旋,在多核状况下,才采用自旋策略。