#0 系列目录#java
#1 背景# 最近一个月都在作项目,我主要负责分布式任务的调度的功能,须要实现一个分布式的受权控制。具体的需求:并发
#2 分析# 先抛开分布式通信这一块,首先从单个jvm如何实现进行分析, 简单点来讲:框架
在单jvm中就是两种线程,一个为manager,另外一种为worker。1:n的对应关系,manager能够随时挂起worker的全部线程,而worker线程互不干扰
。异步
咋一看,会以为是一个比较典型的读写锁的应用场景,读写锁特性:jvm
当读写锁是写加锁状态时, 在这个锁被解锁以前, 全部试图对这个锁加锁的线程都会被阻塞;分布式
当读写锁在读加锁状态时, 全部试图以读模式对它进行加锁的线程均可以获得访问权, 可是若是线程但愿以写模式对此锁进行加锁, 它必须直到知道全部的线程释放锁;测试
使用读写锁实现这样的功能会存在一个问题,就是对应的写锁是没有抢占权,好比当前有读锁未释放时,写锁一直会被阻塞。而项目的需求是,manager是个领导,它能够不用排队,随时打断你
。ui
除此以外,整个worker线程操做会是一个跨方法,跨类的复杂实现。经过lock方式实现,异常稍微处理很差,很容易形成锁未释放,致使manager一直拿不到对应的锁操做。并且worker中本省会使用一些lock操做,容易形成死锁。.net
总结一下:线程
读线程能够不互相影响,写线程拥有最高的抢占权
,能够不理会读线程是否在操做;所以本文的互斥信号(BooleanMutex)就应运而生,它是信号量(Semaphore)的一个变种,加入了读锁的特性。好比在状态为1时能够一直获得响应,对应的P操做不会消费对应的资源。
#3 实现# 基于jdk 1.5以后的concurrent的AQS,实现了一个本身的互斥信号控制。
package com.king.lock; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.AbstractQueuedSynchronizer; /** * 互斥信号共享锁 */ public class BooleanMutex { private Sync sync; public BooleanMutex() { sync = new Sync(); set(false); } /** * 阻塞等待Boolean为true * @throws InterruptedException */ public void lock() throws InterruptedException { sync.innerLock(); } /** * 阻塞等待Boolean为true,容许设置超时时间 * @param timeout * @param unit * @throws InterruptedException * @throws TimeoutException */ public void lockTimeOut(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { sync.innerLock(unit.toNanos(timeout)); } public void unlock(){ set(true); } /** * 从新设置对应的Boolean mutex * @param mutex */ public void set(Boolean mutex) { if (mutex) { sync.innerSetTrue(); } else { sync.innerSetFalse(); } } public boolean state() { return sync.innerState(); } /** * 互斥信号共享锁 */ private final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -7828117401763700385L; /** * 状态为1,则唤醒被阻塞在状态为FALSE的全部线程 */ private static final int TRUE = 1; /** * 状态为0,则当前线程阻塞,等待被唤醒 */ private static final int FALSE = 0; /** * 返回值大于0,则执行;返回值小于0,则阻塞 */ protected int tryAcquireShared(int arg) { return getState() == 1 ? 1 : -1; } /** * 实现AQS的接口,释放共享锁的判断 */ protected boolean tryReleaseShared(int ignore) { // 始终返回true,表明能够release return true; } private boolean innerState() { return getState() == 1; } private void innerLock() throws InterruptedException { acquireSharedInterruptibly(0); } private void innerLock(long nanosTimeout) throws InterruptedException, TimeoutException { if (!tryAcquireSharedNanos(0, nanosTimeout)) throw new TimeoutException(); } private void innerSetTrue() { for (;;) { int s = getState(); if (s == TRUE) { return; // 直接退出 } if (compareAndSetState(s, TRUE)) {// cas更新状态,避免并发更新true操做 releaseShared(0);// 释放一下锁对象,唤醒一下阻塞的Thread } } } private void innerSetFalse() { for (;;) { int s = getState(); if (s == FALSE) { return; //直接退出 } if (compareAndSetState(s, FALSE)) {//cas更新状态,避免并发更新false操做 setState(FALSE); } } } } }
代码其实仍是挺简单的,主要是对AQS的一份扩展实现。对应的javadoc和使用说明:
测试代码:
package com.king.aqs; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author taomk * @version 1.0 * @since 15-11-20 下午4:38 */ public class BooleanMutexTest { public static void main(String [] args) { // 测试1 初始化为true,不会阻塞,会唤醒被状态false阻塞的线程 BooleanMutex mutex = new BooleanMutex(); mutex.set(true); try { System.out.println("1. =======>" + System.currentTimeMillis()); mutex.lock(); //不会被阻塞 System.out.println("1. =======>" + System.currentTimeMillis()); } catch (InterruptedException e) { e.printStackTrace(); } // 测试2 初始化false,主线程阻塞,子线程为true,唤醒 final BooleanMutex mutex2 = new BooleanMutex(); try { final CountDownLatch count = new CountDownLatch(1); ExecutorService executor = Executors.newCachedThreadPool(); System.out.println("2. =======>" + System.currentTimeMillis()); executor.submit(new Callable() { public Object call() throws Exception { Thread.sleep(1000); mutex2.set(true); System.out.println("2. =======>" + System.currentTimeMillis()); count.countDown(); return null; } }); mutex2.lock(); //会被阻塞,等异步线程释放锁对象 System.out.println("2. =======>" + System.currentTimeMillis()); count.await(); System.out.println("2. =======>" + System.currentTimeMillis()); executor.shutdown(); } catch (InterruptedException e) { e.printStackTrace(); } // 测试3 初始化为true,不会被阻塞 try { final BooleanMutex mutex3 = new BooleanMutex(); mutex3.set(true); System.out.println("3. =======>" + System.currentTimeMillis()); final CountDownLatch count = new CountDownLatch(10); ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { executor.submit(new Callable() { public Object call() throws Exception { mutex3.lock(); System.out.println("3. =======>" + System.currentTimeMillis()); count.countDown(); return null; } }); } count.await(); System.out.println("3. =======>" + System.currentTimeMillis()); executor.shutdown(); } catch (InterruptedException e) { e.printStackTrace(); } // 测试4 初始化false,子线程阻塞 try { final BooleanMutex mutex4 = new BooleanMutex();//初始为false final CountDownLatch count = new CountDownLatch(10); ExecutorService executor = Executors.newCachedThreadPool(); System.out.println("4. =======>" + System.currentTimeMillis()); for (int i = 0; i < 10; i++) { executor.submit(new Callable() { public Object call() throws Exception { mutex4.lock();//被阻塞 System.out.println("4. =======>" + System.currentTimeMillis()); count.countDown(); return null; } }); } Thread.sleep(1000); mutex4.set(true); System.out.println("4. =======>" + System.currentTimeMillis()); count.await(); executor.shutdown(); } catch (InterruptedException e) { e.printStackTrace(); } } }