原文:java并发编程 | 锁详解:AQS,Lock,ReentrantLock,ReentrantReadWriteLockjava
锁是用来控制多个线程访问共享资源的方式,java
中可使用synchronized
和Lock
实现锁的功能node
synchronized
是java中的关键字,隐藏获取和释放锁的过程,Lock
是java中的接口,须要主动的获取锁和释放锁,synchronized
是排他锁,而Lock
支持可中断获取锁,超时获取锁算法
Lock
提供的接口编程
public interface Lock { /** * 获取锁,调用该方法后当前线程获取锁,获取到锁以后从该方法返回 */ void lock(); /** * 可中断的获取锁,在获取锁的过程当中能够中断当前线程 */ void lockInterruptibly() throws InterruptedException; /** * 尝试非阻塞的获取锁,调用方法后当即返回,获取到锁则返回true,不然返回false */ boolean tryLock(); /** * 超时获取锁,在超时时间内获取到锁,在超时时间被中断,超时时间内为获取到锁,三种状况下会从该方法返回 */ boolean tryLock(long time, TimeUnit unit) throws InterruptedException; /** * 释放锁 */ void unlock(); /** * 获取等待通知组件,只有当前线程获取到锁以后才能够调用该组件的wait()方法,释放锁 */ Condition newCondition(); }
队列同步器AbstractQueuedSynchronizer
(AQS
简称同步器)是用来构建锁或者其余同步组件的基础框架安全
java
中锁的实现基本都是经过聚合了一个同步器的子类完成线程访问控制的,同步器是实现锁的关键,能够这么理解,锁面向编程者,隐藏了实现细节,同步器面向锁的实现,简化了锁的实现方式,屏蔽了同步状态管理,线程排队,等待与唤醒等底层操做,经过AbstractQueuedSynchronizer
咱们能够很方便的实现一个锁并发
同步器的设计基于模板方法模式,提供的模板方法主要包括:独占锁获取锁与释放同步状态,共享式获取与释放同步状态,获取同步队列中等待线程状况框架
独占式操做ide
想要实现一个独占式锁须要重写如下方法函数
方法名 | 描述 |
---|---|
void acquire(int arg) | 独占式获取同步状态,同一时刻只能有一个线程能够获取到同步状态,获取失败进入同步队列等待 |
void acquireInterruptibly(int arg) | 独占式获取同步状态,响应中断操做,被中断时会抛异常并返回 |
boolean tryAcquireNanos(int arg, long nanosTimeout) | 独占式获取同步状态,响应中断操做,而且增长了超时限制,若是规定时间没有得到同步状态就返回false,不然返回true |
boolean release(int arg) | 独占式释放同步状态,在释放同步状态以后,将同步队列中的第一个节点包含的线程唤醒 |
共享式操做源码分析
想要实现一个共享锁须要重写如下方法
方法名 | 描述 |
---|---|
void acquireShared(int arg) | 共享式获取同步状态,同一时刻能够有多个线程获取到同步状态 |
void acquireSharedInterruptibly(int arg) | 共享式获取同步状态,响应中断操做 |
boolean tryAcquireSharedNanos(int arg, long nanosTimeout) | 共享式获取同步状态,响应中断操做,而且增长了超时限制,若是规定时间没有得到同步状态就返回false,不然返回true |
boolean releaseShared(int arg) | 共享式释放同步状态 |
方法名 | 描述 |
---|---|
Collection
|
获取同步队列上的线程集合 |
在这些模板方法中,屡次提到了同步队列,咱们看一下AQS
是如何实现同步队列的
首先看下AbstractQueuedSynchronizer
的类图
Node
类是AbstractQueuedSynchronizer
类的内部类,同步器依靠内部的一个同步队列来完成同步状态的管理,当前线程获取同步状态失败的时候,同步器会将当前线程及等待信息构形成一个Node
节点加入到同步队列中
属性 | 描述 |
---|---|
waitStatus | 该线程等待状态,包含以下: CANCELLED 值为1,表示须要从同步队列中取消等待 SIGNAL值为-1,表示后继节点处于等待状态,若是当前节点释放同步状态会通知后继节点,使得后继节点的线程可以运行 CONDITION值为-2,表示节点在等待队列中 PROPAGATE值为-3,表示下一次共享式同步状态获取将会无条件传播下去 INITIAL值为0,表示初始状态 |
prev:Node | 前驱节点 |
next:Node | 后继节点 |
thread:Thread | 当前线程 |
nextWaiter:Node | 下一个等待节点 |
能够看到AQS中
的节点信息包含前驱和后继节点,因此咱们知道了AQS的同步队列是双向链表结构的
AQS
中的几个重要属性
属性 | 描述 |
---|---|
state:int | 同步状态:若是等于0,锁属于空闲状态,若是等于1,标识锁被占用,若是大于1,则表示锁被当前持有的线程屡次加锁,即重入状态 |
head:Node | 队列的头节点 |
tail:Node | 队列的尾节点 |
unsafe:Unsafe | AQS中的cas算法实现 |
AQS
中提供了三个方法对同步状态进行操做
getState()
获取到同步状态setState(int newState)
设置同步状态compareAndSetState(int expect, int update)
使用CAS
设置当前状态,该方法可以保证设置的原子性AQS
的基本结构以下图所示
在同步器中head
和tail
的节点的引用指向同步队列的头,尾节点,这样在后面操做节点入列和出列的时候只须要操做同步器中的head
和tail
节点就能够
ReentrantLock
重入锁,内部AQS的实现是基于独占式获取/释放同步状态的。咱们学习一下ReentrantLock
的实现原理来进一步加深对AQS
的理解
重进入是指任意线程在获取到锁以后可以再次获取该锁而不会被锁阻塞,它表示一个线程能够对资源重复加锁,同时支持获取锁时使用公平锁仍是非公平锁
例:
/** * @author: chenmingyu * @date: 2019/4/12 15:09 * @description: ReentrantLock */ public class ReentrantLockTest { private static Lock LOCK = new ReentrantLock(); public static void main(String[] args) { Runnable r1 = new TestThread(); new Thread(r1,"r1").start(); Runnable r2 = new TestThread(); new Thread(r2,"r2").start(); } public static class TestThread implements Runnable{ @Override public void run() { LOCK.lock(); try { System.out.println(Thread.currentThread().getName()+":获取到锁 "+LocalTime.now()); TimeUnit.SECONDS.sleep(3L); }catch (Exception e){ e.printStackTrace(); }finally { LOCK.unlock(); } } } }
输出
只有在r1
线程释放锁以后r2
线程才获取到锁去执行代码打印数据
建立的实例,默认使用非公平锁,若是须要公平锁,须要调用有参的构造函数
/** * 非公平锁 * 建立ReentrantLock实例,默认使用非公平锁 */ public ReentrantLock() { sync = new NonfairSync(); } /** * 公平锁 * 建立ReentrantLock实例,fair为true使用公平锁 */ public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
NonfairSync
与FairSync
都是ReentrantLock
类的内部类,继承自ReentrantLock
类的内部类Sync
,Sync
类继承了AbstractQueuedSynchronizer
类图以下
非公平锁的实现
/** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
非公平锁会在调用lock()
方法的时候首先调用compareAndSetState(0, 1)
方法尝试获取锁,若是没有获取到锁则调用acquire(1)
方法
compareAndSetState(0, 1)
方法是一个CAS
操做,如过设置成功,则为获取到同步状态,并调用setExclusiveOwnerThread(Thread.currentThread());
方法将当前线程设置为独占模式同步状态的全部者
咱们所说的获取同步状态其实指的就是获取锁的状态,获取同步状态成功则加锁成功
protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
acquire(1)
方法是提供的模板方法,调用tryAcquire(arg)
和acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire(arg)
方法调用的是子类的实现,NonfairSync
的tryAcquire
方法
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
nonfairTryAcquire(acquires)
方法
/** * 非公平尝试获取同步状态 */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { /** * 首先根据`getState()`方法获取同步状态,若是等于0尝试调用`compareAndSetState(0, * acquires)`方法获取同步状态,若是设置成功则获取同步状态成功,设置当前线程为独占模式同步状态的 * 全部者 */ int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
getState()
方法获取同步状态,若是等于0尝试调用compareAndSetState(0, acquires)
方法获取同步状态,若是设置成功则获取同步状态成功,设置当前线程为独占模式同步状态的全部者state
+1,表示当前线程屡次加锁若是tryAcquire(arg)
返回false,表示没有获取到同步状态,即没有拿到锁,因此须要调用 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
方法将当前线程加入到同步队列中,而且以死循环的方式获取同步状态,若是获取不到则阻塞节点中的线程,而被阻塞的线程只能经过前驱节点的出队,或者阻塞线程被中断来实现唤醒
addWaiter(Node.EXCLUSIVE)
方法的做用就是构造同步队列的节点信息,而后加入到同步队列尾部
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
首先调用Node
类的构造方法建立一个实例,tail
是AQS
中队列的尾节点
若是tail
节点不为空,将实例的前驱节点置为tail
指向的节点,而后调用compareAndSetTail(pred, node)
方法,compareAndSetTail(pred, node)
方法调用unsafe.compareAndSwapObject(this, tailOffset, expect, update)
,此方法是一个CAS
操做,不可中断,用来保证节点可以被线程安全的添加,设置成功后,将节点tail
的后继节点指向当前实例,以此来实现将当前实例加入到同步队列尾部
若是tail
节点等于空或者compareAndSetTail(pred, node)
设置失败,则会调用enq(node)
方法
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
在这个方法中利用for
循环构造了一个死循环,若是当前AQS
的tail
节点为空,则证实当前同步队列中没有等待的线程,也就是没有节点,调用compareAndSetHead(new Node())
方法构造了一个头节点,而后循环调用compareAndSetTail(t, node)
将当前实例加入到队列的尾部,若是失败就一直调用,直到成功为止
在调用addWaiter(Node mode)
方法后会调用acquireQueued(final Node node, int arg)
方法,做用是在每一个节点进入到同步队列中后就进入了一个自旋的状态,经过校验本身的前驱节点是不是头节点,而且是否获取到同步状态为条件进行判断,若是知足条件则从自旋中退出,负责一直自旋
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
方法内也是一个for
的死循环,经过node.predecessor()
方法获取传入的Node
实例的前驱节点并与AQS
的head
节点进行比较,若是相等,则尝试获取同步状态获取锁,若是获取成功就调用setHead(node);
方法将当前Node
实例节点设置为head
节点,将原来head
节点的后继节点置为null,有助于GC回收
setHead(node);
private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
若是传入的Node
实例的前驱节点与AQS
的head
节点不相等或者获取同步状态失败,则调用shouldParkAfterFailedAcquire(p, node)
和parkAndCheckInterrupt()
方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
经过CAS
操做,设置节点的前驱节点等待状态为Node.SIGNAL
,若是设置失败,返回false,由于外层是死循环,会重复当前方法直到设置成功
parkAndCheckInterrupt()
方法调用LookSupport.park()
阻塞线程,而后清除掉中断标识
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
从acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
方法返回后,调用selfInterrupt()
,将线程中断
公平锁的实现
在了解acquire(1);
方法的做用以后,在理解公平锁的实现就容易了
final void lock() { acquire(1); }
对比非公平锁的实现少了一步上来就获取同步状态的操做,其他操做跟非公平锁的实现同样
公平锁与非公平锁总结:
独占式获取锁的流程
ReentrantLock
的unlock()
方法实际调用的AQS
的release(int arg)
方法
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
首先调用tryRelease(arg)
释放同步状态
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
获取同步状态,并减1,若是此时c==0则释放锁,将当前独占式锁的拥有线程置为null,而后设置state
为0
而后调用unparkSuccessor(Node node)
方法唤醒后继节点的线程
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) /* * 唤醒后继节点的线程 */ LockSupport.unpark(s.thread); }
总结一下独占式获取锁和释放锁的过程:
AQS
维护的同步队列的尾部,而且开始自旋,跳出自旋的条件就是前驱节点为AQS
的头节点而且获取到了同步状态,此时将节点移除同步队列在了解了ReentrantLock
的实现原理以后,咱们就能够仿照着本身去实现一个自定义独占式锁了
步骤
LockTest
类,实现Lock
接口,重写必要的接口LockTest
类里建立一个内部类Sync
,继承AQS
,由于要实现独占式锁,因此重写tryAcquire(int arg)
和tryRelease(int arg)
方法就能够了LockTest
代码
/** * @author: chenmingyu * @date: 2019/4/11 15:11 * @description: 自定义独占式锁 */ public class LockTest implements Lock{ private final Sync SYNC = new Sync(); public static class Sync extends AbstractQueuedSynchronizer{ @Override protected boolean tryAcquire(int arg) { if(compareAndSetState(0,1)){ setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } @Override protected boolean tryRelease(int arg) { if(getState()<1){ throw new IllegalMonitorStateException("释放同步状态不可小于1"); } int c = getState() - arg; if (c == 0) { setExclusiveOwnerThread(null); } setState(c); return true; } } @Override public void lock() { SYNC.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { SYNC.acquireInterruptibly(1); } @Override public boolean tryLock() { return SYNC.tryAcquire(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public void unlock() { SYNC.release(1); } @Override public Condition newCondition() { return null; } }
/** * @author: chenmingyu * @date: 2019/4/12 15:09 * @description: LockTest */ public class ReentrantLockTest { private static Lock LOCKTEST = new LockTest(); public static void main(String[] args) { Runnable r1 = new TestThread(); new Thread(r1,"LockTest 1").start(); Runnable r2 = new TestThread(); new Thread(r2,"LockTest 2").start(); } public static class TestThread implements Runnable{ @Override public void run() { LOCKTEST.lock(); try { System.out.println(Thread.currentThread().getName()+":获取到锁 "+LocalTime.now()); TimeUnit.SECONDS.sleep(3L); }catch (Exception e){ e.printStackTrace(); }finally { LOCKTEST.unlock(); } } } }
输出
ReentrantReadWriteLock
是读写锁的实现,实现ReadWriteLock
接口
ReentrantReadWriteLock
内部一样维护这一个Sync
内部类,实现了AQS
,经过重写对应方法实现读锁和写锁
如今已经知道了同步状态是由AQS
维护的一个整型变量state
,独占式锁获取到锁时会对其进行加1,支持重入,而读写锁ReentrantReadWriteLock
在设计的时候也是经过一个整型变量进行读锁的同步状态和写锁的同步状态维护,在一个变量上维护两种状态就须要对整型变量进行按位分割,一个int类型的变量包含4个字符,一个字符8个bit,就是32bit,在ReentrantReadWriteLock
中,高16位表示读,低16位表示写
写锁的获取
读写锁中的写锁,支持重进入的排它锁
重写ReentrantReadWriteLock
的内部类Sync
中的tryAcquire(int acquires)
方法
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); /* * 1,若是同步状态c不等于0,表明着有读锁或者写锁 */ if (c != 0) { // 2,若是c不等于0,w写锁的同步状态为0,切当前线程不是持有锁的线程,返回false if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires); return true; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
解读
若是存在读锁,写锁不能被获取,必需要等到其余读线程释放读锁,才能够获取到写锁,这么作的缘由是要确保写锁作的操做对读锁可见,若是写锁被获取,则其余读写线程的后续访问均会被阻塞
写锁的释放
读写锁中的读锁,支持重进入的共享锁
写锁的释放与独占式锁释放过程类似,每次都是减小写锁的同步状态,直到为0时,表示写锁已被释放
读锁的获取与释放
读锁是一个支持重入的共享锁,重写ReentrantReadWriteLock
的内部类Sync
中的tryAcquireShared(int unused)
方法
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0; } }
若是其余线程获取了写锁,则当前线程获取读锁状态失败进入等待状态,若是当前线程获取了写锁或者写锁未被获取,则当前线程获取同步状态成功,获取到读锁
释放读锁的时候就是每次释放都会对同步状态进行-1,直到为0时,表示读锁已被释放
锁降级
锁降级是指将写锁降级为读锁,这个过程就是当前线程已经获取到写锁的时候,在获取到读锁,随后释放写锁的过程,这么作的目的为的就是保证数据的可见性
当前线程A获取到写锁后,对数据进行修改,以后在获取到读锁,而后释放写锁,完成锁降级,这时候线程A还没释放读锁,别的线程就没法获取到写锁,就没法对数进行修改,以此来保证数据的可见性
参考:java并发编程的艺术
推荐: