并发编程专题五-AbstractQueuedSynchronizer源码分析

PS:外号鸽子王不是白来的,鸽了好几天,也是由于比较忙,时间太少了,这篇东西有点多,须要慢慢消化。不知不觉竟然写了4个多小时....java

1、什么是AQS

aqs是AbstractQueuedSynchronizer的简称,是用来构建锁或者其余同步组件(信号量、事件等)的基础框架类。JDK中许多并发工具类的内部实现都依赖于AQS,如ReentrantLock, Semaphore, CountDownLatch等等。算法

2、AQS的设计模式

2.1模板方法设计模式

在学习原理和源码以前,咱们先了解一中设计模式。模板方法设计模式。编程

模板方法设计模式定义一个操做中算法的骨架,而将一些步骤延迟到子类中,模板方法使得子类能够不改变算法的结构便可重定义该算法的某些特定步骤。设计模式

通俗来讲就是完成一件事情,有固定的数个步骤,可是每一个步骤根据对象的不一样,而实现细节不一样;就能够在父类中定义一个完成该事情的总方法,按照完成事件须要的步骤去调用其每一个步骤的实现方法。每一个步骤的具体实现,由子类完成。微信

例如 发短信有如下有如下四个步骤: 数据结构

1,须要发送给的人;  2 编写内容; 3,发送日期 4, 调用短信网关发送多线程

发邮件也一样有如下四个步骤:并发

1,须要发送给的人;  2 编写内容; ,发送日期 4, 调用邮箱服务发送框架

这时,能够清楚地发现,不管是发短信仍是发邮件,它们的步骤几乎是相同的: 1, 须要发送给的人;  2 发送内容 3,发送日期 4, 发送。只有具体到发送方法的时候,它们有些步骤才不一样。这时咱们就能够把相同的步骤提取出来,生成一个模版,进行共用,而到具体的内容时,它们再有具本的实现。 具体到代码时,模版就能够用抽象类实现,具体的内容能够到它的子类中实现。ide

代码以下

import java.util.Date;

/**
 * @Auther: DarkKing
 * @Date: 2019/4/21 12:09
 * @Description:
 */
public abstract class SendTemplate {

	//发送给谁,具体发送给谁须要子类去实现
	public abstract void toUser();
	//发送内容,具体发送内容须要子类去实现
	public abstract void content();
	//发送日期,由于日期都是同样的,因此父类就能够实现掉
	public void date() {
		System.out.println(new Date());
	}
	//发送方法,不一样的发送方式确定要实现不一样的发送方法
	public abstract void send();
	
	//发送消息,框架方法-模板方法
	public void sendMessage() {
		toUser();
		content();
		date();
		send();
	}

}

/**
 * @Auther: DarkKing
 * @Date: 2019/4/21 12:09
 * @Description: 短信发送模板实现类
 */
public class SendSms extends SendTemplate {

	@Override
	public void toUser() {
		System.out.println("to Pistachio");
	}

	@Override
	public void content() {
		System.out.println(" I LOVE YOU ❤");

	}

	@Override
	public void send() {
		System.out.println("set sms");

	}
	
	public static void main(String[] args) {
		SendTemplate sendSms = new SendSms();
		sendSms.sendMessage();
	}

}

AQS就是采用了模板方法的设计模式,除了AQS,Spring加载配置的过程一样也是使用了这种设计模式。具体之后Spring源码专题在给你们详细说明

2.2AQS中的方法

既然知道AQS所使用的是模板方法设计模式,那具体都有哪些方法,咱们如今列举一下。

一、模板方法

独占式(独占锁) 共享式(共享锁) 方法描述
acquire acquireShared 获取锁方法,获取同步状态
acquireInterruptibly acquireSharedInterruptibly 获取锁方法,同acquire,但能够响应中断
tryAcquireNanos tryAcquireSharedNanos 获取锁方法
release releaseShared 释放锁
     

二、须要子类覆盖的流程方法

独占式获取  tryAcquire

独占式释放  tryRelease

共享式获取 tryAcquireShared

共享式释放  tryReleaseShared

这些方法咱们能够看到,AQS中只抛出了一个UnsupportedOperationException异常,因此须要咱们在子类中具体去实现。

三、其余方法

isHeldExclusively:判断同步器是否处于独占模式

除此以外,AQS还定义了一个volatile变量state,用于记录锁的状态

getState:获取当前的同步状态

setState:设置当前同步状态

compareAndSetState:使用CAS设置状态,保证状态设置的原子性

为何Doug Lea大师要这样设计AQS呢?AQS面向的是锁的实现者,而咱们在使用Lock锁的时候,只须要调用Lock对应的方法便可,屏蔽了实现细节。而对于锁的实现者来讲,简化了锁的实现方式,例如同步状态管理,线程排队等底层操做。隔离了锁的实现者和锁的使用者。从而进行了解耦,当咱们在使用ReentrantLock等锁的时候,彻底感受不到AQS的存在。

