深刻理解Java中的底层阻塞原理及实现

谈到阻塞,相信你们都不会陌生了。阻塞的应用场景真的多得不要不要的,好比 生产-消费模式,限流统计等等。什么 ArrayBlockingQueue、 LinkedBlockingQueue、DelayQueue 等等,都是阻塞队列的实现啊,多简单!java

阻塞,通常有两个特性很亮眼:1. 不耗 CPU 等待;2. 线程安全;node

额,要这么说也 OK 的。毕竟,咱们遇到的问题,到这里就够解决了。可是有没有想过,这容器的阻塞又是如何实现的呢?linux

好吧,翻开源码,也很简单了:(好比 ArrayBlockingQueue 的 take、put….)安全

1架构

2app

3less

4分布式

5ide

6工具

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

// ArrayBlockingQueue

 

/**

 * Inserts the specified element at the tail of this queue, waiting

 * for space to become available if the queue is full.

 *

 * @throws InterruptedException {@inheritDoc}

 * @throws NullPointerException {@inheritDoc}

 */

public void put(E e) throws InterruptedException {

    checkNotNull(e);

    final ReentrantLock lock = this.lock;

    lock.lockInterruptibly();

    try {

        while (count == items.length)

            // 阻塞的点

            notFull.await();

        enqueue(e);

    } finally {

        lock.unlock();

    }

}

 

/**

 * Inserts the specified element at the tail of this queue, waiting

 * up to the specified wait time for space to become available if

 * the queue is full.

 *

 * @throws InterruptedException {@inheritDoc}

 * @throws NullPointerException {@inheritDoc}

 */

public boolean offer(E e, long timeout, TimeUnit unit)

    throws InterruptedException {

 

    checkNotNull(e);

    long nanos = unit.toNanos(timeout);

    final ReentrantLock lock = this.lock;

    lock.lockInterruptibly();

    try {

        while (count == items.length) {

            if (nanos <= 0)

                return false;

            // 阻塞的点

            nanos = notFull.awaitNanos(nanos);

        }

        enqueue(e);

        return true;

    } finally {

        lock.unlock();

    }

}

 

public E take() throws InterruptedException {

    final ReentrantLock lock = this.lock;

    lock.lockInterruptibly();

    try {

        while (count == 0)

            // 阻塞的点

            notEmpty.await();

        return dequeue();

    } finally {

        lock.unlock();

    }

}

看来,最终都是依赖了 AbstractQueuedSynchronizer 类(著名的AQS)的 await 方法,看起来像那么回事。那么这个同步器的阻塞又是如何实现的呢?

Java的代码老是好跟踪的:

// AbstractQueuedSynchronizer.await()

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

/**

 * Implements interruptible condition wait.

 * <ol>

 * <li> If current thread is interrupted, throw InterruptedException.

 * <li> Save lock state returned by {@link #getState}.

 * <li> Invoke {@link #release} with saved state as argument,

 *      throwing IllegalMonitorStateException if it fails.

 * <li> Block until signalled or interrupted.

 * <li> Reacquire by invoking specialized version of

 *      {@link #acquire} with saved state as argument.

 * <li> If interrupted while blocked in step 4, throw InterruptedException.

 * </ol>

 */

public final void await() throws InterruptedException {

    if (Thread.interrupted())

        throw new InterruptedException();

    Node node = addConditionWaiter();

    int savedState = fullyRelease(node);

    int interruptMode = 0;

    while (!isOnSyncQueue(node)) {

        // 此处进行真正的阻塞

        LockSupport.park(this);

        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)

            break;

    }

    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

        interruptMode = REINTERRUPT;

    if (node.nextWaiter != null) // clean up if cancelled

        unlinkCancelledWaiters();

    if (interruptMode != 0)

        reportInterruptAfterWait(interruptMode);

}

如上,能够看到,真正的阻塞工做又转交给了另外一个工具类: LockSupport 的 park 方法了,这回跟锁扯上了关系,看起来已经愈来愈接近事实了:

// LockSupport.park()

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

/**

 * Disables the current thread for thread scheduling purposes unless the

 * permit is available.

 *

 * <p>If the permit is available then it is consumed and the call returns

 * immediately; otherwise

 * the current thread becomes disabled for thread scheduling

 * purposes and lies dormant until one of three things happens:

 *

 * <ul>

 * <li>Some other thread invokes {@link #unpark unpark} with the

 * current thread as the target; or

 *

 * <li>Some other thread {@linkplain Thread#interrupt interrupts}

 * the current thread; or

 *

 * <li>The call spuriously (that is, for no reason) returns.

 * </ul>

 *

 * <p>This method does <em>not</em> report which of these caused the

 * method to return. Callers should re-check the conditions which caused

 * the thread to park in the first place. Callers may also determine,

 * for example, the interrupt status of the thread upon return.

 *

 * @param blocker the synchronization object responsible for this

 *        thread parking

 * @since 1.6

 */

public static void park(Object blocker) {

    Thread t = Thread.currentThread();

    setBlocker(t, blocker);

    UNSAFE.park(false, 0L);

    setBlocker(t, null);

}

