Java并发——基石篇(上)

概要

并行是这个时代的主旋律,也是不少现代操做系统须要提供的必备功能,在过去摩尔定律催生下,单个CPU核心计算的速度愈来愈快。可是随着产业的发展,单个CPU核心的计算上限已经难以突破,传统的增强单核的思惟模式已经不能知足需求。在古代,人们须要强大的战马来驱动战车,为了可以使得战斗力愈来愈强,人们驯化了愈来愈强劲的战马,可是单匹马的力量始终是有限的,所以人们发明了多马并驾的战车结构。一样地,在现代计算机领域,人们在单个CPU核心能力有限的状况下,使用多个核心的CPU进行并行计算以驱动强大的算力。
可是,多CPU和多战马是远远不一样的,在现实世界中的计算任务大多须要相互协调,其根本缘由是人类的思惟方式是线性串行的,设计一个彻底并行的计算逻辑体系仍是有至关大难度的。java

如何设计一个高并发的程序,不只仅是工程界的难题,在计算机学术界也是一个须要不断突破的研究领域。从学术理论提出,到算法设计,再到工程实施,再到长夜验证调优,整个流程都须要比较长的时间来进行迭代,究其根本,并行计算自己救赎很是复杂,不肯定的,不可预测的逻辑系统。linux

多核系统中的一致性

Java号称一次编写,处处运行,其自己也是构建在不一样的系统之上的,以其运行时JVM来屏蔽系统底层的差别。所以,在介绍Java并发体系以前,有必要简要介绍依稀计算机系统层面上的并发,以及面对的问题。程序员

咱们的目的其实很简单,就是让计算机在同一时刻,可以运行更多的任务。而并行计算,提供了很是不错的解决方案。虽然这看起来很天然,但实际上面临着众多的问题,其中一个重大的问题就是绝大多数的计算不只仅是CPU一我的的事,而是须要不少计算机系统部件共同参与。可是咱们知道,计算机系统中运行速度最快的就是CPU,其余部件例如:内存、磁盘、网络等等都是及其缓慢的,同时这些操做在目前的计算机体系中是很难消除的,由于咱们不可能仅仅靠寄存器就完成全部的计算任务。面对高速CPU和低速存储之间的鸿沟,若是想要实现高效数据通信,一个良好的解决方案就是在他们之间台南佳一个cache层,这个cache层的速度和总体的速度关系以下:算法

CPU --> cache --> 存储

经过cache这个缓冲地带,实现CPU和存储之间的高效沟通,这是计算机和软件领域通用的一个问题解决问题:增长中间层,若是一个中间层解决不了,那就两层。在运算的时候,CPU将须要使用到的数据复制到cache中,之后每次获取数据都较为快速的从cache中获取,加快访问速度。缓存

所谓理想很丰满,现实很骨感。这种计算体系有一个重要的问题须要解决,那就是:缓存一致性(cache coherence)问题。在现代的计算机系统中,主要都是多核系统为主。在这些计算机系统中,每个CPU都拥有本身独立的高速缓存,可是由于主存只有一个,所以他们之间只能共享,这种系统也被称为:共享内存多核系统(Shared-Menory multiprocessors System)。服务器

同时为了保证CPU数据存储的一致性,须要定义一个统一的缓存一致性协议,这类协议有不少,例如:MSI、MESI、MOSI、Synapse、Firefly以及Dragon Protocol等等。因此,一般状况下,共享内存多核系统的架构以下:
image网络

除了使用高速cache来缓存CPU和存储设备之间的速度鸿沟,为了可以充分利用多核CPU的处理性能,处理在实际执行机器指令时并不必定会按照程序设定的指令顺序执行,可能存在代码乱序执行(Out-Of_Order Execution)优化。可是,仅仅只是在代码层面上乱序执行,系统会保证执行的结果逻辑正确,从宏观上看就好像是顺序执行同样。架构

Java内存模型

上面咱们探讨了共享内存多核系统的内存模型,咱们提到了高速缓存以及缓存一致性问题,同时还介绍了指令乱序执行的问题。其实,这些概念在Java中也是存在的。由于Java的目标是:一次编写,处处运行,所以必须在JVM层面上将系统之间的差别屏蔽掉。面对如此多的系统,最好的方式就是定义一套Java本身的内存访问模型,而后在不一样的硬件平台和操做系统上分别利用本地接口来实现。这里的思想其实和增长cache是同样的,经过增长中间层来解决系统差别带来的协做问题。并发

