Java并发——基石篇(下)

Object wait和notify的实现机制

Java Object类提供了一个基于native实现的wait和notify线程间通信的方式,这是除了synchronized以外的另一块独立的并发基础部分,有关wait和notify·的部份内容,咱们在上面分析monitor的exit的时候已经有一些涉及,可是并无过多的深刻,留下了很多的疑问,本小节会详细分析。java

wait实现

ObjectMonitor类中的wait函数代码实现以下:node

void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
  ...
  if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
    ...
    // 抛出异常,不会直接进入等待     THROW(vmSymbols::java_lang_InterruptedException());
    ...
  }
  ...
  ObjectWaiter node(Self);
  node.TState = ObjectWaiter::TS_WAIT;
  Self->_ParkEvent->reset();
  OrderAccess::fence();

  Thread::SpinAcquire(&_WaitSetLock, "WaitSet - add");
  AddWaiter(&node);
  Thread::SpinRelease(&_WaitSetLock);

  if ((SyncFlags & 4) == 0) {
    _Responsible = NULL;
  }

  ...
  // exit the monitor   exit(true, Self); 
  ...
  if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {
        // Intentionally empty       } else if (node._notified == 0) {
        if (millis <= 0) {
          Self->_ParkEvent->park();
        } else {
          ret = Self->_ParkEvent->park(millis);
        }
  }
  // 被 notify 唤醒以后的善后逻辑   ...
}

照例只列出wait函数的核心功能部分,首先会判断一下当前线程是否为可中断而且是否已经被中断,若是是的话会直接抛出InterruptedException异常,而不会进入wait等待,不然的话,就须要执行下面的等待过程,首先会根据Self当前线程新建一个ObjectWaiter对象节点,这个对象咱们在前面分析monitor的enter的视乎就已经见过了。生成一个新的节点以后就是须要将这个节点放到等待队列中,经过调用AddWaiter函数实现node的入队操做,不过在入队操做以前须要得到互斥锁以保证并发安全:linux

void Thread::SpinAcquire(volatile int * adr, const char * LockName) {
  if (Atomic::cmpxchg (1, adr, 0) == 0) {
    return;   // normal fast-path return   }

  // Slow-path : We've encountered contention -- Spin/Yield/Block strategy.   TEVENT(SpinAcquire - ctx);
  int ctr = 0;
  int Yields = 0;
  for (;;) {
    while (*adr != 0) {
      ++ctr;
      if ((ctr & 0xFFF) == 0 || !os::is_MP()) {
        if (Yields > 5) {
          os::naked_short_sleep(1);
        } else {
          os::naked_yield();
          ++Yields;
        }
      } else {
        SpinPause();
      }
    }
    if (Atomic::cmpxchg(1, adr, 0) == 0) return;
  }
}

SpinAcquire是一个自旋锁实现,它经过一个死循环不断经过cas检查判断是否得到锁,这里开始会经过一个cas检查看下是否可以成功,若是成功的话就不用进行下面比较重量级的spin过程,若是获取失败,就须要进入下面的spin过程,这里的spin逻辑是一个比较有意思的算法。这里定义了一个ctr变量,其实就是counter计数器的意思,(ctr&0xFFF)==0|| !os::is_MP()这个条件比较有意思,意思是若是我尝试的次数大于)0xfff,或者当前系统是一个单核处理器系统,那么就执行下面的逻辑。能够看到这里的spin是有必定的限度的,首先开始的时候,若是是多核系统,那么会直接执行SpinPause,咱们看下SpinPause函数的实现,这个函数是实现CPU的忙等待,所以会有不一样系统和CPU架构的对应实现。SpinPause函数linux平台代码以下:算法

int SpinPause() {
    return 0;
}

即SpinPause函数直接返回0,是SpinAcquire实现CPU忙等待的一种方式,此外,若是SpinAcquire里尝试的次数已经到了0xFFF次的话,就利用另外一种方式实现等待:安全

if (Yields > 5) {
   os::naked_short_sleep(1);
} else {
   os::naked_yield();
   ++Yields;
}

首先会尝试经过yield函数来将当前线程的CPU执行时间让出来,若是让了5次仍是没有得到锁,那么就只能经过naked_short_sleep来实现等待了,这里的naked_short_sleep函数从名字就能够看出来是短暂休眠等待,经过每次休眠等待1ms实现。咱们如今看下naked_yield的实现方式,一样看linux平台的实现:架构

void os::naked_yield() {
  sched_yield();
}

能够看到这里的实现是直接调用pthread的sched_yield函数实现线程的时间片让出。接下来看linux平台naked_short_sleep的实现:并发

