简要理解锁、同步器之间的关系

自定义独占锁

package com.lock;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * Created by cxx on 2018/1/16.
 *
 * 独占锁示例
 */
public class Mutex implements Lock {

    //静态内部类,自定义同步器
    private static class Sync extends AbstractQueuedSynchronizer{

        //是否处于占用状态
        protected boolean isHeldExclusively(){
            return getState() == 1;
        }

        //当状态为0的时候,获取锁
        public boolean tryAcquire(int acquires){
            if (compareAndSetState(0,1)){
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        //释放锁,将状态设置为0
        public boolean tryRelease(int releases){
            if (getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        //返回一个condition,每一个condition包含了一个condition队列
        Condition newCondition(){
            return new ConditionObject();
        }
    }

    /***
     * 将操做代理到Sync上便可
     */
    private final Sync sync = new Sync();

    @Override
    public void lock() {
        sync.acquire(1);

    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);

    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    public boolean isLocked(){
        return sync.isHeldExclusively();
    }

    public boolean hasQueuedThreads(){
        return sync.hasQueuedThreads();
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1,unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);

    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

如代码所示,独占锁实现了在同一时刻只能用一个线程获取到锁,而其余获取锁的线程只能处于同步等待队列中等待,只有获取锁的线程释放了锁,后继的线程才可以获取锁。java

#锁、同步器、使用者node

  • 锁是面向使用者的,定义了使用者与锁交互的接口,隐藏了实现细节。
  • 同步器面向的是锁的实现着,简化了锁的实现方式,屏蔽了同步状态管理、线程的队列、等待与唤醒等底层操做。
  • 锁和同步器很好的隔离了使用者和实现着所须要关注的领域。

如上的mutex锁同样,具体的实现代理到sync,mutex只须要向用户定义交互方式便可。安全

AQS的方法

公有方法:

独占锁的获取与释放(包括了对同步队列的操做)ide

  • acquire(int arg):独占式获取同步状态,若是当前线程获取同步状态成功,则由该方法返回,不然,将会进入同步队列等待,该方法将会调用可重写的tryAcquire(int arg)方法;工具

  • release(int arg):独占式释放同步状态,该方法会在释放同步状态以后,将同步队列中第一个节点包含的线程唤醒;ui

共享锁的获取与释放(包括了对同步队列的操做)this

  • acquireShared(int arg):共享式获取同步状态,若是当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻能够有多个线程获取到同步状态;
  • releaseShared(int arg):共享式释放同步状态;

须要由子类实现的保护方法

独占锁的获取与释放的具体实现(没有对同步队列的操做,功能单一)线程

  • tryAcquire(int arg):独占式获取同步状态,获取同步状态成功后,其余线程须要等待该线程释放同步状态才能获取同步状态;
  • tryRelease(int arg):独占式释放同步状态;

共享锁的获取与释放(没有对同步队列的操做)设计

  • tryAcquireShared(int arg):共享式获取同步状态,返回值大于等于0则表示获取成功,不然获取失败;
  • tryReleaseShared(int arg):共享式释放同步状态;

操做队列同步器的状态代理

  • getState():返回同步状态的当前值;
  • setState(int newState):设置当前同步状态;
  • compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法可以保证状态设置的原子性;

其余的方法

  • isHeldExclusively():当前同步器是否在独占式模式下被线程占用,通常该方法表示是否被当前线程所独占;
  • acquireInterruptibly(int arg):与acquire(int arg)相同,可是该方法响应中断,当前线程为获取到同步状态而进入到同步队列中,若是当前线程被中断,则该方法会抛出InterruptedException异常并返回;
  • tryAcquireNanos(int arg,long nanos):超时获取同步状态,若是当前线程在nanos时间内没有获取到同步状态,那么将会返回false,已经获取则返回true;
  • acquireSharedInterruptibly(int arg):共享式获取同步状态,响应中断;
  • tryAcquireSharedNanos(int arg, long nanosTimeout):共享式获取同步状态,增长超时限制;

AQS的所有方法如图所示

输入图片说明

输入图片说明

输入图片说明

AQS小结

  • 在基于AQS构建的同步器中,只能在一个时刻发生阻塞,从而下降上下文切换的开销,提升了吞吐量。同时在设计AQS时充分考虑了可伸缩行,所以J.U.C中全部基于AQS构建的同步器都可以得到这个优点。

  • AQS的主要使用方式是继承,子类经过继承同步器并实现它的抽象方法来管理同步状态。

  • AQS使用一个int类型的成员变量state来表示同步状态,当state>0时表示已经获取了锁,当state = 0时表示释放了锁。它提供了三个方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))来对同步状态state进行操做,固然AQS能够确保对state的操做是安全的。

  • AQS经过内置的FIFO同步队列来完成资源获取线程的排队工做,若是当前线程获取同步状态失败时,AQS则会将当前线程以及等待状态等信息构形成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程

  • 当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。

CLH 同步队列的实现

在CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next),其定义以下:

static final class Node {
    /** 共享 */
    static final Node SHARED = new Node();

    /** 独占 */
    static final Node EXCLUSIVE = null;

    /**
     * 由于超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其余状态;
     */
    static final int CANCELLED =  1;

