上一篇文章中详细分析了基于AQS的ReentrantLock原理,ReentrantLock经过AQS中的state变量0和1之间的转换表明了独占锁。那么能够思考一下,当state变量大于1时表明了什么?J.U.C中是否有基于AQS的这种实现呢?若是有,那他们都是怎么实现的呢?这些疑问经过详细分析J.U.C中的Semaphore与CountDownLatch类后,将会获得解答。node
独占锁意味着只能有一个线程获取锁,其余的线程在锁被占用的状况下都必须等待锁释放后才能进行下一步操做。由此类推,共享锁是否意味着能够由多个线程同时使用这个锁,不须要等待呢?若是是这样,那锁的意义也就不存在了。在J.U.C中共享意味着有多个线程能够同时获取锁,但这个多个是有限制的,并非无限个,J.U.C中经过Semaphore与CountDownLatch来分别实现了两种有限共享锁。ide
Semaphore又叫信号量,他经过一个共享的’信号包‘来给每一个使用他的线程来分配信号,当信号包中的信号足够时,线程能够获取锁,反之,信号包中信号不够了,则不能获取到锁,须要等待足够的信号被释放,才能获取。oop
CountDownLatch又叫计数器,他经过一个共享的计数总量来控制线程锁的获取,当计数器总量大于0时,线程将被阻塞,不可以获取锁,只有当计数器总量为0时,全部被阻塞的线程同时被释放。源码分析
能够看到Semaphore与CountDownLatch都有一个共享总量,这个共享总量就是经过state来实现的。ui
在详细分析Semaphore与CountDownLatch的原理以前,先来看看他们是怎么使用的,这样方便后续咱们理解他们的原理。先知道他是什么?而后再问为何?下面经过两个示例来详细说明Semaphore与CountDownLatch的使用。this
//初始化10个信号量在信号包中,让ABCD4个线程分别去获取 public static void main(String[] args) throws InterruptedException { Semaphore semaphore = new Semaphore(10); SemaphoreTest(semaphore); } private static void SemaphoreTest(final Semaphore semaphore) throws InterruptedException { //线程A初始获取了4个信号量,而后分3次释放了这4个信号量 Thread threadA = new Thread(new Runnable() { @Override public void run() { try { semaphore.acquire(4); System.out.println(Thread.currentThread().getName() + " get 4 semaphore"); Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + " release 1 semaphore"); semaphore.release(1); Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + " release 1 semaphore"); semaphore.release(1); Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + " release 2 semaphore"); semaphore.release(2); } catch (InterruptedException e) { e.printStackTrace(); } } }); threadA.setName("threadA"); //线程B初始获取了5个信号量,而后分2次释放了这5个信号量 Thread threadB = new Thread(new Runnable() { @Override public void run() { try { semaphore.acquire(5); System.out.println(Thread.currentThread().getName() + " get 5 semaphore"); Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + " release 2 semaphore"); semaphore.release(2); Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + " release 3 semaphore"); semaphore.release(3); } catch (InterruptedException e) { e.printStackTrace(); } } }); threadB.setName("threadB"); //线程C初始获取了4个信号量,而后分1次释放了这4个信号量 Thread threadC = new Thread(new Runnable() { @Override public void run() { try { semaphore.acquire(4); System.out.println(Thread.currentThread().getName() + " get 4 semaphore"); Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + " release 4 semaphore"); semaphore.release(4); } catch (InterruptedException e) { e.printStackTrace(); } } }); threadC.setName("threadC"); //线程D初始获取了10个信号量,而后分1次释放了这10个信号量 Thread threadD = new Thread(new Runnable() { @Override public void run() { try { semaphore.acquire(10); System.out.println(Thread.currentThread().getName() + " get 10 semaphore"); Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + " release 10 semaphore"); semaphore.release(10); } catch (InterruptedException e) { e.printStackTrace(); } } }); threadD.setName("threadD"); //线程A和线程B首先分别获取了4个和5个信号量,总信号量变为了1个 threadA.start(); threadB.start(); Thread.sleep(1); //线程C尝试获取4个发现不够则等待 threadC.start(); Thread.sleep(1); //线程D尝试获取10个发现不够则等待 threadD.start(); }
执行结果以下:线程
threadB get 5 semaphore threadA get 4 semaphore threadA release 1 semaphore threadB release 2 semaphore threadC get 4 semaphore threadA release 1 semaphore threadC release 4 semaphore threadB release 3 semaphore threadA release 2 semaphore threadD get 10 semaphore threadD release 10 semaphore
能够看到threadA和threadB在获取了9个信号量以后threadC和threadD以后等待信号量足够时才能继续往下执行。而threadA和threadB在信号量足够时是能够同时执行的。code
其中有一个问题,当threadD排队在threadC以前时,信号量若是被释放了4个,threadC会先于threadD执行吗?仍是须要排队等待呢?这个疑问在详细分析了Semaphore的源码以后再来给你们答案。队列
//初始化计数器总量为2 public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(2); CountDownLatchTest(countDownLatch); } private static void CountDownLatchTest(final CountDownLatch countDownLatch) throws InterruptedException { //threadA尝试执行,计数器为2被阻塞 Thread threadA = new Thread(new Runnable() { @Override public void run() { try { countDownLatch.await(); System.out.println(Thread.currentThread().getName() + " await"); } catch (InterruptedException e) { e.printStackTrace(); } } }); threadA.setName("threadA"); //threadB尝试执行,计数器为2被阻塞 Thread threadB = new Thread(new Runnable() { @Override public void run() { try { countDownLatch.await(); System.out.println(Thread.currentThread().getName() + " await"); } catch (InterruptedException e) { e.printStackTrace(); } } }); threadB.setName("threadB"); //threadC在1秒后将计数器数量减1 Thread threadC = new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(1000); countDownLatch.countDown(); System.out.println(Thread.currentThread().getName() + " countDown"); } catch (InterruptedException e) { e.printStackTrace(); } } }); threadC.setName("threadC"); //threadD在5秒后将计数器数量减1 Thread threadD = new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(5000); countDownLatch.countDown(); System.out.println(Thread.currentThread().getName() + " countDown"); } catch (InterruptedException e) { e.printStackTrace(); } } }); threadD.setName("threadD"); threadA.start(); threadB.start(); threadC.start(); threadD.start(); }
执行结果以下:rem
threadC countDown threadD countDown threadA await threadB await
threadA和threadB在尝试执行时因为计数器总量为2被阻塞,当threadC和threadD将计数器总量减为0后,threadA和threadB同时开始执行。
总结一下:Semaphore就像旋转寿司店,共有10个座位,当座位有空余时,等待的人就能够坐上去。若是有只有2个空位,来的是一家3口,那就只有等待。若是来的是一对情侣,就能够直接坐上去吃。固然若是同时空出5个空位,那一家3口和一对情侣能够同时上去吃。CountDownLatch就像大型商场里面的临时游乐场,每一场游乐的时间事后等待的人同时进场玩,而一场中间会有不爱玩了的人随时出来,但不能进入,一旦全部进入的人都出来了,新一批人就能够同时进场。
明白了Semaphore与CountDownLatch是作什么的,怎么使用的。接下来就来看看Semaphore与CountDownLatch底层时怎么实现这些功能的。
上篇文章经过对ReentrantLock的分析,得倒了AQS中实现独占锁的几个关键方法:
//状态量,独占锁在0和1之间切换 private volatile int state; //调用tryAcquire获取锁,获取失败后加入队列中挂起等操做,AQS中实现 public final void acquire(int arg); //独占模式下尝试获取锁,ReentrantLock中实现 protected boolean tryAcquire(int arg); //调用tryRelease释放锁以及恢复线程等操做,AQS中实现 public final boolean release(int arg); //独占模式下尝试释放锁,ReentrantLock中实现 protected boolean tryRelease(int arg);
其中具体的获取和释放独占锁的逻辑都放在ReentrantLock中本身实现,AQS中负责管理获取或释放独占锁成功失败后须要具体处理的逻辑。那么共享锁的实现是否也是遵循这个规律呢?由此咱们在AQS中发现了如下几个相似的方法:
//调用tryAcquireShared获取锁,获取失败后加入队列中挂起等操做,AQS中实现 public final void acquireShared(int arg); //共享模式下尝试获取锁 protected int tryAcquireShared(int arg); //调用tryReleaseShared释放锁以及恢复线程等操做,AQS中实现 public final boolean releaseShared(int arg); //共享模式下尝试释放锁 protected boolean tryReleaseShared(int arg);
共享锁和核心就在上面4个关键方法中,先来看看Semaphore是怎么调用上述方法来实现共享锁的。
首先是Semaphore的构造方法,同ReentrantLock同样,他有两个构造方法,这样也是为了实现公平共享锁和非公平共享锁,你们可能有疑问,既然是共享锁,为何还分公平和非公平的呢?这就回到了上面那个例子后面的疑问,前面有等待的线程时,后来的线程是否能够直接获取信号量,仍是必定要排队。等待固然是公平的,插队就是非公平的。
仍是用旋转寿司的例子来讲:如今只有2个空位,已经有一家3口在等待,这时来了一对情侣,公平共享锁的实现就是这对情侣必须等待,只到一家3口上桌以后才轮到他们,而非公平共享锁的实现是可让这对状况直接去吃,由于恰好有2个空位,让一家3口继续等待(好像是很不公平......),这种状况下非公平共享锁的好处就是能够最大化寿司店的利润(好像同时也得罪了等待的顾客......),也是Semaphore默认的实现方式。
public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
Semaphore的例子中使用了两个核心方法acquire和release,分别调用了AQS中的acquireSharedInterruptibly和releaseShared方法:
//获取permits个信号量 public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } //释放permits个信号量 public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) //尝试获取arg个信号量 doAcquireSharedInterruptibly(arg); //获取信号量失败时排队挂起 } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { //尝试释放arg个信号量 doReleaseShared(); return true; } return false; }
Semaphore在获取和释放信号量的流程都是经过AQS来实现,具体怎么算获取成功或释放成功则由Semaphore自己实现。
//公平共享锁尝试获取acquires个信号量 protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) //前面是否有排队,有则返回获取失败 return -1; int available = getState(); //剩余的信号量(旋转寿司店剩余的座位) int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) // 剩余信号量不够,够的状况下尝试获取(旋转寿司店座位不够,或者同时来两对状况抢座位) return remaining; } } //非公平共享锁尝试获取acquires个信号量 final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); //剩余的信号量(旋转寿司店剩余的座位) int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) // 剩余信号量不够,够的状况下尝试获取(旋转寿司店座位不够,或者同时来两对情侣抢座位) return remaining; } }
能够看到公平共享锁和非公平共享锁的区别就在是否须要判断队列中是否有已经等待的线程。公平共享锁须要先判断,非公平共享锁直接插队,尽管前面已经有线程在等待。
为了验证这个结论,稍微修改下上面的示例:
threadA.start(); threadB.start(); Thread.sleep(1); threadD.start(); //threadD已经在排队 Thread.sleep(3500); threadC.start(); //3500毫秒后threadC来插队
结果输出:
threadB get 5 semaphore threadA get 4 semaphore threadB release 2 semaphore threadA release 1 semaphore threadC get 4 semaphore //threadC先与threadD获取到信号量 threadA release 1 semaphore threadB release 3 semaphore threadC release 4 semaphore threadA release 2 semaphore threadD get 10 semaphore threadD release 10 semaphore
这个示例很好的说明了当为非公平锁时会先尝试获取共享锁,而后才排队。
当获取信号量失败以后会去排队,排队这个操做经过AQS中的doAcquireSharedInterruptibly方法来实现:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); //加入等待队列 boolean failed = true; try { for (;;) { final Node p = node.predecessor(); //获取当前节点的前置节点 if (p == head) { int r = tryAcquireShared(arg); //前置节点是头节点时,说明当前节点是第一个挂起的线程节点,再次尝试获取共享锁 if (r >= 0) { setHeadAndPropagate(node, r); //与ReentrantLock不一样的地方:获取共享锁成功设置头节点,同时通知下一个节点 p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && //非头节点或者获取锁失败,检查节点状态,查看是否须要挂起线程 parkAndCheckInterrupt()) //挂起线程,当前线程阻塞在这里! throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
这一段代码和ReentrantLock中的acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法基本同样,说下两个不一样的地方。一是加入等待队列时这里加入的是Node.SHARED类型的节点。二是获取锁成功后会通知下一个节点,也就是唤醒下一个线程。以旋转寿司店的例子为例,前面同时走了5个客人,空余5个座位,一家3口坐进去以后会告诉后面的一对情侣,让他们也坐进去,这样就达到了共享的目的。shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法在上一篇文章中都有详细说明,这里就作解释了。
再来看看releaseShared方法时怎么释放信号量的,首先调用tryReleaseShared来尝试释放信号量,释放成功后调用doReleaseShared来判断是否须要唤醒后继线程:
protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow //释放信号量过多 throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) //cas操做设置新的信号量 return true; } } private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { //SIGNAL状态下唤醒后继节点 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); //唤醒后继节点 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
释放的逻辑很好理解,相比ReentrantLock只是在state的数量上有点差异。
CountDownLatch相比Semaphore在实现逻辑上要简单的多,同时他也没有公平和非公平的区别,由于当计数器达到0的时候,全部等待的线程都会释放,不为0的时候,全部等待的线程都会阻塞。直接来看看CountDownLatch的两个核心方法await和countDown。
public void await() throws InterruptedException { //和Semaphore的不一样在于参数为1,其实这个参数对CountDownLatch来讲没什么意义,由于后面CountDownLatch的tryAcquireShared实现是经过getState() == 0来判断的 sync.acquireSharedInterruptibly(1); } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { //这里加入了一个等待超时控制,超过期间后直接返回false执行后面的代码,不会长时间阻塞 return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public void countDown() { sync.releaseShared(1); //每次释放1个计数 } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) //尝试获取arg个信号量 doAcquireSharedInterruptibly(arg); //获取信号量失败时排队挂起 } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; //奠基了同时获取锁的基础,不管State初始为多少,只能计数等于0时触发 }
和Semaphore区别有两个,一是State每次只减小1,同时只有为0时才释放全部等待线程。二是提供了一个超时等待方法。acquireSharedInterruptibly方法跟Semaphore同样,就不细说了,这里重点说下tryAcquireSharedNanos方法。
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); } //最小自旋时间 static final long spinForTimeoutThreshold = 1000L; private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; //计算了一个deadline final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return true; } } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) //超时后直接返回false,继续执行 return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) //大于最小cas操做时间则挂起线程 LockSupport.parkNanos(this, nanosTimeout); //挂起线程也有超时限制 if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
重点看标了注释的几行代码,首先计算了一个超时时间,当超时后直接退出等待,继续执行。若是未超时而且大于最小的cas操做时间,这里定义的是1000ns,则挂起,同时挂起操做也有超时限制。这样就实现了超时等待。
至此关于AQS的共享锁的两个实现Semaphore与CountDownLatch就分析完了,他们与非共享最大的区别就在因而否能多个线程同时获取锁。看完后但愿你们能对Semaphore与CountDownLatch有深入的理解,不明白的时候想一想旋转寿司店和游乐场的例子,若是对你们有帮助,以为写的好的话,能够点个赞,固然更但愿你们能积极指出文中的错误和提出积极的改进意见。