多线程并发修改一个数据结构,很容易破坏这个数据结构,如散列表。锁可以保护共享数据结构,但选择线程安全的实现更好更容易,如阻塞队列就是线程安全的集合。java
Vector
和HashTable
类提供了线程安全的动态数组和散列表,而ArrayList
和HashMap
却不是线程安全的。git
java.util.concurrent
包提供了映射表、有序集、队列的高效实现,如:github
ConcurrentLinkedQueue
:多线程安全访问,无边界,非阻塞,队列;数组
ConcurrentHashMap
:多线程安全访问,散列映射表,初始容量默认16,调整因子默认0.75。安全
并发的散列映射表ConcurrentHashMap
提供原子性的关联插入putIfAbsent(key, value)
和关联删除removeIfPresent(key, value)
。写数组的拷贝CopyOnWriteArrayList
和CopyOnWriteArraySet
是线程安全的集合,全部的修改线程会对底层数组进行复制。对于常常被修改的数据列表,使用同步的ArrayList
性能赛过CopyOnWriteArrayList
。数据结构
对于线程安全的集合,返回的是弱一致性的迭代器:多线程
迭代器不必定能反映出构造后的全部修改;并发
迭代器不会将同一个值返回两次;框架
迭代器不会抛出ConcurrentModificationException
异常。dom
一般线程安全的集合可以高效的支持大量的读者和必定数量的写者,当写者线程数目大于设定值时,后来的写者线程会被暂时阻塞。而对于大多数线程安全的集合,size()
方法通常没法在常量时间完成,通常须要遍历整个集合才能肯定大小。
任何集合类使用同步包装器都会变成线程安全的,会将集合的方法使用锁加以保护,保证线程的安全访问。使用同步包装器时要确保没有任何线程经过原始的非同步方法访问数据结构,也能够说确保不存在任何指向原始对象的引用,能够采用下面构造一个集合并当即传递给包装器的方法定义。
List<E> synchArrayList = Collections.synchronizedList(new ArrayList<E>()); Map<K, V> synchHashMap = Collections.synchronizedMap(new HashMap<K, V>());
固然最好使用java.util.concurrent
包中定义的集合,同步包装器并无太多安全和性能上的优点。
Callable
与Runnable
相似,均可以封装一个异步执行的任务,可是Callable
有返回值。Callabele<T>
接口是一个参数化的类型,只有一个方法call()
,类型参数就是返回值的类型。Future
用来保存异步计算的结果,用get()
方法获取结果。get()
方法的调用会被阻塞,直到计算完成。有超时参数的get()
方法超时时会抛出TimeoutException
异常。
FutureTask
可将Callable
转换成Future
和Runnable
,实现了二者的接口。
Callable<Integer> myComputation = new MyComputationCallable(); FutureTask<Integer> task = new FutureTask<Integer>(myComputation); Thread t = new Thread(task); // it's a Runnable t.start(); Integer result = task.get(); // it's a Future
这里有一个计算指定目录及其子目录下与关键字匹配的文件数目的例子,涉及到Callable
、FutureTask
、Future
的使用。
public Integer call() { count = 0; try { File [] files = directory.listFiles(); List<Future<Integer>> results = new ArrayList<>(); for (File file : files) { if (file.isDirectory()) { MatchCounter counter = new MatchCounter(file, keyword); FutureTask<Integer> task = new FutureTask<>(counter); results.add(task); Thread t = new Thread(task); t.start(); } else { if (search(file)) { count++; } } } for (Future<Integer> result : results) { try { count += result.get(); } catch (ExecutionException e) { e.printStackTrace(); } } } catch (InterruptedException e) { ; } return count; }
构建一个新的线程是有代价的,涉及到与操做系统的交互。对于程序中须要建立大量生命期很短的线程,应该使用线程池。线程池中的线程执行完毕并不会立刻死亡,而是在池中准备为下一个请求提供服务。固然使用线程池还能够限制并发线程的数目。
须要调用执行器Executors
的静态工厂方法来构建线程池,下面的方法返回的是ExecutorService
接口的ThreadPoolExecutor
类的对象。
Executors.newCachedThreadPool
:线程空闲60秒后终止,如有空闲线程当即执行任务,若无则建立新线程。
Executors.newFixedThreadPool
:池中线程数由参数指定,固定大小,剩余任务放置在队列。
使用submit()
方法,将Runnable
对象或Callable
对象提交给线程池ExecutorService
,任务什么时候执行由线程池决定。调用submit()
方法,会返回一个Future
对象,用来查询任务状态或结果。当用完线程池时,要记得调用shutdown()
关闭,会在全部任务执行完后完全关闭。相似的调用shutdownNow
,可取消还没有开始的任务并试图终端正在运行的线程。
线程池的使用步骤大体以下:
调用Executors
类的静态方法newCachedThreadPool()
或newFixedThreadPool()
;
调用submit()
提交Runnable
或Callable
对象;
若是提交Callable
对象,就要保存好返回的Future
对象;
线程池用完时,调用shutdown()
。
对于以前提到的计算文件匹配数的例子,须要产生大量生命期不少的线程,可使用一个线程池来运行任务,完整代码在这里。
public Integer call() { count = 0; try { File [] files = directory.listFiles(); List<Future<Integer>> results = new ArrayList<>(); for (File file : files) { if (file.isDirectory()) { MatchCounter counter = new MatchCounter(file, keyword, pool); Future<Integer> result = pool.submit(counter); results.add(result); } else { if (search(file)) { count++; } } } for (Future<Integer> result : results) { try { count += result.get(); } catch (ExecutionException e) { e.printStackTrace(); } } } catch (InterruptedException e) { ; } return count; }
对于多线程程序,有些应用使用了大量线程,但其中大多数都是空闲的。还有些应用须要完成计算密集型任务,Fork-Join框架专门用来支持这类任务。使用Fork-Join框架解决思路大体是分治的思想,采用递归计算再合并结果。只需继承RecursiveTask<T>
类,并覆盖compute()
方法。invokeAll()
方法接收不少任务并阻塞,直到这些任务完成,join()
方法将生成结果。
对于问题,统计数组中知足某特性的元素个数,使用Fork-Join框架是很合适的。
import java.util.concurrent.*; public class ForkJoinTest { public static void main(String [] args) { final int SIZE = 10000000; double [] numbers = new double[SIZE]; for (int i = 0; i < SIZE; i++) { numbers[i] = Math.random(); } Counter counter = new Counter(numbers, 0, numbers.length, new Filter() { public boolean accept(double x) { return x > 0.5; } }); ForkJoinPool pool = new ForkJoinPool(); pool.invoke(counter); System.out.println(counter.join()); } } interface Filter { boolean accept(double t); } class Counter extends RecursiveTask<Integer> { private final int THRESHOLD = 1000; private double [] values; private int from; private int to; private Filter filter; public Counter(double [] values, int from, int to, Filter filter) { this.values = values; this.from = from; this.to = to; this.filter = filter; } public Integer compute() { if (to - from < THRESHOLD) { int count = 0; for (int i = from; i < to; i++) { if (filter.accept(values[i])) { count++; } } return count; } else { int mid = (from + to) / 2; Counter first = new Counter(values, from, mid, filter); Counter second = new Counter(values, mid, to, filter); invokeAll(first, second); return first.join() + second.join(); } } }
另外,Fork-Join框架使用工做密取来平衡可用线程的工做负载,比手工多线程强多了。