ReentrantLock实现了公平锁与非公平锁,公平锁提供顺序获取锁的方式,而非公平锁提供抢占式获取锁的方式。
公平锁: 线程A占用锁,B等待,而后依次获取锁,其中B会被挂起或者是自旋,而后当线程A释放锁后,线程B再被唤醒,以此类推,按照申请锁的前后顺序来。
非公平锁: 线程A占用锁,B等待,于此同时C请求锁,因为B线程被唤醒须要时间,因此C有可能在B被唤醒钱就释放锁,以此类推,按照锁空闲时申请锁的线程为优先。java
世界应该是公平公正的不是吗?好了,别白日作梦了,因为线程的唤醒是一个比较耗时的操做(切换线程上下文,调用OS等)若是线程持有锁的时间很长,那么公平锁就比较有优点node
NonfairSync中lock的实现以下:ui
final void lock() { // CAS操做设置AbstractQueuedSynchronizer中的volatile int state // 若是设置成功将现有的线程setExclusiveOwnerThread if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else // CAS失败了就调用acquire()方法 acquire(1); }
acquire方法由AbstractQueuedSynchronizer提供this
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire在AbstractQueuedSynchronizer中是一个protected方法而且没有给出实现,可见是但愿由它的子类去扩展线程
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
回去再看NonfairSync中tryAcquire的实现code
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); // 获取同步state int c = getState(); // 这个判断颇有意思,因为调用这个方式是第一次尝试CAS失败才会进入该方法 // 这里从新再判断一次同步state,能够避免以前的线程已经释放lock,而继续将 // 该线程放入等待队列的状况,和lock()的第一段代码含义相同设置同步state if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 接下来判断是不是同一个线程,这个判断是由于ReentrantLock是可重入的lock else if (current == getExclusiveOwnerThread()) { // 将state++,这里的lock的获取就是经过同步state的值来控制的 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
这段代码大概的意思就是若是尚未线程占用锁就设置state为1,若是是已经占用该锁的线程再次访问就累计state的值,返回true,若是已经被占用返回false队列
回过头来继续看acquire,!tryAcquire(arg)意味着获取锁失败,而后执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
进入addWaiter方法ip
/** * 1. 初始化: 若是tail和head没有被初始化,那么建立一个node而且指向它 * +------+ * | Node | <---- tail, head * |(Head)| * +------+ * * 2. 添加新的节点进入队列 * +------+ prev +------+ * head ----> | Node | <---- | Node | <---- tail * |(Head)| ----> |Thread| * +------+ next +------+ */ private Node addWaiter(Node mode) { // 建立一个node使用EXCLUSIVE模式 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; // 若是队列不是null(已经有线程再等待锁)那么将该新增的node加入队列 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 若是上述代码没有成功,这里是使用自旋的方式继续加入等待队列 enq(node); // 入队成功后返回新增node节点 return node; } // 将新的node节点入队并返回以前的tail节点 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { // 典型的入队操做将tail指向新增的node node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
而后再看acquireQueued,此时的node参数是以前咱们分析的新增入队列的node节点get
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 返回node的前一个节点 final Node p = node.predecessor(); /** * 若是该新增node的prev-node是head-node.以下图这种状态 * 也就是说在等待队列中只有一个node,Head-node不包含在 * 内,而且调用tryAcquire方法成功(即成功的设置了同步state) * * * +------+ prev +------+ * head ----> | Node | <---- | Node | <---- tail * |(Head)| ----> |Thread| * +------+ next +------+ * * 那么将head指向改node,原来的head的next节点为null * * +-------------+ * head ----> | Node | <---- tail * |Thread = null| * +-------------+ * */ if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
接下来是shouldParkAfterFailedAcquire,顾名思义该方法返回是否在获取lock失败后堵塞线程,该方法会忽略并移出队列中node.waitStatus = CANCELLED的node同步
/** * 该方的参数是 pred是Thread-A所在的node,node是Thread-B所在的node * 这个必须得理解 * * +------+ prev +--------------+ prev +--------+ * head ----> | Node | <---- | Node | <---- | Node |<---- tail * |(Head)| ----> |Thread-A | ----> |Thread-B| * +------+ next |waitStatus = 0| | | * +--------------+ +--------+ * * +------+ prev +---------------+ prev +--------+ * head ----> | Node | <---- | Node | <---- | Node |<---- tail * |(Head)| ----> |Thread-A | ----> |Thread-B| * +------+ next |waitStatus = -1| | | * +---------------+ +--------+ * * static final int CANCELLED = 1; * static final int SIGNAL = -1; * static final int CONDITION = -2; * static final int PROPAGATE = -3; * * 同时这个方法又分为2步(彷佛整个AQS中都充斥着延迟初始化的概念) * 1. 初始化: 设置形参pred的waitStatus属性为Node.SIGNAL * 2. 因为调用shouldParkAfterFailedAcquire()方法的acquireQueued()方法 * 还在自旋中,因此该方法会被调用第2次,此次才真正返回true,若是waitStatus * 被设置成CANCELLED,那么会忽略等待队列中的这些node * */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ // 这里就是初始化的代码,设置形参pred的waitStatus属性为Node.SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
parkAndCheckInterrupt()方法,使用LockSupport堵塞当前node对应的thread,并返回中断标识,当这个方法被调用时才真正的意味着lock.lock()方法完成了它的使命
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }