JAVA LOCK代码浅析

JAVALOCK整体来讲关键要素主要包括3点:
1.unsafe.compareAndSwapXXX(Objecto,longoffset,intexpected,intx)
2.unsafe.park()和unsafe.unpark()
3.单向链表结构或者说存储线程的数据结构java

第1点
主要为了保证锁的原子性,至关于一个锁是否正在被使用的标记,而且比较和设置这个标记的操做是原子的(硬件提供的swap和test_and_set指令,单CPU下同一指令的多个指令周期不可中断,SMP中经过锁总线支持上诉两个指令的原子性),这基本等于软件级别所能达到的最高级别隔离。node

第2点
主要将未获得锁的线程禁用(park)和唤醒(unpark),也是直接native实现(这几个native方法的实现代码在hotspotsrcsharevmprimsunsafe.cpp文件中,可是关键代码park的最终实现是和操做系统相关的,好比windows下实现是在os_windows.cpp中,有兴趣的同窗能够下载jdk源码查看)。唤醒一个被park()线程主要手段包括如下几种
1.其余线程调用以被park()线程为参数的unpark(Threadthread).
2.其余线程中断被park()线程,如waiters.peek().interrupt();waiters为存储线程对象的队列.
3.不知缘由的返回。编程

park()方法返回并不会报告究竟是上诉哪一种返回,因此返回好最好检查下线程状态,如windows

[java]
LockSupport.park(); //禁用当前线程
if(Thread.interrupted){
//doSomething
}[/java]
AbstractQueuedSynchronizer(AQS)对于这点实现得至关巧妙,以下所示
[java]
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}

//parkAndCheckInterrupt()会返回park住的线程在被unpark后的线程状态,若是线程中断,跳出循环。数据结构

if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
break;
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}

//只有线程被interrupt后才会走到这里多线程

cancelAcquire(node);
throw new InterruptedException();
}

//在park()住的线程被unpark()后,第一时间返回当前线程是否被打断并发

private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
[/java]

第3点对于一个Synchronizer的实现很是重要,存储等待线程,而且unlock时唤醒等待线程,这中间有不少工做须要作,唤醒策略,等待线程意外终结处理,公平非公平,可重入不可重入等。ide

以上简单说明了下JAVALOCKS关键要素,如今咱们来看下java.util.concurrent.locks大体结构ui

203503400.jpg上图中,LOCK的实现类其实都是构建在AbstractQueuedSynchronizer上,为什么图中没有用UML线表示呢,这是每一个Lock实现类都持有本身内部类Sync的实例,而这个Sync就是继承AbstractQueuedSynchronizer(AQS)。为什么要实现不一样的Sync呢?这和每种Lock用途相关。另外还有AQS的State机制。this

基于AQS构建的Synchronizer包括ReentrantLock,Semaphore,CountDownLatch,ReetrantReadWriteLock,FutureTask等,这些Synchronizer实际上最基本的东西就是原子状态的获取和释放,只是条件不同而已。

ReentrantLock须要记录当前线程获取原子状态的次数,若是次数为零,那么就说明这个线程放弃了锁(也有可能其余线程占据着锁从而须要等待),若是次数大于1,也就是得到了重进入的效果,而其余线程只能被park住,直到这个线程重进入锁次数变成0而释放原子状态。如下为ReetranLock的FairSync的tryAcquire实现代码解析。
[java]
//公平获取锁

protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();

//若是当前重进入数为0,说明有机会取得锁

if (c == 0) {

//若是是第一个等待者,而且设置重进入数成功,那么当前线程得到锁

if (isFirst(current) &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}

//若是当前线程自己就持有锁,那么叠加剧进入数,而且继续得到锁

else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}

//以上条件都不知足,那么线程进入等待队列。

return false;
}
[/java]

Semaphore则是要记录当前还有多少次许可可使用,到0,就须要等待,也就实现并发量的控制,Semaphore一开始设置许可数为1,实际上就是一把互斥锁。如下为Semaphore的FairSync实现

[java]
protected int tryAcquireShared(int acquires) {
Thread current = Thread.currentThread();
for (;;) {
Thread first = getFirstQueuedThread();

//若是当前等待队列的第一个线程不是当前线程,那么就返回-1表示当前线程须要等待

if (first != null && first != current)
return -1;

//若是当前队列没有等待者,或者当前线程就是等待队列第一个等待者,那么先取得semaphore还有几个许可证,而且减去当前线程须要的许可证获得剩下的值

int available = getState();
int remaining = available – acquires;

//若是remining<0,那么反馈给AQS当前线程须要等待,若是remaining>0,而且设置availble成功设置成剩余数,那么返回剩余值(>0),也就告知AQS当前线程拿到许可,能够继续执行。

if (remaining < 0 ||compareAndSetState(available, remaining))
return remaining;
}
}
[/java]

CountDownLatch闭锁则要保持其状态,在这个状态到达终止态以前,全部线程都会被park住,闭锁能够设定初始值,这个值的含义就是这个闭锁须要被countDown()几回,由于每次CountDown是sync.releaseShared(1),而一开始初始值为10的话,那么这个闭锁须要被countDown()十次,才可以将这个初始值减到0,从而释放原子状态,让等待的全部线程经过。

[java]

//await时候执行,只查看当前须要countDown数量减为0了,若是为0,说明能够继续执行,不然须要park住,等待countDown次数足够,而且unpark全部等待线程

public int tryAcquireShared(int acquires) {
return getState() == 0? 1 : -1;
}