Java工做内存和主存之间的一致性保证主要经过如下4种操做完成:app

  1. read:Java执行引擎访问引擎访问本地工做内存中的变量副本,若是变量副本无效(变量副本不存在也是无效的一种),那就去主存中获取,同时在本地工做内存中缓存一份
  2. write:Java执行引擎将最新的变量值赋值给工做内存中的变量副本,同时须要判断是否须要将这个新的值当即同步到主内存,若是须要同步的话,还须要配合lock操做
  3. lock:Java执行引擎将主内存中的变量锁定,锁定的含义有:其余的线程在此以后不能访问这个变量直到本线程unlock;一旦锁定,其余线程对这个变量的操做必须等待
  4. unlock:Java执行引擎将主内存中的变量解锁,解锁以后各个线程才能从新并发访问这个变量,直到变量被某个线程再次锁定

Java Thread建立

在Java中,咱们都知道,一个线程直接对应了一个Thread对象。建立和启动一个线程是比较容易的,咱们只须要建立一个Thread对象,而后调用对象的start方法便可。可是在建立一个Thread对象和启动线程JVM中究竟发生了什么?本节咱们就来看下。

在建立一个Thread对象的时候,除了一些初始化设置以外就没有其余实质性的操做,真正的工做实际上是在start方法调用中产生的。

Java经过registerNatives方法将Thread类中的java方法和一个本地的C/C++函数进行对应,同时registerNatives方法是类加载的时候调用的,所以在类首次加载的时候(Bootstarp类加载)就会注册这些native方法。

