这一篇主要围绕线程状态控制相关的操做分析线程的原理,好比线程的中断,线程的通讯等,内容比较多,可能会分两篇文章java
阿里面试系列导读:关注个人技术公众号【架构师修炼宝典】一周出产1-2篇技术文章。linux
【阿里面试系列】搞懂并发编程,轻松应对80%的面试场景git
【阿里面试系列】Java线程的应用及挑战
github
前面咱们简单分析过了线程的使用,经过调用线程的启动方法来启动线程,线程启动后会调用运行方法执行业务逻辑,运行方法执行完毕后,线程的生命周期也就终止了。
不少同窗最先学习线程的时候会比较疑惑,启动一个线程为何是调用启动方法,而不是运行方法,这作一个简单的分析,先简单看一下启动方法的定义面试
public class Thread implements Runnable {
...
public synchronized void start() {
/**
* This method is not invoked for the main method thread or "system"
* group threads created/set up by the VM. Any new functionality added
* to this method in the future may have to also be added to the VM.
*
* A zero status value corresponds to state "NEW".
*/
if (threadStatus != 0)
throw new IllegalThreadStateException();
/* Notify the group that this thread is about to be started
* so that it can be added to the group's list of threads * and the group's unstarted count can be decremented. */
group.add(this);
boolean started = false;
try {
start0(); //注意这里
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
private native void start0();//注意这里
...
复制代码
咱们看到调用启动方法其实是调用一个本地方法START0()来启动一个线程,首先START0()这个方法是在线程的静态块中来注册的,代码以下编程
public class Thread implements Runnable {
/* Make sure registerNatives is the first thing <clinit> does. */
private static native void registerNatives();
static {
registerNatives();
}
复制代码
这个registerNatives的做用是注册一些本地方法提供给Thread类来使用,好比start0(),isAlive(),currentThread(),sleep();这些都是你们很熟悉的方法.registerNatives
的本地方法的定义在文件Thread.c,
Thread.c定义了各个操做系统平台要用的关于线程的公共数据和操做,如下是Thread.c的所有内容设计模式
static JNINativeMethod methods[] = {
{"start0", "()V", (void *)&JVM_StartThread},
{"stop0", "(" OBJ ")V", (void *)&JVM_StopThread},
{"isAlive", "()Z", (void *)&JVM_IsThreadAlive},
{"suspend0", "()V", (void *)&JVM_SuspendThread},
{"resume0", "()V", (void *)&JVM_ResumeThread},
{"setPriority0", "(I)V", (void *)&JVM_SetThreadPriority},
{"yield", "()V", (void *)&JVM_Yield},
{"sleep", "(J)V", (void *)&JVM_Sleep},
{"currentThread", "()" THD, (void *)&JVM_CurrentThread},
{"countStackFrames", "()I", (void *)&JVM_CountStackFrames},
{"interrupt0", "()V", (void *)&JVM_Interrupt},
{"isInterrupted", "(Z)Z", (void *)&JVM_IsInterrupted},
{"holdsLock", "(" OBJ ")Z", (void *)&JVM_HoldsLock},
{"getThreads", "()[" THD, (void *)&JVM_GetAllThreads},
{"dumpThreads", "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
{"setNativeName", "(" STR ")V", (void *)&JVM_SetNativeThreadName},
};
#undef THD
#undef OBJ
#undef STE
#undef STR
JNIEXPORT void JNICALL
Java_java_lang_Thread_registerNatives(JNIEnv *env, jclass cls)
{
(*env)->RegisterNatives(env, cls, methods, ARRAY_LENGTH(methods));
}
复制代码
从这段代码能够看出,start0(),实际会执行JVM_StartThread方法,这个方法是干吗的呢?从名字上来看,彷佛是在JVM层面去启动一个线程,若是真的是这样,那么在JVM层面,必定会调用Java中定义的运行方法。那接下来继续去找找答案。咱们找到jvm.cpp这个文件;这个文件须要下载hotspot的源码才能找到。安全
JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_StartThread");
...
native_thread = new JavaThread(&thread_entry, sz);
...
复制代码
JVM_ENTRY是用来定义JVM_StartThread函数的,在这个函数里面建立了一个真正和平台有关的本地线程。本着打破砂锅查到底的原则,继续看看newJavaThread作了什么事情,继续寻找JavaThread的定义
在hotspot的源码中thread.cpp文件中1558行的位置能够找到以下代码性能优化
JavaThread::JavaThread(ThreadFunction entry_point, size_t stack_sz) :
Thread()
#if INCLUDE_ALL_GCS
, _satb_mark_queue(&_satb_mark_queue_set),
_dirty_card_queue(&_dirty_card_queue_set)
#endif // INCLUDE_ALL_GCS
{
if (TraceThreadEvents) {
tty->print_cr("creating thread %p", this);
}
initialize();
_jni_attach_state = _not_attaching_via_jni;
set_entry_point(entry_point);
// Create the native thread itself.
// %note runtime_23
os::ThreadType thr_type = os::java_thread;
thr_type = entry_point == &compiler_thread_entry ? os::compiler_thread :
os::java_thread;
os::create_thread(this, thr_type, stack_sz);
_safepoint_visible = false;
// The _osthread may be NULL here because we ran out of memory (too many threads active).
// We need to throw and OutOfMemoryError - however we cannot do this here because the caller
// may hold a lock and all locks must be unlocked before throwing the exception (throwing
// the exception consists of creating the exception object & initializing it, initialization
// will leave the VM via a JavaCall and then all locks must be unlocked).
//
// The thread is still suspended when we reach here. Thread must be explicit started
// by creator! Furthermore, the thread must also explicitly be added to the Threads list
// by calling Threads:add. The reason why this is not done here, is because the thread
// object must be fully initialized (take a look at JVM_Start)
}
复制代码
这个方法有两个参数,第一个是函数名称,线程建立成功以后会根据这个函数名称调用对应的函数;第二个是当前进程内已经有的线程数量。最后咱们重点关注与一下os :: create_thread,实际就是调用平台建立线程的方法来建立线程。
接下来就是线程的启动,会调用Thread.cpp文件中的Thread :: start(Thread * thread)方法,代码以下bash
void Thread::start(Thread* thread) {
trace("start", thread);
// Start is different from resume in that its safety is guaranteed by context or
// being called from a Java method synchronized on the Thread object.
if (!DisableStartThread) {
if (thread->is_Java_thread()) {
// Initialize the thread state to RUNNABLE before starting this thread.
// Can not set it after the thread started because we do not know the
// exact thread state at that time. It could be in MONITOR_WAIT or
// in SLEEPING or some other state.
java_lang_Thread::set_thread_status(((JavaThread*)thread)->threadObj(),
java_lang_Thread::RUNNABLE);
}
os::start_thread(thread);
}
}
复制代码
启动方法中有一个函数调用:os :: start_thread(thread);,调用平台启动线程的方法,最终会调用Thread.cpp文件中的JavaThread :: run()方法
// The first routine called by a new Java thread
void JavaThread::run() {
// initialize thread-local alloc buffer related fields
this->initialize_tlab();
// used to test validitity of stack trace backs
this->record_base_of_stack_pointer();
// Record real stack base and size.
this->record_stack_base_and_size();
// Initialize thread local storage; set before calling MutexLocker
this->initialize_thread_local_storage();
this->create_stack_guard_pages();
this->cache_global_variables();
// Thread is now sufficient initialized to be handled by the safepoint code as being
// in the VM. Change thread state from _thread_new to _thread_in_vm
ThreadStateTransition::transition_and_fence(this, _thread_new, _thread_in_vm);
assert(JavaThread::current() == this, "sanity check");
assert(!Thread::current()->owns_locks(), "sanity check");
DTRACE_THREAD_PROBE(start, this);
// This operation might block. We call that after all safepoint checks for a new thread has
// been completed.
this->set_active_handles(JNIHandleBlock::allocate_block());
if (JvmtiExport::should_post_thread_life()) {
JvmtiExport::post_thread_start(this);
}
EventThreadStart event;
if (event.should_commit()) {
event.set_javalangthread(java_lang_Thread::thread_id(this->threadObj()));
event.commit();
}
// We call another function to do the rest so we are sure that the stack addresses used
// from there will be lower than the stack base just computed
thread_main_inner();
// Note, thread is no longer valid at this point!
}
复制代码
这个方法中主要是作一系列的初始化操做,最后有一个方法thread_main_inner,接下来看看这个方法的逻辑是什么样的
void JavaThread::thread_main_inner() {
assert(JavaThread::current() == this, "sanity check");
assert(this->threadObj() != NULL, "just checking");
// Execute thread entry point unless this thread has a pending exception
// or has been stopped before starting.
// Note: Due to JVM_StopThread we can have pending exceptions already!
if (!this->has_pending_exception() &&
!java_lang_Thread::is_stillborn(this->threadObj())) {
{
ResourceMark rm(this);
this->set_native_thread_name(this->get_thread_name());
}
HandleMark hm(this);
this->entry_point()(this, this);
}
DTRACE_THREAD_PROBE(stop, this);
this->exit(false);
delete this;
}
复制代码
和主流程无关的代码我们先不去看,直接找到最核心的代码块this-> entry_point()(this,this);,这个entrypoint应该比较熟悉了,由于咱们在前面提到了,在:: JavaThread这个方法中传递的第一个参数,表明函数名称,线程启动的时候会调用这个函数。
若是你们尚未晕车的话,应该记得咱们在jvm.cpp文件中看到的代码,在建立native_thread = newJavaThread( &thread_entry,SZ); 的时候传递了一个threadentry函数,因此咱们在jvm.cpp中找到这个函数的定义以下
static void thread_entry(JavaThread* thread, TRAPS) {
{
HandleMark hm(THREAD);
Handle obj(THREAD, thread->threadObj());
JavaValue result(T_VOID);
JavaCalls::call_virtual(&result,
obj,
KlassHandle(THREAD, SystemDictionary::Thread_klass()),
vmSymbols::run_method_name(), //注意这里
vmSymbols::void_method_signature(),
THREAD);
}
复制代码
能够看到vmSymbols :: run_method_name()这个调用,其实就是经过回调方法调用Java线程中定义的运行方法,run_method_name是一个宏定义,在vmSymbols.hpp文件中能够找到以下代码
#define VM_SYMBOLS_DO(template, do_alias)
...
template(run_method_name, "run")
...
复制代码
因此结论就是,Java的里面建立线程以后必需要调用启动方法才能真正的建立一个线程,该方法会调用虚拟机启动一个本地线程,本地线程的建立会调用当前系统建立线程的方法进行建立,而且线程被执行的时候会回调跑方法进行业务逻辑的处理
线程的终止有主动和被动之分,被动表示线程出现异常退出或者运行方法执行完毕,线程会自动终止。主动的方式是Thread.stop()来实现线程的终止,可是中止()方法是一个过时的方法,官方是不建议使用,理由很简单,中止()方法在中介一个线程时不会保证线程的资源正常释放,也就是不会给线程完成资源释放工做的机会,至关于咱们在Linux的上经过kill -9强制结束一个进程。
那么如何安全的终止一个线程呢?
咱们先看一下下面的代码,代码演示了一个正确终止线程的方法,至于它的实现原理,稍后咱们再分析
public class InterruptedDemo implements Runnable{
@Override
public void run() {
long i=0l;
while(!Thread.currentThread().isInterrupted()){//notice here
i++;
}
System.out.println("result:"+i);
}
public static void main(String[] args) throws InterruptedException {
InterruptedDemo interruptedDemo=new InterruptedDemo();
Thread thread=new Thread(interruptedDemo);
thread.start();
Thread.sleep(1000);//睡眠一秒
thread.interrupt();//notice here
}
}
复制代码
代码中有两处须要注意,在主线程中,调用了线程的interrupt()方法,在运行方法中,而循环中经过Thread.currentThread()。isInterrupted()来判断线程中断的标识。因此咱们在这里猜测一下,应该是在线程中维护了一个中断标识,经过thread.interrupt()方法去改变了中断标识的值使得运行方法中而循环的判断不成立而跳出循环,所以运行方法执行完毕之后线程就终止了。
咱们来看一下thread.interrupt()方法作了什么事情
public class Thread implements Runnable {
...
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(this);
return;
}
}
interrupt0();
}
...
复制代码
这个方法里面,调用了中断0(),这个方法在前面分析启动方法的时候见过,是一个本机方法,这里就再也不重复贴代码了,一样,咱们找到jvm.cpp文件,找到JVM_Interrupt的定义
JVM_ENTRY(void, JVM_Interrupt(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_Interrupt");
// Ensure that the C++ Thread and OSThread structures aren't freed before we operate oop java_thread = JNIHandles::resolve_non_null(jthread); MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock); // We need to re-resolve the java_thread, since a GC might have happened during the // acquire of the lock JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)); if (thr != NULL) { Thread::interrupt(thr); } JVM_END 复制代码
这个方法比较简单,直接调用了Thread :: interrupt(thr)这个方法,这个方法的定义在Thread.cpp文件中,代码以下
void Thread::interrupt(Thread* thread) {
trace("interrupt", thread);
debug_only(check_for_dangling_thread_pointer(thread);)
os::interrupt(thread);
}
复制代码
Thread :: interrupt方法调用了os :: interrupt方法,这个是调用平台的中断方法,这个方法的实现是在os _ * .cpp文件中,其中星号表明的是不一样平台,由于jvm是跨平台的,因此对于不一样的操做平台,线程的调度方式都是不同的。咱们以os_linux.cpp文件为例
void os::interrupt(Thread* thread) {
assert(Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");
//获取本地线程对象
OSThread* osthread = thread->osthread();
if (!osthread->interrupted()) {//判断本地线程对象是否为中断
osthread->set_interrupted(true);//设置中断状态为true
// More than one thread can get here with the same value of osthread,
// resulting in multiple notifications. We do, however, want the store
// to interrupted() to be visible to other threads before we execute unpark().
//这里是内存屏障,这块在后续的文章中会剖析;内存屏障的目的是使得interrupted状态对其余线程当即可见
OrderAccess::fence();
//_SleepEvent至关于Thread.sleep,表示若是线程调用了sleep方法,则经过unpark唤醒
ParkEvent * const slp = thread->_SleepEvent ;
if (slp != NULL) slp->unpark() ;
}
// For JSR166. Unpark even if interrupt status already was set
if (thread->is_Java_thread())
((JavaThread*)thread)->parker()->unpark();
//_ParkEvent用于synchronized同步块和Object.wait(),这里至关于也是经过unpark进行唤醒
ParkEvent * ev = thread->_ParkEvent ;
if (ev != NULL) ev->unpark() ;
}
复制代码
经过上面的代码分析能够知道,了Thread.interrupt()方法实际就是设置一个中断状态标识为真,而且经过ParkEvent的取消驻留方法来唤醒线程。
这里给你们普及一个知识点,为何的Object.wait,的Thread.sleep和的Thread.join都会抛出InterruptedException的?首先,这个异常的意思是表示一个阻塞被其余线程中断了。而后,因为线程调用了中断()中断方法,那么的Object.wait,的Thread.sleep等被阻塞的线程被唤醒之后会经过is_interrupted方法判断中断标识的状态变化,若是发现中断标识为真,则先清除中断标识,而后抛出InterruptedException的
须要注意的是,InterruptedException的异常的抛出并不意味着线程必须终止,而是提醒当前线程有中断的操做发生,至于接下来怎么处理取决于线程自己,好比
为了让你们可以更好的理解上面这段话,咱们以了Thread.sleep为例直接从JDK的源码中找到中断标识的清除以及异常抛出的方法代码
找到is_interrupted()方法,linux平台中的实如今os_linux.cpp文件中,代码以下
bool os::is_interrupted(Thread* thread, bool clear_interrupted) {
assert(Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");
OSThread* osthread = thread->osthread();
bool interrupted = osthread->interrupted(); //获取线程的中断标识
if (interrupted && clear_interrupted) {//若是中断标识为true
osthread->set_interrupted(false);//设置中断标识为false
// consider thread->_SleepEvent->reset() ... optional optimization
}
return interrupted;
}
复制代码
找到了Thread.sleep这个操做在JDK中的源码体现,怎么找?相信若是前面你们有认真看的话,应该能很快找到,代码在jvm.cpp文件中
JVM_ENTRY(void, JVM_Sleep(JNIEnv* env, jclass threadClass, jlong millis))
JVMWrapper("JVM_Sleep");
if (millis < 0) {
THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "timeout value is negative");
}
//判断并清除线程中断状态,若是中断状态为true,抛出中断异常
if (Thread::is_interrupted (THREAD, true) && !HAS_PENDING_EXCEPTION) {
THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
}
// Save current thread state and restore it at the end of this block.
// And set new thread state to SLEEPING.
JavaThreadSleepState jtss(thread);
...
复制代码
注意上面加了中文注释的地方的代码,先判断is_interrupted的状态,而后抛出一个InterruptedException的异常。到此为止,咱们就已经分析清楚了中断的整个流程。
了解了线程。中断方法的做用之后,再回过头来看Java中Thread.currentThread()。isInterrupted()这段代码,就很好理解了。因为前者先设置了一个中断标识为真,因此isInterrupted( )这个方法的返回值为真,故而不知足而循环的判断条件致使退出循环。这里有必要再提一句,就是这个线程中断标识有两种方式复位,第一种是前面提到过的InterruptedException的;另外一种是经过Thread.interrupted()对当前线程的中断标识进行复位。