原文地址: Java 8 Concurrency Tutorial: Atomic Variables and ConcurrentMap
java.concurrent.atomic
包下有不少原子操做的类。 在有些状况下,原子操做能够在不使用 synchronized
关键字和锁的状况下解决多线程安全问题。html
在内部,原子类大量使用 CAS
, 这是大多数如今 CPU 支持的原子操做指令, 这些指令一般状况下比锁同步要快得多。若是须要同时改变一个变量, 使用原子类是极其优雅的。java
如今选择一个原子类 AtomicInteger
做为例子web
AtomicInteger atomicInt = new AtomicInteger(0); ExecutorService executor = Executors.newFixedThreadPool(2); IntStream.range(0, 1000) .forEach(i -> executor.submit(atomicInt::incrementAndGet)); stop(executor); System.out.println(atomicInt.get()); // => 1000
使用 AtomicInteger
代替 Integer
能够在线程安全的环境中增长变量, 而不要同步访问变量。incrementAndGet()
方法是一个原子操做, 咱们能够在多线程中安全的调用。编程
AtomicInteger
支持多种的原子操做, updateAndGet()
方法接受一个 lambda
表达式,以便对整数作任何的算术运算。api
AtomicInteger atomicInt = new AtomicInteger(0); ExecutorService executor = Executors.newFixedThreadPool(2); IntStream.range(0, 1000) .forEach(i -> { Runnable task = () -> atomicInt.updateAndGet(n -> n + 2); executor.submit(task); }); stop(executor); System.out.println(atomicInt.get()); // => 2000
accumulateAndGet()
方法接受一个 IntBinaryOperator
类型的另外一种 lambda
表达式, 咱们是用这种方法来计算 1 -- 999 的和:安全
AtomicInteger atomicInt = new AtomicInteger(0); ExecutorService executor = Executors.newFixedThreadPool(2); IntStream.range(0, 1000) .forEach(i -> { Runnable task = () -> atomicInt.accumulateAndGet(i, (n, m) -> n + m); executor.submit(task); }); stop(executor); System.out.println(atomicInt.get()); // => 499500
还有一些其余的原子操做类: AtomicBoolean AtomicLong AtomicReference服务器
做为 AtomicLong
的替代, LongAdder
类能够用来连续地向数字添加值。多线程
ExecutorService executor = Executors.newFixedThreadPool(2); IntStream.range(0, 1000) .forEach(i -> executor.submit(adder::increment)); stop(executor); System.out.println(adder.sumThenReset()); // => 1000
LongAdder
类和其余的整数原子操做类同样提供了 add()
和 increment()
方法, 同时也是线程安全的。但其内部的结果不是一个单一的值, 这个类的内部维护了一组变量来减小多线程的争用。实际结果能够经过调用 sum()
和 sumThenReset()
来获取。并发
当来自多线程的更新比读取更频繁时, 这个类每每优于其余的原子类。一般做为统计数据, 好比要统计 web 服务器的请求数量。 LongAdder
的缺点是会消耗更多的内存, 由于有一组变量保存在内存中。oracle
LongAccumulator
是 LongAdder
的一个更通用的版本。它不是执行简单的添加操做, 类 LongAccumulator
围绕 LongBinaryOperator
类型的lambda表达式构建,如代码示例中所示:
LongBinaryOperator op = (x, y) -> 2 * x + y; LongAccumulator accumulator = new LongAccumulator(op, 1L); ExecutorService executor = Executors.newFixedThreadPool(2); IntStream.range(0, 10) .forEach(i -> executor.submit(() -> accumulator.accumulate(i))); stop(executor); System.out.println(accumulator.getThenReset()); // => 2539
咱们使用函数 2 * x + y
和初始值1
建立一个 LongAccumulator
。 每次调用 accumulate(i)
, 当前结果和值i
都做为参数传递给`lambda
表达式。
像 LongAdder
同样, LongAccumulator
在内部维护一组变量以减小对线程的争用。
ConcurrentMap
接口扩展了 Map
接口,并定义了最有用的并发集合类型之一。 Java 8
经过向此接口添加新方法引入了函数式编程。
在下面的代码片断中, 来演示这些新的方法:
ConcurrentMap<String, String> map = new ConcurrentHashMap<>(); map.put("foo", "bar"); map.put("han", "solo"); map.put("r2", "d2"); map.put("c3", "p0");
forEach()
接受一个类型为 BiConsumer
的 lambda
表达式, 并将 map
的 key
和 value
做为参数传递。
map.forEach((key, value) -> System.out.printf("%s = %s\n", key, value));
putIfAbsent()
方法只有当给定的 key
不存在时才将数据存入 map
中, 这个方法和 put
同样是线程安全的, 当多个线程访问 map
时不要作同步操做。
String value = map.putIfAbsent("c3", "p1"); System.out.println(value); // p0
getOrDefault()
方法返回给定 key
的 value
, 当 key
不存在时返回给定的值。
String value = map.getOrDefault("hi", "there"); System.out.println(value); // there
replaceAll()
方法接受一个 BiFunction
类型的 lambda
表达式, 并将 key
和 value
做为参数传递,用来更新 value
。
map.replaceAll((key, value) -> "r2".equals(key) ? "d3" : value); System.out.println(map.get("r2")); // d3
compute()
方法和 replaceAll()
方法有些相同, 不一样的是它多一个参数, 用来更新指定 key
的 value
map.compute("foo", (key, value) -> value + value); System.out.println(map.get("foo")); // barbar
以上全部方法都是 ConcurrentMap
接口的一部分,所以可用于该接口的全部实现。 此外,最重要的实现 ConcurrentHashMap
已经进一步加强了一些新的方法来在 Map
上执行并发操做。
就像并行流同样,这些方法在 Java 8
中经过 ForkJoinPool.commonPool()
提供特殊的 ForkJoinPool
。该池使用预设的并行性, 这取决于可用内核的数量。 个人机器上有四个CPU内核能够实现三种并行性:
System.out.println(ForkJoinPool.getCommonPoolParallelism()); // 3
经过设置如下 JVM
参数能够减小或增长此值:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
咱们使用相同的示例来演示, 不过下面使用 ConcurrentHashMap
类型, 这样能够调用更多的方法。
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>(); map.put("foo", "bar"); map.put("han", "solo"); map.put("r2", "d2"); map.put("c3", "p0");
Java 8
引入了三种并行操做:forEach
, search
和 reduce
。 每一个操做都有四种形式, 分别用 key
, value
, entries
和 key-value
来做为参数。
全部这些方法的第一个参数都是 parallelismThreshold
阀值。 该阈值表示操做并行执行时的最小收集大小。 例如, 若是传递的阈值为500
,而且 map
的实际大小为499
, 则操做将在单个线程上按顺序执行。 在下面的例子中,咱们使用一个阈值来强制并行操做。
方法 forEach()
可以并行地迭代 map
的键值对。 BiConsumer
类型的 lambda
表达式接受当前迭代的 key
和 value
。 为了可视化并行执行,咱们将当前线程名称打印到控制台。 请记住,在个人状况下,底层的 ForkJoinPool
最多使用三个线程。
map.forEach(1, (key, value) -> System.out.printf("key: %s; value: %s; thread: %s\n", key, value, Thread.currentThread().getName())); // key: r2; value: d2; thread: main // key: foo; value: bar; thread: ForkJoinPool.commonPool-worker-1 // key: han; value: solo; thread: ForkJoinPool.commonPool-worker-2 // key: c3; value: p0; thread: main
search()
方法接受一个 BiFunction
类型的 lambda
表达式, 它能对 map
作搜索操做, 若是当前迭代不符合所需的搜索条件,则返回 null
。 请记住,ConcurrentHashMap
是无序的。 搜索功能不该该取决于地图的实际处理顺序。 若是有多个匹配结果, 则结果多是不肯定的。
String result = map.search(1, (key, value) -> { System.out.println(Thread.currentThread().getName()); if ("foo".equals(key)) { return value; } return null; }); System.out.println("Result: " + result); // ForkJoinPool.commonPool-worker-2 // main // ForkJoinPool.commonPool-worker-3 // Result: bar
下面是对 value 的搜索
String result = map.searchValues(1, value -> { System.out.println(Thread.currentThread().getName()); if (value.length() > 3) { return value; } return null; }); System.out.println("Result: " + result); // ForkJoinPool.commonPool-worker-2 // main // main // ForkJoinPool.commonPool-worker-1 // Result: solo
reduce()
方法接受两个类型为 BiFunction
的 lambda
表达式。 第一个函数将每一个键值对转换为任何类型的单个值。 第二个函数将全部这些转换后的值组合成一个结果, 其中火忽略 null
值。
String result = map.reduce(1, (key, value) -> { System.out.println("Transform: " + Thread.currentThread().getName()); return key + "=" + value; }, (s1, s2) -> { System.out.println("Reduce: " + Thread.currentThread().getName()); return s1 + ", " + s2; }); System.out.println("Result: " + result); // Transform: ForkJoinPool.commonPool-worker-2 // Transform: main // Transform: ForkJoinPool.commonPool-worker-3 // Reduce: ForkJoinPool.commonPool-worker-3 // Transform: main // Reduce: main // Reduce: main // Result: r2=d2, c3=p0, han=solo, foo=bar