    /**
     * 后继节点的线程处于等待状态,而当前节点的线程若是释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
     */
    static final int SIGNAL    = -1;

    /**
     * 节点在等待队列中,节点线程等待在Condition上,当其余线程对Condition调用了signal()后,改节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中
     */
    static final int CONDITION = -2;

    /**
     * 表示下一次共享式同步状态获取将会无条件地传播下去
     */
    static final int PROPAGATE = -3;

    /** 等待状态 */
    volatile int waitStatus;

    /** 前驱节点 */
    volatile Node prev;

    /** 后继节点 */
    volatile Node next;

    /** 获取同步状态的线程 */
    volatile Thread thread;

    Node nextWaiter;

    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {
    }

    Node(Thread thread, Node mode) {
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) {
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

状态(waitStatus)

  • CANCELLED :值为1,因为在同步队列中等待的线程等待超时或者被中断,须要从同步队列中取消等待,节点进入该状态将不会发生变化。
  • SIGNAL:值为 -1,后继节点的线程处于等待状态,而当前节点的线程若是释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行。
  • CONDITION: 值为-1,节点在等待队列中,节点线程等待在Condition上,当其余线程对Condition调用了signal()方法后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取中。
  • Propagate:值为 -3,表示下一次共享式同步状态获取将会无条件地被传播下去。
  • Initial:值为0,初始状态。

入队

输入图片说明

  • compareAndSetTail(Node expect,Node update)方法来确保节点可以被线程安全添加。
  • enq(final Node node),同步器经过 “死循环”来保证节点的正确添加,在“死循环”中只有经过CAS将节点设置成为尾节点以后,当前线程才可以从该方法返回,不然,当前线程不断地尝试设置。
  • acquireQueued ,进入一个自旋的过程,每一个节点都在自省地观察,当条件知足,获取到同步状态,就能够从这个自旋过程当中退出。

第一,头结点是成功获取到同步状态的节点,而头结点的线程释放了同步状态以后,将会唤醒其后继节点,后继节点的线程被唤醒后须要检查本身的前驱节点是不是头结点。

第二,维护同步队列的FIFO原则。

AQS:阻塞和唤醒线程

在线程获取同步状态时若是获取失败,则加入CLH同步队列,经过经过自旋的方式不断获取同步状态,可是在自旋的过程当中则须要判断当前线程是否须要阻塞,其主要方法在acquireQueued():

if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;

经过这段代码咱们能够看到,在获取同步状态失败后,线程并非立马进行阻塞,须要检查该线程的状态,检查状态的方法为 shouldParkAfterFailedAcquire(Node pred, Node node) 方法,该方法主要靠前驱节点判断当前线程是否应该被阻塞,代码以下:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //前驱节点
        int ws = pred.waitStatus;
        //状态为signal,表示当前线程处于等待状态,直接放回true
        if (ws == Node.SIGNAL)
            return true;
        //前驱节点状态 > 0 ,则为Cancelled,代表该节点已经超时或者被中断了,须要从同步队列中取消
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } 
        //前驱节点状态为Condition、propagate
        else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

这段代码主要检查当前线程是否须要被阻塞,具体规则以下:

  • 若是当前线程的前驱节点状态为SINNAL,则代表当前线程须要被阻塞,调用unpark()方法唤醒,直接返回true,当前线程阻塞

  • 若是当前线程的前驱节点状态为CANCELLED(ws > 0),则代表该线程的前驱节点已经等待超时或者被中断了,则须要从CLH队列中将该前驱节点删除掉,直到回溯到前驱节点状态 <= 0 ,返回false

  • 若是前驱节点非SINNAL,非CANCELLED,则经过CAS的方式将其前驱节点设置为SINNAL,返回false

若是 shouldParkAfterFailedAcquire(Node pred, Node node) 方法返回true,则调用parkAndCheckInterrupt()方法阻塞当前线程:

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

parkAndCheckInterrupt() 方法主要是把当前线程挂起,从而阻塞住线程的调用栈,同时返回当前线程的中断状态。其内部则是调用LockSupport工具类的park()方法来阻塞该方法。

当线程释放同步状态后,则须要唤醒该线程的后继节点:

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
				//唤醒后继节点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

调用unparkSuccessor(Node node)唤醒后继节点:

private void unparkSuccessor(Node node) {
        //当前节点状态
        int ws = node.waitStatus;
        //当前状态 < 0 则设置为 0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        //当前节点的后继节点
        Node s = node.next;
        //后继节点为null或者其状态 > 0 (超时或者被中断了)
        if (s == null || s.waitStatus > 0) {
            s = null;
            //从tail节点来找可用节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //唤醒后继节点
        if (s != null)
            LockSupport.unpark(s.thread);
    }

可能会存在当前线程的后继节点为null,超时、被中断的状况,若是遇到这种状况了,则须要跳过该节点,可是为什么是从tail尾节点开始,而不是从node.next开始呢?缘由在于node.next仍然可能会存在null或者取消了,因此采用tail回溯办法找第一个可用的线程。最后调用LockSupport的unpark(Thread thread)方法唤醒该线程。

连接

AQS:阻塞和唤醒线程

相关文章
相关标签/搜索