void os::naked_short_sleep(jlong ms) {
  struct timespec req;

  assert(ms < 1000, "Un-interruptable sleep, short time use only");
  req.tv_sec = 0;
  if (ms > 0) {
    req.tv_nsec = (ms % 1000) * 1000000;
  } else {
    req.tv_nsec = 1;
  }

  nanosleep(&req, NULL);

  return;
}

这里咱们经过nanosleep系统调用实现线程的timed waiting。app

到这里咱们分析一下SpinAcquire的实现逻辑:若是是单核处理器就经过yield或者sleep实现等待,若是是多核处理器的话就经过调用空实现函数来忙等待。由于若是是单核CPU的话,你经过调用空实现函数实现忙等待是不科学的,由于只有一个核,若是经过这个核来实现忙等待,那么本来须要释放锁的线程得不到执行,那就可能形成饥饿等待,咱们的CPU一直在转动,可是没有解决任何问题。因此若是是单核CPU系统的话,咱们不能经过调用空函数来实现等待。相反,若是是多核的话,那就能够在另外一个空闲的CPU上实现忙等待增长系统的吞吐量,能够看到在JVM中为了增长系统的算力和保证系统的兼容性,作了多少努力和实现。框架

上面的SpinAcquire函数返回以后,就表示咱们得到了锁,如今能够将咱们的node放到等待队列中了:函数

inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {
  assert(node != NULL, "should not add NULL node");
  assert(node->_prev == NULL, "node already in list");
  assert(node->_next == NULL, "node already in list");
  // put node at end of queue (circular doubly linked list)   if (_WaitSet == NULL) {
    _WaitSet = node;
    node->_prev = node;
    node->_next = node;
  } else {
    ObjectWaiter* head = _WaitSet;
    ObjectWaiter* tail = head->_prev;
    assert(tail->_next == head, "invariant check");
    tail->_next = node;
    head->_prev = node;
    node->_next = head;
    node->_prev = tail;
  }
}

这里的实现比较简单,就是讲node插入双向链表_WaitSet的尾部。插入链表完毕知乎,须要经过SpinRelease将锁释放。

将新建的node节点加入到WaitSet队列中了,咱们接着看wait函数接下来的逻辑,如今咱们就要执行以下内容:

// exit the monitor exit(true, Self);

wait操做释放monitor锁就是在这里实现的。而后接着的是wait函数的park等待。

if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {
        // Intentionally empty } else if (node._notified == 0) {
    if (millis <= 0) {
        Self->_ParkEvent->park();
    } else {
        ret = Self->_ParkEvent->park(millis);
    }
}

在正式park以前,还会再一次看下是否有interruptd,若是有的话就会跳过park操做,不然就会进行park阻塞,park阻塞的时间就是wait函数调用时传入的时间参数。
wait函数接下来的操做是park阻塞唤醒以后的善后逻辑,对于咱们的分析不是很重要,这里就跳过。

notify实现

notify函数的实现代码以下:

void ObjectMonitor::notify(TRAPS) {
  CHECK_OWNER();
  if (_WaitSet == NULL) {
    TEVENT(Empty-Notify);
    return;
  }
  DTRACE_MONITOR_PROBE(notify, this, object(), THREAD);
  INotify(THREAD);
  OM_PERFDATA_OP(Notifications, inc(1));
}

这里主要经过判断WaitSet队列中是否还有线程执行了wait,若是没有就直接返回,若是有就对线程进行唤醒,唤醒经过调用INotify函数实现:

void ObjectMonitor::INotify(Thread * Self) {
  const int policy = Knob_MoveNotifyee;

  Thread::SpinAcquire(&_WaitSetLock, "WaitSet - notify");
  ObjectWaiter * iterator = DequeueWaiter();
  if (iterator != NULL) {
    ObjectWaiter * list = _EntryList;
    if (policy == 0) {
      // prepend to EntryList       if (list == NULL) {
        ...
      } else {
        ...
      }
    } else if (policy == 1) {
      // append to EntryList       if (list == NULL) {
        ...
      } else {
        ...
      }
    } else if (policy == 2) {
      // prepend to cxq       if (list == NULL) {
        ...
      } else {
        ...
      }
    } else if (policy == 3) {
      // append to cxq       ...
    } else {
      ...
    }
    ...
  }
  Thread::SpinRelease(&_WaitSetLock);
}

能够看到,这里的操做都是在_WaitSetLock保护下的,首先会从WaitSet队列中出队一个节点,而后针对这个节点根据Knob_MoveNotifyee来决定执行不一样的策略逻辑,而且策略中的逻辑框架就是同样的,根据_EntryList是否为空执行不一样操做。Knob_MoveNottifyee默认值为2。
notify的唤醒策略主要有如下几种:

  1. 策略 0:将须要唤醒的 node 放到 EntryList 的头部
  2. 策略 1:将须要唤醒的 node 放到 EntryList 的尾部
  3. 策略 2:将须要唤醒的 node 放到 CXQ 的头部
  4. 策略 3:将须要唤醒的 node 放到 CXQ 的尾部

