Java多线程之自旋锁与队列锁

编写高效的并发程序,须要对互斥问题从新研究,设计出适用于多线程的互斥协议。那么问题来了,若是不能得到锁,应该怎么作?java

  • 旋转:继续进行尝试,如自旋锁,延迟较短;node

  • 阻塞:挂起本身,请求调度器切换到另外一个线程,代价较大。git

综合来看,先旋转一小段时间再阻塞,是种不错的选择。github

java.util.concurrent.locks.Lock接口提供了lock()unlock()两个重要的方法,用于解决实际互斥问题。算法

Lock mutex = new MyLock();
mutex.lock();
try {
    do something
} finally {
    mutex.unlock();
}

测试-设置锁

测试-设置来源于getAndSet()操做,经过一个原子布尔型状态变量的值判断当前锁的状态。若为true表示锁忙,若为false表示锁空闲。数组

public class TASLock {
    AtomicBoolean state = new AtomicBoolean(false);

    public void lock() {
        while (state.getAndSet(true)) {
            ;
        }
    }

    public void unlock() {
        state.set(false);
    }
}

测试-测试-设置锁

TTASLock是升级版的TASLock算法,没有直接调用getAndSet()方法,而是在锁看起来空闲(state.get()返回false)时才调用。缓存

public class TTASLock {
    AtomicBoolean state = new AtomicBoolean(false);

    public void lock() {
        while (true) {
            while (state.get()) {
                ;
            }
            if (! state.getAndSet(true)) {
                return;
            }
        }
    }

    public void unlock() {
        state.set(false);
    }
}

这两个算法都能保证无死锁的互斥,可是TTASLock的性能会比TASLock高许多。多线程

能够从计算机系统结构的高速缓存和局部性来解释这个问题,每一个处理器都有一个cache,cache的访问速度比内存快好几个数量级。当cache命中时,会当即加载这个值;当cache缺失时,会在内存或两一个处理器的cache中寻找这个数据。寻找的过程比较漫长,处理器在总线上广播这个地址,其余处理器监听总线。若其余处理器在本身的cache中发现这个地址,则广播该地址及其值来作出响应。若全部处理器都没发现这个地址,则之内存地址及其所对应的值进行响应。并发

getAndSet()的直接调用让TASLock性能损失许多:dom

  • getAndSet()的调用实质是总线上的一个广播,这个调用将会延迟全部的线程,由于全部线程都要经过监听总线通讯。

  • getAndSet()的调用会更新state的值,即便值仍为true,可是其余处理器cache中的锁副本将会被丢弃,从而致使cache缺失。

  • 当持有锁的线程试图释放锁时可能被延迟,由于总线被正在自旋的线程独占。

与此相反,对于TTASLock算法采用的是本地旋转(线程反复地重读被缓存的值而不是反复地使用总线),线程A持有锁时,线程B尝试得到锁,但线程B只会在第一次读锁是cache缺失,以后每次cache命中不产生总线流量。

那么缺点来了,TTASLock释放锁时,会使各自旋线程处理器中的cache副本当即失效,这些线程会从新读取这个值,形成总线流量风暴。

指数后退锁

对于TTASLock算法,当锁看似空闲(state.get()返回false)时,存在高争用(多个线程试图同时获取一个锁)的可能。高争用意味着获取锁的可能性小,而且会形成总线流量增长。线程在重试以前回退一段时间是种不错的选择。

这里实现的指数回退算法的回退准则是,不成功尝试的次数越多,发生争用的可能性就越高,线程须要后退的时间就应越长。

public class BackoffLock {
    private AtomicBoolean state = new AtomicBoolean(false);
    private static final int MIN_DELAY = 10;
    private static final int MAX_DELAY = 100;

    public void lock() {
        Backoff backoff = new Backoff(MIN_DELAY, MAX_DELAY);
        while (true) {
            while (state.get()) {
                ;
            }
            if (! state.getAndSet(true)) {
                return;
            } else {
                backoff.backoff();
            }
        }
    }

    public void unlock() {
        state.set(false);
    }
}

class Backoff {
    private final int minDelay, maxDelay;
    int limit;
    final Random random;

    public Backoff(int min, int max) {
        minDelay = min;
        maxDelay = max;
        limit = minDelay;
        random = new Random();
    }

    public void backoff() {
        int delay = random.nextInt(limit);
        limit = Math.min(maxDelay, 2 * limit);
        try {
            Thread.sleep(delay);
        } catch (InterruptedException e) {
            ;
        }
    }
}

指数后退算法解决了TTASLock释放锁时的高争用问题,可是它的性能与minDelaymaxDelay的选取密切相关,而且很难找到一个通用兼容的值。

