聊聊并发(十四)—基于AQS实现互斥信号(BooleanMutex)

#0 系列目录#java

#1 背景# 最近一个月都在作项目,我主要负责分布式任务的调度的功能,须要实现一个分布式的受权控制。具体的需求:并发

  1. 首先管理员启动整个任务,并设置执行权限;
  2. 工做节点收到消息后就会建立对应的线程,并开始执行任务(任务都是由一个管理员进行分配);
  3. 运行过程当中管理员须要临时中断某个任务,须要设置一个互斥信号,此时对应的工做节点都须要被阻塞,注意不是彻底销毁;

#2 分析# 先抛开分布式通信这一块,首先从单个jvm如何实现进行分析, 简单点来讲:框架

在单jvm中就是两种线程,一个为manager,另外一种为worker。1:n的对应关系,manager能够随时挂起worker的全部线程,而worker线程互不干扰异步

咋一看,会以为是一个比较典型的读写锁的应用场景,读写锁特性:jvm

  1. 当读写锁是写加锁状态时, 在这个锁被解锁以前, 全部试图对这个锁加锁的线程都会被阻塞;分布式

  2. 当读写锁在读加锁状态时, 全部试图以读模式对它进行加锁的线程均可以获得访问权, 可是若是线程但愿以写模式对此锁进行加锁, 它必须直到知道全部的线程释放锁;测试

使用读写锁实现这样的功能会存在一个问题,就是对应的写锁是没有抢占权,好比当前有读锁未释放时,写锁一直会被阻塞。而项目的需求是,manager是个领导,它能够不用排队,随时打断你ui

除此以外,整个worker线程操做会是一个跨方法,跨类的复杂实现。经过lock方式实现,异常稍微处理很差,很容易形成锁未释放,致使manager一直拿不到对应的锁操做。并且worker中本省会使用一些lock操做,容易形成死锁。.net

总结一下:线程

  1. 须要的是一个相似于信号量的PV控制;
  2. 具备的读写锁的,读线程能够不互相影响,写线程拥有最高的抢占权,能够不理会读线程是否在操做;
  3. 支持线程中断 (worker线程须要容许cancel);

所以本文的互斥信号(BooleanMutex)就应运而生,它是信号量(Semaphore)的一个变种,加入了读锁的特性。好比在状态为1时能够一直获得响应,对应的P操做不会消费对应的资源。

#3 实现# 基于jdk 1.5以后的concurrent的AQS,实现了一个本身的互斥信号控制。

package com.king.lock;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

/**
 * 互斥信号共享锁
 */
public class BooleanMutex {

	private Sync sync;

	public BooleanMutex() {
		sync = new Sync();
		set(false);
	}

	/**
	 * 阻塞等待Boolean为true
	 * @throws InterruptedException
	 */
	public void lock() throws InterruptedException {
		sync.innerLock();
	}

	/**
	 * 阻塞等待Boolean为true,容许设置超时时间
	 * @param timeout
	 * @param unit
	 * @throws InterruptedException
	 * @throws TimeoutException
	 */
	public void lockTimeOut(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
		sync.innerLock(unit.toNanos(timeout));
	}

	public void unlock(){
		set(true);
	}

	/**
	 * 从新设置对应的Boolean mutex
	 * @param mutex
	 */
	public void set(Boolean mutex) {
		if (mutex) {
			sync.innerSetTrue();
		} else {
			sync.innerSetFalse();
		}
	}

	public boolean state() {
		return sync.innerState();
	}

	/**
	 * 互斥信号共享锁
	 */
	private final class Sync extends AbstractQueuedSynchronizer {
		private static final long serialVersionUID = -7828117401763700385L;

		/**
		 * 状态为1,则唤醒被阻塞在状态为FALSE的全部线程
		 */
		private static final int TRUE = 1;
		/**
		 * 状态为0,则当前线程阻塞,等待被唤醒
		 */
		private static final int FALSE = 0;

		/**
		 * 返回值大于0,则执行;返回值小于0,则阻塞
		 */
		protected int tryAcquireShared(int arg) {
			return getState() == 1 ? 1 : -1;
		}