2.3自定义锁的实现

既然咱们了解了AQS中的一些方法,那咱们就经过实现父类中的方法,来本身实现一个锁,加深下对模板方法的理解。

本身实现锁的话,首先咱们实现Lock接口,Lock共有6个接口,以前咱们都已经讲过了,你们若是感兴趣,个人并发编程专题里查看哈~

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * @Auther: DarkKing
 * @Date: 2019/4/21 12:09
 * @Description:
 */
public class SelfLock implements Lock{
	
	//aqs中state 表示获取到锁的状态 state=1 获取到了锁,state=0,表示这个锁当前没有线程拿到

	//定义一个内部类,实现AQS模板方法
	private static class Sync extends AbstractQueuedSynchronizer{
		
		//是否占用
		protected boolean isHeldExclusively() {
			return getState()==1;
		}
		//尝试获取锁
		protected boolean tryAcquire(int arg) {
			//CAS操做,首先对比锁是否被获取到,获取到的话就将锁的状态置为1,不然获取锁失败
			if(compareAndSetState(0,1)) {
				//设置锁的拥有者为当前线程
				setExclusiveOwnerThread(Thread.currentThread());
				return true;
			}
			return false;
		}
		//尝试释放锁
		protected boolean tryRelease(int arg) {
			//若是锁的状态为0,则不须要释放,抛出异常
			if(getState()==0) {
				throw new UnsupportedOperationException();
			}
			//设置锁的拥有者为null,而且状态设置为0
			setExclusiveOwnerThread(null);
			setState(0);
			return true;
		}
		//Condition对象,用于对锁的对象唤醒和等待
		Condition newCondition() {
			return new ConditionObject();
		}
	}
	
	private final Sync sycn = new Sync();

	//获取锁。若是锁已被其余线程获取,则进行等待
	@Override
	public void lock() {
		sycn.acquire(1);
		
	}
	//能够直接调用父类的方法。该方法和lock的区别就是能够响应中断。
	@Override
	public void lockInterruptibly() throws InterruptedException {
		sycn.acquireInterruptibly(1);
		
	}
	//它表示用来尝试获取锁,若是获取成功,则返回true,若是获取失败返回false
	@Override
	public boolean tryLock() {
		return sycn.tryAcquire(1);
	}
	//带时间戳的获取锁,若是等待时间超过,则获取失败
	@Override
	public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
		return sycn.tryAcquireNanos(1, unit.toNanos(time));
	}
    //释放锁
	@Override
	public void unlock() {
		sycn.release(1);
		
	}
	//返回Condition对象
	@Override
	public Condition newCondition() {
		return sycn.newCondition();
	}


}

以上方法咱们实现了获取锁和释放锁的接口。从实现中咱们发现获取锁的时候使用了CAS操做,但释放锁的时候没有进行CAS操做。这样写会不会出现问题呢?接下来写个测试类。测试下锁可否正常使用

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import com.xiangxue.tools.SleepTools;

/**
 * @Auther: DarkKing
 * @Date: 2019/4/21 12:09
 * @Description:
 */
public class TestMyLock {
    public void test() {
        final Lock lock = new SelfLock();
        
        class Worker extends Thread {
            public void run() {
                while (true) {
                    lock.lock();
                    try {
                    	SleepTools.second(1);
                        System.out.println(Thread.currentThread().getName());
                        SleepTools.second(1);
                    } finally {
                        lock.unlock();
                    }
                    SleepTools.second(2);
                }
            }
        }
        // 启动10个子线程
        for (int i = 0; i < 10; i++) {
            Worker w = new Worker();
            w.setDaemon(true);
            w.start();
        }
        // 主线程每隔1秒换行
        for (int i = 0; i < 10; i++) {
        	SleepTools.second(1);
            System.out.println();
        }
    }

    public static void main(String[] args) {
        TestMyLock testMyLock = new TestMyLock();
        testMyLock.test();
    }
}

咱们定义了10个线程,打印出当前获取锁的线程。从打印咱们能够看出来,每次只会打印出一个线程名。说明咱们写的锁是起做用的。那为何释放的时候不须要CAS操做呢。当咱们一个线程获取到锁,其余线程又都在作什么呢。接下来咱们走进AQS源码中,进行学习。

3、AQS源码解析

咱们上面的测试用例中。当咱们一个线程获取到锁,其余线程再去获取锁的时候,咱们返回了false,线程进入的等待状态,那既然线程进入了等待状态,等待锁被释放的时候了进行唤醒。那么必然要有个地方存储咱们进行等待的线程的一个数据结构。

3.一、AQS中的数据结构-节点和同步队列