另外,BackoffLock算法还有两个问题:

  • cache一致性流量:全部线程都在同一个共享存储单元上旋转;

  • 临界区利用率低:后退时间没法肯定,线程延迟可能过长。

基于数组的锁

下面的这些是队列锁,名字看上去奇形怪状的,实际上是发明者名字的首字母。队列锁就是将线程组织成一个队列,让每一个线程在不一样的存储单元上旋转,从而下降cache一致性流量。

基于循环数组实现队列锁ALock,每一个线程检测本身的slot对应的flag[]域来判断是否轮到本身。

一个线程想得到锁,就要调用lock()方法,得到自增tail得到分配的slot号,而后等待这个slot空闲;当释放锁时,就要阻塞当前slot,而后让下一个slot可运行。
flag[i]true时,那么这个线程就有权得到锁。任意时刻的flag[]数组中,应该只有一个slot的值为true

public class ALock {
    ThreadLocal<Integer> mySlotIndex = new ThreadLocal<Integer>();
    AtomicInteger tail;
    volatile boolean [] flag;
    int size;

    public ALock() {
        size = 100;
        tail = new AtomicInteger(0);
        flag = new boolean[size];
        flag[0] = true;
    }

    public void lock() {
        int slot = tail.getAndIncrement() % size;
        mySlotIndex.set(slot);
        while (! flag[slot]) {
            ;
        }
    }

    public void unlock() {
        int slot = mySlotIndex.get();
        flag[slot] = false;
        flag[(slot + 1) % size] = true;
    }
}
  • mySlotIndex是线程的局部变量,只能被一个线程访问,每一个线程都有本身独立初始化的副本。不须要保存在共享存储器,不须要同步,不会产生一致性流量。使用get()set()方法来访问局部变量的值。

  • tail是常规变量,域被全部的线程共享,支持原子操做。

  • 数组flag[]也是被多个线程共享的,可是每一个线程都是在一个数组元素的本地cache副本上旋转。

ALock对BackoffLock的改进:在多个共享存储单元上旋转,将cache无效性降到最低;把一个线程释放锁和另外一个线程得到该锁之间的时间间隔最小化;先来先服务的公平性。可是,数组的大小至少与最大的并发线程数相同,并非空间有效的,当并发线程最大个数为n时,同步L个不一样对象就须要O(Ln)大小的空间。

CLH队列锁

CLH队列锁表示为QNode对象的链表,每一个线程经过一个线程局部变量pred指向其前驱。每一个线程经过检测前驱结点的locked域来判断是否轮到本身。若是该域为true,则前驱线程要么已经得到锁要么正在等待锁;若是该域为false,则前驱进程已释放锁,轮到本身了。正常状况下,队列链中只有一个结点的locked域为false

当一个线程调用lock()方法想得到锁时,将本身的locked域置为true,表示该线程不许备释放锁,而后并将本身的结点加入到队列链尾部。最后就是在前驱的locked域上旋转,等待前驱释放锁。当这个线程调用unlock()方法要释放锁时,线程要将本身的locked域置为false,表示已经释放锁,而后将前驱结点做为本身的新结点以便往后访问。

那么问题来了,为何要在释放锁时作myNode.set(myPred.get())这个处理呢?假设线程A释放锁,A的后继结点为B,若是不使用这种处理方式,A在释放锁后立刻申请锁将本身的locked域置为true,整个动做在B检测到前驱A释放锁以前,那么将发生死锁。

public class CLHLock {
    AtomicReference<QNode> tail;
    ThreadLocal<QNode> myPred;
    ThreadLocal<QNode> myNode;

    public CLHLock() {
        tail = new AtomicReference<QNode>(new QNode());
        myPred = new ThreadLocal<QNode>() {
            protected QNode initialValue() {
                return null;
            }
        };
        myNode = new ThreadLocal<QNode>() {
            protected QNode initialValue() {
                return new QNode();
            }
        };
    }

    public void lock() {
        QNode qnode = myNode.get();
        qnode.locked = true;
        QNode pred = tail.getAndSet(qnode);
        myPred.set(pred);
        while (pred.locked) {
            ;
        }
    }

    public void unlock() {
        QNode qnode = myNode.get();
        qnode.locked = false;
        myNode.set(myPred.get());
    }

    class QNode {
        boolean locked = false;
    }
}

若是最大线程数为n,有L个不一样对象,那么CLHLock只须要O(L+n)空间。比ALock所需空间少,而且不须要知道可能使用锁的最大线程数量。可是,在无cache的系统上性能较差,由于一次要访问两个结点,若这两个结点内存位置较远,性能损失会很大。

MCS队列锁

MCS队列锁经过检测本身所在结点的locked的值来判断是否轮到本身,等待这个域被前驱释放锁时改变。

