Java中的锁---队列同步器

Lock接口

锁是用来控制多个线程访问共享资源的方式,通常来讲,一个锁可以防止多个线程同时访问共享资源。Lock接口在使用时须要显式地获取和释放锁,拥有了锁获取与释放的可操做性、可中断的获取以及超时获取锁等多种Synchronized关键字所不具有的同步特性。java

Lock lock = new ReentrantLock();
lock.lock();
try{}finally{
    lock.unlock();
}

在finally块中释放锁,目的是保证在获取锁以后,最终可以被释放。 不要将获取锁的过程写在try块中,由于若是早获取锁时发生了了异常,异常抛出的同时,也会致使锁无端释放。node

队列同步器

队列同步器是用来构建锁或者其余同步组件的基础框架,使用了一个int成员变量表示同步状态,经过内置的FIFO队列来完成资源获取线程的排队工做。安全

同步器的主要使用方式是继承,子类经过集成同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程当中免不了要对同步状态进行更改,用到同步器提供的三个方法(getState(),setState()和compartAndSetState(int expect,int update))来进行操做,由于他们可以保证状态的改变是安全的。子类推荐被定义为自定义同步组件的静态内部类,同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用,同步器既能够支持独占式地获取同步状态,也能够支持共享式地获取同步状态。 锁与同步器之间的关系:锁是面向使用者的,定义了使用者与锁交互的接口,隐藏了实现细节;同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操做。并发

同步器提供的模版方法分为三类:独占式获取与释放同步状态、共享式获取与释放同步状态 和 查询同步队列中的等待线程状况。框架

package com.lock;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * Created by cxx on 2018/1/16.
 * 
 * 独占锁示例
 */
public class Mutex implements Lock {

    //静态内部类,自定义同步器
    private static class Sync extends AbstractQueuedSynchronizer{

        //是否处于占用状态
        protected boolean isHeldExclusively(){
            return getState() == 1;
        }

        //当状态为0的时候,获取锁
        public boolean tryAcquire(int acquires){
            if (compareAndSetState(0,1)){
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        //释放锁,将状态设置为0
        public boolean tryRelease(int releases){
            if (getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        //返回一个condition,每一个condition包含了一个condition队列
        Condition newCondition(){
            return new ConditionObject();
        }
    }

    private final Sync sync = new Sync();

    @Override
    public void lock() {
        sync.acquire(1);

    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);

    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    public boolean isLocked(){
        return sync.isHeldExclusively();
    }

    public boolean hasQueuedThreads(){
        return sync.hasQueuedThreads();
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1,unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);

    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

独占锁Mutex 是一个自定义同步组件,它在同一时刻只容许一个线程占用锁。Mutex中定义了一个静态内部类,该内部类继承了同步器并实现了独占式获取和释放同步状态。ide

队列同步器的实现分析

同步队列

同步器依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待新状态等信息构形成为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。工具

同步器拥有首节点(head)和尾节点(tail),没有成功获取同步状态的线程将会成为节点加入该队列的尾部。当一个线程状态成功地获取了同步状态(或锁),其余线程将没法获取到同步状态,转而被构形成为节点并加入到同步队列中,而这个加入队列的过程必须保证线程安全,所以同步器提供了一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect。Node update),它须要传递当前线程“认为的尾节点”和当前节点,只有设置成功后,当前节点才正式与以前的尾节点创建关联。oop

2.独占式同步状态获取与释放ui

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

主要逻辑是:首先调用自定义同步器实现的 tryAcquire(int arg)方法,该方法保证线程安全的获取同步状态,若是同步状态获取失败,则构造同步节点(独占式 Node.EXCLUSIVE,同一时刻只能用一个线程成功获取同步状态),并经过 addWaiter方法将该节点加入到同步队列的尾部,最后调用 acquireQueued方法,使得该节点以”死循环“的方式获取同步状态。若是获取不到阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。this

//添加到同步队列中
 private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

  /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
        节点插入到尾节点:将并发添加节点的请求经过CAS变得”串行化“了。
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

在 acquireQueued 方法中,当前线程在”死循环“中尝试获取同步状态,而只有前驱节点是头结点才可以尝试获取同步状态。缘由以下:

  1. 头节点是成功获取到同步状态的节点,而头结点的线程线程释放了同步状态后,将会唤醒其后继节点,后继节点的线程在被唤醒后须要检查本身的前驱节点是不是头结点。
  2. 维护同步队列的FIFO原则。该方法中,节点自旋获取同步状态。

自旋的实现:p == head && tryAcquire(arg)

/**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移出队列(或中止自旋)的条件是前驱节点为头结点且成功获取了同步状态。在释放同步状态时,同步器调用 tryRelease(int arg)方法释放同步状态,而后唤醒头结点的后继结点。

共享式同步状态获取与释放

共享式获取与独占式获取最主要的区别在于同一时刻可否有多个线程同时进入同步状态。经过调用同步器的acquireShared方法能够共享式地获取同步状态。

/**
     * Acquires in shared mode, ignoring interrupts.  Implemented by
     * first invoking at least once {@link #tryAcquireShared},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquireShared} until success.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquireShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     */
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

    /**
     * Acquires in shared uninterruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

在 acquireShared 方法中,同步器调用 tryAcquireShared 尝试获取同步状态。doAcquireShared 方法的自旋过程当中,若是当前节点的前驱为头结点时,尝试获取同步状态,若是返回值大于等于0,表示该次获取同步状态成功并从自旋过程当中退出。

/**
     * Releases in shared mode.  Implemented by unblocking one or more
     * threads if {@link #tryReleaseShared} returns true.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryReleaseShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     * @return the value returned from {@link #tryReleaseShared}
     */
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

    /**
     * Release action for shared mode -- signals successor and ensures
     * propagation. (Note: For exclusive mode, release just amounts
     * to calling unparkSuccessor of head if it needs signal.)
     */
    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

releaseShared 在释放同步状态以后,将会唤醒后续处于等待状态的节点。对于可以支持多个线程同时访问的并发组件,和独占式主要区别在于 tryReleaseShared 方法必须确保同步状态线程安全释放,通常是经过循环和CAS来保证的,由于释放同步状态的操做会同时来自多个线程。

独占式超时获取同步状态

经过调用同步器的 doAcquireNanos 方法能够超时获取同步状态,既在指定时间段内获取同步状态,若是获取同步状态则返回 true,不然返回false。

/**
     * Acquires in exclusive timed mode.
     *
     * @param arg the acquire argument
     * @param nanosTimeout max wait time
     * @return {@code true} if acquired
     */
    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

若是当前线程获取同步状态失败,则判断是否超时,若是没有超时,从新计算超时间隔,而后使当线程等待。若是已经超时,则中断当前线程,而后获取同步状态。

自定义同步组件--TwinsLock

设计一个同步工具:

第一步:肯定访问模式,共享仍是独占。

第二步:定义资源数,

第三部:组合自定义同步器。

TwinsLock 实现了Lock接口,提供了面向使用者的接口,使用者调用Lock() 方法获取锁,随后调用unlock方法释放锁。TwinsLock 同时包含了一个自定义同步器 Sync,而该同步器面向线程访问和同步状态控制。

相关文章
相关标签/搜索