concurrent的工具包类主要用来协调不一样线程的运行状态(完成状态、完成步调)、对同步资源的访问限制。
java
countDownLatch是经过一个计数器来实现线程的统一执行。首先给latch设置一个初始值,countDown() 每被调用一次,设定的初始值会减1。在设定值达到0以前,使用latch. await() 的线程会一直阻塞。知道latch.countDown()将设定值减为0,latch.wait()线程才会运行。dom
public class cdlTest { public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(3); Waiter waiter = new Waiter(latch); Decrementer decrementer = new Decrementer(latch); new Thread(waiter).start(); new Thread(decrementer).start(); } } class Waiter implements Runnable { CountDownLatch latch = null; public Waiter(CountDownLatch latch) { this.latch = latch; } @Override public void run() { System.out.println(" waiter is begin"); try { latch.wait(); } catch (Exception e) { e.printStackTrace(); } System.out.println(" Waiter Released"); } } class Decrementer implements Runnable { CountDownLatch latch = null; public Decrementer(CountDownLatch latch) { this.latch = latch; } @Override public void run() { System.out.println(" Decrementer is begin"); try { Thread.sleep(1000); this.latch.countDown(); System.out.println(" Decrementer down 1"); Thread.sleep(1000); this.latch.countDown(); System.out.println(" Decrementer down 2"); Thread.sleep(1000); this.latch.countDown(); System.out.println(" Decrementer down 3"); } catch (Exception e) { e.printStackTrace(); } System.out.println(" Decrementer is release "); } }
循环屏障:会将全部在这个锁定队列中的线程都按顺序执行。当awite()等时候,线程进入等待队列,而后等全部的线程(线程数是CyclicBarrier里的参数)。
ide
public class CycliTest { private static int[] timeWalk = { 5, 8, 15, 15, 10 }; private static int[] timeSelf = { 1, 3, 4, 4, 5 }; private static int[] timeBus = { 2, 4, 6, 6, 7 }; static String now() { SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); return sdf.format(new Date()) + ":"; } static class Tour implements Runnable { private int[] times; private CyclicBarrier barrier; private String tourName; public Tour(CyclicBarrier barrier, String tourName, int[] times) { this.times = times; this.barrier = barrier; this.tourName = tourName; } @Override public void run() { try { Thread.sleep(times[0] * 100); System.out.println(now() + tourName + " Reached Shenzhen "); barrier.await(); Thread.sleep(times[1] * 100); System.out.println(now() + tourName + " Reached Guangzhou "); barrier.await(); Thread.sleep(times[2] * 100); System.out.println(now() + tourName + " Reached Shaoguan "); barrier.await(); Thread.sleep(times[3] * 100); System.out.println(now() + tourName + " Reached Chengdu "); barrier.await(); Thread.sleep(times[4] * 100); System.out.println(now() + tourName + " Reached Wuhan"); barrier.await(); } catch (BrokenBarrierException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { CyclicBarrier barrier = new CyclicBarrier(3); ExecutorService exec = Executors.newCachedThreadPool(); exec.submit(new Tour(barrier, "WalkTour", timeWalk)); exec.submit(new Tour(barrier, "SelfTour", timeSelf)); exec.submit(new Tour(barrier, "BusTour", timeBus)); exec.shutdown(); } }
一个信号量。因为咱们的锁都是加在semaphore上的,只有等semaphore有空闲的时候,等待线程才会继续执行。对限制线程个数的操做的共享资源能够使用。
工具
public class MyTest extends Thread { private Semaphore semaphore; public MyTest(Semaphore s) { this.semaphore = s; } public void run() { try { if (semaphore.availablePermits() > 0) { System.out.println("thread name [" + this.getName() + "] can take"); } else { System.out.println(" thread name [" + this.getName() + "] must waite"); } semaphore.acquire();//获取 System.out.println("thread name[" + this.getName() + "] get resource"); Thread.sleep((long) (Math.random() * 1000)); System.out.println("thread name[" + this.getName() + "] release resource"); semaphore.release();//释放 } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { ExecutorService es = Executors.newCachedThreadPool(); Semaphore s = new Semaphore(2); for (int i = 0; i < 50; i++) { es.submit(new MyTest(s)); } es.shutdown(); } }