线程若要得到锁,需把本身结点添加到链表的尾部。若队列链表原先为空,则得到锁。不然,将前驱结点的next域指向本身,在本身的locked域上自旋等待,直到前驱将该域置为false。线程若要释放锁,判断是否在队尾,若是是只需将tail置为null,若是不是需将后继的locked域置为false且将本身结点的next域置为默认的null。注意在队尾的状况,可能有个线程正在得到锁,要等一下变为后一种状况。

public class MCSLock {
    AtomicReference<QNode> tail;
    ThreadLocal<QNode> myNode;

    public MCSLock() {
        tail = new AtomicReference<QNode>(null);
        myNode = new ThreadLocal<QNode>() {
            protected QNode initialValue() {
                return new QNode();
            }
        };
    }

    public void lock() {
        QNode qnode = myNode.get();
        QNode pred = tail.getAndSet(qnode);
        if (pred != null) {
            qnode.locked = true;
            pred.next = qnode;
            while (qnode.locked) {
                ;
            }
        }
    }

    public void unlock() {
        QNode qnode = myNode.get();
        if (qnode.next == null) {
            if (tail.compareAndSet(qnode, null)) {
                return;
            }
            while (qnode.next == null) {
                ;
            }
        }
        qnode.next.locked = false;
        qnode.next = null;
    }

    class QNode {
        boolean locked = false;
        QNode next = null;
    }
}

结点能被重复使用,该锁的空间复杂度为O(L+n)。MCSLock算法适合无cache的系统结构,由于每一个线程只需控制本身自旋的存储单元。可是,释放锁也须要旋转,另外读写比较次数比CLHLock多。

时限队列锁

Lock接口中有个一个tryLock()方法,能够指定一个时限(得到锁可等待的最大时间),若得到锁超时则放弃尝试。最后返回一个布尔值说明锁是否申请成功。

时限队列锁TOLock是基于CLHLock类的,锁是一个结点的虚拟队列,每一个结点在它的前驱结点上自旋等待锁释放。可是当这个线程超时时,不能简单的抛弃它的队列结点,而是将这个结点标记为已废弃,这样它的后继们(若是有)就会注意到这个结点已放弃,转而在放弃结点的前驱上自旋。为了保证队列链表的连续性,每次申请锁都会new一个QNode

时限队列锁结点的域pred会特殊一点,它有3种类型的取值:

  • null:初始状态,未得到锁或已释放锁;

  • AVAILABLE:一个静态结点,表示对应结点已释放锁,申请锁成功;

  • QNodepred域为null的前驱结点,表示对应结点因超时放弃锁请求,在放弃请求时才会设置这个值。

申请锁时,建立一个pred域为null的新结点并加入到链表尾部,若原先链表为空或前驱结点已释放锁,则得到锁。不然,在尝试时间内,找到pred域为null的前驱结点,等待它释放锁。若在等待前驱结点释放锁的过程当中超时,就尝试从链表中删除这个结点,要分这个结点是否有后继两种状况。
释放锁时,检查该结点是否有后继,若无后继可直接把tail设置为null,不然将该结点的pred域指向AVAILABLE

public class TOLock {
    static QNode AVAILABLE = new QNode();
    AtomicReference<QNode> tail;
    ThreadLocal<QNode> myNode;

    public TOLock() {
        tail = new AtomicReference<QNode>(null);
        myNode = new ThreadLocal<QNode>();
    }

    public boolean tryLock(long time, TimeUnit unit) {
        long startTime = System.currentTimeMillis();
        long patience = TimeUnit.MILLISECONDS.convert(time, unit);
        QNode qnode = new QNode();
        myNode.set(qnode);
        qnode.pred = null;
        QNode myPred = tail.getAndSet(qnode);
        if (myPred == null || myPred.pred == AVAILABLE) {
            return true;
        }
        while (System.currentTimeMillis() - startTime < patience) {
            QNode predPred = myPred.pred;
            if (predPred == AVAILABLE) {
                return true;
            } else {
                if (predPred != null) {
                    myPred = predPred;
                }
            }
        }
        if (! tail.compareAndSet(qnode, myPred)) {
            qnode.pred = myPred;
        }
        return false;
    }

    public void unlock() {
        QNode qnode = myNode.get();
        if (! tail.compareAndSet(qnode, null)) {
            qnode.pred = AVAILABLE;
        }
    }

    static class QNode {
        public QNode pred = null;
    }
}

优势:同CLHLock。
缺点:每次申请锁都要分配一个新结点,在锁上旋转的线程可能要回溯一个超时结点链。

上面实现的这些锁算法不支持重入。咱们可使用银行转帐的例子来测试一下锁的效果,任意帐户间能够随意转帐,锁生效时全部帐户的总金额是不变的。完整的算法实现和测试代码在这里

相关文章
相关标签/搜索