synchronized是Java并发同步开发的基本技术,是Java语言层面提供的线程间同步手段。咱们编写以下一段代码:java
public class SyncTest { private static final Object lock = new Object(); public static void main(String[] args) { int a = 0; synchronized (lock) { a++; } System.out.println("Result: " + a); } }
针对其中同步部分咱们会看到以下字节码:node
monitorenter iinc 1 by 1 aload_2 monitorexit
这实际上是javac在编译时将synchronized同步块的先后插入montor进入和退出的字节码指令,所以,咱们想探索synchronized的实现机制,就须要探索monitorenter和monitorexit指令的执行过程。算法
咱们先看一下monitorenter的代码实现:数据结构
void TemplateTable::monitorenter() { ... // store object __ movptr(Address(rmon, BasicObjectLock::obj_offset_in_bytes()), rax); // 跳转执行 lock_object 函数 __ lock_object(rmon); ... }
这里咱们依然只给出重点代码部分,代码比较长,前面有不少指令时初始化执行环境的,最后重点会跳转lock_object函数,一样这个函数也是有不一样CPU平台实现的,咱们仍是看X86平台的:并发
// Lock object // // Args: // rdx, c_rarg1: BasicObjectLock to be used for locking // // Kills: // rax, rbx void InterpreterMacroAssembler::lock_object(Register lock_reg) { if (UseHeavyMonitors) { call_VM(noreg, CAST_FROM_FN_PTR(address, InterpreterRuntime::monitorenter), lock_reg); } else { // 执行锁优化的逻辑部分,例如:锁粗化,锁消除等等 // 若是一切优化措施都执行了,仍是须要进入 monitor,就执行以下,其实和上面那个 if 分支是同样的 // Call the runtime routine for slow case call_VM(noreg, CAST_FROM_FN_PTR(address, InterpreterRuntime::monitorenter), lock_reg); } }
这里咱们不管如何最终都是执行InterpreterRuntime::monitorenter函数,这个函数不只仅是模板执行器会调用,解释执行器也会执行这个,因此定义在InterpreterRuntime类下:函数
// Synchronization // // The interpreter's synchronization code is factored out so that it can // be shared by method invocation and synchronized blocks. //%note synchronization_3 //%note monitor_1 IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem)) Handle h_obj(thread, elem->obj()); if (UseBiasedLocking) { // Retry fast entry if bias is revoked to avoid unnecessary inflation ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK); } else { ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK); } IRT_END
上面的代码,在原始的代码基础上有删减,保留了核心关键逻辑。即根据UseBiasedLocking这个变量分别执行fast_enter或者slow_enter的逻辑。高并发
同步锁优化处理即fast_enter执行处理,下面是fast_enter函数的定义:工具
// Fast Monitor Enter/Exit // This the fast monitor enter. The interpreter and compiler use // some assembly copies of this code. Make sure update those code // if the following function is changed. The implementation is // extremely sensitive to race condition. Be careful. void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) { if (UseBiasedLocking) { if (!SafepointSynchronize::is_at_safepoint()) { BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD); if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) { return; } } else { assert(!attempt_rebias, "can not rebias toward VM thread"); BiasedLocking::revoke_at_safepoint(obj); } assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now"); } slow_enter(obj, lock, THREAD); }
这里开始仍是要判断UseBiasedLocking,若是是true的话,就针对开始执行优化逻辑,不然仍是会fall back到slow_enter的,是否是感受判断UseBiasedLocking有点啰嗦?其实不是的,由于这个函数在不少地方都会调用,所以判断是必须的。为了方便接下来的代码分析,下面我要放出OpenJDK官方wiki中针对锁优化的原理图:oop
在解释原理图以前,须要介绍一下Java对象的内存布局,由于上面图中的实现原理就是充分利用java对象的头完成的。Java对象在内存的结构基本分为:对象头和对象体,其中对象头存储对象特征信息,对象体存放对象数据部分。
在OpenJDK工程中,有一个子工程叫jol,全名:java object layout,简单易懂,就是java对象布局的意思。这是一个工具库,经过这个库能够获取JVM中对象布局信息,下面咱们看一下一个简单的例子(这也是官方给的例子):布局
public class JOLTest { public static void main(String[] args) { System.out.println(VM.current().details()); System.out.println(ClassLayout.parseClass(A.class).toPrintable()); } public static class A { boolean f; } }
这里经过JOL的接口来获取类A的对象内存布局,执行以后输出以下内容:
# Running 64-bit HotSpot VM. # Using compressed oop with 3-bit shift. # Using compressed klass with 3-bit shift. # WARNING | Compressed references base/shifts are guessed by the experiment! # WARNING | Therefore, computed addresses are just guesses, and ARE NOT RELIABLE. # WARNING | Make sure to attach Serviceability Agent to get the reliable addresses. # Objects are 8 bytes aligned. # Field sizes by type: 4, 1, 1, 2, 2, 4, 4, 8, 8 [bytes] # Array element sizes: 4, 1, 1, 2, 2, 4, 4, 8, 8 [bytes] JOLTest$A object internals: OFFSET SIZE TYPE DESCRIPTION VALUE 0 12 (object header) N/A 12 1 boolean A.f N/A 13 3 (loss due to the next object alignment) Instance size: 16 bytes Space losses: 0 bytes internal + 3 bytes external = 3 bytes total
这里咱们看到输出了不少信息,上面咱们类A的对象布局以下:12byte的对象头+1byte的对象体+3byte的填充部分。
从JVM的代码咱们能够看到一个对象的头部定义:
volatile markOop _mark; union _metadata { Klass* _klass; narrowKlass _compressed_klass; } _metadata;
能够看到分为两部分:第一部分就是mark部分,官方称之为mark word,第二个是klass的类型指针,指向这个对象的类对象。这里的mark word长度是一个系统字宽,在64bit系统上就是8个字节,从上面的日志咱们能够看到虚拟机默认使用了compressed klass,所以第二部分的union就是narrowKlass类型的,若是咱们继续看下narrowKlass的定义就知道这是个32bit的unsigned int类型,所以将占用4个字节,因此对象的头部长度总体为12字节。
Mark word用于存储对象自身运行时的数据,如hash code、GC分代年龄等等信息,他是实现偏向锁的关键。并且考虑到虚拟机的空间效率,mark word被设计成一个非固定数据结构以便在极小的空间内存存储尽可能多的信息,他会根据对象的状态复用本身的存储空间。所以,mark word内存布局定义在32bit和64bit系统中对象的布局不一样:
// 32 bits: // -------- // hash:25 ------------>| age:4 biased_lock:1 lock:2 (normal object) // JavaThread*:23 epoch:2 age:4 biased_lock:1 lock:2 (biased object) // size:32 ------------------------------------------>| (CMS free block) // PromotedObject*:29 ---------->| promo_bits:3 ----->| (CMS promoted object) // // 64 bits: // -------- // unused:25 hash:31 -->| unused:1 age:4 biased_lock:1 lock:2 (normal object) // JavaThread*:54 epoch:2 unused:1 age:4 biased_lock:1 lock:2 (biased object) // PromotedObject*:61 --------------------->| promo_bits:3 ----->| (CMS promoted object) // size:64 ----------------------------------------------------->| (CMS free block) // // unused:25 hash:31 -->| cms_free:1 age:4 biased_lock:1 lock:2 (COOPs && normal object) // JavaThread*:54 epoch:2 cms_free:1 age:4 biased_lock:1 lock:2 (COOPs && biased object) // narrowOop:32 unused:24 cms_free:1 unused:4 promo_bits:3 ----->| (COOPs && CMS promoted object) // unused:21 size:35 -->| cms_free:1 unused:7 ------------------>| (COOPs && CMS free block)
咱们主要关注其中的normal object和biased object两部分的头定义
biased_lock | lock | 状态 |
---|---|---|
1 | 01 | 可偏置、但未锁且未偏置 |
0 | 01 | 已解锁、不可偏置 |
-- | 00 | 轻量级锁定 |
-- | 01 | 重量级锁定 |
偏置锁即这个锁首先假设本身被偏向的线程所持有。在单个线程连续持有锁时,偏向锁就起做用了。若是一个线程接二连三地获取锁,那么获取的过程当中若是没有发生竞态,那么能够跳过繁重的同步过程,直接就得到锁执行,这样能够大大提升性能。偏向锁是JDK1.6中引入的一项锁优化手段,它的目的就是消除数据在无争用的状况下的同步操做,进一步提升运行性能。这里也涉及了轻量级锁,轻量级锁也是JDK1.6引入的一个锁优化机制,所谓轻量级是相对于使用操做系统互斥机制来实现传统锁而言的,在这个角度上,传统的方式及时重量级锁,悲观锁,会致使线程的状态切换,而线程状态的切换是一个至关重量级的操做。
先看一下slow_enter函数:
// Interpreter/Compiler Slow Case // This routine is used to handle interpreter/compiler slow case // We don't need to use fast path here, because it must have been // failed in the interpreter/compiler code. void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) { markOop mark = obj->mark(); assert(!mark->has_bias_pattern(), "should not see bias pattern here"); if (mark->is_neutral()) { // Anticipate successful CAS -- the ST of the displaced mark must // be visible <= the ST performed by the CAS. lock->set_displaced_header(mark); if (mark == obj()->cas_set_mark((markOop) lock, mark)) { TEVENT(slow_enter: release stacklock); return; } // Fall through to inflate() ... } else if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) { assert(lock != mark->locker(), "must not re-lock the same lock"); assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock"); lock->set_displaced_header(NULL); return; } // The object header will never be displaced to this lock, // so it does not matter what the value is, except that it // must be non-zero to avoid looking like a re-entrant lock, // and must not look locked either. lock->set_displaced_header(markOopDesc::unused_mark()); ObjectSynchronizer::inflate(THREAD, obj(), inflate_cause_monitor_enter)->enter(THREAD); }
这里的执行逻辑比较简洁,主要执行上面OpenJDK wiki中的锁优化逻辑。首先会判断对象锁是否为中立的(neutral):
bool is_neutral() const { // 这里的 biased_lock_mask_in_place 是 7 // unlocked_value 值是 1 return (mask_bits(value(), biased_lock_mask_in_place) == unlocked_value); }
它的判断标准是将mark word中最后7个bit进行掩码运算,将获得的值和1进行比较,若是等于1就表示对象时中立的,也就是没有被任何线程锁定,不然就算失败。至于为何是最后7个bit,是由于不管是普通对象仍是可偏置的对象,最后7个bit的格式是固定的(其余几种模式的对象格式不一样)。
再回到上面的slow_enter函数,若是判断为中立的,也就是没有锁定的话,会将当前的mark word,存储到lock指针指向的对象中,这里的lock指针指向的就是上面提到的lock record。而后进行一个很是重要的操做,就是经过院子cas操做将这个lock指针安装到对象mark word中,若是安装成功就表示当前线程得到了这个对象锁,能够直接返回执行同步代码块,不然就会fall back到膨胀锁中。
上面是判断对象是否为中立的逻辑,若是当线程进来发现当前的对象锁已经被另外一个线程锁定了。这个时候就会执行到else逻辑中:
if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) { assert(lock != mark->locker(), "must not re-lock the same lock"); assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock"); lock->set_displaced_header(NULL); return; }
若是发现当前对象已经锁定,须要判断下是否是当前线程本身锁定了,由于在sysnchronized中可能再一次synchronized,这种状况下直接返回便可。
若是上面的两个判断都失败了,也就是对象被锁定,而且锁定线程不是当前线程,这个时候须要执行上面OpenJDK wiki中的inflate膨胀逻辑。所谓膨胀,就是根据当前锁对象,生成一个ObjectMonitor对象,这个对象中保存了sychronized阻塞的队列,以及实现了不一样的队列调度策略,下面咱们重点看一下ObjectMonitor中的enter逻辑
在enter函数中,有不少判断和优化执行的逻辑,可是核心和经过Enterl函数实际进入队列将当前线程阻塞:
void ObjectMonitor::EnterI(TRAPS) { ... // Try the lock - TATAS if (TryLock (Self) > 0) { assert(_succ != Self, "invariant"); assert(_owner == Self, "invariant"); assert(_Responsible != Self, "invariant"); return; } ... // We try one round of spinning *before* enqueueing Self. // // If the _owner is ready but OFFPROC we could use a YieldTo() // operation to donate the remainder of this thread's quantum // to the owner. This has subtle but beneficial affinity // effects. if (TrySpin (Self) > 0) { assert(_owner == Self, "invariant"); assert(_succ != Self, "invariant"); assert(_Responsible != Self, "invariant"); return; } ... ObjectWaiter node(Self); // Push "Self" onto the front of the _cxq. // Once on cxq/EntryList, Self stays on-queue until it acquires the lock. // Note that spinning tends to reduce the rate at which threads // enqueue and dequeue on EntryList|cxq. ObjectWaiter * nxt; for (;;) { node._next = nxt = _cxq; if (Atomic::cmpxchg(&node, &_cxq, nxt) == nxt) break; // Interference - the CAS failed because _cxq changed. Just retry. // As an optional optimization we retry the lock. if (TryLock (Self) > 0) { assert(_succ != Self, "invariant"); assert(_owner == Self, "invariant"); assert(_Responsible != Self, "invariant"); return; } } ... for (;;) { if (TryLock(Self) > 0) break; ... if ((SyncFlags & 2) && _Responsible == NULL) { Atomic::replace_if_null(Self, &_Responsible); } // park self if (_Responsible == Self || (SyncFlags & 1)) { TEVENT(Inflated enter - park TIMED); Self->_ParkEvent->park((jlong) recheckInterval); // Increase the recheckInterval, but clamp the value. recheckInterval *= 8; if (recheckInterval > MAX_RECHECK_INTERVAL) { recheckInterval = MAX_RECHECK_INTERVAL; } } else { TEVENT(Inflated enter - park UNTIMED); Self->_ParkEvent->park(); } if (TryLock(Self) > 0) break; ... } ... if (_Responsible == Self) { _Responsible = NULL; } // 善后处理,好比将当前线程从等待队列 CXQ 中移除 ... }
照例只保留了重要代码。咱们先看TryLock方法:
int ObjectMonitor::TryLock(Thread * Self) { void * own = _owner; if (own != NULL) return 0; if (Atomic::replace_if_null(Self, &_owner)) { // Either guarantee _recursions == 0 or set _recursions = 0. assert(_recursions == 0, "invariant"); assert(_owner == Self, "invariant"); return 1; } // The lock had been free momentarily, but we lost the race to the lock. // Interference -- the CAS failed. // We can either return -1 or retry. // Retry doesn't make as much sense because the lock was just acquired. return -1; }
这里逻辑很简单,主要是尝试经过cas操做将_owner字段设置为Self,其中_owner表示当前ObjectMonitor对象锁持有的线程指针,Self指向当前执行的线程。若是设置上了,表示当前线程得到了锁,不然没有得到。
在上面的Enterl函数中,咱们看到TryLock先后连续执行了两次,并且代码判断逻辑同样,为何要这样?这实际上是为了在入队阻塞线程以前的最后检查,防止线程无谓的进行状态切换。可是为何执行两次?其实第二次执行的注释已经说明了,这么作有一些微妙的亲和力影响,即若是在过去一段时间内,某个线程尝试获取某个资源一直失败,那么系统在后面会倾向于将资源分配给这个线程。
若是两次TryLock以后仍然失败,那么只能乖乖入队阻塞了,在入队以前须要建立一个ObjectWaiter对象,这个对象将当前线程的对象(注意是JavaThread对象)包裹起来,咱们看一下ObjectWaiter的定义:
class ObjectWaiter : public StackObj { public: enum TStates { TS_UNDEF, TS_READY, TS_RUN, TS_WAIT, TS_ENTER, TS_CXQ }; enum Sorted { PREPEND, APPEND, SORTED }; ObjectWaiter * volatile _next; ObjectWaiter * volatile _prev; Thread* _thread; jlong _notifier_tid; ParkEvent * _event; volatile int _notified; volatile TStates TState; Sorted _Sorted; // List placement disposition bool _active; // Contention monitoring is enabled public: ObjectWaiter(Thread* thread); void wait_reenter_begin(ObjectMonitor *mon); void wait_reenter_end(ObjectMonitor *mon); };
_next和_prev表明这是一个双向队列实现等待队列(可是实际上,入队操做并无造成双向链表,真正造成双向链表是在exit的时候)。node节点建立完毕以后会执行以下入队操做
// Push "Self" onto the front of the _cxq. // Once on cxq/EntryList, Self stays on-queue until it acquires the lock. // Note that spinning tends to reduce the rate at which threads // enqueue and dequeue on EntryList|cxq. ObjectWaiter * nxt; for (;;) { node._next = nxt = _cxq; if (Atomic::cmpxchg(&node, &_cxq, nxt) == nxt) break; // Interference - the CAS failed because _cxq changed. Just retry. // As an optional optimization we retry the lock. if (TryLock (Self) > 0) { assert(_succ != Self, "invariant"); assert(_owner == Self, "invariant"); assert(_Responsible != Self, "invariant"); return; } }
注释中说明,咱们是要将当前节点放到CXQ队列的头部,将节点的next指针经过cas操做指向_cxq指针就完成了入队操做。若是入队成功,则退出当前循环,不然再次尝试lock,由于在高并发状态下,cas锁定可能会出错失败。
若是上面的循环退出了,就表示当前线程的node节点已经顺利进入CXQ队列了,那么接下来须要进入另外一个循环:
for (;;) { if (TryLock(Self) > 0) break; ... if ((SyncFlags & 2) && _Responsible == NULL) { Atomic::replace_if_null(Self, &_Responsible); } // park self if (_Responsible == Self || (SyncFlags & 1)) { TEVENT(Inflated enter - park TIMED); Self->_ParkEvent->park((jlong) recheckInterval); // Increase the recheckInterval, but clamp the value. recheckInterval *= 8; if (recheckInterval > MAX_RECHECK_INTERVAL) { recheckInterval = MAX_RECHECK_INTERVAL; } } else { TEVENT(Inflated enter - park UNTIMED); Self->_ParkEvent->park(); } if (TryLock(Self) > 0) break; ... }
这个循环的逻辑比较简单:
重点在于第2步,咱们知道synchronzed若是获取对象锁失败的话,会致使当前线程被阻塞,那么这个阻塞操做就是在这里完成的,这里须要注意的是,这里须要判断一下_Responible指针,若是这个指针为null,表示以前对象锁尚未等待线程,也就是说当前线程是第一个等待线程,这时候经过cas操做将_Responsible指向Self,表示当前线程是这个对象锁的等待线程。接下来,若是当前线程是等待线程,那么会执行一个简单的退避算法,进行一个短期的阻塞等待。这个算法很简单,第一次等待1ms,第二次等待8ms,第三次等待64ms,以此类推,知道等待时长的上限:MAX_RECHECK_INTERVAL,也就是说在synchronize在一个对象锁上的线程,若是他是第一个等待线程的话,那么他会不停的休眠,检查锁。反之,若是当前线程不是第一个等待线程,那么只能执行无限期的休眠,一直等待对象锁的exit函数执行唤醒才行。
当一个线程得到对象锁成功后,就能够执行自定义的同步代码块了。执行完成以后会执行到ObjectMonitor的exit函数中,释放当前对象锁,方便下一个线程来获取这个对象锁,下面咱们逐步分析exit的实现过程。
void ObjectMonitor::exit(bool not_suspended, TRAPS) { for (;;) { ... ObjectWaiter * w = NULL; int QMode = Knob_QMode; if (QMode == 2 && _cxq != NULL) { ... } if (QMode == 3 && _cxq != NULL) { ... } if (QMode == 4 && _cxq != NULL) { ... } ... ExitEpilog(Self, w); return; } }
exit函数的执行逻辑有两步:
在exit函数中首先是根据Knob_QMode的值执行不一样执行不一样逻辑,而Knob_QMode的默认值为0,它的做用主要用来指定在exit的时候EntryList和CXQ队列之间的唤醒关系,也就是说,当EntryList和CXQ中都有等待的线程时,由于exit以后只能有一个线程获得锁,这个时候选择唤醒哪一个队列中的线程是一个值得考虑的事。而这里的默认策略就是0。
出队策略0表明CXQ队列后进先出,即将cxq指针赋予_EntryList,而后经过一个循环将本来单项链表的CXQ链表变成双向链表,方便后面针对CXQ链表进行查询,这时候,_EntryList就是CXQ。而后交由ExitEpilog唤醒
void ObjectMonitor::ExitEpilog(Thread * Self, ObjectWaiter * Wakee) { assert(_owner == Self, "invariant"); // Exit protocol: // 1. ST _succ = wakee // 2. membar #loadstore|#storestore; // 2. ST _owner = NULL // 3. unpark(wakee) _succ = Knob_SuccEnabled ? Wakee->_thread : NULL; ParkEvent * Trigger = Wakee->_event; // Hygiene -- once we've set _owner = NULL we can't safely dereference Wakee again. // The thread associated with Wakee may have grabbed the lock and "Wakee" may be // out-of-scope (non-extant). Wakee = NULL; // Drop the lock OrderAccess::release_store(&_owner, (void*)NULL); OrderAccess::fence(); // ST _owner vs LD in unpark() if (SafepointMechanism::poll(Self)) { TEVENT(unpark before SAFEPOINT); } DTRACE_MONITOR_PROBE(contended__exit, this, object(), Self); Trigger->unpark(); // Maintain stats and report events to JVMTI OM_PERFDATA_OP(Parks, inc()); }
即经过park event将等待的线程唤醒,而后执行unpark函数
void os::PlatformEvent::unpark() { if (Atomic::xchg(1, &_event) >= 0) return; int status = pthread_mutex_lock(_mutex); int anyWaiters = _nParked; status = pthread_mutex_unlock(_mutex); if (anyWaiters != 0) { status = pthread_cond_signal(_cond); assert_status(status == 0, status, "cond_signal"); } }
这里依然是经过pthread的condition signal唤醒线程,前面线程休眠是经过condition wait实现的。
出队策略1即Knob_QMnode的值修改成1,这种模式下是先进先出,即FIFO队列行为。这种模式下的处理是先将CXQ队列reverse一下,而后再讲新的队头也就是原来的队尾赋值给_EntryList。而后按_EntryList进行唤醒。
出队策略2跟出队策略0类似,可是他是优先执行CXQ队列的操做,再执行_EntryList队列的操做。即优先按CXQ进行唤醒。
出队策略3和出队策略4都是简单的连接。出队策略3是将CXQ放在_EntryList以后,而出队策略4是将_EntryList放在CXQ以前。而后按新~~~~_EntryList进行唤醒。