/* Make sure registerNatives is the first thing <clinit> does. */
private static native void registerNatives();
static {
    registerNatives();
}
static JNINativeMethod methods[] = {
    {"start0",           "()V",        (void *)&JVM_StartThread},
    {"stop0",            "(" OBJ ")V", (void *)&JVM_StopThread},
    {"isAlive",          "()Z",        (void *)&JVM_IsThreadAlive},
    {"suspend0",         "()V",        (void *)&JVM_SuspendThread},
    {"resume0",          "()V",        (void *)&JVM_ResumeThread},
    {"setPriority0",     "(I)V",       (void *)&JVM_SetThreadPriority},
    {"yield",            "()V",        (void *)&JVM_Yield},
    {"sleep",            "(J)V",       (void *)&JVM_Sleep},
    {"currentThread",    "()" THD,     (void *)&JVM_CurrentThread},
    {"countStackFrames", "()I",        (void *)&JVM_CountStackFrames},
    {"interrupt0",       "()V",        (void *)&JVM_Interrupt},
    {"isInterrupted",    "(Z)Z",       (void *)&JVM_IsInterrupted},
    {"holdsLock",        "(" OBJ ")Z", (void *)&JVM_HoldsLock},
    {"getThreads",        "()[" THD,   (void *)&JVM_GetAllThreads},
    {"dumpThreads",      "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
    {"setNativeName",    "(" STR ")V", (void *)&JVM_SetNativeThreadName},
};
JNIEXPORT void JNICALL
Java_java_lang_Thread_registerNatives(JNIEnv *env, jclass cls)
{
    (*env)->RegisterNatives(env, cls, methods, ARRAY_LENGTH(methods));
}

再看看对应JNI的结构体

/*
 * used in RegisterNatives to describe native method name, signature, * and function pointer. */
typedef struct {
    char *name;
    char *signature;
    void *fnPtr;
} JNINativeMethod;

即第一列是Java中定义的native方法名称,第二列是Java方法签名,第三列是本地方法对应函数。所以,Java中的start方法就是对应native的JVM——StartThread函数:

JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
  JVMWrapper("JVM_StartThread");
  JavaThread *native_thread = NULL;

  // We cannot hold the Threads_lock when we throw an exception,   // due to rank ordering issues. Example:  we might need to grab the   // Heap_lock while we construct the exception.   bool throw_illegal_thread_state = false;

  // We must release the Threads_lock before we can post a jvmti event   // in Thread::start.   {
    // Ensure that the C++ Thread and OSThread structures aren't freed before     // we operate.     MutexLocker mu(Threads_lock);

    // Since JDK 5 the java.lang.Thread threadStatus is used to prevent     // re-starting an already started thread, so we should usually find     // that the JavaThread is null. However for a JNI attached thread     // there is a small window between the Thread object being created     // (with its JavaThread set) and the update to its threadStatus, so we     // have to check for this     if (java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)) != NULL) {
      throw_illegal_thread_state = true;
    } else {
      // We could also check the stillborn flag to see if this thread was already stopped, but       // for historical reasons we let the thread detect that itself when it starts running 
      jlong size =
             java_lang_Thread::stackSize(JNIHandles::resolve_non_null(jthread));
      // Allocate the C++ Thread structure and create the native thread.  The       // stack size retrieved from java is 64-bit signed, but the constructor takes       // size_t (an unsigned type), which may be 32 or 64-bit depending on the platform.       //  - Avoid truncating on 32-bit platforms if size is greater than UINT_MAX.       //  - Avoid passing negative values which would result in really large stacks.       NOT_LP64(if (size > SIZE_MAX) size = SIZE_MAX;)
      size_t sz = size > 0 ? (size_t) size : 0;
      // 重点看这里!!!       native_thread = new JavaThread(&thread_entry, sz);

      // At this point it may be possible that no osthread was created for the       // JavaThread due to lack of memory. Check for this situation and throw       // an exception if necessary. Eventually we may want to change this so       // that we only grab the lock if the thread was created successfully -       // then we can also do this check and throw the exception in the       // JavaThread constructor.       if (native_thread->osthread() != NULL) {
        // Note: the current thread is not being used within "prepare".         native_thread->prepare(jthread);
      }
    }
  }

  if (throw_illegal_thread_state) {
    THROW(vmSymbols::java_lang_IllegalThreadStateException());
  }

  assert(native_thread != NULL, "Starting null thread?");

  if (native_thread->osthread() == NULL) {
    // No one should hold a reference to the 'native_thread'.     native_thread->smr_delete();
    if (JvmtiExport::should_post_resource_exhausted()) {
      JvmtiExport::post_resource_exhausted(
        JVMTI_RESOURCE_EXHAUSTED_OOM_ERROR | JVMTI_RESOURCE_EXHAUSTED_THREADS,
        os::native_thread_creation_failed_msg());
    }
    THROW_MSG(vmSymbols::java_lang_OutOfMemoryError(),
              os::native_thread_creation_failed_msg());
  }

  Thread::start(native_thread);

JVM_END

这段代码的主要做用是建立一个JavaThread对象并启动。咱们进入建立JavaThread构造函数

