1、入队操做
-
当一个线程获取锁失败以后会被转换为Node节点,而后会使用enq方法,将该节点插入到AQS的阻塞队列,下面看一下这个方法如何实现
private Node enq(final Node node) {
for(;;) {
Node t = tail;
if( t == null) {
if(compareAndSetHead(new Node())) {
tail = head;
}
}else {
node.prev = t;
if(compareAndSetTail(t,node)) {
t.next = node;
}
}
}
}
-
解析:进入到第一次循环,tail和head此时都是null,如图default,而后让t指向tail,即null,如图I,此时都是null,而后随便产生一个实例,也就是II所示,让head指向它,而后执行语句,让tail也指向它;
-
接下来进行第二次循环,让t也指向这个实例,如图III,而后进入到else语句,先让node的前向指向哨兵,如图IV;而后,把tail设置为node,在把哨兵的next指向tail,如图VI,也就结束了。
2、AQS-条件变量的支持
-
notify和wait方法是配合synchronized内置锁实现线程同步的,条件变量的signal和await方法也是用来配合锁(使用AQS实现的锁),实现线程间同步的基础设施
-
它们的不一样在于,synchronized同时只能与一个共享变量的notify或者wait方法实现同步,而AQS的一个锁能够对应多个条件变量,咱们先看下面的例子
package com.ruigege.LockSourceAnalysis6;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class TestReentrantLock {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
lock.lock();
try {
System.out.println("begin wait");
condition.await();
System.out.println("end wait");
}catch(Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
lock.lock();
try {
System.out.println("begin signal");
condition.signal();
System.out.println("end signal");
}catch(Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
22.2
-
其实这里的Lock对象就至关于synchronized加上了共享变量,调用lock.lock()就至关于进入了synchronized块(获取了共享变量的内置锁),调用lock.unlock()方法就至关于推出了synchronized块,调用条件变量的signal方法就至关于调用共享变量的notify方法,signalAll()至关于notifyAll()方法
-
在上面的代码中,lock.newCondition()的做用其实就是new了一个在AQS内部声明的ConditionObject对象,ConditionObject是AQS的内部类,能够方法AQS内部的变量(例如状态变量state)和方法,在每一个条件变量内部都维护了一个条件队列,用来存放调用条件变量的await()方法时被阻塞的线程,注意这个条件队列和AQS队列不是一回事。
-
下面即是await方法源码,当线程调用条件变量的await()方法时(必须先调用锁的lock方法获取锁),在内部会构造一个类型为Node.CONDITION的node节点,而后将该节点插入到条件队列末尾,以后当前线程会释放获取得锁(也就是会操做锁对应的state变量的值),并被阻塞挂起,这时候若是有其余线程调用lock.lock()尝试获取锁,就会有一个线程获取到锁,若是获取到的锁的线程调用了条件变量的await方法,则该线程会被放入到条件变量的阻塞队列中,而后释放获取到的锁,在await方法处阻塞。
public final void await() throws InterruptedException {
if(Thread.interrupted()) {
throw new InterruptedException();
}
Node node = addContionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while(!isOnSyncQueue(node)) {
LockSupport.park(this);
if((interruprMode = checkInterruptWhileWaiting(node)) != 0) {
break;
}
}
}
3、源码:
-
所在包:com.ruigege.ConcurrentListSouceCodeAnalysis5
-
https://github.com/ruigege66/ConcurrentJava
-
-
-
欢迎关注微信公众号:傅里叶变换,我的帐号,仅用于技术交流