Java1.5加入的JUC并发包,就像一把好用的瑞士军刀,极大的丰富了Java处理并发的手段,但JUC并不简单,有必定的学习成本,我曾经也断断续续看过一些JUC的实现源码,可是既不系统也不够深刻,此次决定从新出发,从新拜读大师Doug Lea的神做,因此本身也是抱着以学代练的心态,记录本身的学习心得,不免有理解不到位的地方,你们轻喷哈。java
不知道你有没有这样的感受,在使用JUC中提供的工具类处理并发时,有一种死记硬背的感受,好比LockSupport应该怎么用,CountDownLatch能干吗,但并不清楚其实现原理,只知道how不知道why,这种状态有二个比较大的问题。linux
那要深刻,最直接有效的办法就是阅读源码!windows
咱们知道JUC看似有不少类,结构错综复杂,可是若是要从中挑出最重要的一个类,那必定是队列同步器AbstractQueuedSynchronizer, 而AbstractQueuedSynchronizer又是利用LockSupport来控制线程的状态,从而达到线程在等待唤醒之间切换的目的。而咱们处理并发,重点就是管理线程的状态,因此理解LockSupport是很重要的一个基础。bash
先来看一个简单的例子并发
public static void main(String[] args) {
Thread worker = new Thread(() -> {
LockSupport.park();
System.out.println("start work");
});
worker.start();
System.out.println("main thread sleep");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
LockSupport.unpark(worker);
try {
worker.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 最终控制台输出结果
main thread sleep
start work
复制代码
启动一个worker线程,主线程先sleep 500ms,worker线程由于调用了LockSupport的park,会等待,直到主线程sleep结束,调用unpark唤醒worker线程。那么在JUC以前,咱们经常使用的让线程等待的方法以下app
Object monitor = new Object();
synchronized (monitor) {
try {
monitor.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
复制代码
主要有三点区别工具
前面只是铺垫,如今来到咱们的主菜,解读LockSupport的park和unpark方法,固然还有一些其余相似的重载方法,如parkUntil,parkNanos,它们的大致原理相似,感兴趣你们能够自行查阅源码。oop
这篇文章以及后续的文章,分析的源码都基于Open Jdk 8。学习
为何先讲unpark方法,由于unpark代码量少一些,相对简单,柿子先捡软的捏-。-ui
//java.util.concurrent.locks.LockSupport.java
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}
//
UNSAFE = sun.misc.Unsafe.getUnsafe();
复制代码
参数thread是咱们要唤醒的目标线程,先判空,而后调用UNSAFE.unpark,UNSAFE是Unsafe对象,不要被这个名字吓到,这个类提供了不少有用的方法,以前的文章也有提到过,好比获取类对象中属性的内存偏移地址,还有 CAS操做等。可是这个Unsafe对象必须使用反射获得而后才能正常使用,由于getUnsafe方法有判断当前类加载器是否是BootStrapClassLoader。咱们继续查看Unsafe类unpark的实现。
// Unsafe.java
public native void unpark(Object thread);
复制代码
能够看到unpark是一个native方法,它的native实现是在 hotspot\src\share\vm\prims\unsafe.cpp 看下代码实现,
UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread))
UnsafeWrapper("Unsafe_Unpark");
// 声明一个Parker对象p,它是真正干活的对象
Parker* p = NULL;
if (jthread != NULL) {
// 根据传入的jthread对象,来获取native层的oopDesc*对象,oop是oopDesc* 的宏定义
oop java_thread = JNIHandles::resolve_non_null(jthread);
if (java_thread != NULL) {
// 获取java_thread对象中_park_event_offset的值,该值就是Parker对象的地址
jlong lp = java_lang_Thread::park_event(java_thread);
if (lp != 0) {
// 若是地址有效,直接转为Parker指针
p = (Parker*)addr_from_java(lp);
} else {
// 若是地址无效
MutexLocker mu(Threads_lock);
java_thread = JNIHandles::resolve_non_null(jthread);
if (java_thread != NULL) {
// 转为native层的JavaThread对象
JavaThread* thr = java_lang_Thread::thread(java_thread);
if (thr != NULL) {
// 将JavaThread的成员变量_parker赋值给p
p = thr->parker();
if (p != NULL) { // Bind to Java thread for next time.
// 将p的地址赋值给_park_event_offset,下次获取时可用
java_lang_Thread::set_park_event(java_thread, addr_to_java(p));
}
}
}
}
}
}
if (p != NULL) {
// 这个USDT2的宏,暂时我也不清楚是干啥的,不过不影响咱们的分析,咱们先忽略
#ifndef USDT2
HS_DTRACE_PROBE1(hotspot, thread__unpark, p);
#else /* USDT2 */
HOTSPOT_THREAD_UNPARK(
(uintptr_t) p);
#endif /* USDT2 */
// 真正干货的方法,调用了Parker的unpark方法
p->unpark();
}
复制代码
根据上面的代码,咱们须要知道二个native层的类,JavaThread类和Parker类
class JavaThread: public Thread {
private:
JavaThread* _next; // The next thread in the Threads list
oop _threadObj; // The Java level thread object
// 省略代码...
private:
Parker* _parker;
public:
Parker* parker() { return _parker; }
// 省略代码...
复制代码
JavaThread类很长,这里只列出几个成员变量,如今只须要知道它是native层的Thread,成员变量_threadObj是Java层的thread对象,经过它native层能够调用Java层的代码。咱们继续重点看下Parker类的实现。
class Parker : public os::PlatformParker {
private:
volatile int _counter ;
// 省略代码...
}
复制代码
咱们重点关注_counter字段,能够简单理解为_counter字段 > 0时,能够通行,即park方法会直接返回,另外park方法返回后,_counter会被赋值为0,unpark方法能够将_counter置为1,而且唤醒当前等待的线程。
能够看到Parker的父类是os::PlatformParker,那这个类又是干吗的呢?这里先插个题外话, 咱们都知道,Java是跨平台的,咱们在应用层定义的Thread确定依赖于具体的平台,不一样的平台有不一样实现,好比Linux是一套代码,Windows是另一套,那咱们就能理解了,PlatformParker根据平台有不一样的实现。在OpenJdk8的实现中支持5个平台
咱们知道Linux是如今使用比较普遍的操做系统,好比熟知的Android是基于Linux内核,因此这里咱们就挑选Linux来分析吧。对应的文件路径hotspot\src\os\linux\vm\os_linux.cpp
void Parker::unpark() {
int s, status ;
// 先进入_mutex的临界区,声明以下
// pthread_mutex_t _mutex [1] ;
// pthread_cond_t _cond [2] ;
status = pthread_mutex_lock(_mutex);
assert (status == 0, "invariant") ;
s = _counter;
// 将_counter置为1
_counter = 1;
// s记录的是unpark以前的_counter数,若是s < 1,说明有可能该线程在等待状态,须要唤醒。
if (s < 1) {
// thread might be parked
// _cur_index表明被使用cond的index
if (_cur_index != -1) {
// thread is definitely parked
// 根据虚拟机参数WorkAroundNPTLTimedWaitHang来作不一样的处理,默认该参数是1
if (WorkAroundNPTLTimedWaitHang) {
// 先唤醒目标等待的线程
status = pthread_cond_signal (&_cond[_cur_index]);
assert (status == 0, "invariant");
// 释放互斥锁,先唤醒后释放锁,可能会致使线程被唤醒后获取不到锁,再次进入等待状态,个人理解是效率可能会低一丢丢
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant");
} else {
// 先释放锁
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant");
// 后发信号唤醒线程,唤醒操做在互斥代码块外部,感受这里可能会有风险,暂时还GET不到。。。
status = pthread_cond_signal (&_cond[_cur_index]);
assert (status == 0, "invariant");
}
} else {
// 若是线程没有在等待,直接返回
pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
}
} else {
// 若是线程没有在等待,直接返回
pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
}
}
复制代码
代码很少,都加了注释,整体来讲就是根据Park类的成员变量_counter来作加锁解锁和唤醒操做,在Linux平台, 加锁用的pthread_mutex_lock,解锁是pthread_mutex_unlock,唤醒是pthread_cond_signal 。接下来解析LockSupport的park方法。
先看下Java层park方法的实现
public static void park() {
UNSAFE.park(false, 0L);
}
复制代码
Unsafe中的实现
public native void park(boolean var1, long var2);
复制代码
仍然是一个native方法,咱们继续跟进去看下
UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))
// 省略代码...
thread->parker()->park(isAbsolute != 0, time);
// 省略代码...
复制代码
省略了非关键代码,重点是park方法,这个thread咱们前面已经遇到过,就是native层的JavaThread对象,而后调用Parker的park方法,继续跟进去linux平台的os_linux.cpp的实现
void Parker::park(bool isAbsolute, jlong time) {
// 先原子的将_counter的值设为0,并返回_counter的原值,若是原值>0说明有通行证,直接返回
if (Atomic::xchg(0, &_counter) > 0) return;
Thread* thread = Thread::current();
assert(thread->is_Java_thread(), "Must be JavaThread");
JavaThread *jt = (JavaThread *)thread;
// 判断线程是否已经被中断
if (Thread::is_interrupted(thread, false)) {
return;
}
// Next, demultiplex/decode time arguments
timespec absTime;
// park方法的传参是isAbsolute = false, time = 0,因此会继续往下走
if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all return; } // 这里time为0,若是调用的是parkNanos或者parkUtil,这里time就会>0, if (time > 0) { // 若是time > 0,unpackTime计算absTime的时间 unpackTime(&absTime, isAbsolute, time); } ThreadBlockInVM tbivm(jt); // 再次判断线程是否被中断,若是没有被中断,尝试得到互斥锁,若是获取失败,直接返回 if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) { return; } int status ; // 若是_counter > 0, 不须要等待,这里再次检查_counter的值 if (_counter > 0) { // no wait needed _counter = 0; status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; // 插入一个写内存屏障,保证可见性,具体实现见下方 OrderAccess::fence(); return; } // 省略assert代码 OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */); // 设置JavaThread的_suspend_equivalent为true,表示线程被暂停 jt->set_suspend_equivalent(); // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self() assert(_cur_index == -1, "invariant"); if (time == 0) { _cur_index = REL_INDEX; // arbitrary choice when not timed // 让线程等待_cond[_cur_index]信号,到这里线程进入等待状态 status = pthread_cond_wait (&_cond[_cur_index], _mutex) ; } else { _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX; // 线程进入有超时时间的等待,内部实现调用了pthread_cond_timedwait系统调用 status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ; if (status != 0 && WorkAroundNPTLTimedWaitHang) { pthread_cond_destroy (&_cond[_cur_index]) ; pthread_cond_init (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr()); } } _cur_index = -1; // 省略assert代码 // _counter从新设置为0 _counter = 0 ; // 释放互斥锁 status = pthread_mutex_unlock(_mutex) ; assert_status(status == 0, status, "invariant") ; // 插入写屏障 OrderAccess::fence(); // 省略额外检查代码 } // OrderAccess::fence 在linux平台的实现 inline void OrderAccess::fence() { // 若是是多核cpu if (os::is_MP()) { // always use locked addl since mfence is sometimes expensive // 判断是否AMD64 CPU,汇编代码实现写屏障 #ifdef AMD64 __asm__ volatile ("lock; addl $0,0(%%rsp)" : : : "cc", "memory"); #else __asm__ volatile ("lock; addl $0,0(%%esp)" : : : "cc", "memory"); #endif } } 复制代码
上面分析了park的实现原理,有了前面unpark方法分析的知识铺垫,park方法应该很容易看懂。
经过LockSupport的源码阅读,能够总结出一下几点
最后,仍是想提一下Java层关于线程状态的小知识,可能有些同窗会不是特别清楚,因此仍是作个总结。 Java线程状态有如下6种。
关于并发,咱们软件工程师要作的,就是控制线程在这几个状态间正确转换,所谓“工欲善其事,必先利其器”,JDK提供的各类并发工具类,咱们只有深刻了解它们,才能灵活高效的运用,这也是我记录"啃透Java并发"系列文章的初心,与君共勉!