斜体为抽象类,下横线为接口
java
聚合关系总结:node
锁实现总结:多线程
性质:性能
Lock接口定义了锁的行为ui
public interface Lock {
//上锁(不响应Thread.interrupt()直到获取锁)
void lock();
//上锁(响应Thread.interrupt())
void lockInterruptibly() throws InterruptedException;
//尝试获取锁(以nonFair方式获取锁)
boolean tryLock();
//在指定时间内尝试获取锁(响应Thread.interrupt(),支持公平/二阶段非公平)
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
//解锁
void unlock();
//获取Condition
Condition newCondition();
}
复制代码
//锁具体实现
private final Sync sync;
//根据传入参数选择FairSync或NonfairSync实现
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
public void lock() {
sync.lock();
}
#java.util.concurrent.locks.ReentrantLock.Sync
abstract void lock();
复制代码
加入同步队列(当同步队列为空时会直接得到锁),等待锁this
#java.util.concurrent.locks.ReentrantLock.FairSync
final void lock() {
acquire(1);
}
#java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquire(int arg) {
if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
复制代码
acquire()流程:spa
tryAcquire():模板方法,获取锁线程
#java.util.concurrent.locks.ReentrantLock.FairSync
protected final boolean tryAcquire(int acquires) {
//获取当前线程
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {//当前锁没被占用
if (!hasQueuedPredecessors() &&//1.判断同步队列中是否有节点在等待
compareAndSetState(0, acquires)) {//2.若是上面!1成立,修改state值(代表当前锁已被占用)
setExclusiveOwnerThread(current);//3.若是2成立,修改当前占用锁的线程为当前线程
return true;
}
}
else if (current == getExclusiveOwnerThread()) {//占用锁线程==当前线程(重入)
int nextc = c + acquires;//
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);//修改status
return true;
}
return false;//直接获取锁失败
}
复制代码
acquireQueued(addWaiter(Node.EXCLUSIVE), arg):加入同步队列指针
#java.util.concurrent.locks.AbstractQueuedSynchronizer
//1
private Node addWaiter(Node mode) {
//生成node
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
//将node加到队列尾部
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//若是加入失败(多线程竞争或者tail指针为null)
enq(node);
return node;
}
//1.1
private Node enq(final Node node) {
//死循环加入节点(cas会失败)
for (;;) {
Node t = tail;
if (t == null) { //tail为null,同步队列初始化
//设置head指针
if (compareAndSetHead(new Node()))//注意这里是个空节点!!
tail = head;//将tail也指向head
} else {
node.prev = t;//将当前node加到队尾
if (compareAndSetTail(t, node)) {
t.next = node;
return t;//注意这里才返回
}
}
}
}
//2
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
//表示是否被打断
boolean interrupted = false;
for (;;) {
//获取node.pre节点
final Node p = node.predecessor();
if (p == head //当前节点是不是同步队列中的第二个节点
&& tryAcquire(arg)) {//获取锁,head指向当前节点
setHead(node);//head=head.next
p.next = null;//置空
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && //是否空转(由于空转唤醒是个耗时操做,进入空转前判断pre节点状态.若是pre节点即将释放锁,则不进入空转)
parkAndCheckInterrupt())//利用unsafe.park()进行空转(阻塞)
interrupted = true;//若是Thread.interrupt()被调用,(不会真的被打断,会继续循环空转直到获取到锁)
}
} finally {
if (failed)//tryAcquire()过程出现异常致使获取锁失败,则移除当前节点
cancelAcquire(node);
}
}
复制代码
过程总结:code
注意:这里有两次tryAcquire()过程.第一次,为了不同步队列为空时还插入队列产生的性能耗费(cas空转).第二次,就是正常的流程.先插入队尾,而后等待唤醒,再获取锁
selfInterrupt(): 唤醒当前线程
static void selfInterrupt() {//在获取锁以后 响应intterpt()请求
Thread.currentThread().interrupt();
}
复制代码
一阶段
#java.util.concurrent.locks.ReentrantLock.NonfairSync
final void lock() {
//在acquire()以前先尝试获取锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
复制代码
二阶段 acquire()流程与公平锁如出一辙,惟一区别在于tryAcquire()实现中
#java.util.concurrent.locks.ReentrantLock.NonfairSync
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
#java.util.concurrent.locks.ReentrantLock.Sync
final boolean nonfairTryAcquire(int acquires) {//这个过程其实和FairSync.tryAcquire()基本一致
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//惟一区别: 这里不会去判断队列中是否为空
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
复制代码
区别点 | lock()过程(一阶段) | tryAcquire()过程(二阶段) |
---|---|---|
FairSync | 直接acquire() | 当前若无线程持有锁,若是同步队列为空,获取锁 |
NonFairSync | 先尝试获取锁,再acquire() | 当前若无线程持有锁,获取锁 |
#java.util.concurrent.locks.ReentrantLock
public void unlock() {
sync.release(1);
}
#java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean release(int arg) {
if (tryRelease(arg)) {//释放锁
Node h = head;
if (h != null &&//head节点为空(非公平锁直接获取锁)
h.waitStatus != 0)
unparkSuccessor(h);//唤醒同步队列中离head最近的一个waitStatus<=0的节点
return true;
}
return false;
}
#java.util.concurrent.locks.ReentrantLock
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
//持有锁的线程==当前线程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {//重入锁所有释放
free = true;
//置空持有锁线程
setExclusiveOwnerThread(null);
}
//state==0(此时持有锁,不用cas)
setState(c);
return free;
}
复制代码
lockInterruptibly()与lock()过程基本相同,区别在于Thread.intterpt()的应对措施不一样
//lock()
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
//表示是否被打断
boolean interrupted = false;
for (;;) {
//获取node.pre节点
final Node p = node.predecessor();
if (p == head //当前节点是不是同步队列中的第二个节点
&& tryAcquire(arg)) {//获取锁,当前head指向当前节点
setHead(node);//head=head.next
p.next = null;//置空
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && //是否空转(由于空转唤醒是个耗时操做,进入空转前判断pre节点状态.若是pre节点即将释放锁,则不进入空转)
parkAndCheckInterrupt())//利用unsafe.park()进行空转(阻塞)
interrupted = true;//若是Thread.interrupt()被调用,(不会真的被打断,会继续循环空转直到获取到锁)
}
} finally {
if (failed)//tryAcquire()过程出现异常致使获取锁失败,则移除当前节点
cancelAcquire(node);
}
}
// lockInterruptibly()
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())//惟一区别当Thread.intterpt()打断时,直接抛出异常
throw new InterruptedException();
}
} finally {
if (failed)//而后移除当前节点
cancelAcquire(node);
}
}
复制代码
#java.util.concurrent.locks.ReentrantLock
public boolean tryLock() {
//尝试获取非公平锁
return sync.nonfairTryAcquire(1);
}
复制代码
#java.util.concurrent.locks.ReentrantLock
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
#java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||//获取锁(公平/非公平)
doAcquireNanos(arg, nanosTimeout);//在指定时间内等待锁(空转)
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
...
final long deadline = System.nanoTime() + nanosTimeout;
//加入队尾
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return true;
}
//上面与acquireQueued()相同,重点看这里
//计算剩余时间
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
//利用parkNanos()指定空转时间
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())//若是被Thread.interrupt(),则抛异常
throw new InterruptedException();
}
} finally {
if (failed)//移除节点
cancelAcquire(node);
}
}
复制代码
public Condition newCondition() {
return sync.newCondition();
}
#java.util.concurrent.locks.ReentrantLock.Sync
final ConditionObject newCondition() {
return new ConditionObject();
}
复制代码