看得出来,这里的实现就比较简洁了,先获取当前线程,设置阻塞对象,阻塞,而后解除阻塞。

好吧,到底什么是真正的阻塞,咱们仍是不得而知!

UNSAFE.park(false, 0L); 是个什么东西? 看起来就是这一句起到了最关键的做用呢!但因为这里已是 native 代码,咱们已经没法再简单的查看源码了!那咋整呢?

那不行就看C/C++的源码呗,看一下 parker 的定义(park.hpp):

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

class Parker : public os::PlatformParker {

private:

  volatile int _counter ;

  Parker * FreeNext ;

  JavaThread * AssociatedWith ; // Current association

 

public:

  Parker() : PlatformParker() {

    _counter       = 0 ;

    FreeNext       = NULL ;

    AssociatedWith = NULL ;

  }

protected:

  ~Parker() { ShouldNotReachHere(); }

public:

  // For simplicity of interface with Java, all forms of park (indefinite,

  // relative, and absolute) are multiplexed into one call.  c中暴露出两个方法给java调用

  void park(bool isAbsolute, jlong time);

  void unpark();

 

  // Lifecycle operators

  static Parker * Allocate (JavaThread * t) ;

  static void Release (Parker * e) ;

private:

  static Parker * volatile FreeList ;

  static volatile int ListLock ;

 

};

那 park() 方法究竟是如何实现的呢? 实际上是继承的 os::PlatformParker 的功能,也就是平台相关的私有实现,以 Linux 平台实现为例(os_linux.hpp):

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

// Linux中的parker定义

class PlatformParker : public CHeapObj<mtInternal> {

  protected:

    enum {

        REL_INDEX = 0,

        ABS_INDEX = 1

    };

    int _cur_index;  // which cond is in use: -1, 0, 1

    pthread_mutex_t _mutex [1] ;

    pthread_cond_t  _cond  [2] ; // one for relative times and one for abs.

 

  public:       // TODO-FIXME: make dtor private

    ~PlatformParker() { guarantee (0, "invariant") ; }

 

  public:

    PlatformParker() {

      int status;

      status = pthread_cond_init (&_cond[REL_INDEX], os::Linux::condAttr());

      assert_status(status == 0, status, "cond_init rel");

      status = pthread_cond_init (&_cond[ABS_INDEX], NULL);

      assert_status(status == 0, status, "cond_init abs");

      status = pthread_mutex_init (_mutex, NULL);

      assert_status(status == 0, status, "mutex_init");

      _cur_index = -1; // mark as unused

    }

};

看到 park.cpp 中没有重写 park() 和 unpark() 方法,也就是说阻塞实现彻底交由特定平台代码处理了(os_linux.cpp):

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

// park方法的实现,依赖于 _counter, _mutex[1], _cond[2]

void Parker::park(bool isAbsolute, jlong time) {

  // Ideally we'd do something useful while spinning, such

  // as calling unpackTime().

 

  // Optional fast-path check:

  // Return immediately if a permit is available.

  // We depend on Atomic::xchg() having full barrier semantics

  // since we are doing a lock-free update to _counter.

  if (Atomic::xchg(0, &_counter) > 0) return;

 

  Thread* thread = Thread::current();

  assert(thread->is_Java_thread(), "Must be JavaThread");

  JavaThread *jt = (JavaThread *)thread;

 

  // Optional optimization -- avoid state transitions if there's an interrupt pending.

  // Check interrupt before trying to wait

  if (Thread::is_interrupted(thread, false)) {

    return;

  }

 

  // Next, demultiplex/decode time arguments

  timespec absTime;

  if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all

    return;

  }

  if (time > 0) {

    unpackTime(&absTime, isAbsolute, time);

  }

 

  // Enter safepoint region

  // Beware of deadlocks such as 6317397.

  // The per-thread Parker:: mutex is a classic leaf-lock.

  // In particular a thread must never block on the Threads_lock while

  // holding the Parker:: mutex.  If safepoints are pending both the

  // the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.

  ThreadBlockInVM tbivm(jt);

 

  // Don't wait if cannot get lock since interference arises from

  // unblocking.  Also. check interrupt before trying wait

  if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {

    return;

  }

 

  int status ;

  if (_counter > 0)  { // no wait needed

    _counter = 0;

    status = pthread_mutex_unlock(_mutex);

    assert (status == 0, "invariant") ;

    // Paranoia to ensure our locked and lock-free paths interact

    // correctly with each other and Java-level accesses.

    OrderAccess::fence();

    return;

  }

 

#ifdef ASSERT

  // Don't catch signals while blocked; let the running threads have the signals.

  // (This allows a debugger to break into the running thread.)

  sigset_t oldsigs;

  sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals();

  pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs);

