啃透Java并发-LockSupport源码详解

Java1.5加入的JUC并发包,就像一把好用的瑞士军刀,极大的丰富了Java处理并发的手段,但JUC并不简单,有必定的学习成本,我曾经也断断续续看过一些JUC的实现源码,可是既不系统也不够深刻,此次决定从新出发,从新拜读大师Doug Lea的神做,因此本身也是抱着以学代练的心态,记录本身的学习心得,不免有理解不到位的地方,你们轻喷哈。java

为何要阅读源码

不知道你有没有这样的感受,在使用JUC中提供的工具类处理并发时,有一种死记硬背的感受,好比LockSupport应该怎么用,CountDownLatch能干吗,但并不清楚其实现原理,只知道how不知道why,这种状态有二个比较大的问题。linux

  • 死记硬背,比较容易遗忘,在工做中使用容易挖坑,风险大
  • 对JUC的认识不够深刻,知识不可以造成体系,难以融会贯通,灵活运用

那要深刻,最直接有效的办法就是阅读源码!windows

为何要先解析LockSupport

咱们知道JUC看似有不少类,结构错综复杂,可是若是要从中挑出最重要的一个类,那必定是队列同步器AbstractQueuedSynchronizer, 而AbstractQueuedSynchronizer又是利用LockSupport来控制线程的状态,从而达到线程在等待唤醒之间切换的目的。而咱们处理并发,重点就是管理线程的状态,因此理解LockSupport是很重要的一个基础。bash

LockSupoort的简单使用

先来看一个简单的例子并发

