本节咱们先会来复习一下java.util.concurrent下面的一些并发容器,而后再会来简单看一下各类同步器。java
首先,咱们来测试一下ConcurrentHashMap和ConcurrentSkipListMap的性能。 前者对应的非并发版本是HashMap,后者是跳表实现,Map按照Key顺序排序(固然也能够提供一个Comparator进行排序)。git
在这个例子里,咱们不是简单的测试Map读写Key的性能,而是实现一个多线程环境下使用Map最最多见的场景:统计Key出现频次,咱们的Key的范围是1万个,而后循环1亿次(也就是Value平均也在1万左右),10个并发来操做Map:github
@Slf4j
public class ConcurrentMapTest {
int loopCount = 100000000;
int threadCount = 10;
int itemCount = 10000;
@Test
public void test() throws InterruptedException {
StopWatch stopWatch = new StopWatch();
stopWatch.start("hashmap");
normal();
stopWatch.stop();
stopWatch.start("concurrentHashMap");
concurrent();
stopWatch.stop();
stopWatch.start("concurrentSkipListMap");
concurrentSkipListMap();
stopWatch.stop();
log.info(stopWatch.prettyPrint());
}
private void normal() throws InterruptedException {
HashMap<String, Long> freqs = new HashMap<>();
ForkJoinPool forkJoinPool = new ForkJoinPool(threadCount);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, loopCount).parallel().forEach(i -> {
String key = "item" + ThreadLocalRandom.current().nextInt(itemCount);
synchronized (freqs) {
if (freqs.containsKey(key)) {
freqs.put(key, freqs.get(key) + 1);
} else {
freqs.put(key, 1L);
}
}
}
));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
//log.debug("normal:{}", freqs);
}
private void concurrent() throws InterruptedException {
ConcurrentHashMap<String, LongAdder> freqs = new ConcurrentHashMap<>(itemCount);
ForkJoinPool forkJoinPool = new ForkJoinPool(threadCount);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, loopCount).parallel().forEach(i -> {
String key = "item" + ThreadLocalRandom.current().nextInt(itemCount);
freqs.computeIfAbsent(key, k -> new LongAdder()).increment();
}
));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
//log.debug("concurrentHashMap:{}", freqs);
}
private void concurrentSkipListMap() throws InterruptedException {
ConcurrentSkipListMap<String, LongAdder> freqs = new ConcurrentSkipListMap<>();
ForkJoinPool forkJoinPool = new ForkJoinPool(threadCount);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, loopCount).parallel().forEach(i -> {
String key = "item" + ThreadLocalRandom.current().nextInt(itemCount);
freqs.computeIfAbsent(key, k -> new LongAdder()).increment();
}
));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
//log.debug("concurrentSkipListMap:{}", freqs);
}
}
复制代码
这里能够看到,这里的三种实现:安全
运行结果以下: bash
能够看到咱们利用ConcurrentHashMap巧妙实现的并发词频统计功能,其性能相比有锁的版本高了太多。 值得注意的是,ConcurrentSkipListMap的containsKey、get、put、remove等相似操做时间复杂度是log(n),加上其有序性,因此性能和ConcurrentHashMap有差距。微信
若是咱们打印一下ConcurrentSkipListMap最后的结果,差很少是这样的: 网络
这一节咱们比较一下computeIfAbsent()和putIfAbsent()的区别,这2个方法很容易由于误用致使一些Bug。多线程
写一个程序来验证一下:并发
@Slf4j
public class PutIfAbsentTest {
@Test
public void test() {
ConcurrentHashMap<String, String> concurrentHashMap = new ConcurrentHashMap<>();
log.info("Start");
log.info("putIfAbsent:{}", concurrentHashMap.putIfAbsent("test1", getValue()));
log.info("computeIfAbsent:{}", concurrentHashMap.computeIfAbsent("test1", k -> getValue()));
log.info("putIfAbsent again:{}", concurrentHashMap.putIfAbsent("test2", getValue()));
log.info("computeIfAbsent again:{}", concurrentHashMap.computeIfAbsent("test2", k -> getValue()));
}
private String getValue() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return UUID.randomUUID().toString();
}
}
复制代码
在这里获取值的操做须要1s,从运行结果能够看到,第二次值已经存在的时候,putIfAbsent还耗时1s,而computeIfAbsent不是,并且还能够看到第一次值不存在的时候putIfAbsent返回了null,而computeIfAbsent返回了计算后的值:框架
使用的时候必定须要根据本身的需求来使用合适的方法。
以前的例子里咱们用到了ThreadLocalRandom,这里简单提一下ThreadLocalRandom可能的误用:
@Slf4j
public class ThreadLocalRandomMisuse {
@Test
public void test() throws InterruptedException {
ThreadLocalRandom threadLocalRandom = ThreadLocalRandom.current();
IntStream.rangeClosed(1, 5)
.mapToObj(i -> new Thread(() -> log.info("wrong:{}", threadLocalRandom.nextInt())))
.forEach(Thread::start);
IntStream.rangeClosed(1, 5)
.mapToObj(i -> new Thread(() -> log.info("ok:{}", ThreadLocalRandom.current().nextInt())))
.forEach(Thread::start);
TimeUnit.SECONDS.sleep(1);
}
}
复制代码
一句话而言,咱们应该每次都ThreadLocalRandom.current().nextInt()这样用而不是实例化了ThreadLocalRandom.current()每次调用nextInt()。观察一下两次输出能够发现,wrong的那5次获得的随机数都是同样的:
ConcurrentHashMap提供了比较高级的一些方法能够进行并发的归并操做,咱们写一段程序比较一下使用遍历方式以及使用reduceEntriesToLong()统计ConcurrentHashMap中全部值的平均数的性能和写法上的差别:
@Slf4j
public class ConcurrentHashMapReduceTest {
int loopCount = 100;
int itemCount = 10000000;
@Test
public void test() {
ConcurrentHashMap<String, Long> concurrentHashMap = LongStream.rangeClosed(1, itemCount)
.boxed()
.collect(Collectors.toMap(i -> "item" + i, Function.identity(),(o1, o2) -> o1, ConcurrentHashMap::new));
StopWatch stopWatch = new StopWatch();
stopWatch.start("normal");
normal(concurrentHashMap);
stopWatch.stop();
stopWatch.start("concurrent with parallelismThreshold=1");
concurrent(concurrentHashMap, 1);
stopWatch.stop();
stopWatch.start("concurrent with parallelismThreshold=max long");
concurrent(concurrentHashMap, Long.MAX_VALUE);
stopWatch.stop();
log.info(stopWatch.prettyPrint());
}
private void normal(ConcurrentHashMap<String, Long> map) {
IntStream.rangeClosed(1, loopCount).forEach(__ -> {
long sum = 0L;
for (Map.Entry<String, Long> item : map.entrySet()) {
sum += item.getValue();
}
double average = sum / map.size();
Assert.assertEquals(itemCount / 2, average, 0);
});
}
private void concurrent(ConcurrentHashMap<String, Long> map, long parallelismThreshold) {
IntStream.rangeClosed(1, loopCount).forEach(__ -> {
double average = map.reduceEntriesToLong(parallelismThreshold, Map.Entry::getValue, 0, Long::sum) / map.size();
Assert.assertEquals(itemCount / 2, average, 0);
});
}
}
复制代码
执行结果以下:
固然,咱们这里只演示了reduceEntriesToLong()一个方法,ConcurrentHashMap还有十几种各类reduceXXX()用于对Key、Value和Entry进行并行归并操做。
其实这里想说的以前的文章中也提到过,ConcurrentHashMap不能确保多个针对Map的操做是原子性的(除非是以前提到computeIfAbsent()和putIfAbsent()等等),好比在下面的例子里,咱们有一个9990大小的ConcurrentHashMap,有多个线程在计算它离10000满员还有多少差距,而后填充差距:
@Test
public void test() throws InterruptedException {
int limit = 10000;
ConcurrentHashMap<String, Long> concurrentHashMap = LongStream.rangeClosed(1, limit - 10)
.boxed()
.collect(Collectors.toConcurrentMap(i -> UUID.randomUUID().toString(), Function.identity(),
(o1, o2) -> o1, ConcurrentHashMap::new));
log.info("init size:{}", concurrentHashMap.size());
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int __ = 0; __ < 10; __++) {
executorService.execute(() -> {
int gap = limit - concurrentHashMap.size();
log.debug("gap:{}", gap);
concurrentHashMap.putAll(LongStream.rangeClosed(1, gap)
.boxed()
.collect(Collectors.toMap(i -> UUID.randomUUID().toString(), Function.identity())));
});
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.HOURS);
log.info("finish size:{}", concurrentHashMap.size());
}
复制代码
这段代码显然是有问题的:
输出结果以下:
能够看到,有一些线程甚至计算出了负数的gap,最后结果是10040,比预期的limit多了40。
还有一点算不上误用,只是提一下,ConcurrentHashMap的Key/Value不能是null,而HashMap是能够的,为何是这样呢? 下图是ConcurrentHashMap做者的回复:
意思就是若是get(key)返回了null,你搞不清楚这究竟是key没有呢仍是value就是null。非并发状况下你可使用后contains(key)来判断,可是并发状况下不行,你判断的时候可能Map已经修改了。
CopyOnWrite的意义在于几乎没有什么修改,而读并发超级高的场景,若是有修改,咱们重起炉灶复制一份,虽然代价很大,可是这样能让99.9%的并发读实现无锁,咱们来试试其性能,先是写的测试,咱们比拼一下CopyOnWriteArrayList、手动锁的ArrayList以及synchronizedList包装过的ArrayList:
@Test
public void testWrite() {
List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
List<Integer> arrayList = new ArrayList<>();
List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());
StopWatch stopWatch = new StopWatch();
int loopCount = 100000;
stopWatch.start("copyOnWriteArrayList");
IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> copyOnWriteArrayList.add(ThreadLocalRandom.current().nextInt(loopCount)));
stopWatch.stop();
stopWatch.start("arrayList");
IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> {
synchronized (arrayList) {
arrayList.add(ThreadLocalRandom.current().nextInt(loopCount));
}
});
stopWatch.stop();
stopWatch.start("synchronizedList");
IntStream.range(0, loopCount).parallel().forEach(__ -> synchronizedList.add(ThreadLocalRandom.current().nextInt(loopCount)));
stopWatch.stop();
log.info(stopWatch.prettyPrint());
}
复制代码
10万次操做不算多,结果以下:
再来看看读,先使用一个方法来进行1000万数据填充,而后测试,迭代1亿次:
private void addAll(List<Integer> list) {
list.addAll(IntStream.rangeClosed(1, 10000000).boxed().collect(Collectors.toList()));
}
@Test
public void testRead() {
List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
List<Integer> arrayList = new ArrayList<>();
List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());
addAll(copyOnWriteArrayList);
addAll(arrayList);
addAll(synchronizedList);
StopWatch stopWatch = new StopWatch();
int loopCount = 100000000;
int count = arrayList.size();
stopWatch.start("copyOnWriteArrayList");
IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> copyOnWriteArrayList.get(ThreadLocalRandom.current().nextInt(count)));
stopWatch.stop();
stopWatch.start("arrayList");
IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> {
synchronized (arrayList) {
arrayList.get(ThreadLocalRandom.current().nextInt(count));
}
});
stopWatch.stop();
stopWatch.start("synchronizedList");
IntStream.range(0, loopCount).parallel().forEach(__ -> synchronizedList.get(ThreadLocalRandom.current().nextInt(count)));
stopWatch.stop();
log.info(stopWatch.prettyPrint());
}
复制代码
执行结果以下:
看完了大部分的并发容器咱们再来看看五种并发同步器。
CountDownLatch在以前的文章中已经出现过N次了,也是五种并发同步器中使用最最频繁的一种,通常常见的应用场景有:
来看看ResponseFuture的相关代码实现:
public class ResponseFuture {
private final int opaque;
private final Channel processChannel;
private final long timeoutMillis;
private final InvokeCallback invokeCallback;
private final long beginTimestamp = System.currentTimeMillis();
private final CountDownLatch countDownLatch = new CountDownLatch(1);
private final SemaphoreReleaseOnlyOnce once;
private final AtomicBoolean executeCallbackOnlyOnce = new AtomicBoolean(false);
private volatile RemotingCommand responseCommand;
private volatile boolean sendRequestOK = true;
private volatile Throwable cause;
...
public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
return this.responseCommand;
}
public void putResponse(final RemotingCommand responseCommand) {
this.responseCommand = responseCommand;
this.countDownLatch.countDown();
}
...
}
复制代码
在发出网络请求后,咱们等待响应,在收到响应后咱们把数据放入后解锁CountDownLatch,而后等待响应的请求就能够继续拿数据。
Semaphore能够用来限制并发,假设咱们有一个游戏须要限制同时在线的玩家,咱们先来定义一个Player类,在这里咱们经过传入的Semaphore限制进入玩家的数量。 在代码里,咱们经过了以前学习到的AtomicInteger、AtomicLong和LongAdder来统计玩家的总数,最长等待时间和宗等待时长。
@Slf4j
public class Player implements Runnable {
private static AtomicInteger totalPlayer = new AtomicInteger();
private static AtomicLong longestWait = new AtomicLong();
private static LongAdder totalWait = new LongAdder();
private String playerName;
private Semaphore semaphore;
private LocalDateTime enterTime;
public Player(String playerName, Semaphore semaphore) {
this.playerName = playerName;
this.semaphore = semaphore;
}
public static void result() {
log.info("totalPlayer:{},longestWait:{}ms,averageWait:{}ms", totalPlayer.get(), longestWait.get(), totalWait.doubleValue() / totalPlayer.get());
}
@Override
public void run() {
try {
enterTime = LocalDateTime.now();
semaphore.acquire();
totalPlayer.incrementAndGet();
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
long ms = Duration.between(enterTime, LocalDateTime.now()).toMillis();
longestWait.accumulateAndGet(ms, Math::max);
totalWait.add(ms);
//log.debug("Player:{} finished, took:{}ms", playerName, ms);
}
}
}
复制代码
主测试代码以下:
@Test
public void test() throws InterruptedException {
Semaphore semaphore = new Semaphore(10, false);
ExecutorService threadPool = Executors.newFixedThreadPool(100);
IntStream.rangeClosed(1, 10000).forEach(i -> threadPool.execute(new Player("Player" + i, semaphore)));
threadPool.shutdown();
threadPool.awaitTermination(1, TimeUnit.HOURS);
Player.result();
}
复制代码
咱们限制并发玩家数量为10个,非公平进入,线程池是100个固定线程,总共有10000个玩家须要进行游戏,程序结束后输出以下:
CyclicBarrier用来让全部线程彼此等待,等待全部的线程或者说参与方一块儿到达了汇合点后一块儿进入下一次等待,不断循环。在全部线程到达了汇合点后能够由最后一个到达的线程作一下『后处理』操做,这个后处理操做能够在声明CyclicBarrier的时候传入,也能够经过判断await()的返回来实现。
这个例子咱们实现一个简单的场景,一个演出须要等待3位演员到位才能开始表演,演出须要进行3次。咱们经过CyclicBarrier来实现等到全部演员到位,到位后咱们的演出须要2秒时间。
@Slf4j
public class CyclicBarrierTest {
@Test
public void test() throws InterruptedException {
int playerCount = 5;
int playCount = 3;
CyclicBarrier cyclicBarrier = new CyclicBarrier(playerCount);
List<Thread> threads = IntStream.rangeClosed(1, playerCount).mapToObj(player->new Thread(()-> IntStream.rangeClosed(1, playCount).forEach(play->{
try {
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(100));
log.debug("Player {} arrived for play {}", player, play);
if (cyclicBarrier.await() ==0) {
log.info("Total players {} arrived, let's play {}", cyclicBarrier.getParties(),play);
TimeUnit.SECONDS.sleep(2);
log.info("Play {} finished",play);
}
} catch (Exception e) {
e.printStackTrace();
}
}))).collect(Collectors.toList());
threads.forEach(Thread::start);
for (Thread thread : threads) {
thread.join();
}
}
}
复制代码
经过if (cyclicBarrier.await() ==0)能够实如今最后一个演员到位后作冲破栅栏后的后处理操做,咱们看下这个演出是否是循环了3次,而且是否是全部演员到位后才开始的:
10:35:43.333 [Thread-4] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 5 arrived for play 1
10:35:43.333 [Thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 2 arrived for play 1
10:35:43.333 [Thread-3] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 4 arrived for play 1
10:35:43.367 [Thread-2] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 3 arrived for play 1
10:35:43.376 [Thread-0] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 1 arrived for play 1
10:35:43.377 [Thread-0] INFO me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Total players 5 arrived, let's play 1 10:35:43.378 [Thread-2] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 3 arrived for play 2 10:35:43.432 [Thread-3] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 4 arrived for play 2 10:35:43.434 [Thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 2 arrived for play 2 10:35:43.473 [Thread-4] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 5 arrived for play 2 10:35:45.382 [Thread-0] INFO me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Play 1 finished 10:35:45.390 [Thread-0] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 1 arrived for play 2 10:35:45.390 [Thread-0] INFO me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Total players 5 arrived, let's play 2
10:35:45.437 [Thread-3] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 4 arrived for play 3
10:35:45.443 [Thread-4] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 5 arrived for play 3
10:35:45.445 [Thread-2] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 3 arrived for play 3
10:35:45.467 [Thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 2 arrived for play 3
10:35:47.395 [Thread-0] INFO me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Play 2 finished
10:35:47.472 [Thread-0] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 1 arrived for play 3
10:35:47.473 [Thread-0] INFO me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Total players 5 arrived, let's play 3 10:35:49.477 [Thread-0] INFO me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Play 3 finished 复制代码
从这个例子能够看到,咱们的演出是在最后到达的Player1演员这个线程上进行的,值得注意的一点是,在他表演的时候其余演员已经又进入了等待状态(不要误认为,CyclicBarrier会让全部线程阻塞,等待后处理完成后再让其它线程继续下一次循环),就等他表演结束后继续来到await()才能又开始新的演出。
Phaser和Barrier相似,只不过前者更灵活,参与方的人数是能够动态控制的,而不是一开始先肯定的。Phaser能够手动经过register()方法注册成为一个参与方,而后经过arriveAndAwaitAdvance()表示本身已经到达,等到其它参与方一块儿到达后冲破栅栏。
好比下面的代码,咱们对全部传入的任务进行iterations次迭代操做。 Phaser终止的条件是大于迭代次数或者没有参与方,onAdvance()返回true表示终止。 咱们首先让主线程成为一个参与方,而后让每个任务也成为参与方,在新的线程中运行任务,运行完成后到达栅栏,只要栅栏没有终止则无限循环。 在主线程上咱们一样也是无限循环,每个阶段都是等待其它线程完成任务后(到达栅栏后),本身再到达栅栏开启下一次任务。
@Slf4j
public class PhaserTest {
AtomicInteger atomicInteger = new AtomicInteger();
@Test
public void test() throws InterruptedException {
int iterations = 10;
int tasks = 100;
runTasks(IntStream.rangeClosed(1, tasks)
.mapToObj(index -> new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicInteger.incrementAndGet();
}))
.collect(Collectors.toList()), iterations);
Assert.assertEquals(tasks * iterations, atomicInteger.get());
}
private void runTasks(List<Runnable> tasks, int iterations) {
Phaser phaser = new Phaser() {
protected boolean onAdvance(int phase, int registeredParties) {
return phase >= iterations - 1 || registeredParties == 0;
}
};
phaser.register();
for (Runnable task : tasks) {
phaser.register();
new Thread(() -> {
do {
task.run();
phaser.arriveAndAwaitAdvance();
} while (!phaser.isTerminated());
}).start();
}
while (!phaser.isTerminated()) {
doPostOperation(phaser);
phaser.arriveAndAwaitAdvance();
}
doPostOperation(phaser);
}
private void doPostOperation(Phaser phaser) {
while (phaser.getArrivedParties() < 100) {
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.info("phase:{},registered:{},unarrived:{},arrived:{},result:{}",
phaser.getPhase(),
phaser.getRegisteredParties(),
phaser.getUnarrivedParties(),
phaser.getArrivedParties(), atomicInteger.get());
}
}
复制代码
10次迭代,每次迭代100个任务,执行一下看看:
能够看到,主线程的后处理任务的while循环结束后只有它本身没有到达栅栏,这个时候它能够作一些任务后处理工做,完成后冲破栅栏。
Exchanger实现的效果是两个线程在同一时间(会合点)交换数据,写一段代码测试一下。在下面的代码里,咱们定义一个生产者线程不断发送数据,发送数据后休眠时间随机,经过使用Exchanger,消费者线程实现了在生产者发送数据后马上拿到数据的效果,在这里咱们并无使用阻塞队列来实现:
@Slf4j
public class ExchangerTest {
@Test
public void test() throws InterruptedException {
Random random = new Random();
Exchanger<Integer> exchanger = new Exchanger<>();
int count = 10;
Executors.newFixedThreadPool(1, new ThreadFactoryImpl("producer"))
.execute(() -> {
try {
for (int i = 0; i < count; i++) {
log.info("sent:{}", i);
exchanger.exchange(i);
TimeUnit.MILLISECONDS.sleep(random.nextInt(1000));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
ExecutorService executorService = Executors.newFixedThreadPool(1, new ThreadFactoryImpl("consumer"));
executorService.execute(() -> {
try {
for (int i = 0; i < count; i++) {
int data = exchanger.exchange(null);
log.info("got:{}", data);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.HOURS);
}
}
复制代码
运行效果以下:
并发容器这块我就不作过多总结了,ConcurrentHashMap实在是太好用太经常使用,可是务必注意其线程安全的特性并非说ConcurrentHashMap怎么用都没有问题,错误使用在业务代码中很常见。
如今咱们来举个看表演的例子总结一下几种并发同步器:
一样,代码见个人Github,欢迎clone后本身把玩,欢迎点赞。
欢迎关注个人微信公众号:随缘主人的园子