在分析不一样策略的逻辑以前,咱们先看下WaitSet的出队逻辑实现,这是INotify函数开始会执行的事:

inline ObjectWaiter* ObjectMonitor::DequeueWaiter() {
  // dequeue the very first waiter   ObjectWaiter* waiter = _WaitSet;
  if (waiter) {
    DequeueSpecificWaiter(waiter);
  }
  return waiter;
}

从注释中能够看出,这里将WaitSet队列中的第一个node出队,下面直接返回WaitSet队列指针也就是队头,而后删除出队节点:

inline void ObjectMonitor::DequeueSpecificWaiter(ObjectWaiter* node) {
  assert(node != NULL, "should not dequeue NULL node");
  assert(node->_prev != NULL, "node already removed from list");
  assert(node->_next != NULL, "node already removed from list");
  // when the waiter has woken up because of interrupt,   // timeout or other spurious wake-up, dequeue the   // waiter from waiting list   ObjectWaiter* next = node->_next;
  if (next == node) {
    assert(node->_prev == node, "invariant check");
    _WaitSet = NULL;
  } else {
    ObjectWaiter* prev = node->_prev;
    assert(prev->_next == node, "invariant check");
    assert(next->_prev == node, "invariant check");
    next->_prev = prev;
    prev->_next = next;
    if (_WaitSet == node) {
      _WaitSet = next;
    }
  }
  node->_next = NULL;
  node->_prev = NULL;
}

这样咱们就完成了从WaitSet双向链表队列中的队头出队逻辑。

唤醒策略0

if (list == NULL) {
    iterator->_next = iterator->_prev = NULL;
    _EntryList = iterator;
} else {
    list->_prev = iterator;
    iterator->_next = list;
    iterator->_prev = NULL;
    _EntryList = iterator;
}

若是EntryList为空的话,表示以前没有线程被notify唤醒,已经直接将当前节点放到EntryList中便可,不然的话,就将当前节点放到EntryList的头部。

唤醒策略1

策略1和策略0逻辑很类似,这里只是将节点放到尾部:

if (list == NULL) {
        iterator->_next = iterator->_prev = NULL;
        _EntryList = iterator;
} else {
        // CONSIDER:  finding the tail currently requires a linear-time walk of         // the EntryList.  We can make tail access constant-time by converting to         // a CDLL instead of using our current DLL.         ObjectWaiter * tail;
        for (tail = list; tail->_next != NULL; tail = tail->_next) {}
        assert(tail != NULL && tail->_next == NULL, "invariant");
        tail->_next = iterator;
        iterator->_prev = tail;
        iterator->_next = NULL;
}

唤醒策略2

if (list == NULL) {
        iterator->_next = iterator->_prev = NULL;
        _EntryList = iterator;
} else {
        iterator->TState = ObjectWaiter::TS_CXQ;
        for (;;) {
          ObjectWaiter * front = _cxq;
          iterator->_next = front;
          if (Atomic::cmpxchg(iterator, &_cxq, front) == front) {
            break;
          }
        }
}

首先若是发现 EntryList 为空的话,也就是第一个被 notify 唤醒的线程会进入到 EntryList,而 WaitSet 中剩下的节点会依次插入到 cxq 的头部,而后更新 cxq 指针指向新的头节点。

唤醒策略 3

策略3的逻辑和策略2比较类似,只是策略3会将节点放到cxq尾部:

iterator->TState = ObjectWaiter::TS_CXQ;
      for (;;) {
        ObjectWaiter * tail = _cxq;
        if (tail == NULL) {
          iterator->_next = NULL;
          if (Atomic::replace_if_null(iterator, &_cxq)) {
            break;
          }
        } else {
          while (tail->_next != NULL) tail = tail->_next;
          tail->_next = iterator;
          iterator->_prev = tail;
          iterator->_next = NULL;
          break;
        }
}

这里不会判断 EntryList 是否为空,而是直接将节点放到 cxq 的尾部,这一点和前面几个策略不同,须要注意下。

notifyAll 实现

notifyAll 的实现其实和 notify 实现大同小异:

void ObjectMonitor::notifyAll(TRAPS) {
  CHECK_OWNER();
  if (_WaitSet == NULL) {
    TEVENT(Empty-NotifyAll);
    return;
  }

  DTRACE_MONITOR_PROBE(notifyAll, this, object(), THREAD);
  int tally = 0;
  while (_WaitSet != NULL) {
    tally++;
    INotify(THREAD);
  }

  OM_PERFDATA_OP(Notifications, inc(tally));
}

能够看到,其实就是根据WaitSet长度,反复调用INotify函数,至关于屡次调用 notify。

相关文章
相关标签/搜索