jdk提供
synchronized
实现线程同步,但有些场景下并不灵活,如多个同步方法,每次只能有一个线程访问;而Lock则能够很是灵活的在代码中实现同步机制java
在以前学习阻塞队列中,较多地方使用
ReadWriteLock
,Condition
,接下来在探究实现原理以前,先研究下锁的使用node
Lock 接口的定义安全
public interface Lock { // 获取锁,若当前lock被其余线程获取;则此线程阻塞等待lock被释放 // 若是采用Lock,必须主动去释放锁,而且在发生异常时,不会自动释放锁 void lock(); // 获取锁,若当前锁不可用(被其余线程获取); // 则阻塞线程,等待获取锁,则这个线程可以响应中断,即中断线程的等待状态 void lockInterruptibly() throws InterruptedException; // 来尝试获取锁,若是获取成功,则返回true; // 若是获取失败(即锁已被其余线程获取),则返回false // 也就是说,这个方法不管如何都会当即返回 boolean tryLock(); // 在拿不到锁时会等待必定的时间 // 等待过程当中,能够被中断 // 超过期间,依然获取不到,则返回false;不然返回true boolean tryLock(long time, TimeUnit unit) throws InterruptedException; // 释放锁 void unlock(); // 返回一个绑定该lock的Condtion对象 // 在Condition#await()以前,锁会被该线程持有 // Condition#await() 会自动释放锁,在wait返回以后,会自动获取锁 Condition newCondition(); }
ReentrantLock
可重入锁。jdk中ReentrantLock是惟一实现了Lock接口的类多线程
可重入的意思是一个线程拥有锁以后,能够再次获取锁,并发
最基本的使用场景,就是利用lock和unlock来实现线程同步框架
以轮班为实例进行说明,要求一我的下班以后,另外一我的才能上班,即不能两我的同时上班,具体实现能够以下ide
public class LockDemo { private Lock lock = new ReentrantLock(); private void workOn() { System.out.println(Thread.currentThread().getName() + ":上班!"); } private void workOff() { System.out.println(Thread.currentThread().getName() + ":下班"); } public void work() { try { lock.lock(); workOn(); System.out.println(Thread.currentThread().getName() + "工做中!!!!"); Thread.sleep(100); workOff(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public static void main(String[] args) throws InterruptedException { LockDemo lockDemo = new LockDemo(); int i = 0; List<Thread> list = new ArrayList<>(30); do { Thread a = new Thread(new Runnable() { @Override public void run() { lockDemo.work(); } }, "小A_" + i); Thread b = new Thread(new Runnable() { @Override public void run() { lockDemo.work(); } }, "小B_" + i); list.add(a); list.add(b); } while (i++ < 10); list.parallelStream().forEach(Thread::start); Thread.sleep(3000); System.out.println("main over!"); } }
上面的示例,主要给出了lock()
和 unlock()
的配套使用,当一个线程在上班迁尝试获取锁,若是获取到,则只有在下班以后才会释放锁,保证在其上班的过程当中,不会有线程也跑来上岗抢饭碗,输出以下源码分析
小A_3:上班! 小A_3工做中!!!! 小A_3:下班 小A_1:上班! 小A_1工做中!!!! 小A_1:下班 // .... 省略部分 小B_7:上班! 小B_7工做中!!!! 小B_7:下班 小B_5:上班! 小B_5工做中!!!! 小B_5:下班 main over!
从基本的使用中,肯定lock的使用姿式通常以下:学习
Lock lock = new ReentrantLock()
lock.lock()
尝试获取锁,若被其余线程占用,则阻塞lock.unlock()
; 通常来说,把释放锁的逻辑,放在须要线程同步的代码包装外的finally
块中即常见的使用姿式应该是测试
try { lock.lock(); // ..... } finally { lock.unlock(); }
一个疑问,若没被加锁,仅只执行lock.unlock()
是否会有问题?
测试以下
@Test public void testLock() { Lock lock = new ReentrantLock(); // lock.lock(); // lock.unlock(); lock.unlock(); System.out.println("123"); }
执行以后,发现抛了异常(去掉上面的注释,即加锁一次,释放锁两次也会抛下面异常)
另外一个疑问,代码块中屡次上锁,释放锁只一次,是否会有问题?
将前面的TestDemo方法稍稍改动一下
public void work() { try { lock.lock(); workOn(); lock.lock(); System.out.println(Thread.currentThread().getName() + "工做中!!!!"); Thread.sleep(100); workOff(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }
再次执行,发现其余线程都没法再次获取到锁了,运行的gif图以下
所以能够得出结论:
lock()
,lock()
连续调用的状况,即二者之间没有释放锁unlock()
的显示调用在JDK的阻塞队列中,不少地方就利用了Condition和Lock来实现出队入队的并发安全性,以 ArrayBlockingQueue
为例
内部定义了锁,非空条件,非满条件
/** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; public ArrayBlockingQueue(int capacity, boolean fair) { // ... 省略 lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
出队,入队的实现以下(屏蔽一些与锁无关逻辑)
// 入队逻辑 public void put(E e) throws InterruptedException { // ... final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) // 若是队列已满,则执行Condtion notFull的等待方法 // 本线程会释放锁,等待其余线程出队以后,执行 notFull.singal()方法 notFull.await(); enqueue(e); } finally { // 释放锁 lock.unlock(); } } private void enqueue(E x) { // 入队,notEmpty 条件执行,唤醒被 notEmpty.await() 阻塞的出队线程 notEmpty.signal(); } // 出队 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) // 队列为空,线程执行notEmpty.wait(),阻塞并释放锁 // 等待其余入队线程执行 notEmpty.signal(); 后被唤醒 notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } private E dequeue() { // ... // 出队,notFull 条件执行,唤醒被 notFull.await() 阻塞的入队线程 notFull.signal(); }
下面看下Condition的定义
public interface Condition { // 使当前线程处于等待状态,释放与Condtion绑定的lock锁 // 直到 singal()方法被调用后,被唤醒(若中断,就game over了) // 唤醒后,该线程会再次获取与条件绑定的 lock锁 void await() throws InterruptedException; // 相比较await()而言,不响应中断 void awaitUninterruptibly(); // 在wait()的返回条件基础上增长了超时响应,返回值表示当前剩余的时间 // < 0 ,则表示超时 long awaitNanos(long nanosTimeout) throws InterruptedException; // 同上,只是时间参数不一样而已 boolean await(long time, TimeUnit unit) throws InterruptedException; // 同上,只是时间参数不一样而已 boolean awaitUntil(Date deadline) throws InterruptedException; // 表示条件达成,唤醒一个被条件阻塞的线程 void signal(); // 唤醒全部被条件阻塞的线程。 void signalAll(); }
经过上面的注释,也就是说Condtion通常是与Lock配套使用,应用在多线程协同工做的场景中;即一个线程的执行,指望另外一个线程执行完毕以后才完成
针对这种方式,咱们写个测试类,来实现累加,要求以下:
上面这种状况下,线程3的执行,要求线程1和线程2都执行完毕
说明:下面实现只是为了演示Condition和Lock的使用,上面这种场景有更好的选择,如Thread.join()或者利用Fork/Join都更加优雅
public class LockCountDemo { private int start = 10; private int middle = 90; private int end = 200; private volatile int tmpAns1 = 0; private volatile int tmpAns2 = 0; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); private AtomicInteger count = new AtomicInteger(0); private int add(int i, int j) { try { lock.lock(); int sum = 0; for (int tmp = i; tmp < j; tmp++) { sum += tmp; } return sum; } finally { atomic(); lock.unlock(); } } private int sum() throws InterruptedException { try { lock.lock(); condition.await(); return tmpAns1 + tmpAns2; } finally { lock.unlock(); } } private void atomic() { if (2 == count.addAndGet(1)) { condition.signal(); } } public static void main(String[] args) throws InterruptedException { LockCountDemo demo = new LockCountDemo(); Thread thread1 = new Thread(() -> { System.out.println(Thread.currentThread().getName() + " : 开始执行"); demo.tmpAns1 = demo.add(demo.start, demo.middle); System.out.println(Thread.currentThread().getName() + " : calculate ans: " + demo.tmpAns1); }, "count1"); Thread thread2 = new Thread(() -> { System.out.println(Thread.currentThread().getName() + " : 开始执行"); demo.tmpAns2 = demo.add(demo.middle, demo.end + 1); System.out.println(Thread.currentThread().getName() + " : calculate ans: " + demo.tmpAns2); }, "count2"); Thread thread3 = new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + " : 开始执行"); int ans = demo.sum(); System.out.println("the total result: " + ans); } catch (Exception e) { e.printStackTrace(); } }, "sum"); thread3.start(); thread1.start(); thread2.start(); Thread.sleep(3000); System.out.println("over"); } }
输出以下
sum : 开始执行 count2 : 开始执行 count1 : 开始执行 count1 : calculate ans: 3960 the total result: 20055 count2 : calculate ans: 16095 over
小结Condition的使用:
Lock#newConditin()
进行实例化Condition#await()
会释放lock,线程阻塞;直到线程中断or Condition#singal()
被执行,唤醒阻塞线程,并从新获取lockAQS是一个用于构建锁和同步容器的框架。事实上concurrent包内许多类都是基于AQS构建,例如ReentrantLock,Semaphore,CountDownLatch,ReentrantReadWriteLock,FutureTask等。AQS解决了在实现同步容器时设计的大量细节问题
AQS使用一个FIFO的队列表示排队等待锁的线程,队列头节点称做“哨兵节点”或者“哑节点”,它不与任何线程关联。其余的节点与等待线程关联,每一个节点维护一个等待状态waitStatus
private transient volatile Node head; private transient volatile Node tail; private volatile int state; static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3; //取值为 CANCELLED, SIGNAL, CONDITION, PROPAGATE 之一 volatile int waitStatus; volatile Node prev; volatile Node next; // Link to next node waiting on condition, // or the special value SHARED volatile Thread thread; Node nextWaiter; }
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
源码分析:
tyrAcquire: 尝试获取锁,非阻塞当即返回
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 没有线程占用锁 if (compareAndSetState(0, acquires)) { // 占用锁成功,设置为独占 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 本线程以前已经获取到锁 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); // 更新重入次数 setState(nextc); return true; } return false; }
非公平锁tryAcquire的流程是:
addWaiter
: 入队
private Node addWaiter(Node mode) { // 建立一个节点,并将其挂在链表的最后 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 尾节点为空,说明队列还未初始化,须要初始化head节点并入队新节点 enq(node); return node; }
acquireQueued
挂起
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { // /标记线程是否被中断过 boolean interrupted = false; for (;;) { //获取前驱节点 final Node p = node.predecessor(); // 若该节点为有效的队列头(head指向的Node内部实际为空) // 尝试获取锁 if (p == head && tryAcquire(arg)) { setHead(node); // 获取成功,将当前节点设置为head节点 p.next = null; // help GC failed = false; //返回是否被中断过 return interrupted; } // 判断获取失败后是否能够挂起,若能够则挂起 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 线程若被中断,设置interrupted为true interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
线程挂起的逻辑
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; //前驱节点的状态 if (ws == Node.SIGNAL) // 前驱节点状态为signal,返回true return true; if (ws > 0) { // 从队尾向前寻找第一个状态不为CANCELLED的节点 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 将前驱节点的状态设置为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
线程入队后可以挂起的前提是,它的前驱节点的状态为SIGNAL,它的含义是“Hi,前面的兄弟,若是你获取锁而且出队后,记得把我唤醒!”。
shouldParkAfterFailedAcquire
会先判断当前节点的前驱状态是否符合要求:
小结下lock()流程
acquire()
acquireQueued
, 挂起以前,会先尝试获取锁,值有确认失败以后,则挂起锁,并设置前置Node的状态为SIGNAL(以保障在释放锁的时候,能够保证唤醒Node的后驱节点线程)尝试释放锁,成功,须要清楚各类状态(计数,释放独占锁)
此外还须要额外判断队列下个节点是否须要唤醒,而后决定唤醒被挂起的线程;
public final boolean release(int arg) { if (tryRelease(arg)) { // 尝试释放锁 Node h = head; if (h != null && h.waitStatus != 0) // 查看头结点的状态是否为SIGNAL,若是是则唤醒头结点的下个节点关联的线程 unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; // 计算释放后state值 if (Thread.currentThread() != getExclusiveOwnerThread()) // 若是不是当前线程占用锁,那么抛出异常 throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // 锁被重入次数为0,表示释放成功,清空独占线程 free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
Lock lock = new ReentrantLock()
lock.lock()
尝试获取锁,若被其余线程占用,则阻塞lock.unlock()
; 通常来说,把释放锁的逻辑,放在须要线程同步的代码包装外的finally
块中lock()
,lock()
连续调用的状况,即二者之间没有释放锁unlock()
的显示调用Lock#newConditin()
进行实例化Condition#await()
会释放lock,线程阻塞;直到线程中断or Condition#singal()
被执行,唤醒阻塞线程,并从新获取lockReentrantLock#lock
的流程图大体以下