JavaThread::JavaThread(ThreadFunction entry_point, size_t stack_sz) :
                       Thread() {
  initialize();
  _jni_attach_state = _not_attaching_via_jni;
  set_entry_point(entry_point);
  // Create the native thread itself.   // %note runtime_23   os::ThreadType thr_type = os::java_thread;
  thr_type = entry_point == &compiler_thread_entry ? os::compiler_thread :
                                                     os::java_thread;
  // 经过 os 类的 create_thread 函数来建立一个线程   os::create_thread(this, thr_type, stack_sz);
  // The _osthread may be NULL here because we ran out of memory (too many threads active).   // We need to throw and OutOfMemoryError - however we cannot do this here because the caller   // may hold a lock and all locks must be unlocked before throwing the exception (throwing   // the exception consists of creating the exception object & initializing it, initialization   // will leave the VM via a JavaCall and then all locks must be unlocked).   //   // The thread is still suspended when we reach here. Thread must be explicit started   // by creator! Furthermore, the thread must also explicitly be added to the Threads list   // by calling Threads:add. The reason why this is not done here, is because the thread   // object must be fully initialized (take a look at JVM_Start) }

能够看到,重点是经过os类的create_thread函数来建立一个线程,由于JVM是跨平台的,而且不一样操做系统上的线程实现机制可能不太同样,所以这里的create_thread确定会有多个针对不一样平台的实现,咱们查看这个函数的实现就知道了:
image
能够看到,HotSpot提供了主要的操做系统上的实现,由于在服务器上,linux的占比是很高的,所以咱们这里就看下linux上的实现:

bool os::create_thread(Thread* thread, ThreadType thr_type,
                       size_t req_stack_size) {
  ...
  // init thread attributes   pthread_attr_t attr;
  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
  // Calculate stack size if it's not specified by caller.   size_t stack_size = os::Posix::get_initial_stack_size(thr_type, req_stack_size);
  // In the Linux NPTL pthread implementation the guard size mechanism   // is not implemented properly. The posix standard requires adding   // the size of the guard pages to the stack size, instead Linux   // takes the space out of 'stacksize'. Thus we adapt the requested   // stack_size by the size of the guard pages to mimick proper   // behaviour. However, be careful not to end up with a size   // of zero due to overflow. Don't add the guard page in that case.   size_t guard_size = os::Linux::default_guard_size(thr_type);
  if (stack_size <= SIZE_MAX - guard_size) {
    stack_size += guard_size;
  }
  assert(is_aligned(stack_size, os::vm_page_size()), "stack_size not aligned");

  int status = pthread_attr_setstacksize(&attr, stack_size);
  assert_status(status == 0, status, "pthread_attr_setstacksize");

  // Configure glibc guard page.   pthread_attr_setguardsize(&attr, os::Linux::default_guard_size(thr_type));
  ...
  pthread_t tid;
  // 建立并启动线程   int ret = pthread_create(&tid, &attr, (void* (*)(void*)) thread_native_entry, thread);
  ...
}

这个函数比较长,这里就省略部分,只保留和线程建立启动相关的部分,能够看到,在linux平台上,JVM的线程是经过大名鼎鼎的pthread库来建立启动线程的,这里须要注意的是,在指定线程栈大小的时候,并非程序员指定多少就是多少,而是要根据系统平台的限制来综合决定的。咱们也能够得出结论,Java Thread在底层对应一个pthread线程。咱们看下pthread建立并启动线程的接口:

int thread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg);

第一个是pthread_t结构体数据指针,存放线程信息,第二个是线程的属性,第三个是线程体,也就是线程实际执行的函数,第四个是线程体的参数列表。
上面调用这个接口的地方,咱们指定了线程体函数是thread_native_entry,参数是thread指针。咱们先看下thread_native_entry这个函数的定义:

// Thread start routine for all newly created threads static void *thread_native_entry(Thread *thread) {
  ...
  // call one more level start routine   thread->run();
  ...
}

一样,这里只保留了重点代码,经过注释咱们能够知道,thread->run()这一行是最可能执行咱们run方法的地方。咱们看一下代码:

// The first routine called by a new Java thread void JavaThread::run() {
  ...
  // We call another function to do the rest so we are sure that the stack addresses used   // from there will be lower than the stack base just computed   thread_main_inner();
}

这里重点是调用了thread_main_inner函数:

void JavaThread::thread_main_inner() {
  assert(JavaThread::current() == this, "sanity check");
  assert(this->threadObj() != NULL, "just checking");

  // Execute thread entry point unless this thread has a pending exception   // or has been stopped before starting.   // Note: Due to JVM_StopThread we can have pending exceptions already!   if (!this->has_pending_exception() &&
      !java_lang_Thread::is_stillborn(this->threadObj())) {
    {
      ResourceMark rm(this);
      this->set_native_thread_name(this->get_thread_name());
    }
    HandleMark hm(this);
    // 这里开始调用 java thread 的 run 方法啦~~~     this->entry_point()(this, this);
  }

  DTRACE_THREAD_PROBE(stop, this);

  // java 中的 run 方法执行完毕了,这里须要退出线程并清理资源   this->exit(false);
  // delete cpp 的对象   this->smr_delete();
}

能够看到,Java Thread中的run方法就是在this->entry_point()(this,this);这里调用的。看这里的调用方式就知道,entry_point()返回的是一个函数指针,而后直接调用,entry_point函数实现以下:

ThreadFunction entry_point() const             { return _entry_point; }

那么_entry_point是哪里来的?咱们再看上面JavaThread的构造函数,咱们发现了一个方法set_entry_point(entry_point),_entry_point就是咱们建立JavaThread对象时传入的函数指针。

相关文章
相关标签/搜索