public static void main(String[] args) {

        Thread worker = new Thread(() -> {
            LockSupport.park();
            System.out.println("start work");
        });

        worker.start();

        System.out.println("main thread sleep");
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        LockSupport.unpark(worker);

        try {
            worker.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    // 最终控制台输出结果
    main thread sleep
    start work
复制代码

启动一个worker线程,主线程先sleep 500ms,worker线程由于调用了LockSupport的park,会等待,直到主线程sleep结束,调用unpark唤醒worker线程。那么在JUC以前,咱们经常使用的让线程等待的方法以下app

Object monitor = new Object();
        synchronized (monitor) {
            try {
                monitor.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

复制代码

主要有三点区别工具

  1. LockSupport.park和unpark不须要在同步代码块中,wait和notify是须要的。
  2. LockSupport的pork和unpark是针对线程的,而wait和notify是能够是任意对象。
  3. LockSupport的unpark可让指定线程被唤醒,可是notify是随机唤醒一个,notifyAll是所有唤醒,不够灵活。

LockSupport源码解读

前面只是铺垫,如今来到咱们的主菜,解读LockSupport的park和unpark方法,固然还有一些其余相似的重载方法,如parkUntil,parkNanos,它们的大致原理相似,感兴趣你们能够自行查阅源码。oop

这篇文章以及后续的文章,分析的源码都基于Open Jdk 8。学习

LockSupport.unpark

为何先讲unpark方法,由于unpark代码量少一些,相对简单,柿子先捡软的捏-。-ui

//java.util.concurrent.locks.LockSupport.java
    public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }
    
    //
    UNSAFE = sun.misc.Unsafe.getUnsafe();
复制代码

参数thread是咱们要唤醒的目标线程,先判空,而后调用UNSAFE.unpark,UNSAFE是Unsafe对象,不要被这个名字吓到,这个类提供了不少有用的方法,以前的文章也有提到过,好比获取类对象中属性的内存偏移地址,还有 CAS操做等。可是这个Unsafe对象必须使用反射获得而后才能正常使用,由于getUnsafe方法有判断当前类加载器是否是BootStrapClassLoader。咱们继续查看Unsafe类unpark的实现。

// Unsafe.java
public native void unpark(Object thread);
复制代码

能够看到unpark是一个native方法,它的native实现是在 hotspot\src\share\vm\prims\unsafe.cpp 看下代码实现,

UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread))
  UnsafeWrapper("Unsafe_Unpark");
  // 声明一个Parker对象p,它是真正干活的对象
  Parker* p = NULL;
  if (jthread != NULL) {
    // 根据传入的jthread对象,来获取native层的oopDesc*对象,oop是oopDesc* 的宏定义
    oop java_thread = JNIHandles::resolve_non_null(jthread);
    if (java_thread != NULL) {
      // 获取java_thread对象中_park_event_offset的值,该值就是Parker对象的地址
      jlong lp = java_lang_Thread::park_event(java_thread);
      if (lp != 0) {
        // 若是地址有效,直接转为Parker指针
        p = (Parker*)addr_from_java(lp);
      } else {
        // 若是地址无效
        MutexLocker mu(Threads_lock);
        java_thread = JNIHandles::resolve_non_null(jthread);
        if (java_thread != NULL) {
          // 转为native层的JavaThread对象 
          JavaThread* thr = java_lang_Thread::thread(java_thread);
          if (thr != NULL) {
            // 将JavaThread的成员变量_parker赋值给p
            p = thr->parker();
            if (p != NULL) { // Bind to Java thread for next time.
              // 将p的地址赋值给_park_event_offset,下次获取时可用
              java_lang_Thread::set_park_event(java_thread, addr_to_java(p));
            }
          }
        }
      }
    }
  }
  if (p != NULL) {
  // 这个USDT2的宏,暂时我也不清楚是干啥的,不过不影响咱们的分析,咱们先忽略
#ifndef USDT2
    HS_DTRACE_PROBE1(hotspot, thread__unpark, p);
#else /* USDT2 */
    HOTSPOT_THREAD_UNPARK(
                          (uintptr_t) p);
#endif /* USDT2 */
    // 真正干货的方法,调用了Parker的unpark方法
    p->unpark();
  }
复制代码

根据上面的代码,咱们须要知道二个native层的类,JavaThread类和Parker类

class JavaThread: public Thread {
private:
  JavaThread*    _next;                          // The next thread in the Threads list
  oop            _threadObj;                     // The Java level thread object
 // 省略代码...
private:
  Parker*    _parker;
public:
  Parker*     parker() { return _parker; }
  // 省略代码...
复制代码

JavaThread类很长,这里只列出几个成员变量,如今只须要知道它是native层的Thread,成员变量_threadObj是Java层的thread对象,经过它native层能够调用Java层的代码。咱们继续重点看下Parker类的实现。

class Parker : public os::PlatformParker {
private:
  volatile int _counter ;
  // 省略代码...
  }
复制代码

咱们重点关注_counter字段,能够简单理解为_counter字段 > 0时,能够通行,即park方法会直接返回,另外park方法返回后,_counter会被赋值为0,unpark方法能够将_counter置为1,而且唤醒当前等待的线程。

能够看到Parker的父类是os::PlatformParker,那这个类又是干吗的呢?这里先插个题外话, 咱们都知道,Java是跨平台的,咱们在应用层定义的Thread确定依赖于具体的平台,不一样的平台有不一样实现,好比Linux是一套代码,Windows是另一套,那咱们就能理解了,PlatformParker根据平台有不一样的实现。在OpenJdk8的实现中支持5个平台

  • aix
  • bsd
  • linux
  • solaris
  • windows

咱们知道Linux是如今使用比较普遍的操做系统,好比熟知的Android是基于Linux内核,因此这里咱们就挑选Linux来分析吧。对应的文件路径hotspot\src\os\linux\vm\os_linux.cpp

void Parker::unpark() {
  int s, status ;
  // 先进入_mutex的临界区,声明以下
  // pthread_mutex_t _mutex [1] ;
  // pthread_cond_t  _cond  [2] ; 
  status = pthread_mutex_lock(_mutex);
  assert (status == 0, "invariant") ;
  s = _counter;
  // 将_counter置为1
  _counter = 1;
  // s记录的是unpark以前的_counter数,若是s < 1,说明有可能该线程在等待状态,须要唤醒。
  if (s < 1) {
    // thread might be parked
    // _cur_index表明被使用cond的index
    if (_cur_index != -1) {
      // thread is definitely parked
      // 根据虚拟机参数WorkAroundNPTLTimedWaitHang来作不一样的处理,默认该参数是1
      if (WorkAroundNPTLTimedWaitHang) {
        // 先唤醒目标等待的线程
        status = pthread_cond_signal (&_cond[_cur_index]);
        assert (status == 0, "invariant");
        // 释放互斥锁,先唤醒后释放锁,可能会致使线程被唤醒后获取不到锁,再次进入等待状态,个人理解是效率可能会低一丢丢
        status = pthread_mutex_unlock(_mutex);
        assert (status == 0, "invariant");
      } else {
        // 先释放锁
        status = pthread_mutex_unlock(_mutex);
        assert (status == 0, "invariant");
        // 后发信号唤醒线程,唤醒操做在互斥代码块外部,感受这里可能会有风险,暂时还GET不到。。。
        status = pthread_cond_signal (&_cond[_cur_index]);
        assert (status == 0, "invariant");
      }
    } else {
      // 若是线程没有在等待,直接返回
      pthread_mutex_unlock(_mutex);
      assert (status == 0, "invariant") ;
    }
  } else {
    // 若是线程没有在等待,直接返回
    pthread_mutex_unlock(_mutex);
    assert (status == 0, "invariant") ;
  }
}
复制代码

代码很少,都加了注释,整体来讲就是根据Park类的成员变量_counter来作加锁解锁和唤醒操做,在Linux平台, 加锁用的pthread_mutex_lock,解锁是pthread_mutex_unlock,唤醒是pthread_cond_signal 。接下来解析LockSupport的park方法。

LockSupport.park

先看下Java层park方法的实现

public static void park() {
        UNSAFE.park(false, 0L);
    }
复制代码

Unsafe中的实现

public native void park(boolean var1, long var2);
复制代码

仍然是一个native方法,咱们继续跟进去看下

UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))
 // 省略代码...
  thread->parker()->park(isAbsolute != 0, time);
 // 省略代码...
