Java Object类提供了一个基于native实现的wait和notify线程间通信的方式,这是除了synchronized以外的另一块独立的并发基础部分,有关wait和notify·的部份内容,咱们在上面分析monitor的exit的时候已经有一些涉及,可是并无过多的深刻,留下了很多的疑问,本小节会详细分析。java
ObjectMonitor类中的wait函数代码实现以下:node
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) { ... if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) { ... // 抛出异常,不会直接进入等待 THROW(vmSymbols::java_lang_InterruptedException()); ... } ... ObjectWaiter node(Self); node.TState = ObjectWaiter::TS_WAIT; Self->_ParkEvent->reset(); OrderAccess::fence(); Thread::SpinAcquire(&_WaitSetLock, "WaitSet - add"); AddWaiter(&node); Thread::SpinRelease(&_WaitSetLock); if ((SyncFlags & 4) == 0) { _Responsible = NULL; } ... // exit the monitor exit(true, Self); ... if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) { // Intentionally empty } else if (node._notified == 0) { if (millis <= 0) { Self->_ParkEvent->park(); } else { ret = Self->_ParkEvent->park(millis); } } // 被 notify 唤醒以后的善后逻辑 ... }
照例只列出wait函数的核心功能部分,首先会判断一下当前线程是否为可中断而且是否已经被中断,若是是的话会直接抛出InterruptedException异常,而不会进入wait等待,不然的话,就须要执行下面的等待过程,首先会根据Self当前线程新建一个ObjectWaiter对象节点,这个对象咱们在前面分析monitor的enter的视乎就已经见过了。生成一个新的节点以后就是须要将这个节点放到等待队列中,经过调用AddWaiter函数实现node的入队操做,不过在入队操做以前须要得到互斥锁以保证并发安全:linux
void Thread::SpinAcquire(volatile int * adr, const char * LockName) { if (Atomic::cmpxchg (1, adr, 0) == 0) { return; // normal fast-path return } // Slow-path : We've encountered contention -- Spin/Yield/Block strategy. TEVENT(SpinAcquire - ctx); int ctr = 0; int Yields = 0; for (;;) { while (*adr != 0) { ++ctr; if ((ctr & 0xFFF) == 0 || !os::is_MP()) { if (Yields > 5) { os::naked_short_sleep(1); } else { os::naked_yield(); ++Yields; } } else { SpinPause(); } } if (Atomic::cmpxchg(1, adr, 0) == 0) return; } }
SpinAcquire是一个自旋锁实现,它经过一个死循环不断经过cas检查判断是否得到锁,这里开始会经过一个cas检查看下是否可以成功,若是成功的话就不用进行下面比较重量级的spin过程,若是获取失败,就须要进入下面的spin过程,这里的spin逻辑是一个比较有意思的算法。这里定义了一个ctr变量,其实就是counter计数器的意思,(ctr&0xFFF)==0|| !os::is_MP()这个条件比较有意思,意思是若是我尝试的次数大于)0xfff,或者当前系统是一个单核处理器系统,那么就执行下面的逻辑。能够看到这里的spin是有必定的限度的,首先开始的时候,若是是多核系统,那么会直接执行SpinPause,咱们看下SpinPause函数的实现,这个函数是实现CPU的忙等待,所以会有不一样系统和CPU架构的对应实现。SpinPause函数linux平台代码以下:算法
int SpinPause() { return 0; }
即SpinPause函数直接返回0,是SpinAcquire实现CPU忙等待的一种方式,此外,若是SpinAcquire里尝试的次数已经到了0xFFF次的话,就利用另外一种方式实现等待:安全
if (Yields > 5) { os::naked_short_sleep(1); } else { os::naked_yield(); ++Yields; }
首先会尝试经过yield函数来将当前线程的CPU执行时间让出来,若是让了5次仍是没有得到锁,那么就只能经过naked_short_sleep来实现等待了,这里的naked_short_sleep函数从名字就能够看出来是短暂休眠等待,经过每次休眠等待1ms实现。咱们如今看下naked_yield的实现方式,一样看linux平台的实现:架构
void os::naked_yield() { sched_yield(); }
能够看到这里的实现是直接调用pthread的sched_yield函数实现线程的时间片让出。接下来看linux平台naked_short_sleep的实现:并发
void os::naked_short_sleep(jlong ms) { struct timespec req; assert(ms < 1000, "Un-interruptable sleep, short time use only"); req.tv_sec = 0; if (ms > 0) { req.tv_nsec = (ms % 1000) * 1000000; } else { req.tv_nsec = 1; } nanosleep(&req, NULL); return; }
这里咱们经过nanosleep系统调用实现线程的timed waiting。app
到这里咱们分析一下SpinAcquire的实现逻辑:若是是单核处理器就经过yield或者sleep实现等待,若是是多核处理器的话就经过调用空实现函数来忙等待。由于若是是单核CPU的话,你经过调用空实现函数实现忙等待是不科学的,由于只有一个核,若是经过这个核来实现忙等待,那么本来须要释放锁的线程得不到执行,那就可能形成饥饿等待,咱们的CPU一直在转动,可是没有解决任何问题。因此若是是单核CPU系统的话,咱们不能经过调用空函数来实现等待。相反,若是是多核的话,那就能够在另外一个空闲的CPU上实现忙等待增长系统的吞吐量,能够看到在JVM中为了增长系统的算力和保证系统的兼容性,作了多少努力和实现。框架
上面的SpinAcquire函数返回以后,就表示咱们得到了锁,如今能够将咱们的node放到等待队列中了:函数
inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) { assert(node != NULL, "should not add NULL node"); assert(node->_prev == NULL, "node already in list"); assert(node->_next == NULL, "node already in list"); // put node at end of queue (circular doubly linked list) if (_WaitSet == NULL) { _WaitSet = node; node->_prev = node; node->_next = node; } else { ObjectWaiter* head = _WaitSet; ObjectWaiter* tail = head->_prev; assert(tail->_next == head, "invariant check"); tail->_next = node; head->_prev = node; node->_next = head; node->_prev = tail; } }
这里的实现比较简单,就是讲node插入双向链表_WaitSet的尾部。插入链表完毕知乎,须要经过SpinRelease将锁释放。
将新建的node节点加入到WaitSet队列中了,咱们接着看wait函数接下来的逻辑,如今咱们就要执行以下内容:
// exit the monitor exit(true, Self);
wait操做释放monitor锁就是在这里实现的。而后接着的是wait函数的park等待。
if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) { // Intentionally empty } else if (node._notified == 0) { if (millis <= 0) { Self->_ParkEvent->park(); } else { ret = Self->_ParkEvent->park(millis); } }
在正式park以前,还会再一次看下是否有interruptd,若是有的话就会跳过park操做,不然就会进行park阻塞,park阻塞的时间就是wait函数调用时传入的时间参数。
wait函数接下来的操做是park阻塞唤醒以后的善后逻辑,对于咱们的分析不是很重要,这里就跳过。
notify函数的实现代码以下:
void ObjectMonitor::notify(TRAPS) { CHECK_OWNER(); if (_WaitSet == NULL) { TEVENT(Empty-Notify); return; } DTRACE_MONITOR_PROBE(notify, this, object(), THREAD); INotify(THREAD); OM_PERFDATA_OP(Notifications, inc(1)); }
这里主要经过判断WaitSet队列中是否还有线程执行了wait,若是没有就直接返回,若是有就对线程进行唤醒,唤醒经过调用INotify函数实现:
void ObjectMonitor::INotify(Thread * Self) { const int policy = Knob_MoveNotifyee; Thread::SpinAcquire(&_WaitSetLock, "WaitSet - notify"); ObjectWaiter * iterator = DequeueWaiter(); if (iterator != NULL) { ObjectWaiter * list = _EntryList; if (policy == 0) { // prepend to EntryList if (list == NULL) { ... } else { ... } } else if (policy == 1) { // append to EntryList if (list == NULL) { ... } else { ... } } else if (policy == 2) { // prepend to cxq if (list == NULL) { ... } else { ... } } else if (policy == 3) { // append to cxq ... } else { ... } ... } Thread::SpinRelease(&_WaitSetLock); }
能够看到,这里的操做都是在_WaitSetLock保护下的,首先会从WaitSet队列中出队一个节点,而后针对这个节点根据Knob_MoveNotifyee来决定执行不一样的策略逻辑,而且策略中的逻辑框架就是同样的,根据_EntryList是否为空执行不一样操做。Knob_MoveNottifyee默认值为2。
notify的唤醒策略主要有如下几种:
在分析不一样策略的逻辑以前,咱们先看下WaitSet的出队逻辑实现,这是INotify函数开始会执行的事:
inline ObjectWaiter* ObjectMonitor::DequeueWaiter() { // dequeue the very first waiter ObjectWaiter* waiter = _WaitSet; if (waiter) { DequeueSpecificWaiter(waiter); } return waiter; }
从注释中能够看出,这里将WaitSet队列中的第一个node出队,下面直接返回WaitSet队列指针也就是队头,而后删除出队节点:
inline void ObjectMonitor::DequeueSpecificWaiter(ObjectWaiter* node) { assert(node != NULL, "should not dequeue NULL node"); assert(node->_prev != NULL, "node already removed from list"); assert(node->_next != NULL, "node already removed from list"); // when the waiter has woken up because of interrupt, // timeout or other spurious wake-up, dequeue the // waiter from waiting list ObjectWaiter* next = node->_next; if (next == node) { assert(node->_prev == node, "invariant check"); _WaitSet = NULL; } else { ObjectWaiter* prev = node->_prev; assert(prev->_next == node, "invariant check"); assert(next->_prev == node, "invariant check"); next->_prev = prev; prev->_next = next; if (_WaitSet == node) { _WaitSet = next; } } node->_next = NULL; node->_prev = NULL; }
这样咱们就完成了从WaitSet双向链表队列中的队头出队逻辑。
if (list == NULL) { iterator->_next = iterator->_prev = NULL; _EntryList = iterator; } else { list->_prev = iterator; iterator->_next = list; iterator->_prev = NULL; _EntryList = iterator; }
若是EntryList为空的话,表示以前没有线程被notify唤醒,已经直接将当前节点放到EntryList中便可,不然的话,就将当前节点放到EntryList的头部。
策略1和策略0逻辑很类似,这里只是将节点放到尾部:
if (list == NULL) { iterator->_next = iterator->_prev = NULL; _EntryList = iterator; } else { // CONSIDER: finding the tail currently requires a linear-time walk of // the EntryList. We can make tail access constant-time by converting to // a CDLL instead of using our current DLL. ObjectWaiter * tail; for (tail = list; tail->_next != NULL; tail = tail->_next) {} assert(tail != NULL && tail->_next == NULL, "invariant"); tail->_next = iterator; iterator->_prev = tail; iterator->_next = NULL; }
if (list == NULL) { iterator->_next = iterator->_prev = NULL; _EntryList = iterator; } else { iterator->TState = ObjectWaiter::TS_CXQ; for (;;) { ObjectWaiter * front = _cxq; iterator->_next = front; if (Atomic::cmpxchg(iterator, &_cxq, front) == front) { break; } } }
首先若是发现 EntryList 为空的话,也就是第一个被 notify 唤醒的线程会进入到 EntryList,而 WaitSet 中剩下的节点会依次插入到 cxq 的头部,而后更新 cxq 指针指向新的头节点。
策略3的逻辑和策略2比较类似,只是策略3会将节点放到cxq尾部:
iterator->TState = ObjectWaiter::TS_CXQ; for (;;) { ObjectWaiter * tail = _cxq; if (tail == NULL) { iterator->_next = NULL; if (Atomic::replace_if_null(iterator, &_cxq)) { break; } } else { while (tail->_next != NULL) tail = tail->_next; tail->_next = iterator; iterator->_prev = tail; iterator->_next = NULL; break; } }
这里不会判断 EntryList 是否为空,而是直接将节点放到 cxq 的尾部,这一点和前面几个策略不同,须要注意下。
notifyAll 的实现其实和 notify 实现大同小异:
void ObjectMonitor::notifyAll(TRAPS) { CHECK_OWNER(); if (_WaitSet == NULL) { TEVENT(Empty-NotifyAll); return; } DTRACE_MONITOR_PROBE(notifyAll, this, object(), THREAD); int tally = 0; while (_WaitSet != NULL) { tally++; INotify(THREAD); } OM_PERFDATA_OP(Notifications, inc(tally)); }
能够看到,其实就是根据WaitSet长度,反复调用INotify函数,至关于屡次调用 notify。