AQS的中的等待的线程所有存储在一个同步队列里,它是一个先进先出的数据结构,先进来的线程会先被唤醒。同时这个队列仍是个双向列表。上一个节点指向下一个节点。还有头指示器指向第一个节点。尾指示器指向最后一个节点。

3.2 同步队列节点属性

打开AQS的源码,找到Node类,全部属性以下

字段名 属性值 描述
CANCELLED 1 线程等待超时或者被中断了,须要从队列中移走
SIGNAL -1 后续的节点等待状态,当前节点若是完成则通知后面的节点去运行
CONDITION -2 当前节点处于等待队列
PROPAGATE -3 共享,表示状态要日后面的节点传播
  0 初始状态
waitStatus   标识当前节点状态,就是上面的那几个常量字段。
Node prev   表示当前节点的前驱节点
Node next   表示当前节点的后驱节点
Thread thread   表示当前节点存放的线程
Node nextWaiter   指向等待队列节点

3.3 获取锁的过程

当咱们执行获取acquire锁的方法时

若是获取失败,则会执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法,该方法首先会执行addWaiter(Node.EXCLUSIVE), arg),

该方法是将线程包装成Node节点,经过CAS操做,将节点加入到同步队列中。

尾节点变化如图所示

由于多线程的缘由,为了保持原子性,因此这里须要使用CAS操做。否则可能会出现尾部节点数据丢失等问题。若是CAS对比操做失败,则执行enq方法,将该操做放入循环中。直到添加成功。(CAS的基本操做)

当Node已经放入了同步队列以后,那么这个线程说明须要进行等待。排队排到个人时候我要去获取锁。因此acquireQueued方法中,会一直循环获取锁。

首先获取前置节点,若是前置节点是头节点,则直接去尝试获取锁。当获取锁成功以后,将该节点设置为头节点,以前的头结点设置为null,脱钩帮助GC。而后returen false;说明获取锁成功。不须要在进行等待。

若是获取锁失败,则会执行parkAndCheckInterrupt()方法,将本身阻塞。这是整个获取锁的一个过程。

3.4 释放锁的过程

释放锁的方法为咱们定义的unlock();实际调用的AQS中的release方法。

该方法首先执行咱们实现的TryRelease尝试释放锁,若是锁释放成功,则获取头结点,而后执行unparkSuccessor(h)方法。

这个方法主要就是获取头结点的下一个节点,若是下个节点为null,则将下个节点的前一个节点设置为尾部,不然的话而后进行唤醒。

首节点的变化如图所示

当一个节点释放锁以后,修改头结点时同时只会有一个线程去操做,因此不须要CAS操做,直接将头节点设为null,而后修改同步器头部指向下一个节点。

总体获取和释放锁的过程以下所示

3.五、AQS中的数据结构-节点和等待队列

每个锁都有一个Condition对象,该对象主要实现await和signal等方法。用于线程的等待和唤醒。所以每一个Condition对象中确定也存在一个等待队列。

等待队列的数据结构

等待队列和同步队列的区别

一、等待队列里的节点和同步队列节点都是同一个属性。惟一的不一样就是等待队列是一个单向链表,而非双向链表。

二、一个同步器(锁)里面,只会有一个同步队列,但能够有多个等待队列。以下图所示。

一个锁能够建立个多个Condition,每个Condition下都会有一个等待队列。

private Lock lock = new ReentrantLock(); private Condition keCond = lock.newCondition(); private Condition siteCond = lock.newCondition();

3.六、await方法过程

await表示使当前线程进入等待状态。

首先调用addConditionWaiter方法将该线程包装成Node加入到阻塞队列中去,而后调用fullyRelease方法,将该线程所持有的锁进行释放。在while中判断线程是否被唤醒,若是没有,则进行阻塞。

await方法过程

3.七、single方法过程

single唤醒等待的一个线程。

当执行single的时候,首先须要从等待队列中取出一个节点,若是不为null,则执行doSignal方法。

若是等待队列的下一个节点为null,则把末尾节点设为null

而后调用transferForSignal方法,判断当前节点的状态,若是不能修改值,则取消,若是修改为功,则嗲用enq。将该节点移动到同步队列尾部,并设置waitStatus为SIGNAL。知足条件后唤醒线程。

single流程图以下

由于每一个condition对象都会有一个同步机制,并且调用single会指定唤醒对应等待队列的线程,不会丢失信息。因此建议使用single方法唤醒,而不是调用singleAll,并且每次调用singleAll还要将全部等待队列的节点所有移动到同步队列中。

大致上学完AQS,要了解模板方法设计,会本身手动实现锁,了解获取锁和释放锁,以及基于锁的等待和唤醒机制。你们若是还有什么问题,能够加我微信一块儿讨论哈。

其余阅读   并发编程专题

相关文章
相关标签/搜索