java中的队列同步器AQS -- AbstractQueuedSynchronizer

1)原理:java

1)使用一个int成员变量表示同步状态(private volatile int state;),经过同步队列(一个FIFO双向队列)来完成同步状态的管理(即:线程的排队)。
2)当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构形成为一个节点(Node)并将其加入到同步队列的尾部,而且当前线程(因进行自旋操做)被阻塞。
3)首节点是成功获取到同步状态的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,然后继节点将会在获取同步状态成功时将本身设置为首节点。
	
同步队列中的节点(Node):用来保存获取同步状态失败的线程引用、等待状态以及前驱和后继节点。

2)说明:node

1)AQS是用来构建锁或者其它同步工具的基础框架,锁和同步工具都采用封装一个AQS的子类(静态内部类)的方式来实现其功能。
2)同步器的主要使用方式是继承,子类经过继承同步器并实现它的抽象方法来管理同步状态,
3)经过getState()、setState(int newState)和compareAndSetState(int expect,int update)来安全地修改状态。

3)同步器中的方法:编程

1)同步器中能够被子类重写的方法:
	protected boolean tryAcquire(int arg) // 独占式获取同步状态
	protected boolean tryRelease(int arg) // 独占式释放同步状态
	protected int tryAcquireShared(int arg) // 共享式获取同步状态,当返回值大于等于0时,表示获取成功。
	protected boolean tryReleaseShared(int arg) // 共享式释放同步状态
	protected boolean isHeldExclusively() // 当前同步器是否在独占模式下被占用。
		
2)模板方法:

	1>独占式操做同步状态的方法:
		public final void acquire(int arg) 
			独占式获取同步状态,若是当前线程成功获取同步状态,则该方法返回;不然,当前线程会进入同步队列等待。
			该方法将会调用子类重写的tryAcquire(int arg)方法;若是后续对线程进行中断操做,线程不会从同步队列中移出。
		
		public final void acquireInterruptibly(int arg) throws InterruptedException {
			在acquire(int arg)的基础上增长了中断响应。
			若当前线程未获取到同步状态,进入同步队列等待时,若是当前线程被中断,则该方法抛出终端异常并返回。
			
		public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException
			在acquireInterruptibly(int arg)的基础上增长了超时限制。
			在指定的时间内,获取到同步状态返回true,不然返回false。
			
		public final boolean release(int arg)
			独占式释放同步状态,该方法会在释放同步状态以后,会唤醒其后继节点(即:将同步队列中第一个节点包含的线程唤醒)。
			该方法将会调用子类重写的tryRelease(int arg)方法;

	2>共享式操做同步状态的方法:
		public final void acquireShared(int arg)
			共享式的获取同步状态,若是当前线程成功获取同步状态,则该方法返回;不然,当前线程会进入同步队列等待。
			该方法将会调用子类重写的tryAcquireShared(int arg)方法。tryAcquireShared(int arg)方法返回值为int类型,当返回值大于等于0时,表示可以获取到同步状态。
			所以,在共享式获取的自旋过程当中,成功获取到同步状态并退出自旋的条件就是tryAcquireShared(int arg)方法返回值大于等于0。
			
			与独占式获取的区别:在同一个时刻,能够有多个线程同时获取到同步状态。
		
		public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {	
			在acquireShared(int arg)的基础上增长了超时限制。
			
		public final boolean releaseShared(int arg)
			共享式释放同步状态
			该方法将会调用子类重写的tryReleaseShared(int arg)方法;
			tryReleaseShared(int arg)方法必须确保同步状态的安全释放(通常是经过循环和CAS来保证同步状态的安全释放)。
			
3)方法源码分析:
	
独占式:
	1)获取同步状态的方法:
		public final void acquire(int arg) {
			if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
				selfInterrupt();
		}
		
		final boolean acquireQueued(final Node node, int arg) {
			boolean failed = true;
			try {
				boolean interrupted = false;
				for (;;) { // 自旋
					final Node p = node.predecessor();
					// 1移出队列(或中止自旋)的条件是前驱节点为头节点且成功获取了同步状态。
					// 2当前驱节点是头节点时,才会去尝试获取同步状态
					if (p == head && tryAcquire(arg)) { 
						setHead(node);
						p.next = null; // help GC
						failed = false;
						return interrupted;
					}
					if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 进入等待状态并检查时是否被中断
						interrupted = true;
				}
			} finally {
				if (failed)
					cancelAcquire(node);
			}
		}
		
		1>调用自定义同步器实现的tryAcquire(int arg)方法,该方法保证线程安全的获取同步状态。
		2>若是同步状态获取失败,则构造独占式(Node.EXCLUSIVE)同步节点并经过addWaiter(Node node)方法将该节点加入到同步队列的尾部。
		3>最后调用acquireQueued(Node node,int arg)方法,使得该节点以自旋的方式获取同步状态。
		4>若是获取不到同步状态,则阻塞节点中的线程。
		5>被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。
		
	2)释放同步状态的方法:
		public final boolean release(int arg) {
			if (tryRelease(arg)) {
				Node h = head;
				if (h != null && h.waitStatus != 0)
					unparkSuccessor(h);
				return true;
			}
			return false;
		}
		
		
共享式:
	1)获取同步状态的方法:
		public final void acquireShared(int arg) {
			if (tryAcquireShared(arg) < 0)
				doAcquireShared(arg);
		}
		
		private void doAcquireShared(int arg) {
			final Node node = addWaiter(Node.SHARED);	// 则构造共享式(Node.SHARED)同步节点,并将该节点加入到同步队列的尾部
			boolean failed = true;
			try {
				boolean interrupted = false;
				for (;;) { // 自旋
					final Node p = node.predecessor();
					if (p == head) { 					// 当前驱节点是头节点时,才会去尝试获取同步状态
						int r = tryAcquireShared(arg);	
						if (r >= 0) {					// 当返回值大于等于0时,表示获取同步状态成功并从退出自旋
							setHeadAndPropagate(node, r);
							p.next = null; // help GC
							if (interrupted) selfInterrupt();
							failed = false;
							return;
						}
					}
					if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
						interrupted = true;
				}
			} finally {
				if (failed)
					cancelAcquire(node);
			}
		}
	
	2)释放同步状态的方法:
		public final boolean releaseShared(int arg) {
			if (tryReleaseShared(arg)) {
				doReleaseShared();
				return true;
			}
			return false;
		}

		private void doReleaseShared() {
			for (;;) {	// 经过循环和CAS来保证同步状态的安全释放
				Node h = head;
				if (h != null && h != tail) {
					int ws = h.waitStatus;
					if (ws == Node.SIGNAL) {
						if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
							continue;            // loop to recheck cases
						unparkSuccessor(h);
					}
					else if (ws == 0 &&
							 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
						continue;                // loop on failed CAS
				}
				if (h == head)                   // loop if head changed
					break;
			}
		}

参考书籍:<java并发编程的艺术>安全

相关文章
相关标签/搜索