本节咱们来研究下并发包中的Atomic类型。java
先来一把性能测试,对比一下AtomicLong(1.5出来的)、LongAdder(1.8出来的)和LongAccumulator(1.8出来的)用于简单累加的性能。git
程序逻辑比较简单,能够看到咱们在最大并发10的状况下,去作10亿次操做测试:github
@Slf4j
public class AccumulatorBenchmark {
private StopWatch stopWatch = new StopWatch();
static final int threadCount = 100;
static final int taskCount = 1000000000;
static final AtomicLong atomicLong = new AtomicLong();
static final LongAdder longAdder = new LongAdder();
static final LongAccumulator longAccumulator = new LongAccumulator(Long::sum, 0L);
@Test
public void test() {
Map<String, IntConsumer> tasks = new HashMap<>();
tasks.put("atomicLong", i -> atomicLong.incrementAndGet());
tasks.put("longAdder", i -> longAdder.increment());
tasks.put("longAccumulator", i -> longAccumulator.accumulate(1L));
tasks.entrySet().forEach(item -> benchmark(threadCount, taskCount, item.getValue(), item.getKey()));
log.info(stopWatch.prettyPrint());
Assert.assertEquals(taskCount, atomicLong.get());
Assert.assertEquals(taskCount, longAdder.longValue());
Assert.assertEquals(taskCount, longAccumulator.longValue());
}
private void benchmark(int threadCount, int taskCount, IntConsumer task, String name) {
stopWatch.start(name);
ForkJoinPool forkJoinPool = new ForkJoinPool(threadCount);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, taskCount).parallel().forEach(task));
forkJoinPool.shutdown();
try {
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
} catch (InterruptedException e) {
e.printStackTrace();
}
stopWatch.stop();
}
}
复制代码
结果以下: 安全
和官网说的差很少,在高并发的状况下LongAdder性能会比AtomicLong好不少。bash
在不少开源代码中咱们有看到AtomicReference的身影,它到底是干什么的呢?咱们来写一段测试程序,在这个程序中咱们定一了一个Switch类型,做为一个开关,而后写三个死循环的线程来测试,当开关有效的时候会持续死循环,在2秒后关闭全部的三个开关:微信
@Slf4j
public class AtomicReferenceTest {
private Switch rawValue = new Switch();
private volatile Switch volatileValue = new Switch();
private AtomicReference<Switch> atomicValue = new AtomicReference<>(new Switch());
@Test
public void test() throws InterruptedException {
new Thread(() -> {
log.info("Start:rawValue");
while (rawValue.get()) {
}
log.info("Done:rawValue");
}).start();
new Thread(() -> {
log.info("Start:volatileValue");
while (volatileValue.get()) {
}
log.info("Done:volatileValue");
}).start();
new Thread(() -> {
log.info("Start:atomicValue");
while (atomicValue.get().get()) {
}
log.info("Done:atomicValue");
}).start();
Executors.newSingleThreadScheduledExecutor().schedule(rawValue::off, 2, TimeUnit.SECONDS);
Executors.newSingleThreadScheduledExecutor().schedule(volatileValue::off, 2, TimeUnit.SECONDS);
Executors.newSingleThreadScheduledExecutor().schedule(atomicValue.get()::off, 2, TimeUnit.SECONDS);
TimeUnit.HOURS.sleep(1);
}
class Switch {
private boolean enable = true;
public boolean get() {
return enable;
}
public void off() {
enable = false;
}
}
}
复制代码
运行程序: 多线程
下面咱们来看一下AtomicInteger的compareAndSet()功能。首先说明这个程序没有任何意义,只是测试一下功能。在这个程序里,咱们乱序开启10个线程,每个线程的任务就是按照次序来累加数字。咱们使用AtomicInteger的compareAndSet()来确保乱序的线程也能按照咱们要的顺序操做累加。并发
@Slf4j
public class AtomicIntegerTest {
@Test
public void test() throws InterruptedException {
AtomicInteger atomicInteger = new AtomicInteger(0);
List<Thread> threadList = IntStream.range(0,10).mapToObj(i-> {
Thread thread = new Thread(() -> {
log.debug("Wait {}->{}", i, i+1);
while (!atomicInteger.compareAndSet(i, i + 1)) {
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("Done {}->{}", i, i+1);
});
thread.setName(UUID.randomUUID().toString());
return thread;
}).sorted(Comparator.comparing(Thread::getName)).collect(Collectors.toList());
for (Thread thread : threadList) {
thread.start();
}
for (Thread thread : threadList) {
thread.join();
}
log.info("result:{}", atomicInteger.get());
}
}
复制代码
执行结果以下:dom
11:46:30.611 [2c80b367-d80e-46b5-94f5-b7b172e79dad] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 4->5
11:46:30.611 [7bccbb54-4573-4b77-979b-840613406428] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 5->6
11:46:30.612 [c0792831-6201-4f6c-b702-79c1b798c3aa] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 9->10
11:46:30.612 [949b0c26-febb-4830-ad98-f43521ce4382] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 7->8
11:46:30.613 [ccc05b0f-11da-41fa-b8fc-59a90dfc2250] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 6->7
11:46:30.611 [037e9595-73cb-4aa1-afee-4250347746c8] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 3->4
11:46:30.611 [4f15d9ce-044e-4657-b418-4874d03e5d22] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 1->2
11:46:30.611 [3a96c35c-bc4e-45f4-aae4-9fd8611acaea] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 8->9
11:46:30.611 [94465214-27bf-4543-80e2-dbaeeb6ddc94] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 0->1
11:46:30.611 [60f9cb50-21e6-45bc-9b4d-867783ab033b] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 2->3
11:46:30.627 [94465214-27bf-4543-80e2-dbaeeb6ddc94] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 0->1
11:46:30.681 [4f15d9ce-044e-4657-b418-4874d03e5d22] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 1->2
11:46:30.681 [60f9cb50-21e6-45bc-9b4d-867783ab033b] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 2->3
11:46:30.734 [037e9595-73cb-4aa1-afee-4250347746c8] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 3->4
11:46:30.780 [2c80b367-d80e-46b5-94f5-b7b172e79dad] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 4->5
11:46:30.785 [7bccbb54-4573-4b77-979b-840613406428] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 5->6
11:46:30.785 [ccc05b0f-11da-41fa-b8fc-59a90dfc2250] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 6->7
11:46:30.787 [949b0c26-febb-4830-ad98-f43521ce4382] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 7->8
11:46:30.838 [3a96c35c-bc4e-45f4-aae4-9fd8611acaea] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 8->9
11:46:30.890 [c0792831-6201-4f6c-b702-79c1b798c3aa] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 9->10
11:46:30.890 [main] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - result:10
复制代码
能够看到,Wait的输出是乱序的,最后Done的输出是顺序的。高并发
AtomicStampedReference能够用来解决ABA问题,什么是ABA问题咱们看这个例子: 线程1读取了数字以后,等待1秒,而后尝试把1修改成3。 线程2后启动,读取到数字1后修改2,稍等一下又修改回1。 虽然AtomicInteger确保多个线程的原子性操做,可是没法确保1就是原先读取到的那个1,没有通过别人修改。 能够再换一个例子来讲,若是咱们如今帐上有100元,要修改成200元,在修改以前帐户已经被操做过了从100元充值到了150而后提现到了100,虽然最后仍是回到了100,可是这个时候严格一点的话,咱们应该认为这个100不是原先的100,这个帐户的版本发生了变化,若是咱们使用乐观行锁的话,虽然余额都是100可是行锁的版本确定不一致,AtomicStampedReference就是相似行乐观锁的概念。
@Test
public void test() throws InterruptedException {
AtomicInteger atomicInteger = new AtomicInteger(1);
Thread thread1 = new Thread(() -> {
int value = atomicInteger.get();
log.info("thread 1 read value: " + value);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (atomicInteger.compareAndSet(value, 3)) {
log.info("thread 1 update from " + value + " to 3");
} else {
log.info("thread 1 update fail!");
}
});
thread1.start();
Thread thread2 = new Thread(() -> {
int value = atomicInteger.get();
log.info("thread 2 read value: " + value);
if (atomicInteger.compareAndSet(value, 2)) {
log.info("thread 2 update from " + value + " to 2");
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
value = atomicInteger.get();
log.info("thread 2 read value: " + value);
if (atomicInteger.compareAndSet(value, 1)) {
log.info("thread 2 update from " + value + " to 1");
}
}
});
thread2.start();
thread1.join();
thread2.join();
}
复制代码
看下运行结果:
11:56:20.373 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 read value: 1
11:56:20.381 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 update from 1 to 2
11:56:20.373 [Thread-0] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 1 read value: 1
11:56:20.483 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 read value: 2
11:56:20.484 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 update from 2 to 1
11:56:21.386 [Thread-0] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 1 update from 1 to 3
复制代码
下面咱们使用AtomicStampedReference来修复这个问题:
@Test
public void test2() throws InterruptedException {
AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(1, 1);
Thread thread1 = new Thread(() -> {
int[] stampHolder = new int[1];
int value = atomicStampedReference.get(stampHolder);
int stamp = stampHolder[0];
log.info("thread 1 read value: " + value + ", stamp: " + stamp);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (atomicStampedReference.compareAndSet(value, 3, stamp, stamp + 1)) {
log.info("thread 1 update from " + value + " to 3");
} else {
log.info("thread 1 update fail!");
}
});
thread1.start();
Thread thread2 = new Thread(() -> {
int[] stampHolder = new int[1];
int value = atomicStampedReference.get(stampHolder);
int stamp = stampHolder[0];
log.info("thread 2 read value: " + value + ", stamp: " + stamp);
if (atomicStampedReference.compareAndSet(value, 2, stamp, stamp + 1)) {
log.info("thread 2 update from " + value + " to 2");
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
value = atomicStampedReference.get(stampHolder);
stamp = stampHolder[0];
log.info("thread 2 read value: " + value + ", stamp: " + stamp);
if (atomicStampedReference.compareAndSet(value, 1, stamp, stamp + 1)) {
log.info("thread 2 update from " + value + " to 1");
}
value = atomicStampedReference.get(stampHolder);
stamp = stampHolder[0];
log.info("thread 2 read value: " + value + ", stamp: " + stamp);
}
});
thread2.start();
thread1.join();
thread2.join();
}
复制代码
运行结果以下:
11:59:11.946 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 read value: 1, stamp: 1
11:59:11.951 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 update from 1 to 2
11:59:11.946 [Thread-0] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 1 read value: 1, stamp: 1
11:59:12.053 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 read value: 2, stamp: 2
11:59:12.053 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 update from 2 to 1
11:59:12.053 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 read value: 1, stamp: 3
11:59:12.954 [Thread-0] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 1 update fail!
复制代码
能够看到,如今咱们修改数据的时候不只仅是拿着值来修改了,还要提供版本号,读取数据的时候能够读取到数据以及版本号。这样的话,虽然数值不变,可是线程2通过两次修改后数据的版本从1变为了3,回过头来线程1再要拿着版本号1来修改数据的话必然失败。
本文比较短,咱们再来看网友以前问的一个有意思的问题,程序以下。
@Slf4j
public class InterestingProblem {
int a = 1;
int b = 1;
void add() {
a++;
b++;
}
void compare() {
if (a < b)
log.info("a:{},b:{},{}", a, b, a>b);
}
@Test
public void test() throws InterruptedException {
new Thread(() -> {
while (true)
add();
}).start();
new Thread(() -> {
while (true)
compare();
}).start();
TimeUnit.MILLISECONDS.sleep(100);
}
}
复制代码
这位网友是这么问的,他说见鬼了,不但能看到日志输出,并且我发现以前判断过一次a<b,以后输出a>b竟然是成立的,结果里能够看到true,JVM出现Bug了可能:
他以为a和b不是静态的,为啥会出现并发问题呢因而问了同事:
这位网友实际上是没有搞清楚多线程状况下,可见性问题、原子性问题解决的事情,同事也把各类并发的概念混淆在一块儿了。 咱们这么来看这段代码,这段代码里一个线程不断操做a和b进行累加操做,一个线程判断a和b,而后输出结果。出现这个问题的缘由本质上是由于a<b是三步操做,取a,取b以及比较,不是原子性的,在整个过程当中可能穿插了add线程的操做a和b。若是先获取a,而后a++ b++,而后获取b,这个时候a<b,若是先a++,而后获取a,获取b,最后b++,这个时候a>b。咱们来看一下compare()方法的字节码,能够很明显看到a<b以及a>b的比较分明是4行指令,咱们不能以代码行数来判断操做是不是原子的,不是原子意味着操做过程当中可能被穿插了其它线程的其它代码:
0 aload_0
1 getfield #2 <me/josephzhu/javaconcurrenttest/atomic/InterestingProblem.a>
4 aload_0
5 getfield #3 <me/josephzhu/javaconcurrenttest/atomic/InterestingProblem.b>
8 if_icmpge 67 (+59)
11 getstatic #4 <me/josephzhu/javaconcurrenttest/atomic/InterestingProblem.log>
14 ldc #5 <a:{},b:{},{}>
16 iconst_3
17 anewarray #6 <java/lang/Object>
20 dup
21 iconst_0
22 aload_0
23 getfield #2 <me/josephzhu/javaconcurrenttest/atomic/InterestingProblem.a>
26 invokestatic #7 <java/lang/Integer.valueOf>
29 aastore
30 dup
31 iconst_1
32 aload_0
33 getfield #3 <me/josephzhu/javaconcurrenttest/atomic/InterestingProblem.b>
36 invokestatic #7 <java/lang/Integer.valueOf>
39 aastore
40 dup
41 iconst_2
42 aload_0
43 getfield #2 <me/josephzhu/javaconcurrenttest/atomic/InterestingProblem.a>
46 aload_0
47 getfield #3 <me/josephzhu/javaconcurrenttest/atomic/InterestingProblem.b>
50 if_icmple 57 (+7)
53 iconst_1
54 goto 58 (+4)
57 iconst_0
58 invokestatic #8 <java/lang/Boolean.valueOf>
61 aastore
62 invokeinterface #9 <org/slf4j/Logger.info> count 3
67 return
复制代码
因此这位网友的理解有几个问题:
咱们再来看看他三位同事的说法:
因此要进行简单修复这个问题的话就是为add()和compare()都加上synchronized关键字,除了这个锁的方式有没有其它方式呢?你能够想一想。
本文简单测试了一下java.util.concurrent.atomic包下面的一些经常使用Atomic操做类,最后分享了一个网友的问题和疑惑,但愿文本对你有用。
一样,代码见个人Github,欢迎clone后本身把玩,欢迎点赞。
欢迎关注个人微信公众号:随缘主人的园子