关于Java并发的书籍和文章已经有不少了,可是就我本身的学习下来的感觉来讲,有一些看似简单的知识点,以致于大神们和文章的做者们都直接忽略了,可是这些知识点却很重要,若是不搞清楚,很难“完全理解、融会贯通”,这种似懂非懂的感受让我很难受,因此我总结了这篇文章,可能不会有什么牛X的技术,高深的理论,可是这些思考曾经让我对Java并发的认知更进了一步,送给大家。html
先提几个曾经困扰过个人问题啊,看似很简单,并且可能还有不少同窗还存在误解,咱们来一块儿看一下。java
若是这些问题也曾困扰过你,那这篇文章最合适你不过了,接下来咱们一块儿进入Java的世界扒一扒并发。程序员
这2个概念是Java内存模型(Java Memory Model)中提出的,关于内存模型的详细介绍猛戳这里,咱们目前只须要知道内存模型是帮咱们屏蔽底层硬件细节的,程序员只须要按照它的规则来写代码,写的程序就能够实现跨平台运行,很巧妙的设计。缓存
了解了内存模型,咱们回到主题,咱们知道JVM将内存划分了如下几大块安全
那主内存,工做内存跟它们的对应关系是怎么样的呢? 这里直接给出结论。bash
这个知识点看似不起眼,可是却很重要,由于只有有了这个结论,才能与咱们后面的实际代码例子结合起来,不然就会感受理论与实际操做脱节了,无法对应起来。数据结构
需求是这样的,有一个Counter计数器类,内部有一个count成员变量int类型,记录当前的总数,具体定义以下。多线程
public class Counter {
private int count;
public void increment() {
this.count++;
}
public int getCount() {
return this.count;
}
}
复制代码
咱们如今的任务是调用一亿次increment方法,而后打印count的数量,那么显然正确的输出应该是一亿。并发
public static void main(String[] args) {
// 单线程代码
Counter counter = new Counter();
int loopCount = 100000000;
long startTime = System.currentTimeMillis();
for (int i = 0; i < loopCount; i++) {
counter.increment();
}
System.out.println("count:" + counter.getCount());
System.out.println("take time:" + (System.currentTimeMillis() - startTime) + "ms");
}
// 输出结果
count:100000000
take time:577ms
复制代码
so easy!这样的代码咱们太熟悉了,可是此次我想从代码在虚拟机栈中的具体执行过程来加深理解程序是怎么运做的。先经过Javac和Javap命令查看Counter类的increment方法的字节码实现。app
javac Counter.java
javap -verbose Counter.class
// Counter类 increment方法的字节码
public void increment();
descriptor: ()V
flags: ACC_PUBLIC
Code:
stack=3, locals=1, args_size=1
0: aload_0
1: dup
2: getfield #2 // Field count:I
5: iconst_1
6: iadd
7: putfield #2 // Field count:I
10: return
复制代码
咱们知道Java中方法的调用是基于栈帧实现的,每一个栈帧中主要包含操做数栈+用到类的运行时常量池引用+本地变量表。我画了一张示意图帮助你们理解整个执行过程,而且将其中一次count++操做的字节码在操做数栈中的执行步骤分解了(以count=10为例),这里要注意,下面这张图的运行是在工做内存中(main方法所在线程的虚拟机栈中)。
上面的条件只要有一个被打破,执行的结果就可能不正确,这也就是为何Java多线程环境下容易出现并发问题,缘由就是没有同时知足这三个条件。
上面已经提到,咱们上面的Count类的实例中,需同时知足 有序性,可见性,原子性,其中有序性和原子性,咱们比较容易想到由于多个线程交叉执行,若是不加同步控制,有序性和原子性确定无法保证,可是这里比较难理解的是可见性,骨头先捡难啃的啃,因此接下来咱们先谈谈可见性。
仍是以咱们上面的示例来讲明。 咱们已经知道
counter.increment();
复制代码
编译成字节码为
getfield #2
iconst_1
iadd
putfield #2
复制代码
前面已经说过,这里的字节码的执行过程是在工做内存中,可是getField和putField这二条指令实际上是跟主内存有交互的,这里仍是以Counter类的increment方法为例。
正是由于CPU高速cache的存在,在多核环境中会有可见性的问题。这里额外提一句 ,之因此有高速cache存在,是为提升运行效率,现代CPU的速度比咱们的内存快不少,若是每次都锁总线写主存,会致使执行速度降低不少,这是不能够接受的,木桶理论咱们都能理解哈。这里我也画了一张图,来帮助你们理解。
volatile修饰的变量有下面的特性
其中monitor能够理解为锁,moniter release就是释放锁,monitor acquire就是获取锁,这样就是volatile变量的读写都是直接对主存操做的,至关于牺牲一部分性能来换取可见性,这一部分牺牲的性能通常是能够忽略不计的,只须要知道有这么回事就行。
给count加上volatile修饰符后,查看编译后的字节码后会发现,字节码层面惟一的变化是给count添加了ACC_VOLATILE标识flag,在运行时会根据这个flag会自动插入内存屏障,保证volatile可见性语义,内存屏障一共有四种,分别是:
这里有个文档,比较权威详细的说明了内存屏障的知识,这一块知识你们能够本身继续深刻。这里给出文档中的一个实例,比较形象的说明了内存屏障是怎么插入的。
再回到上面的例子,咱们给count添加上volatile修饰符以后,是否是就能在多线程中获得正确的累加结果呢?咱们试验一下,简单起见,咱们只开2个线程,每一个线程分配一半的计算量。
// Counter.java
private volatile int count;
// main 方法
Counter counter = new Counter();
int loopCount = 100000000;
int halfCount = loopCount / 2;
Thread thread1 = new Thread(() -> {
for (int i = 0; i < halfCount; i++) {
counter.increment();
}
});
Thread thread2 = new Thread(() -> {
for (int i = halfCount; i < loopCount; i++) {
counter.increment();
}
});
thread1.start();
thread2.start();
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("count:" + counter.getCount());
System.out.println("take time:" + (System.currentTimeMillis() - startTime) + "ms");
// 运行结果
count:51743664
take time:2335ms
复制代码
结果显然仍是不对的,并且程序运行的时间长了好几倍了。这是由于volatile只保证了可见性,却没有原子性语义,好比下面这种状况
那咱们不妨设想下,若是在putfield以前,检查下当前栈中存储的count是否是最新的,若是不是最新的从新读取count,而后重试,若是是最新的,直接写入更新值,彷佛这样就能解决咱们上面出现的错误写入的问题。看起来彷佛是一个不错的想法,可是必定要注意,整个检查过程要保证原子性,不然仍然会有并发问题。事实上JDK中Unsafe包里面的CAS方法就是这个思路,不断循环尝试,这个过程就是自旋,它的底层实现依赖cmpxchgl 和 cmpxchgq这二个汇编指令,不一样平台的cpu有不一样的实现,可是代码大同小异,我在这里以opekjdk8为例扒一扒CAS的源码,源码比较多我只会贴出关键代码块。
// Unsafe.class中的三个CAS方法,都是native的
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
复制代码
它对应的native实如今hotspot/src/share/vm/prims/unsafe.cpp
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END
复制代码
篇幅关系,这里只贴上compareAndSwapInt的实现,能够看到又调用了Atomic::cmpxchg方法,继续跟进去
unsigned Atomic::cmpxchg(unsigned int exchange_value,
volatile unsigned int* dest, unsigned int compare_value) {
assert(sizeof(unsigned int) == sizeof(jint), "more work to do");
return (unsigned int)Atomic::cmpxchg((jint)exchange_value, (volatile jint*)dest,
(jint)compare_value);
}
jbyte Atomic::cmpxchg(jbyte exchange_value, volatile jbyte* dest, jbyte compare_value) {
assert(sizeof(jbyte) == 1, "assumption.");
uintptr_t dest_addr = (uintptr_t)dest;
uintptr_t offset = dest_addr % sizeof(jint);
volatile jint* dest_int = (volatile jint*)(dest_addr - offset);
jint cur = *dest_int;
jbyte* cur_as_bytes = (jbyte*)(&cur);
jint new_val = cur;
jbyte* new_val_as_bytes = (jbyte*)(&new_val);
new_val_as_bytes[offset] = exchange_value;
while (cur_as_bytes[offset] == compare_value) {
jint res = cmpxchg(new_val, dest_int, cur);
if (res == cur) break;
cur = res;
new_val = cur;
new_val_as_bytes[offset] = exchange_value;
}
return cur_as_bytes[offset];
}
复制代码
咱们跟踪到了调用了cmpxchg这个方法,这个方法不是在atomic.cpp中定义的,查看atomic.hpp,看到了cmpxchg对应的内联函数的定义
inline static jint cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value);
// See comment above about using jlong atomics on 32-bit platforms
inline static jlong cmpxchg (jlong exchange_value, volatile jlong* dest, jlong compare_value);
复制代码
这里咱们以solaris_x86平台为例,cmpxchg对应的内涵函数定义在hotspot/src/os_cpu/solaris_x86/vm/atomic_solaris_x86.inline.hpp
#define LOCK_IF_MP(mp) "cmp $0, " #mp "; je 1f; lock; 1: "
inline jint _Atomic_cmpxchg(jint exchange_value, volatile jint* dest, jint compare_value, int mp) {
__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
: "=a" (exchange_value)
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", "memory");
return exchange_value;
}
复制代码
这个是内嵌汇编代码,实话说,汇编这一块的知识我也还给老师了。根据LOCK_IF_MP这个宏定义判断是否是多核心,若是是多核心须要加锁,可是这个锁是cpu总线锁,它的代价比咱们应用层中用的Lock代价小得多。同时咱们看到cmpxchgl这个关键的指令。追到这一层,我想对于应用开发工程师已经足够了。了解了底层实现,咱们来现学现卖实战一波。
要使用CAS,确定要使用Unsafe类,咱们仍是经过反射来获取Unsafe对象,先看UnsafeUtil类的实现
// UnsafeUtil.java
public static Unsafe getUnsafeObject() {
Class clazz = AtomicInteger.class;
try {
Field uFiled = clazz.getDeclaredField("unsafe");
uFiled.setAccessible(true);
return (Unsafe) uFiled.get(null);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public static long getVariableOffset(Object target, String variableName) {
Object unsafeObject = getUnsafeObject();
if (unsafeObject != null) {
try {
Method method = unsafeObject.getClass().getDeclaredMethod("objectFieldOffset", Field.class);
method.setAccessible(true);
Field targetFiled = target.getClass().getDeclaredField(variableName);
return (long) method.invoke(unsafeObject, targetFiled);
} catch (Exception e) {
e.printStackTrace();
}
}
return -1;
}
复制代码
再来看Counter类的实现
public class Counter {
private volatile int count;
private Unsafe mUnsafe;
private long countOffset;
public Counter() {
mUnsafe = UnsafeUtil.getUnsafeObject();
countOffset = UnsafeUtil.getVariableOffset(this, "count");
}
public void increment() {
int cur = getCount();
while (!mUnsafe.compareAndSwapInt(this, countOffset, cur, cur+1)) {
cur = getCount();
}
}
public int getCount() {
return this.count;
}
}
复制代码
再次开启二个线程,执行咱们的累加程序
// 输出结果
count:100000000
take time:5781ms
复制代码
能够看到咱们获得正确的累加结果,可是运行时长更长了,可是还好,时间复杂度仍是在一个数量级上的。这里要注意一点的是,上述示例代码中,我给count变量增长了volatile关键字,其实就算不加volatile关键字,在这里CAS也是可以正确工做的,可是效率会低一点,我测试下来差很少性能会低5%左右,你们能够思考下为何不加volatile效率会低?
volatile关键字还有一个禁止指令重排序的语义,一个经典的应用就是DCL单例模式,你们应该都很熟了,就不赘述了。
到这里,关于可见性咱们已经讨论的差很少了,接下来咱们来讨论”原子性“
其实上面咱们已经说起了一些,好比CAS自己就是原子的,那想想还有哪些是原子的?我这里仍是以Counter类的increment方法为例。
0: aload_0
1: dup
2: getfield #2 // Field count:I
5: iconst_1
6: iadd
7: putfield #2 // Field count:I
10: return
复制代码
这7条字节码指令,都是原子的,没有问题吧,可是咱们若是再往深处想的话,仍是会有疑问,单个字节码指令都是原子的吗? 若是单条都不是原子的,我想咱们前面的全部判断都是错误的,由于咱们得出结论的理论基石被打破了。事实是,单条字节码就是原子的,这个原子性由谁来保证呢?由Java内存模型JMM来保证,程序员不须要知道具体的细节。
虽然单条字节码是原子的,可是多条字节码组合起来就不是原子的了。这也是不少并发问题发生的根源。那咱们程序员有哪些手段保证原子性呢?大概有如下三种。
CAS咱们上面已经讨论过,这里不赘述了,咱们来看看synchronized
synchronized算是咱们最经常使用的同步方式,主要有如下三种使用方式
// 普通类方法同步
synchronized publid void invoke() {}
// 类静态方法同步
synchronized public static void invoke() {}
// 代码块同步
synchronized(object) {
}
复制代码
这三种方式不一样之处在于咱们进行同步的对象不一样,普通类synchronized同步的就是对象自己,静态方法同步的是类Class自己,代码块同步的是咱们在括号内部填入的对象。本质上它们的原理是相同的,都会有一个monitor的对象与咱们要进行同步的对象进行关联,当有一个线程持有了monitor的锁后,其余线程必须等待,一直到该线程释放了该monitor才能被别的线程从新获取,hotspot虚拟机中,它在native层对应的实现类是ObjectMonitor.hpp,这个类内部维护了不少同步相关的变量,咱们重点关注二个变量
void * volatile _owner; // pointer to owning thread OR BasicLock
ObjectWaiter * volatile _WaitSet; // LL of threads wait()ing on the monitor
复制代码
_owner表明当前持有锁的线程,—WaitSet表明等待锁的线程队列,经过ObjectWaiter的数据结构能够推断这是一个双向循环链表。
回到咱们的实例,这一次咱们经过synchronized来实现咱们的Counter类。
public void increment() {
synchronized (Counter.class) {
this.count++;
}
}
// 再运行一次,
// 输出结果
count:100000000
take time:4881ms
复制代码
输出的结果是正确的,并且从咱们打印的运行时间结果来看,synchronized加锁后的执行速度比咱们上面的CAS还要快,这有一点点反直觉,其实synchronized在Java1.7后引入了偏向锁,轻量级锁后,synchronized性能有了较大提高,因此在使用synchronized的时候不须要有太多的心理负担,通常状况下,对性能不是极度要求高的话,使用synchronized没有问题。
这里仍是贴下increment方法加上synchronized同步后的字节码实现。
// 咱们的实现
synchronized (Counter.class) {
this.count++;
}
// 编译后,至关于下面的伪代码
monitorenter Counter.class;
try {
this.count++;
monitorexit Counter.class;
} finally {
monitorexit Counter.class;
}
复制代码
除了synchronized关键字以外,还有个比较经常使用的用来作同步类就是ReentrantLock
ReentrantLock的使用你们应该都很熟了,篇幅关系,这里只简单提一下用法,更详细的使用文档你们能够自行查阅相关资料。
// 参数表示 是不是公平锁,公平锁严格按照等待顺序获取锁,可是吞吐率低性能差
// 非公平锁性能高,可是有可能会出现锁等待饥饿
ReentrantLock reentrantLock = new ReentrantLock(false);
// 一个标准的使用方式
reentrantLock.lock();
try {
// do something
} finally {
reentrantLock.unlock();
}
复制代码
这里要注意的是加锁的lock方法的调用,必定要在try-catch-finally的前面,不能在内部,由于若是在内部调用lock,若是代码在lock以前就出现异常了,就会出现咱们没有加锁就执行了finally里面的释放锁,确定会有问题。
为了对比,咱们仍是使用ReentrantLock实现一遍上面Counter累加。
Lock lock = new ReentrantLock(false);
Thread thread1 = new Thread(() -> {
for (int i = 0; i < halfCount; i++) {
lock.lock();
try {
counter.increment();
} finally {
lock.unlock();
}
}
});
Thread thread2 = new Thread(() -> {
for (int i = halfCount; i < loopCount; i++) {
lock.lock();
try {
counter.increment();
} finally {
lock.unlock();
}
}
});
// 运行程序,输出结果
count:100000000
take time:12754ms
复制代码
输出结果也是正确的,可是运行时间倒是最长的,差很少是咱们用synchronized的三倍。固然咱们这里并非搞性能测试,运行的时间也没什么参考意义,贴出来只是让你们有个直观认识,那就是CAS未必就必定性能高,synchronized未必就必定性能差,要具体问题具体分析,必定要有质疑的意识。
既然说到了Lock,咱们仍是扒一扒它的实现源码,这里以非公平锁为例。
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
//......
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
}
复制代码
能够看到,lock方法的实现,首先会用CAS尝试去设置State为1,若是设置成功,将exclusiveOwnerThread设置为当前thread,若是设置不成功调用acquire方法,又调用了tryAcquire,咱们能够重写该方法来自定义锁的逻辑,ReentrantLock中的默认实现以下
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
复制代码
首先尝试获取state状态,若是==0尝试用CAS获取锁,若是!=0,检查是不是当前线程拥有锁,若是是,将state+1,返回true,若是不是返回false。因此这也就是为何是可重入锁的缘由,容许锁的嵌套,若是已经获取了锁,state+1而后返回true。
// 可重入锁,容许嵌套重复得到锁,伪代码以下
lock.lock();
lock.lock();
// do something ...
lock.unlock();
lock.unlock();
复制代码
总结一下,ReentrantLock的实现是基于队列同步器AbstractQueuedSynchronizer(AQS)的,而AQS内部也是封装了CAS来实现的,深究下去仍是有不少内容的,能够说整个concurrent包都是创建在CAS+AQS这二块基石上的,篇幅关系,更多的实现细节咱们这里就不讨论了,你们能够自行参考相关的源码分析文章加深理解。
到这里,咱们讨论了Java中的”原子性“,以及若是保证”原子性“的三种常规手段,原子性的讨论就结束了。接下来咱们进入最后一个问题,Java的”有序性“。
为了方便说明,咱们首先举一个简单的例子
int a = 1; ①
int b = 2; ②
int c = a; ③
复制代码
按咱们的预期,执行的顺序应该是①②③,可是 真实的执行顺序多是②①③或者①③②,这是由于指令重排序的缘由,基本有二种重排序
重排序的目的是优化咱们的程序运行速度,可是优化的前提是不能破坏as-if-serial语义,简单来讲,以上面的例子为例,③因为有依赖①的结果,因此它须要永远排在①后面执行。 在单线程有序性还比较容易保证,可是在多线程状况就会变得复杂起来。因此JMM中抽象出了一个happen-before原则,这个原则是JMM给咱们开发者的承诺,让咱们写代码时对多线程状况下的有序性有一个正确的预期。这个原则有下面5条。
固然happen-before具备传递性,若是A happen-before B, B happen-before C,则A 也 happen-before C。 须要注意的是,happen-before并不彻底等同于时间意义上的先执行,好比上面的例子中,根据第一条happen-before原则,int a = 1; 这条语句 happen-before int b = 2; 这条语句,可是因为两者之间没有依赖关系,能够指令重排,因此能够是 int b = 2;先执行,这是合法的,并不违背happen-before原则。
理解这几条happen-before原则后,不少咱们平时常常写的并发代码就有了理论依据,好比第二条,加锁happen-before解锁,因此保证了锁的同步范围内的代码,具备原子性和有序性,同时加锁和解锁都会插入内存屏障,可见性也获得保障,因此加锁后的代码是线程安全的。再好比第三条,volatile的写happen-before于volatile的读,有了这一条,多线程之间volatile修饰的共享变量的可见性获得保证。
另外几条原则比较好理解,你们能够自行结合实际代码加深理解,这里就不赘述了。
Java并发算是一个比较高级的主题,可是这一块的知识又是高级工程师必须掌握的,骨头再难啃也得啃,但愿本文的一些总结能帮助到但愿深刻了解Java并发的同窗,哪怕是其中能有一点,能让你在阅读中有豁然开朗的感受,个人目的就达到了。
最后,来针鸡血,”怕什么真理无穷,进一寸有一寸的欢喜“。
码字不易,若是喜欢点个赞呗!