复制代码

省略了非关键代码,重点是park方法,这个thread咱们前面已经遇到过,就是native层的JavaThread对象,而后调用Parker的park方法,继续跟进去linux平台的os_linux.cpp的实现

void Parker::park(bool isAbsolute, jlong time) {
  // 先原子的将_counter的值设为0,并返回_counter的原值,若是原值>0说明有通行证,直接返回
  if (Atomic::xchg(0, &_counter) > 0) return;

  Thread* thread = Thread::current();
  assert(thread->is_Java_thread(), "Must be JavaThread");
  JavaThread *jt = (JavaThread *)thread;

  // 判断线程是否已经被中断
  if (Thread::is_interrupted(thread, false)) {
    return;
  }

  // Next, demultiplex/decode time arguments
  timespec absTime;
  // park方法的传参是isAbsolute = false, time = 0,因此会继续往下走
  if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all return; } // 这里time为0,若是调用的是parkNanos或者parkUtil,这里time就会>0, if (time > 0) { // 若是time > 0,unpackTime计算absTime的时间 unpackTime(&absTime, isAbsolute, time); } ThreadBlockInVM tbivm(jt); // 再次判断线程是否被中断,若是没有被中断,尝试得到互斥锁,若是获取失败,直接返回 if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) { return; } int status ; // 若是_counter > 0, 不须要等待,这里再次检查_counter的值 if (_counter > 0) { // no wait needed _counter = 0; status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; // 插入一个写内存屏障,保证可见性,具体实现见下方 OrderAccess::fence(); return; } // 省略assert代码 OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */); // 设置JavaThread的_suspend_equivalent为true,表示线程被暂停 jt->set_suspend_equivalent(); // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self() assert(_cur_index == -1, "invariant"); if (time == 0) { _cur_index = REL_INDEX; // arbitrary choice when not timed // 让线程等待_cond[_cur_index]信号,到这里线程进入等待状态 status = pthread_cond_wait (&_cond[_cur_index], _mutex) ; } else { _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX; // 线程进入有超时时间的等待,内部实现调用了pthread_cond_timedwait系统调用 status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ; if (status != 0 && WorkAroundNPTLTimedWaitHang) { pthread_cond_destroy (&_cond[_cur_index]) ; pthread_cond_init (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr()); } } _cur_index = -1; // 省略assert代码 // _counter从新设置为0 _counter = 0 ; // 释放互斥锁 status = pthread_mutex_unlock(_mutex) ; assert_status(status == 0, status, "invariant") ; // 插入写屏障 OrderAccess::fence(); // 省略额外检查代码 } // OrderAccess::fence 在linux平台的实现 inline void OrderAccess::fence() { // 若是是多核cpu if (os::is_MP()) { // always use locked addl since mfence is sometimes expensive // 判断是否AMD64 CPU,汇编代码实现写屏障 #ifdef AMD64 __asm__ volatile ("lock; addl $0,0(%%rsp)" : : : "cc", "memory"); #else __asm__ volatile ("lock; addl $0,0(%%esp)" : : : "cc", "memory"); #endif } } 复制代码

上面分析了park的实现原理,有了前面unpark方法分析的知识铺垫,park方法应该很容易看懂。

收获和总结

经过LockSupport的源码阅读,能够总结出一下几点

  1. native层的JavaThread经过Parker的_counter来表示通行证,>0表示能够通行,若是_counter=0,调用park线程会等待,直到被unpark唤醒,若是先调用unpack,再调用park会直接返回,并消费掉_counter(设置为0)。
  2. Linux平台,线程等待和唤醒,加锁用的pthread_mutex_lock,解锁是pthread_mutex_unlock,唤醒是pthread_cond_signal,了解到这些内心就有数了,知其然知其因此然,何其快哉!

最后,仍是想提一下Java层关于线程状态的小知识,可能有些同窗会不是特别清楚,因此仍是作个总结。 Java线程状态有如下6种。

  1. NEW (新建的线程,尚未调用start)
  2. RUNNABLE (调用了start,正在运行,或者在等待操做系统的调度,分配CPU时间片)
  3. BLOCKED (synchronied,等待monitorEnter)
  4. WAITING (wait,LockSupport.park 会进入该状态)
  5. TIMED_WAITING (带等待时间的wait, LockSupport.parkNanos, parkUtil)
  6. TERMINATED (线程执行结束)

关于并发,咱们软件工程师要作的,就是控制线程在这几个状态间正确转换,所谓“工欲善其事,必先利其器”,JDK提供的各类并发工具类,咱们只有深刻了解它们,才能灵活高效的运用,这也是我记录"啃透Java并发"系列文章的初心,与君共勉!

相关文章
相关标签/搜索