那么对象如何与监视器关联呢,在 Java 中,对象包含三块:对象头、实例数据、填充数据,其中对象头中就包含 Mark Word,Mark Word 通常存储对象的 hashCode、GC分代年龄以及锁信息,锁信息就包含指向互斥量(重量级锁)的指针,指向了一个监视器;监视器是经过 ObjectMonitor 来实现的,代码以下:多线程
while (synchronization state does not allow acquire) {
enqueue current thread if not already queued;
possibly block current thread;
}
dequeue current thread if it was queued;
复制代码
release 操做以下:
update synchronization state;
if (state may permit a blocked thread to acquire)
unblock one or more queued threads;
复制代码
为了实现上述 acquire、release 操做,须要完成如下三个功能:
同步状态的原子性管理;
线程的阻塞与唤醒;
排队机制;
同步状态
AQS 类经过使用单个 int 类型来保存同步状态,并提供了 getState()、 setState(int)、compareAndSetState(int, int) 三个方法来读取和更新状态,而且此同步状态经过关键字 volatile 修饰,保证了多线程环境下的可见性,compareAndSetState(int, int) 是经过 CAS(Compare and swap,比较并交换) 实现的,当多个线程同时对某个资源进行 CAS 操做的时候,只能有一个线程操做成功,但并不会阻塞其余线程,其余线程会收到操做失败的信号,CAS 是一个轻量级的乐观锁。CAS 的底层经过 Unsafe 类实现的,利用处理器提供的 CMPXCHG 指令实现其原子性,使得仅当同步器状态为一个指望值的时候,才会被原子的更新成目标值,相比 synchronized 不会致使过多的上下文切换切换和挂起线程。在 java.util.concurrent 包中,大量地使用 CAS 来实现原子性。
AQS 同步队列是一个 FIFO 队列,在此同步队列中,一个节点表示一个线程,它保存着线程的引用、状态、前驱节点、后继节点。同步队列经过两个节点 tail 和 head 来存取,初始化时,tail、head 初始化为一个空节点,线程要加入到同步队列中,经过 CAS 原子地拼接为新的 tail 节点,线程要退出队列,只需设置 head 节点指向当前线程节点。
同步队列的优势在于其出队和入队的操做都是无锁的、快速的。为了将 CLH 锁队列用于阻塞同步器,该同步队列须要作些额外的修改以提供一种高效的方式定位某个节点的后继节点,在自旋锁中,一个节点只需改变其状态,下一次自旋中其后继节点就能注意到这个改变。可是在阻塞式同步器中,一个节点须要显示地唤醒其后继节点。同步队列包含一个 next 连接到它的后继节点。第二个对 CLH 锁队列主要的修改是将每一个节点都有的状态字段用于控制阻塞而非自旋。
基本的 acquire 操做的最终实现通常形式(互斥、非中断、无超时):
if(!tryAcquire(arg)) {
node = create and enqueue new node;
pred = node's effective predecessor; while (pred is not head node || !tryAcquire(arg)) { if (pred's signal bit is set)
park();
else
compareAndSet pred's signal bit to true; pred = node's effective predecessor;
}
head = node;
}
复制代码
release 操做以下:
if(tryRelease(arg) && head node's signal bit is set) { compareAndSet head's bit to false;
unpark head's successor, if one exist } 复制代码
若是同步状态的占用线程为当前线程,则为重入性,将当前 state 加 1,不然返回为 false。紧接着会调用 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 方法,addWaiter(Node.EXCLUSIVE) 方法的源代码以下:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
复制代码
上述方法在获取同步状态失败的基础上,建立独占模式的节点,若是尾节点不为空,则将新节点的 prev 指向当前最后一个节点,当前的最后一个节点的 next 指向当前节点,而后 tail 指向当前节点。若是调用失败,则调用 enq() 方法设置尾节点。
在队列中加入节点成功后,接下来会调用 acquireQueued(Node,int) 方法,自旋的获取同步状态,固然前提是当前节点的 prev 节点是 head 节点才能获取同步状态,调用 tryAcquire(int) 方法获取同步状态,若是成功获取到,并将当前节点设置为 head 节点,并将当前节点 prev 设置为 null(由于按照 FIFO 的同步队列原则,由于是非公平锁的模式,因此即便是第一个节点也有可能被新来的线程抢占到)。
若是获取同步状态失败,则调用 shouldParkAfterFailedAcquire(node,node) ,则根据条件判断是否应该阻塞,防止 CPU 处于忙碌状态,会浪费 CPU 资源,若是 prev 节点状态是 SIGNAL 则阻塞,若是是 CANCELLED 状态则向前遍历,移除前面全部为该状态的节点 ,此方法里面阻塞当前节点用的方法是 LockSupport.park()。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
returntrue;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } 复制代码
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
returntrue;
}
}
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
复制代码
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
复制代码