		/**
		 * 实现AQS的接口,释放共享锁的判断
		 */
		protected boolean tryReleaseShared(int ignore) {
			// 始终返回true,表明能够release
			return true;
		}

		private boolean innerState() {
			return getState() == 1;
		}

		private void innerLock() throws InterruptedException {
			acquireSharedInterruptibly(0);
		}

		private void innerLock(long nanosTimeout) throws InterruptedException, TimeoutException {
			if (!tryAcquireSharedNanos(0, nanosTimeout))
				throw new TimeoutException();
		}

		private void innerSetTrue() {
			for (;;) {
				int s = getState();
				if (s == TRUE) {
					return; // 直接退出
				}
				if (compareAndSetState(s, TRUE)) {// cas更新状态,避免并发更新true操做
					releaseShared(0);// 释放一下锁对象,唤醒一下阻塞的Thread
				}
			}
		}

		private void innerSetFalse() {
			for (;;) {
				int s = getState();
				if (s == FALSE) {
					return; //直接退出
				}
				if (compareAndSetState(s, FALSE)) {//cas更新状态,避免并发更新false操做
					setState(FALSE);
				}
			}
		}
	}
}

代码其实仍是挺简单的,主要是对AQS的一份扩展实现。对应的javadoc和使用说明:

输入图片说明

测试代码:

package com.king.aqs;

import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author taomk
 * @version 1.0
 * @since 15-11-20 下午4:38
 */
public class BooleanMutexTest {

	public static void main(String [] args) {
		// 测试1 初始化为true,不会阻塞,会唤醒被状态false阻塞的线程
		BooleanMutex mutex = new BooleanMutex();
		mutex.set(true);
		try {
			System.out.println("1. =======>" + System.currentTimeMillis());
			mutex.lock(); //不会被阻塞
			System.out.println("1. =======>" + System.currentTimeMillis());
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		// 测试2 初始化false,主线程阻塞,子线程为true,唤醒
		final BooleanMutex mutex2 = new BooleanMutex();
		try {
			final CountDownLatch count = new CountDownLatch(1);
			ExecutorService executor = Executors.newCachedThreadPool();
			System.out.println("2. =======>" + System.currentTimeMillis());
			executor.submit(new Callable() {
				public Object call() throws Exception {
					Thread.sleep(1000);
					mutex2.set(true);
					System.out.println("2. =======>" + System.currentTimeMillis());
					count.countDown();
					return null;
				}
			});

			mutex2.lock(); //会被阻塞,等异步线程释放锁对象
			System.out.println("2. =======>" + System.currentTimeMillis());
			count.await();
			System.out.println("2. =======>" + System.currentTimeMillis());
			executor.shutdown();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		// 测试3 初始化为true,不会被阻塞
		try {
			final BooleanMutex mutex3 = new BooleanMutex();
			mutex3.set(true);
			System.out.println("3. =======>" + System.currentTimeMillis());
			final CountDownLatch count = new CountDownLatch(10);
			ExecutorService executor = Executors.newCachedThreadPool();

			for (int i = 0; i < 10; i++) {
				executor.submit(new Callable() {
					public Object call() throws Exception {
						mutex3.lock();
						System.out.println("3. =======>" + System.currentTimeMillis());
						count.countDown();
						return null;
					}
				});
			}
			count.await();
			System.out.println("3. =======>" + System.currentTimeMillis());
			executor.shutdown();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		// 测试4 初始化false,子线程阻塞
		try {
			final BooleanMutex mutex4 = new BooleanMutex();//初始为false
			final CountDownLatch count = new CountDownLatch(10);
			ExecutorService executor = Executors.newCachedThreadPool();
			System.out.println("4. =======>" + System.currentTimeMillis());
			for (int i = 0; i < 10; i++) {
				executor.submit(new Callable() {
					public Object call() throws Exception {
						mutex4.lock();//被阻塞
						System.out.println("4. =======>" + System.currentTimeMillis());
						count.countDown();
						return null;
					}
				});
			}
			Thread.sleep(1000);
			mutex4.set(true);
			System.out.println("4. =======>" + System.currentTimeMillis());
			count.await();
			executor.shutdown();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}
相关文章
相关标签/搜索