在原 JUC包 (一) 原子类 与 CAS操作中我们主要了解了什么是原子操作,以及原子操作的实现CAS(Compare ans Swap)
操作. 本章,我们继续接着介绍使用CAS原理
实现的锁操作. 锁在JUC包
中使用非常广泛, 其取代了传统的synchronized
关键字被广泛使用. 下文中,我们将一起了解锁的实现原理与基本使用场景.
AQS,是(Abstract Queued Synchronizer)队列同步器, 简称同步器.它是用于构建锁和其他同步操作的基础框架.使用一个int
变量来标示同步操作. 就个人理解而言, 它就是一个模板类, 其中实现了一个同步队列的基本操作.
其实现与队列基本一致. 主要维护了3个线程状态的方法,分别为getState() / setState(int newState) 和 compareAndSetState()
.
同步器中其他可重写方法:
protected boolean tryAcquire(int args)
独占式获取同步状态protected boolean tryRelease(int args)
独占式释放同步状态protected int tryAcquireShared(int args)
共享式获取同步状态protected boolean tryReleasedShared(int args)
共享式释放同步状态protected boolean isHeldExclusively()
当前同步器是否被独占void acquire(int arg)
void acuqireInterruptibly(int arg)
void tryAcquireNanos(int arg, long naos)
void acquireShared(int arg)
void acquireSharedInterruptily(int arg)
boolean tryAcquireSharedNanos(int arg, long naos)
boolean release(int arg)
boolean releaseShared(int arg)
Collection<Thread> getQueuedThreads()
try/release
同步锁与非同步锁 及 获取队列所以线程的getQueuedThreads()
方法.我们在自定义锁的构造时,通常是使用同步器来进行实现.使用同步器,我们可以构建自定义锁.(类似ReentrantLock
)
import java.util.ArrayList; import java.util.Collections; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; public class Mutex implements Lock{ private static class Sync extends AbstractQueuedSynchronizer{ // 是否处于占用状态 protected boolean isHeldExclusively(){ // 判断状态是否为1 return getState()==1; } // CAS 尝试获取 成功返回true 失败返回false public boolean tryAcquire(int acquires){ if(compareAndSetState(0, 1)){ setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } public boolean tryRelease(){ // 如果 状态为0 说明没有获取锁.(所以无法释放) if(getState()==0)throw new IllegalStateException(); // 如果存在空 设置同步器的所有线程为空 setExclusiveOwnerThread(null); // 设置当前同步器状态为0 setState(0); //释放成功 return true; } Condition newCondition(){return new ConditionObject();} } private final Sync sync = new Sync(); @Override public void lock() { sync.acquire(1); } @Override public boolean tryLock() { return sync.tryAcquire(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(time)); } @Override public void unlock() { sync.release(1); } @Override public Condition newCondition() { return sync.newCondition(); } public static void main(String[] args) { Collections.synchronizedList(new ArrayList<>()); } }
我们经常使用的ReentrantLock
就是靠类似上方的数据进行实现的.(感兴趣的可以读读文尾的附注部分.)
tryAcquire
与acquire()
的区别?
tryAcquire()
是尝试一次获取锁, 获取成功返回true
. 否则, 返回false
. 而acquire()
是写成了一个死循环进行获取锁. 获取失败,则进入等待队列. 这是Java
内部一个用于提高运行效率的设计.
从本章开始, 我们具体讲解下同步器是如何具体实现这些需求的.(独占锁与共享锁)
在同步器内部维护了诸如上图的同步队列.同步器维护了两个分别指向head
与tail
结点的两个指针.各个结点分别记录了prev/next/状态
等多个信息.
Node内的是属性如下所示:
在同步队列中,主要有两个主要操作: 设置头结点/设置尾结点.
head
的下一个结点置为head
结点.(head = head->next
) 无多线程问题.compareAndSetTail(Node expect, Noode update)
进行更新.由于是CAS
操作, 所以在尝试失败后会不断的使用重试机制进行重复操作. 会遇到多线程问题, 通过CAS进行解决.OK, 我们下面看下源码:
compareAndSetHead() & compareAndSetTail()
/** * CAS head field. Used only by enq. */ private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); } /** * CAS tail field. Used only by enq. */ private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); } // 使用Unsafe类,调用系统底层进行CAS操作.
acquire() -> tryAcquire/acquireQueued -> (addWaiter-> enq)
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } // AQS内并没有太多关于tryAcquire()的操作(后续给出一个ReentrantLock的例子) protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } // 获取队列内的值 final boolean acquireQueued(final Node node, int arg) { // 失败标示标为true boolean failed = true; try { // 中断标示标为false boolean interrupted = false; // 自旋操作 阻塞当前线程 for (;;) { // 获取前置结点 final Node p = node.predecessor(); // 如果前置结点为头结点(当前锁持有者) 并且获取锁成功 if (p == head && tryAcquire(arg)) { // 如果完成上述条件 则将当前结点设置为头结点 setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) // 放弃获取 cancelAcquire(node); } } // 增加等待结点 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; // 使用CAS进行将新结点设置到末尾 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 如果CAS失败 则进入enq()方法 enq(node); return node; } // 如果第一次CAS失败 那么不断循环自旋,直到将结点添加到末尾为止 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
在上述到过程中, 当线程获取锁时. 线程会先通过tryAcquire()
获取线程当锁. 当尝试失败时, 线程会将当前线程通过addWaiter()
方法将当前线程放入同步队列中.(如果放入失败,则通过enq()
方法不断循环添加, 直到成功结束.) 对于放入成功的结点,则不断进行自旋for循环
, 直到当前线程获取到锁(成为同步队列的头结点为止).
下面我们来看下release()
释放锁的方法:
release() -> tryRelease() / unparkSuccessor() -> LockSupport.unpark(s.thread)
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } // 模板方法 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } // 使用`LockSupport`的`unpark`方法唤醒后续结点 private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
本章中, 讲述了AQS
获取独占锁的获取和释放的过程:
上文分析了独占式同步队列的基本实现. 本节讲述下共享式同步状态的获取和释放.
共享式同步状态的获取和释放和独占式不太一样. 值得一提的是, 独占式的同步状态与共享式同步状态不能同时存在. 两种模式式互斥的. 基本源码如下所示:
acquireShared() -> tryAcquireShared()/doAcquireShared() ->addWaiter()/
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { // 如果当前结点为头结点 尝试获取 int r = tryAcquireShared(arg); if (r >= 0) { // r>=0 说明已经获取到锁 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } //两个入参,一个是当前成功获取共享锁的节点,一个就是tryAcquireShared方法的返回值,注意上面说的,它可能大于0也可能等于0 private void setHeadAndPropagate(Node node, int propagate) { Node h = head; //记录当前头节点 //设置新的头节点,即把当前获取到锁的节点设置为头节点 //注:这里是获取到锁之后的操作,不需要并发控制 setHead(node); //这里意思有两种情况是需要执行唤醒操作 //1.propagate > 0 表示调用方指明了后继节点需要被唤醒 //2.头节点后面的节点需要被唤醒(waitStatus<0),不论是老的头结点还是新的头结点 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; //如果当前节点的后继节点是共享类型或者没有后继节点,则进行唤醒 //这里可以理解为除非明确指明不需要唤醒(后继等待节点是独占类型),否则都要唤醒 if (s == null || s.isShared()) //后面详细说 doReleaseShared(); } } private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
doReleaseShared() -> unparkSuccessor()
// 这段没怎么看懂
private void doReleaseShared() { for (;;) { //唤醒操作由头结点开始,注意这里的头节点已经是上面新设置的头结点了 //其实就是唤醒上面新获取到共享锁的节点的后继节点 Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; //表示后继节点需要被唤醒 if (ws == Node.SIGNAL) { //这里需要控制并发,因为入口有setHeadAndPropagate跟release两个,避免两次unpark if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; //执行唤醒操作 unparkSuccessor(h); } //如果后继节点暂时不需要唤醒,则把当前节点状态设置为PROPAGATE确保以后可以传递下去 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } //如果头结点没有发生变化,表示设置完成,退出循环 //如果头结点发生变化,比如说其他线程获取到了锁,为了使自己的唤醒动作可以传递,必须进行重试 if (h == head) break; } } private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
共享式同步状态的获取和释放真的比较绕. 就个人理解而言,
Head
头结点的值进行了改变,导致了后续的结点也能够获取到了锁. (Head结点为 volatile
类型变量)tryAcquireNanos -> doAcquireNanos
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); } private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { long lastTime = System.nanoTime(); final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } if (nanosTimeout <= 0) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); long now = System.nanoTime(); nanosTimeout -= now - lastTime; lastTime = now; if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
通过if (nanosTimeout <= 0)return false;
可以看出, 当超过时间界限, 会退出等待循环.
TwinsLock
package com.yanxml.multithreading.art.lock; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; /** * 同步式锁 允许2个线程进行访问 * */ public class TwinsLock implements Lock{ private final Sync sync = new Sync(2); public static final class Sync extends AbstractQueuedSynchronizer{ public Sync(int count){ if(count<0){ throw new IllegalArgumentException(); } setState(count); } // public int tryAcquireShared(int reduceCount){ public int tryAcquireShared(int reduceCount){ for(;;){ int current = getState(); int newCount = current - reduceCount; if(newCount < 0 || compareAndSetState(current, newCount)){ // if(compareAndSetState(current, newCount)){ return newCount; } } } // public boolean tryReleaseShared(int returnCount){ public boolean tryReleaseShared(int returnCount){ for(;;){ int current = getState(); int newCount = current+returnCount; if(compareAndSetState(current, newCount)){ return true; } } } } @Override public void lock() { sync.acquireShared(1); } @Override public void unlock() { sync.releaseShared(1); } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock() { return false; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public Condition newCondition() { return null; } }
TwinsLockTest 测试类
package com.yanxml.multithreading.art.lock; import java.util.concurrent.locks.Lock; import com.yanxml.multithreading.art.lock.TwinsLock; /** * 用于测试TwinsLock的功能. * * */ class Worker extends Thread{ Lock lock; public Worker(Lock lock){ this.lock = lock; } public void run(){ while(true){ lock.lock(); try{ Thread.sleep(3000); System.out.println(Thread.currentThread().getName()); Thread.sleep(3000); }catch(Exception e){ e.printStackTrace(); }finally{ lock.unlock(); } } } } public class TwinsLockTest { public static void main(String[] args) throws InterruptedException { Lock lock = new TwinsLock(); for(int i=0;i<10;i++){ Worker w = new Worker(lock); w.setDaemon(true); w.start(); } for(int i=0;i<10;i++){ Thread.sleep(3000); System.out.println(); } } }
await()
线程等待awaitUninterruptibly()
等待状态直到被通知awaitNanos(long nanosTimeout)
等待(时间段内未收到信息,唤醒)boolean awaitUntil(Date deadline)
等待(死亡时间)void signal()
唤醒一个等待Condition
的线程void signalAll()
唤醒所有等待Condition
的线程AQS的同步队列中, 同时维护一个等待队列.
Condition.await()
方法后,将其从AQS队列中取出,放入Condition的队列中.Condition.singnal()
方法后,将其从Condition的队列中拿出,放入AQS队列.await /
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
void park()
阻塞当前线程void parkNanos(long nanos)
阻塞当前线程(时间间隔nanos)void parkUtil(long deadline)
阻塞当前线程(直到deadline)void unpark(Thread thread)
释放某个线程的锁定final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { // CAS 获取锁 成功即为所有者 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 如果当前线程为线程所有者 nextc=c+1 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // 如果c==0 说明全部释放成功了 free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
ReentrantLoc
k内FairLock
与NoFairLock
的实现各不相同.public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } // 我还是看不懂 这个判断的幺蛾子. public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
ReentrantWriteReadLock
.[1] Java多线程核心
[2] Java多线程编程艺术
[3] Java并发之AQS详解
[4] 一步步透彻理解Lock的Acquire和Release原理源码
[5] JDK中多线程之JUC锁的JDK源码解读配合大神的一起看,秒懂。
[6] java并发编程的艺术——第五章总结(Lock锁与队列同步器)
[7] setState 和 compareAndSetState方法作用分析
[8] 深入浅出AQS之共享锁模式
[9] 锁与CAS介绍
[10] Java并发源码剖析(二)——AbstractQueuedSynchronizer共享模式
[11] 同步器节点的waitStatus解释
[12] AQS——同步队列共享模式
[13] Java中的锁——队列同步器
[14] Java线程并发中的锁——Lock(下)