//countDown时候执行,若是当前countDown数量为0,说明没有线程await,直接返回false而不须要唤醒park住线程,若是不为0,获得剩下须要countDown的数量而且compareAndSet,最终返回剩下的countDown数量是否为0,供AQS断定是否释放全部await线程。

public boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
[/java]

FutureTask须要记录任务的执行状态,当调用其实例的get方法时,内部类Sync会去调用AQS的acquireSharedInterruptibly()方法,而这个方法会反向调用Sync实现的tryAcquireShared()方法,即让具体实现类决定是否让当前线程继续仍是park,而FutureTask的tryAcquireShared方法所作的惟一事情就是检查状态,若是是RUNNING状态那么让当前线程park。而跑任务的线程会在任务结束时调用FutureTask实例的set方法(与等待线程持相同的实例),设定执行结果,而且经过unpark唤醒正在等待的线程,返回结果。

[java]

//get时待用,只检查当前任务是否完成或者被Cancel,若是未完成而且没有被cancel,那么告诉AQS当前线程须要进入等待队列而且park住

protected int tryAcquireShared(int ignore) {
return innerIsDone()? 1 : -1;
}

//断定任务是否完成或者被Cancel

boolean innerIsDone() {
return ranOrCancelled(getState()) && runner == null;
}

//get时调用,对于CANCEL与其余异常进行抛错

V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
if (!tryAcquireSharedNanos(0,nanosTimeout))
throw new TimeoutException();
if (getState() == CANCELLED)
throw new CancellationException();
if (exception != null)
throw new ExecutionException(exception);
return result;
}

//任务的执行线程执行完毕调用(set(Vv))

void innerSet(V v) {
for (;;) {
int s = getState();
//若是线程任务已经执行完毕,那么直接返回(多线程执行任务?)
if (s == RAN)
return;
//若是被CANCEL了,那么释放等待线程,而且会抛错
if (s == CANCELLED) {
releaseShared(0);
return;
}

//若是成功设定任务状态为已完成,那么设定结果,unpark等待线程(调用get()方法而阻塞的线程),以及后续清理工做(通常由FutrueTask的子类实现)

if (compareAndSetState(s, RAN)) {
result = v;
releaseShared(0);
done();
return;
}
}
}
[/java]

以上4个AQS的使用是比较典型,然而有个问题就是这些状态存在哪里呢?而且是能够计数的。从以上4个example,咱们能够很快获得答案,AQS提供给了子类一个intstate属性。而且暴露给子类getState()和setState()两个方法(protected)。这样就为上述状态解决了存储问题,RetrantLock能够将这个state用于存储当前线程的重进入次数,Semaphore能够用这个state存储许可数,CountDownLatch则能够存储须要被countDown的次数,而Future则能够存储当前任务的执行状态(RUNING,RAN,CANCELL)。其余的Synchronizer存储他们的一些状态。

AQS留给实现者的方法主要有5个方法,其中tryAcquire,tryRelease和isHeldExclusively三个方法为须要独占形式获取的synchronizer实现的,好比线程独占ReetranLock的Sync,而tryAcquireShared和tryReleasedShared为须要共享形式获取的synchronizer实现。

ReentrantLock内部Sync类实现的是tryAcquire,tryRelease,isHeldExclusively三个方法(由于获取锁的公平性问题,tryAcquire由继承该Sync类的内部类FairSync和NonfairSync实现)Semaphore内部类Sync则实现了tryAcquireShared和tryReleasedShared(与CountDownLatch类似,由于公平性问题,tryAcquireShared由其内部类FairSync和NonfairSync实现)。CountDownLatch内部类Sync实现了tryAcquireShared和tryReleasedShared。FutureTask内部类Sync也实现了tryAcquireShared和tryReleasedShared。

其实使用过一些JAVAsynchronizer的以后,而后结合代码,可以很快理解其究竟是如何作到各自的特性的,在把握了基本特性,即获取原子状态和释放原子状态,其实咱们本身也能够构造synchronizer。以下是一个LOCKAPI的一个例子,实现了一个先入先出的互斥锁。

[java]
public class FIFOMutex {
private AtomicBoolean locked=new AtomicBoolean(false);
private Queue<Thread> waiters=new ConcurrentLinkedQueue<Thread>();
public void lock(){
boolean wasInterrupted=false;
Thread current=Thread.currentThread();
waiters.add(current);

//若是waiters的第一个等待者不为当前线程,或者当前locked的状态为被占用(true)

//那么park住当前线程

while(waiters.peek()!=current||!locked.compareAndSet(false, true)){
LockSupport.park();

//当线程被unpark时,第一时间检查当前线程是否被interrupted

if(Thread.interrupted()){
wasInterrupted=true;
}
}

//获得锁后,从等待队列移除当前线程,若是,而且若是当前线程已经被interrupted,

//那么再interrupt一下以便供外部响应。

waiters.remove();
if(wasInterrupted){
current.interrupt();
}
}

//unlock逻辑相对简单,设定当前锁为空闲状态,而且将等待队列中

//的第一个等待线程唤醒

public void unlock(){
locked.set(false);
LockSupport.unpark(waiters.peek());
}
}
[/java]

总结,JAVAlock机制对于整个javaconcurrent包的成员意义重大,了解这个机制对于使用java并发类有着不少的帮助,文章中可能存在着各类错误,请各位多多谅解而且可以提出来,谢谢。

文章参考:JDK1.6sourcejava并发编程实践JDK1.6API文档

相关文章
相关标签/搜索