Lock接口主要操做类是ReentrantLock
,能够起到synchronized的做用,另外也提供额外的功能。
用Lock重写上一篇中的死锁例子java
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Resource { Lock lock=new ReentrantLock(); int num=0; void doSome(){ } public void deal(Resource res){ while(true){ boolean mylock=this.lock.tryLock();//尝试得到当前Resource的锁定 boolean resLock=res.lock.tryLock();//尝试得到传入的Resource的锁定 try{ if(mylock&&resLock){ res.doSome(); System.out.println(res+":"+this.num); break;//退出循环 } }finally{ if(mylock) this.lock.unlock(); if(resLock) res.lock.unlock(); } } } }
重写后不会出现死锁的缘由在于,当没法同时得到两个锁定时,干脆释放已得到的锁定。
上面代码使用当前Resource的Lock的tryLock()方法尝试得到锁定,以及传入Resource的Lock的tryLock()方法尝试得到锁定。只有当能够得到两个Resource的锁定,才能执行res.doSome().最后不管什么状况,都要finally解除锁定。segmentfault
ReadWriteLock接口定义了读取锁定和写入锁定的行为。可使用readLock()
,writeLock()
方法返回Lock操做对象。ReentrantReadWriteLock
是ReadWriteLock接口的主要操做类.ReentrantReadWriteLock.readLock
操做Lock接口,调用其lock()方法时,若没有任何ReentrantReadWriteLock.writeLock
调用过lock()方法,也就是没有任何写入锁定时,才能够取得读取锁定。
下面用ReadWriteLock
试着写一个ArrayListapi
import java.util.Arrays; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class MyArrayList<T> { private ReadWriteLock lock=new ReentrantReadWriteLock(); private Object[] list; private int next=0; public MyArrayList(){ list=new Object[16]; } public void add(T obj){ try{ lock.writeLock().lock();//获取写入锁定 if(next==list.length) list=Arrays.copyOf(list, list.length*2); list[next++]=obj; }finally{ lock.writeLock().unlock();//解除写入锁定 } } @SuppressWarnings("unchecked") public T get(int index){ try{ lock.readLock().lock();//获取读取锁定 return (T) list[index]; }finally{ lock.readLock().unlock();//解除读取锁定 } } public int size(){ try{ lock.readLock().lock(); return next; }finally{ lock.readLock().unlock(); } } }
重写后的效果是dom
如有线程调用add()方法进行写入操做,先得到写入锁定。这时若是有其余线程准备得到写入锁定或读取锁定,都必须等待前面的写入锁定解除。ide
如有线程调用get()方法进行读取操做,先得到读取锁定。这时若是有其余线程准备得到读取锁定,则能够得到;但若是是准备得到写入锁定,仍然要等待全部读取锁定解除。this
使用ReadWriteLock
的好处在于,若是有两个线程都想调用get()和size()方法,因为锁定的关系,其中一个线程只能等到另外一个线程解除锁定。然而,这两个方法都只是读取对象状态,若是只是读取操做,就能够容许线程并行,这样读取效率将会提升。spa
Condition接口用来搭配Lock,最基本的用法就是达到Object的wait(),notify(),notifyAll()方法的做用。
下面用wait(),notify(),notifyAll()实现生产者与消费者.线程
店员从生产者得到生产出的商品,消费者从店员取走商品3d
若生产者生产速度较快,店员那可能有不少商品,店员会叫生产者停一下。过一段时间,店员那商品很少了,再通知生产者继续生产code
若消费者取走速度过快,店员那可能没有商品可供取走,店员会叫消费者停一下。过一段时间,店员那有商品了,再通知消费者过来取
这里假定店员那最多只能放一件商品
public class Producer implements Runnable{ private Clerk clerk; public Producer(Clerk clerk){ this.clerk=clerk; } @Override public void run() { for(int i=0;i<10;i++){ try { Thread.sleep((int)Math.random()*3000); } catch (InterruptedException e) { } clerk.setProduct(i); } } }
public class Consumer implements Runnable{ private Clerk clerk; public Consumer(Clerk clerk){ this.clerk=clerk; } @Override public void run() { for(int i=0;i<10;i++){ try { Thread.sleep((int)Math.random()*3000); } catch (InterruptedException e) { } clerk.getProduct(); } } }
public class Clerk extends Thread{ private int product=-1;//没有商品 public synchronized void setProduct(int product){ while(this.product!=-1){ try { wait();//店员那有商品,生产者停一下 } catch (InterruptedException e) { } } this.product=product; System.out.println("生产者生产商品"+this.product); notify();//通知等待集合(唤醒的多是消费者,也多是生产者) } public synchronized int getProduct(){ while(this.product==-1){ try { wait();//店员没有商品,消费者停一下 } catch (InterruptedException e) { } } int p=this.product; System.out.println("消费者消费商品"+this.product); this.product=-1;//商品已经被取走 notify(); return p; } public static void main(String[] args){ Clerk clerk=new Clerk(); new Thread(new Producer(clerk)).start(); new Thread(new Consumer(clerk)).start(); } }
生产者生产商品0 消费者消费商品0 生产者生产商品1 消费者消费商品1 生产者生产商品2 消费者消费商品2 生产者生产商品3 消费者消费商品3 生产者生产商品4 消费者消费商品4 生产者生产商品5 消费者消费商品5 生产者生产商品6 消费者消费商品6 生产者生产商品7 消费者消费商品7 生产者生产商品8 消费者消费商品8 生产者生产商品9 消费者消费商品9
如今用Condition接口重写
public class Clerk { private int product=-1;//没有商品 Lock lock=new ReentrantLock(); private Condition condition=lock.newCondition(); public void setProduct(int product){ try{ lock.lock(); while(this.product!=-1){ try { condition.await();//店员那有商品,生产者停一下 } catch (InterruptedException e) { } } this.product=product; System.out.println("生产者生产商品"+this.product); condition.signal();//通知等待集合(唤醒的多是消费者,也多是生产者) }finally{ lock.unlock(); } } public int getProduct(){ try{ lock.lock(); while(this.product==-1){ try { condition.await();//店员没有商品,消费者停一下 } catch (InterruptedException e) { } } int p=this.product; System.out.println("消费者消费商品"+this.product); this.product=-1;//商品已经被取走 condition.signal(); return p; }finally{ lock.unlock(); } } public static void main(String[] args){ Clerk clerk=new Clerk(); new Thread(new Producer(clerk)).start(); new Thread(new Consumer(clerk)).start(); } }
注意在多个生产者,消费者线程的状况下,等待集合中二者都会有,而condition.signal()从等待集合中唤醒的具体对象是不肯定的。有可能消费者取走商品后,唤醒的仍是消费者,这时,消费者又会执行while循环,进入等待集合。
事实上,一个Condition对象能够表示一个等待集合。这样上面例子,能够有两个等待集合,一个给消费者用,一个给生产者用。生产者只会通知消费者的等待集合,消费者也只会通知生产者的等待集合。这样效率会高些。
public class Clerk { ... private Condition producerCondition=lock.newCondition();//生产者的等待集合 private Condition consumerCondition=lock.newCondition();//消费者的等待集合 public void setProduct(int product){ try{ lock.lock(); while(this.product!=-1){ try { producerCondition.await();//店员那有商品,生产者停一下 } catch (InterruptedException e) { } } this.product=product; System.out.println("生产者生产商品"+this.product); consumerCondition.signal();//唤醒消费者等待集合 }finally{ lock.unlock(); } } public int getProduct(){ try{ lock.lock(); while(this.product==-1){ try { consumerCondition.await();//店员没有商品,消费者停一下 } catch (InterruptedException e) { } } int p=this.product; System.out.println("消费者消费商品"+this.product); this.product=-1;//商品已经被取走 producerCondition.signal();//唤醒生产者等待集合 return p; }finally{ lock.unlock(); } } ... }
定义Executor接口的目的是将Runnable的指定与如何执行分离。它只定义了一个execute()方法。
public class Page{ private Executor executor; public Page(Executor executor){ this.executor=executor; } ... public void method1(){ ... executor.execute(new Runnable(){ @Override public void run(){ ... } }); ... }
}
public class DirectExecutor implements Executor{ public void execute(Runnable r){ r.run(); } }
调用
new Page(new DirectExecutor()).method1();
Executor api
像线程池这类服务,其实是定义在Executor接口的子接口ExecutorService中。通用的ExecutorService由抽象类AbstractExecutorService操做,若是须要线程池功能,可使用其子类ThreadPoolExecutor.
重写上面executor例子
ExecutorService executorService=Executors.newCachedThreadPool(); new Page(executorService).method1(); executorService.shutdown();//在指定执行的Runnable都完成后,将ExecutorService关闭
ExecutorService还定义了submit(),invokeAll(),invokeAny()等方法,这些方法出如今java.util.concurrent.Future
,java.util.concurrent.Callable
接口
Future定义的行为就是让你在未来取得结果。你能够将想执行的工做交给Future,Future会使用另外一个线程处理,你能够先作别的事情。过些时候,再调用Future的get()得到结果。
若是结果已经产生,get()会直接返回,不然会进入阻塞状态直到结果返回。get()的另外一种重载方法能够指定等待结果的时间,若指定时间内结果还没产生,则抛出TimeoutException异常。也可使用Future的isDone()方法看结果是否产生。
Future常常与Callable一块儿使用,Callable的做用与Runnable类似,都是用来定义执行的流程。
Runnable的run()方法无返回值,也没法抛出异常
Callable的call()方法能够有返回值,也能够抛出异常
FutureTask
是Future的操做类,建立时可传入Callable对象指定执行的流程
import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; public class FutureTaskDemo { public static int fib(int n){ return n<=1?n:fib(n-1)+fib(n-2); } public static void main(String[] args){ FutureTask<Integer> task=new FutureTask<Integer>( new Callable<Integer>(){ @Override public Integer call() throws Exception { return fib(30); } } ); new Thread(task).start(); try { Thread.sleep(3000); System.out.println(task.get()); } catch (InterruptedException|ExecutionException e) { } } }
FutureTask构造类
FutureTask实现RunnableFuture接口
,RunnableFuture接口
继承Runnable,Future接口。因此能够new Thread(task).
ExecutorService的submit()方法也能够接受Callable对象,调用后返回Future对象。
ExecutorService service=Executors.newCachedThreadPool(); Future<Integer> future=service.submit(new Callable<Integer>(){ @Override public Integer call() throws Exception { return fib(30); } });
若是有多个Callable,能够先将它们收集到Collection中,而后调用ExecutorService的invokeAll()方法,返回List<Future>
若是有多个Callable,要求其中只要有一个执行完成就好了,则能够先将它们收集到Collection中,而后调用ExecutorService的invokeAny()方法
ScheduledThreadPoolExecutor用来进行工做排程,其中的schedule()方法用来排定Runnable或Callable实例延迟多久执行一次,并返回Future子接口ScheduledFuture的实例。
import java.util.Date; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class ScheduledExecution { public static void main(String[] args){ ScheduledExecutorService service=Executors.newSingleThreadScheduledExecutor(); service.scheduleWithFixedDelay(new Runnable(){ public void run(){ System.out.println(new Date()); try { Thread.sleep(2000);//假设工做会执行2s } catch (InterruptedException e) { } } }, 2000, 1000, TimeUnit.MILLISECONDS); } }
Sat Oct 24 17:11:59 CST 2015 Sat Oct 24 17:12:02 CST 2015 Sat Oct 24 17:12:05 CST 2015 Sat Oct 24 17:12:08 CST 2015 Sat Oct 24 17:12:11 CST 2015
能够看到,输出两两间相差3s.scheduleWithFixedDelay()
方法参数
若是把方法换成scheduleAtFixedRate()
Sat Oct 24 17:28:28 CST 2015 Sat Oct 24 17:28:30 CST 2015 Sat Oct 24 17:28:32 CST 2015 Sat Oct 24 17:28:34 CST 2015
每次排定的执行周期是1s,可是工做执行的时间是2s,会超过排定的执行周期,因此输出两两间相差2s。
Future的另外一个操做类ForkJoinTask
,与ExecutorService的另外一个操做类ForkJoinPool
有关,它们都是jdk7新增的api,用来解决分而治之的问题。
ForkJoinTask
操做Future接口,能够在将来得到耗时工做的执行结果
ForkJoinPool
管理ForkJoinTask
,调用fork()方法,可让另外一个线程执行ForkJoinTask
若是要得到ForkJoinTask
的执行结果,能够调用join()方法。若是执行结果还没产生,会阻塞直至有执行结果返回
使用ForkJoinTask
的子类RecursiveTask
,它是个抽象类,使用时必须继承它,并操做compute()方法。
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class FibDemo extends RecursiveTask<Integer>{ final int n; FibDemo(int n){ this.n=n; } public static int fib(int n){ return n<=1?n:fib(n-1)+fib(n-2); } @Override protected Integer compute() { if(n<=10){ return fib(n); } FibDemo f1=new FibDemo(n-1); f1.fork();//ForkJoinPool分配线程执行子任务 FibDemo f2=new FibDemo(n-2); return f2.compute()+f1.join();//执行f2子任务+得到f1子任务进行完成的结果 } public static void main(String[] args){ FibDemo fib=new FibDemo(40); ForkJoinPool pool=new ForkJoinPool(); System.out.println(pool.invoke(fib)); } }