#endif

 

  OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);

  jt->set_suspend_equivalent();

  // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()

 

  assert(_cur_index == -1, "invariant");

  if (time == 0) {

    _cur_index = REL_INDEX; // arbitrary choice when not timed

    status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;

  } else {

    _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;

    status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;

    if (status != 0 && WorkAroundNPTLTimedWaitHang) {

      pthread_cond_destroy (&_cond[_cur_index]) ;

      pthread_cond_init    (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());

    }

  }

  _cur_index = -1;

  assert_status(status == 0 || status == EINTR ||

                status == ETIME || status == ETIMEDOUT,

                status, "cond_timedwait");

 

#ifdef ASSERT

  pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);

#endif

 

  _counter = 0 ;

  status = pthread_mutex_unlock(_mutex) ;

  assert_status(status == 0, status, "invariant") ;

  // Paranoia to ensure our locked and lock-free paths interact

  // correctly with each other and Java-level accesses.

  OrderAccess::fence();

 

  // If externally suspended while waiting, re-suspend

  if (jt->handle_special_suspend_equivalent_condition()) {

    jt->java_suspend_self();

  }

}

 

// unpark 实现,相对简单些

void Parker::unpark() {

  int s, status ;

  status = pthread_mutex_lock(_mutex);

  assert (status == 0, "invariant") ;

  s = _counter;

  _counter = 1;

  if (s < 1) {

    // thread might be parked

    if (_cur_index != -1) {

      // thread is definitely parked

      if (WorkAroundNPTLTimedWaitHang) {

        status = pthread_cond_signal (&_cond[_cur_index]);

        assert (status == 0, "invariant");

        status = pthread_mutex_unlock(_mutex);

        assert (status == 0, "invariant");

      } else {

        // must capture correct index before unlocking

        int index = _cur_index;

        status = pthread_mutex_unlock(_mutex);

        assert (status == 0, "invariant");

        status = pthread_cond_signal (&_cond[index]);

        assert (status == 0, "invariant");

      }

    } else {

      pthread_mutex_unlock(_mutex);

      assert (status == 0, "invariant") ;

    }

  } else {

    pthread_mutex_unlock(_mutex);

    assert (status == 0, "invariant") ;

  }

}

从上面代码能够看出,阻塞主要借助于三个变量,_cond、_mutex、_counter, 调用 Linux 系统的 pthread_cond_wait、pthread_mutex_lock、pthread_mutex_unlock (一组 POSIX 标准的阻塞接口)等平台相关的方法进行阻塞了!

而 park.cpp 中,则只有  Allocate、Release 等的一些常规操做!

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

// 6399321 As a temporary measure we copied & modified the ParkEvent::

// allocate() and release() code for use by Parkers.  The Parker:: forms

// will eventually be removed as we consolide and shift over to ParkEvents

// for both builtin synchronization and JSR166 operations.

 

volatile int Parker::ListLock = 0 ;

Parker * volatile Parker::FreeList = NULL ;

 

Parker * Parker::Allocate (JavaThread * t) {

  guarantee (t != NULL, "invariant") ;

  Parker * p ;

 

  // Start by trying to recycle an existing but unassociated

  // Parker from the global free list.

  // 8028280: using concurrent free list without memory management can leak

  // pretty badly it turns out.

  Thread::SpinAcquire(&ListLock, "ParkerFreeListAllocate");

  {

    p = FreeList;

    if (p != NULL) {

      FreeList = p->FreeNext;

    }

  }

  Thread::SpinRelease(&ListLock);

 

  if (p != NULL) {

    guarantee (p->AssociatedWith == NULL, "invariant") ;

  } else {

    // Do this the hard way -- materialize a new Parker..

    p = new Parker() ;

  }

  p->AssociatedWith = t ;          // Associate p with t

  p->FreeNext       = NULL ;

  return p ;

}

 

void Parker::Release (Parker * p) {

  if (p == NULL) return ;

  guarantee (p->AssociatedWith != NULL, "invariant") ;

  guarantee (p->FreeNext == NULL      , "invariant") ;

  p->AssociatedWith = NULL ;

 

  Thread::SpinAcquire(&ListLock, "ParkerFreeListRelease");

  {

    p->FreeNext = FreeList;

    FreeList = p;

  }

  Thread::SpinRelease(&ListLock);

}

综上源码,在进行阻塞的时候,底层并无(并不必定)要用 while 死循环来阻塞,更多的是借助于操做系统的实现来进行阻塞的。固然,这也更符合你们的猜测!

从上的代码咱们也发现一点,底层在作许多事的时候,都不忘考虑线程中断,也就是说,即便在阻塞状态也是能够接收中断信号的,这为上层语言打开了方便之门。

若是要细说阻塞,其实还远没完,不过再往操做系统层面如何实现,就得再下点功夫,去翻翻资料了,把底线压在操做系统层面,大多数状况下也够用了!

欢迎学Java和大数据的朋友们加入java架构交流: 855835163

加群连接:https://jq.qq.com/?_wv=1027&amp;k=5dPqXGI

群内提供免费的架构资料还有:Java工程化、高性能及分布式、高性能、深刻浅出。高架构。性能调优、Spring,MyBatis,Netty源码分析和大数据等多个知识点高级进阶干货的免费直播讲解  能够进来一块儿学习交流哦

相关文章
相关标签/搜索