java jdk提供了一系列解决并发冲突的锁和工具,ReentrantLock为可重入独占锁。 要从哪里开始,由于这个其实要讲起了不少。java
public class Locktest { /** * 测试Lock的使用。在方法中使用Lock,能够避免使用Synchronized关键字。 */ public static class LockTest { Lock lock = new ReentrantLock();// 锁 double value = 0d; // 值 int addtimes = 0; /** * 增长value的值,该方法的操做分为2步,并且相互依赖,必须实如今一个事务中 * 因此该方法必须同步,之前的作法是在方法声明中使用Synchronized关键字。 * @throws InterruptedException */ public void addValue(double v) throws InterruptedException { lock.lock();// 取得锁 System.out.println("LockTest to addValue: " + v + " " + System.currentTimeMillis()); this.value += v; Thread.sleep(1000); this.addtimes++; lock.unlock();// 释放锁 } public Double getValue() { return this.value; } } public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { final LockTest lockTest = new LockTest(); Runnable run = new Runnable(){ @Override public void run() { try { lockTest.addValue(1); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }; for(int i=0;i<100;i++){ new Thread(run).start(); }为了保证value 和addtimes 的操做在addValue函数中是原子操做,且最后的值是正确的,加了一把ReentrantLock锁。node
那么接下来咱们来分析下ReentrantLock是如何实现的?数据结构
看源码分析前建议能够先从最后总结开始,从宏观上有一个大体认识。架构
ReetrantLock在jdkjava.util.concurrent.locks包下,实现接口Lock并发
a Lock lock = new ReentrantLock();// 锁 b lock.lock(); //获取锁 2 /** * 业务逻辑,保证只有一个线程同时执行 **/ c lock.unlock() //释放锁 3
如图所示: 由三个内部类Sync、FairSync、NonfairSync,关系以下,都是基于AbstractQueuedSynchronizer实现,后面简称AQS,因此能够知道,jdk锁的实现AQS是关键app
/** * Creates an instance of {@code ReentrantLock}. * This is equivalent to using {@code ReentrantLock(false)}. */ public ReentrantLock() { sync = new NonfairSync(); }
能够指定 new ReentrantLock(true); 为公平锁ide
public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
public void lock() { sync.lock(); }
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); //直接调用获取锁方法acquire,按照正常的程序拿锁,进入队列 } ... }
非公平锁会先直接去抢占,而后在acquire函数
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L;
final void lock() { if (compareAndSetState(0, 1)) //先尝试插队直接去拿锁,更改state状态为1,若是成功则把Owner线程设置为当前线程,则表示成功得到锁 setExclusiveOwnerThread(Thread.currentThread()); else //插队失败则按照公平锁方式同样,排队获取 acquire(1); } //尝试获取锁后面再讲 protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
为AQS一方法,底层调用CAS,将state公共变量更改成1。工具
protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
AQS是JUC重要的同步器,全部的锁基于整个同步器实现协调,这里简单的介绍下,有兴趣后面再重点分析 主要由如下几个重要部分组成源码分析
state是关键,volatile int state;用volatile修饰。当为0时表示锁是空闲,能够获取锁,当大于0时表示得到锁。 独占锁时大于0表示锁的重入次数,共享锁时,state共当前共享线程个数。
node是一个双向链表,有Node、prev、next、head、tail组成,该链表被称之CHL队列(FIFO) 如上图
acquire流程通过如下步骤:
public final void acquire(int arg) { // 1.先尝试 tryAcquire 获取锁,具体实现后面再详细讲解, // 2.再addWaiter 加入队尾等待,acquireQueued放入同步队列 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
公平锁与最大区别在tryAcquire,如下分析两则tryAcquire源码
公平锁尝试获取锁实现(OwnerThread为以得到锁的线程)
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread();//当前前程 int c = getState();//获取当前锁状态 if (c == 0) {//当锁空闲时 判断前置节点为空,则调用cas将state设置成1,当前线程设置成OwnerThread,获取锁成功,true返回 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } }//Ownerthread为当前线程时,+1,如下为重入锁的逻辑 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc);//设置state值,+1 return true;//返回true获取锁 } return false; }
非公平锁tryAcquire实现
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread();//获取当前线程 int c = getState();//get到state锁状态 if (c == 0) {//锁空闲,能够获取锁 //经过CAS将state状态更改为 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
//建立与当前线程队列的节点和给定锁模式(独占、共享) //新节点node从队尾加入,设置成功则把新节点设置成尾节点tail,并将原tail.next 指向node /** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node // 返回新的节点 */ private Node addWaiter(Node mode) { // new 一个新节点,设置当前线程和独占模式exclusive Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail;//tail节点赋值给pred,用于后面交换 if (pred != null) {//若是原尾节点存在 node.prev = pred; //将新节点的上一个指针指向原尾节点 if (compareAndSetTail(pred, node)) {//新节点node经过CAS设置成新tail节点 pred.next = node;//原tail节点的下一个指针指向新的尾节点tail return node;//返回新节点,即也是新尾节点 } } enq(node);//假如原尾节点为空或者compareAndSetTail失败再次enq放入尾节点 return node; }
//空队列,首先必须初始化,插入队列尾部,返回当前节点上一个节点 /** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
获取锁的关键
// /** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; //设置标志位,若是为true 则会被中断 try { boolean interrupted = false; for (;;) {//自旋 //当前节点node已经经过addWaiter设置为tail了,定义p为tail上一个节点 final Node p = node.predecessor(); //若是p为head节点,则才有资格尝试调用tryAcquire获取锁 if (p == head && tryAcquire(arg)) { //获取锁成功则当前节点设置成head,setHead中已将node.prev = null;指向前置节点设置成null了,再也不指向原head setHead(node); //将原head节点next指向null,这个时候,原head将是一个孤立的node,有利于gc回收 p.next = null; // help GC failed = false;//获取成功标志 return interrupted; } //一、获取锁失败后,只有被unpark唤醒的waitStatus状态为Node.SIGNAL才能够被阻塞;二、阻塞当前线程,返回中断状态 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //阻塞当前线程,返回中断状态,为true,则返回 interrupted = true;//若是阻塞线程被中断则设置true,下次for循环进来被return interrupted; } } finally { if (failed)//若是失败则取消该节点获取锁 cancelAcquire(node); } }
// CANCELLED = 1 // SIGNAL = -1 // CONDITION = -2 // NORMAL = 0 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 前一个节点的状态(注意:不是当前节点) int ws = pred.waitStatus; if (ws < 0) // waitStatus<0,也就是前面的节点尚未得到到锁,那么返回true,表示当前节点(线程)就应该park()了。 return true; if (ws > 0) { // waitStatus>0,也就是前一个节点被CANCELLED了,那么就将前一个节点去掉,递归此操做直到全部前一个节点的waitStatus<=0,进行4 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // waitStatus=0,修改前一个节点状态位为SINGAL,表示后面有节点等待你处理,须要根据它的等待状态来决定是否该park() compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } // ws<0才须要park(),ws>=0都返回false,表示线程不该该park() return false; }
public final boolean release(int arg) { //尝试释放锁,设置AQS state状态,若是为0则返回true,若是解锁成功则唤醒head的下一个节点,让其得到锁 if (tryRelease(arg)) { Node h = head;//head 赋给h,中间变量用于后面交换 //存在头节点,waitStatus 为1 -1 -2 -3,唤醒下一个节点 if (h != null && h.waitStatus != 0) unparkSuccessor(h);//唤醒下一个节点 return true; } return false; }
protected final boolean tryRelease(int releases) { int c = getState() - releases;//当前状态state,独占表示重入次数-1 //当前线程不是独占OwnerThread,则抛出异常,由于lock和unlock是一对,必须保证释放锁的线程为当前得到锁的线程 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) {//若是等于0表示解锁成功,OwnerThread设置null 若是是重入锁要屡次解锁,直到0 free = true; setExclusiveOwnerThread(null); } setState(c);//设置AQS state状态,若是是重入锁要多长解锁 return free; }
若是一个存在,唤醒节点的next
/** * Wakes up node's successor, if one exists. * * @param node the node */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; //head节点等待状态, // 此时node是须要释放锁的头节点 // 清空头节点的waitStatus,也就是不须要锁了,这里修改为功失败无所谓 if (ws < 0)//设置0代表已经得到锁 compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; //若是不存在下一个节点或者线程已中断或已取消 // 从头节点的下一个节点开始寻找继任节点,当且仅当继任结点的waitStatus<=0才是有效继任节点,不然将这些waitStatus>0(也就是CANCELLED的节点)从AQS队列中剔除 if (s == null || s.waitStatus > 0) { s = null; //从队尾开始往前任找,直到node.next,过滤掉中断的结点 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //下一个节点存在则直接唤醒 LockSupport.unpark(s.thread); }
static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3;
LockSupport为阻塞线程提供基础的功能,它由一对park和unpark组成,park会阻塞当前线程(获取许可,线程默认许可被占用了),unpark“唤醒”等待线程(释放许可);至关于信号量,park拿到才能够运行。 简而言之,是用mutex和condition保护了一个_counter的变量,当park时,这个变量置为了0,当unpark时,这个变量置为1。
LockSupport.park(); 中止 System.out.println("======");
乐观的独占锁(相似ReentrantLock) SimpleExclusiveLock .java
package com.concurrent; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.LockSupport; /** * 简单乐观独占锁 */ public class SimpleExclusiveLock { /** * 独占锁标记 true 锁不可用 false 锁可用 */ private AtomicBoolean state = new AtomicBoolean(false); List<Thread> queue = new ArrayList<Thread>();//阻塞队列 public boolean lock() { if (!state.get()&&state.compareAndSet(false, true)) {//取锁成功不会阻塞,程序会继续执行 return true; // 利用CAS } else { System.out.println("queue.add and park "+Thread.currentThread()); queue.add(Thread.currentThread());//加入阻塞队列 LockSupport.park();//阻塞线程 System.out.println("park after "+Thread.currentThread()); return false; } } public boolean unLock() { if (state.get()) { System.out.println("queue.remove and unpark "+Thread.currentThread()); queue.remove(Thread.currentThread());//从队列里移除 if (state.compareAndSet(true, false)) {// 利用CAS if(!queue.isEmpty()){ System.out.println("unpark "+queue.get(0).getName()); LockSupport.unpark((Thread) queue.get(0));//唤醒第一个等待线程 System.out.println("unpark after "+queue.get(0).getName()); } return true; } return false; } else { return false; } } }
SimpleExclusiveLockTest .java
使用
package com.concurrent; public class SimpleExclusiveLockTest { public static SimpleExclusiveLock lock = new SimpleExclusiveLock(); // 独占锁 public static volatile int i = 0; // 保证可见性 public class RunnableTask implements Runnable { @Override public void run() { while (true) { try { lock.lock();//加锁 i += 1; System.out.println("thread name:"+ Thread.currentThread().getName() +" i="+ i); try { Thread.currentThread().sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } finally { lock.unLock();//释放锁 } } } } public void runTask() { for (int i = 0; i < 100; i++) { new Thread(new RunnableTask(),"thread"+ i).start(); } } public static void main(String[] args) { SimpleExclusiveLockTest test = new SimpleExclusiveLockTest(); test.runTask(); } }
JUC(Java Util Concurrency)仅用简单的park, unpark和CAS指令就实现了各类高级同步数据结构,并且效率很高,使人惊叹。
如下我从宏观角度描述获取锁和解锁流程
锁的状态是由AQS.state控制,加锁和解锁都会感知和变动此变量,当为0时表示锁是空闲,能够获取锁,当大于0时表示得到锁。 独占锁时大于0表示锁的重入次数,共享锁时,state共当前共享线程个数。
acquire流程通过如下步骤:
与公平锁acquire惟一区别在tryAcquire流程中,不用要求前置节点是head节点,则表示tail能够直接去抢占锁,若是抢占失败后面的流程与公平一致。
解锁流程比较简单,解锁节点确定是head,由于head持有锁