package com.lock; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; /** * Created by cxx on 2018/1/16. * * 独占锁示例 */ public class Mutex implements Lock { //静态内部类,自定义同步器 private static class Sync extends AbstractQueuedSynchronizer{ //是否处于占用状态 protected boolean isHeldExclusively(){ return getState() == 1; } //当状态为0的时候,获取锁 public boolean tryAcquire(int acquires){ if (compareAndSetState(0,1)){ setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } //释放锁,将状态设置为0 public boolean tryRelease(int releases){ if (getState() == 0) { throw new IllegalMonitorStateException(); } setExclusiveOwnerThread(null); setState(0); return true; } //返回一个condition,每一个condition包含了一个condition队列 Condition newCondition(){ return new ConditionObject(); } } /*** * 将操做代理到Sync上便可 */ private final Sync sync = new Sync(); @Override public void lock() { sync.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override public boolean tryLock() { return sync.tryAcquire(1); } public boolean isLocked(){ return sync.isHeldExclusively(); } public boolean hasQueuedThreads(){ return sync.hasQueuedThreads(); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1,unit.toNanos(time)); } @Override public void unlock() { sync.release(1); } @Override public Condition newCondition() { return sync.newCondition(); } }
如代码所示,独占锁实现了在同一时刻只能用一个线程获取到锁,而其余获取锁的线程只能处于同步等待队列中等待,只有获取锁的线程释放了锁,后继的线程才可以获取锁。java
#锁、同步器、使用者node
如上的mutex锁同样,具体的实现代理到sync,mutex只须要向用户定义交互方式便可。安全
独占锁的获取与释放(包括了对同步队列的操做)ide
acquire(int arg):独占式获取同步状态,若是当前线程获取同步状态成功,则由该方法返回,不然,将会进入同步队列等待,该方法将会调用可重写的tryAcquire(int arg)方法;工具
release(int arg):独占式释放同步状态,该方法会在释放同步状态以后,将同步队列中第一个节点包含的线程唤醒;ui
共享锁的获取与释放(包括了对同步队列的操做)this
独占锁的获取与释放的具体实现(没有对同步队列的操做,功能单一)线程
共享锁的获取与释放(没有对同步队列的操做)设计
操做队列同步器的状态代理
在基于AQS构建的同步器中,只能在一个时刻发生阻塞,从而下降上下文切换的开销,提升了吞吐量。同时在设计AQS时充分考虑了可伸缩行,所以J.U.C中全部基于AQS构建的同步器都可以得到这个优点。
AQS的主要使用方式是继承,子类经过继承同步器并实现它的抽象方法来管理同步状态。
AQS使用一个int类型的成员变量state来表示同步状态,当state>0时表示已经获取了锁,当state = 0时表示释放了锁。它提供了三个方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))来对同步状态state进行操做,固然AQS能够确保对state的操做是安全的。
AQS经过内置的FIFO同步队列来完成资源获取线程的排队工做,若是当前线程获取同步状态失败时,AQS则会将当前线程以及等待状态等信息构形成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程
当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。
在CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next),其定义以下:
static final class Node { /** 共享 */ static final Node SHARED = new Node(); /** 独占 */ static final Node EXCLUSIVE = null; /** * 由于超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其余状态; */ static final int CANCELLED = 1; /** * 后继节点的线程处于等待状态,而当前节点的线程若是释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行 */ static final int SIGNAL = -1; /** * 节点在等待队列中,节点线程等待在Condition上,当其余线程对Condition调用了signal()后,改节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中 */ static final int CONDITION = -2; /** * 表示下一次共享式同步状态获取将会无条件地传播下去 */ static final int PROPAGATE = -3; /** 等待状态 */ volatile int waitStatus; /** 前驱节点 */ volatile Node prev; /** 后继节点 */ volatile Node next; /** 获取同步状态的线程 */ volatile Thread thread; Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { } Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; } }
第一,头结点是成功获取到同步状态的节点,而头结点的线程释放了同步状态以后,将会唤醒其后继节点,后继节点的线程被唤醒后须要检查本身的前驱节点是不是头结点。
第二,维护同步队列的FIFO原则。
在线程获取同步状态时若是获取失败,则加入CLH同步队列,经过经过自旋的方式不断获取同步状态,可是在自旋的过程当中则须要判断当前线程是否须要阻塞,其主要方法在acquireQueued():
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true;
经过这段代码咱们能够看到,在获取同步状态失败后,线程并非立马进行阻塞,须要检查该线程的状态,检查状态的方法为 shouldParkAfterFailedAcquire(Node pred, Node node) 方法,该方法主要靠前驱节点判断当前线程是否应该被阻塞,代码以下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //前驱节点 int ws = pred.waitStatus; //状态为signal,表示当前线程处于等待状态,直接放回true if (ws == Node.SIGNAL) return true; //前驱节点状态 > 0 ,则为Cancelled,代表该节点已经超时或者被中断了,须要从同步队列中取消 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } //前驱节点状态为Condition、propagate else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
这段代码主要检查当前线程是否须要被阻塞,具体规则以下:
若是当前线程的前驱节点状态为SINNAL,则代表当前线程须要被阻塞,调用unpark()方法唤醒,直接返回true,当前线程阻塞
若是当前线程的前驱节点状态为CANCELLED(ws > 0),则代表该线程的前驱节点已经等待超时或者被中断了,则须要从CLH队列中将该前驱节点删除掉,直到回溯到前驱节点状态 <= 0 ,返回false
若是前驱节点非SINNAL,非CANCELLED,则经过CAS的方式将其前驱节点设置为SINNAL,返回false
若是 shouldParkAfterFailedAcquire(Node pred, Node node) 方法返回true,则调用parkAndCheckInterrupt()方法阻塞当前线程:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
parkAndCheckInterrupt() 方法主要是把当前线程挂起,从而阻塞住线程的调用栈,同时返回当前线程的中断状态。其内部则是调用LockSupport工具类的park()方法来阻塞该方法。
当线程释放同步状态后,则须要唤醒该线程的后继节点:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //唤醒后继节点 unparkSuccessor(h); return true; } return false; }
调用unparkSuccessor(Node node)唤醒后继节点:
private void unparkSuccessor(Node node) { //当前节点状态 int ws = node.waitStatus; //当前状态 < 0 则设置为 0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); //当前节点的后继节点 Node s = node.next; //后继节点为null或者其状态 > 0 (超时或者被中断了) if (s == null || s.waitStatus > 0) { s = null; //从tail节点来找可用节点 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //唤醒后继节点 if (s != null) LockSupport.unpark(s.thread); }
可能会存在当前线程的后继节点为null,超时、被中断的状况,若是遇到这种状况了,则须要跳过该节点,可是为什么是从tail尾节点开始,而不是从node.next开始呢?缘由在于node.next仍然可能会存在null或者取消了,因此采用tail回溯办法找第一个可用的线程。最后调用LockSupport的unpark(Thread thread)方法唤醒该线程。