Guava集合处理是很强大的(这些在jdk8中都有些引入),但Guava发光的地方是并发。java
Monitor实现同步编程
/** * 经过Monitor的Guard进行条件阻塞 */ public class MonitorSample { private List<String> list = new ArrayList<String>(); private static final int MAX_SIZE = 10; private Monitor monitor = new Monitor(); private Monitor.Guard listBelowCapacity = new Monitor.Guard(monitor) { @Override public boolean isSatisfied() { return list.size() < MAX_SIZE; } }; public void addToList(String item) throws InterruptedException { monitor.enterWhen(listBelowCapacity); //Guard(形如Condition),不知足则阻塞,并且咱们并无在Guard进行任何通知操做 try { list.add(item); } finally { monitor.leave(); } } }
Monitor就像java本土的synchronized, ReentrantLock同样,每次只运行一个线程占用,且可重占用,每一次占用会对应一次退出占用。并发
就如上面,咱们经过if条件来判断是否可进入Monitor代码块,并再try/finally中释放:app
if (monitor.enterIf(guardCondition)) { try { doWork(); } finally { monitor.leave(); } }
Monitor.enter //进入Monitor块,将阻塞其余线程知道Monitor.leave Monitor.tryEnter //尝试进入Monitor块,true表示能够进入, false表示不能,而且不会一直阻塞 Monitor.tryEnterIf //根据条件尝试进入Monitor块
这些方法都有对应的限时版本。异步
jdk5以后有了Future这种异步执行的结构ide
ExecutorService executor = Executors.newCachedThreadPool(); Future<Integer> future = executor.submit(new Callable<Integer>(){ public Integer call() throws Exception{ return service.getCount(); } }); //Retrieve the value of computation Integer count = future.get();
ListenableFuture对Future进行了扩展,容许注册一个回调函数,task执行完后自动调用。函数式编程
获取ListableFuture对象。函数
正如咱们获取Future对象要经过ExecutorService.submit(Callable)来获取同样,咱们能够这样建立ListenableFuture对象:工具
executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(NUM_THREADS)); //包装Executors建立的线程池 ListenableFuture<String> listenableFuture = executorService.submit(new Callable<String>()...); //获取ListableFuture对象 listenableFuture.addListener(new Runnable() { @Override public void run() { methodToRunOnFutureTaskCompletion(); } }, executorService); //注册回调函数
FutureCallback定义了onSuccess和onFailure方法,onSuccess方法会接收一个Future对象,这样咱们就能够获取Future的结果。ui
首先须要一个FutureCallback实现类。
/** * 定义一个FutureCallBack实现类 */ public class FutureCallbackImpl implements FutureCallback<String> { private StringBuilder builder = new StringBuilder(); @Override public void onSuccess(String result) { builder.append(result).append(" successfully"); } @Override public void onFailure(Throwable t) { builder.append(t.toString()); } public String getCallbackResult() { return builder.toString(); } }
使用实例:
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); ListenableFuture<String> futureTask = executorService.submit(new Callable<String>() { //建立ListenaleFuture对象 @Override public String call() throws Exception { return "Task completed"; } }); FutureCallbackImpl callback = new FutureCallbackImpl(); Futures.addCallback(futureTask, callback); //添加回调 callback.getCallbackResult(); //获取结果
若是CallBack是一个耗时操做,你应该选择另外一个注册CallBack:
Futures.addCallback(futureTask,callback,executorService); //提供另外一个线程池来执行性回调
SettableFuture能够用来设置要返回得值:
SettableFuture<String> sf = SettableFuture.create(); //Set a value to return sf.set("Success"); //Or set a failure Exception sf.setException(someException);
该接口与函数式编程密切相关, 相似Function, 但apply方法会转换成一个ListenableFuture封装的范型对象。
public class AsyncFuntionSample implements AsyncFunction<Long, String> { private ConcurrentMap<Long, String> map = Maps.newConcurrentMap(); private ListeningExecutorService listeningExecutorService; @Override public ListenableFuture<String> apply(final Long input) throws Exception { if (map.containsKey(input)) { SettableFuture<String> listenableFuture = SettableFuture.create(); //构建一个SettableFuture listenableFuture.set(map.get(input)); return listenableFuture; } else { return listeningExecutorService.submit(new Callable<String>() { @Override public String call() throws Exception { String retrieved = //compute to get the data; map.putIfAbsent(input, retrieved); return retrieved; } }); } } }
FutureFallback用于异常恢复的备份
/** * 当Future任务失败后, 做为备份的Future */ public class FutureFallbackImpl implements FutureFallback<String> { @Override public ListenableFuture<String> create(Throwable t) throws Exception { if (t instanceof FileNotFoundException) { SettableFuture<String> settableFuture = SettableFuture.create(); settableFuture.set("Not Found"); return settableFuture; } throw new Exception(t); } }
Futures类是有关Future实例的一个工具类。
ListenableFuture<Person> lf = Futures.transform(ListenableFuture<String> f,AsyncFunction<String,Person> af);
ListenableFuture<String> lf = Futures.withFallback(ListenableFuture<String> f,FutureFallback<String> fb);
RateLimiter限制访问每秒访问资源的线程数。有点相似信号量Semaphore。
RateLimiter limiter = RateLimiter.create(4.0); //每秒不超过4个任务被提交
limiter.acquire(); //请求RateLimiter, 超过permits会被阻塞 executor.submit(runnable); //提交任务
也有非阻塞式地尝试:
if(limiter.tryAcquire()){ //未请求到limiter则当即返回false doSomething(); }else{ doSomethingElse(); }