这里举一个栗子,咱们对一个资源进行加锁,但是又要进行细粒度的控制,该如何实现呢?java
好比咱们开了了个餐馆。餐馆有一个厨房,服务员能够通知厨房进行作菜,当前冰箱里有菜时,厨房就会开始作菜,冰箱里没菜则会等待。dom
/** * Created by Anur IjuoKaruKas on 6/28/2018 */ @SuppressWarnings("Duplicates") public class Restaurant { private final Lock kitchen = new ReentrantLock(); private ConcurrentLinkedDeque<String> meetFridge = new ConcurrentLinkedDeque<>();// 肉冰箱 public Runnable cockMeet() { return new Runnable() { @Override public void run() { synchronized (kitchen) { System.out.println("通知厨房作肉"); if (meetFridge.isEmpty()) { try { System.out.println("冰箱没有肉了,等待冰箱有肉"); kitchen.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } String meetNeedToCock = meetFridge.getFirst(); System.out.println("正在炒" + meetNeedToCock); } } }; } public Runnable buySomething() { return new Runnable() { @Override public void run() { synchronized (kitchen) { System.out.println("进货了"); meetFridge.addLast("牛肉"); kitchen.notify(); } } }; } public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10); Restaurant restaurant = new Restaurant(); executorService.execute(restaurant.cockMeet()); executorService.execute(restaurant.cockMeet()); Thread.sleep(1000); executorService.execute(restaurant.buySomething()); Thread.sleep(1000); executorService.execute(restaurant.buySomething()); Thread.sleep(1000); executorService.execute(restaurant.buySomething()); executorService.execute(restaurant.cockMeet()); } }
运行一下main方法,能够获得如下输出:ide
通知厨房作肉 冰箱没有肉了,等待冰箱有肉 通知厨房作肉 冰箱没有肉了,等待冰箱有肉 进货了 正在炒牛肉 进货了 正在炒牛肉 进货了 通知厨房作肉 正在炒牛肉
到这里是没有什么问题的。this
如今咱们既须要作肉,也须要作菜。 也就是说: 一、服务员通知了厨房,须要作一个肉和一个菜。这个时候厨房正好没库存,厨房进行了等待。 二、这时候某人去菜市场买了菜回来,厨房开始作菜。 三、过了一段时间 四、某人去菜市场买了肉回来,厨房开始作肉。线程
/** * Created by Anur IjuoKaruKas on 6/28/2018 */ @SuppressWarnings("Duplicates") public class Restaurant { private final Lock kitchen = new ReentrantLock(); private final Condition waitMeet = kitchen.newCondition(); private final Condition waitVege = kitchen.newCondition(); private ConcurrentLinkedDeque<String> meetFridge = new ConcurrentLinkedDeque<>();// 肉冰箱 private ConcurrentLinkedDeque<String> vegeFridge = new ConcurrentLinkedDeque<>();// 菜冰箱 public Runnable cockMeet() { return new Runnable() { @Override public void run() { kitchen.lock(); try { System.out.println("通知厨房作肉"); if (meetFridge.isEmpty()) { try { System.out.println("冰箱没有肉了,等待冰箱有肉"); waitMeet.await(); // 直接调用condition的wait方法 } catch (InterruptedException e) { e.printStackTrace(); } } String meetNeedToCock = meetFridge.getFirst(); System.out.println("正在炒" + meetNeedToCock); } catch (Exception e) { e.printStackTrace(); } finally { kitchen.unlock(); } } }; } public Runnable cockVege() { return new Runnable() { @Override public void run() { kitchen.lock(); try { System.out.println("通知厨房作菜"); if (vegeFridge.isEmpty()) { try { System.out.println("冰箱没有菜了,等待冰箱有菜"); waitVege.await(); // 直接调用condition的wait方法 } catch (InterruptedException e) { e.printStackTrace(); } } String meetNeedToCock = vegeFridge.getFirst(); System.out.println("正在炒" + meetNeedToCock); } catch (Exception e) { e.printStackTrace(); } finally { kitchen.unlock(); } } }; } public Runnable buySomething() { return new Runnable() { @Override public void run() { kitchen.lock(); try { Random random = new Random(); if (random.nextBoolean()) { System.out.println("肉进货了"); meetFridge.addLast("牛肉"); waitMeet.signal(); } else { System.out.println("菜进货了"); vegeFridge.addLast("苦瓜"); waitVege.signal(); } } catch (Exception e) { e.printStackTrace(); } finally { kitchen.unlock(); } } }; } public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10); Restaurant restaurant = new Restaurant(); executorService.execute(restaurant.cockMeet()); executorService.execute(restaurant.cockVege()); executorService.execute(restaurant.buySomething()); } }
最后输出:rest
通知厨房作肉 冰箱没有肉了,等待冰箱有肉 通知厨房作菜 冰箱没有菜了,等待冰箱有菜 肉进货了 正在炒牛肉
可见咱们能够针对状况对不一样的行为进行通知,这就是condition的力量。code
这里就不瞎扯场景了,直接上代码。队列
这是仿kafka BufferPool的一种思路,(固然没kafka实现的那么复杂),它的思路是使用一个队列来管理等待的线程。 每次线程进来sout(),都进行等待 知足必定的条件时,mission()会通知队头的一个线程进行操做。资源
/** * Created by Anur IjuoKaruKas on 6/25/2018 */ public class Task { private Deque<Condition> waiters = new ArrayDeque<>(); private Lock lock = new ReentrantLock(); private Integer count = 0; private void sout(String str) { this.lock.lock(); try { System.out.println("sout " + str + " get the lock"); Condition condition = this.lock.newCondition(); waiters.addLast(condition); condition.await(); Condition conditionFromWaiters = waiters.removeFirst(); if (conditionFromWaiters != condition) { System.out.println("???????"); } System.out.println("Test Task: " + str); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } private void mission() { this.lock.lock(); try { System.out.println("mission get the lock"); while (count < 10) { count++; } Condition condition = waiters.peekFirst(); if (condition != null) { condition.signal(); } count = 0; } finally { lock.unlock(); } } public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(10); final Task task = new Task(); for (int i = 0; i < 1000000; i++) { final int finalI = i; executorService.execute(new Runnable() { @Override public void run() { task.sout(finalI + ""); } }); executorService.execute(new Runnable() { @Override public void run() { task.mission(); } }); } } }