死磕 java同步系列之StampedLock源码解析

问题

(1)StampedLock是什么?java

(2)StampedLock具备什么特性?node

(3)StampedLock是否支持可重入?源码分析

(4)StampedLock与ReentrantReadWriteLock的对比?学习

简介

StampedLock是java8中新增的类,它是一个更加高效的读写锁的实现,并且它不是基于AQS来实现的,它的内部自成一片逻辑,让咱们一块儿来学习吧。ui

StampedLock具备三种模式:写模式、读模式、乐观读模式。this

ReentrantReadWriteLock中的读和写都是一种悲观锁的体现,StampedLock加入了一种新的模式——乐观读,它是指当乐观读时假定没有其它线程修改数据,读取完成后再检查下版本号有没有变化,没有变化就读取成功了,这种模式更适用于读多写少的场景。线程

使用方法

让咱们经过下面的例子了解一下StampedLock三种模式的使用方法:调试

class Point {
    private double x, y;
    private final StampedLock sl = new StampedLock();

    void move(double deltaX, double deltaY) {
        // 获取写锁,返回一个版本号(戳)
        long stamp = sl.writeLock();
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            // 释放写锁,须要传入上面获取的版本号
            sl.unlockWrite(stamp);
        }
    }

    double distanceFromOrigin() {
        // 乐观读
        long stamp = sl.tryOptimisticRead();
        double currentX = x, currentY = y;
        // 验证版本号是否有变化
        if (!sl.validate(stamp)) {
            // 版本号变了,乐观读转悲观读
            stamp = sl.readLock();
            try {
                // 从新读取x、y的值
                currentX = x;
                currentY = y;
            } finally {
                // 释放读锁,须要传入上面获取的版本号
                sl.unlockRead(stamp);
            }
        }
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }

    void moveIfAtOrigin(double newX, double newY) {
        // 获取悲观读锁
        long stamp = sl.readLock();
        try {
            while (x == 0.0 && y == 0.0) {
                // 转为写锁
                long ws = sl.tryConvertToWriteLock(stamp);
                // 转换成功
                if (ws != 0L) {
                    stamp = ws;
                    x = newX;
                    y = newY;
                    break;
                }
                else {
                    // 转换失败
                    sl.unlockRead(stamp);
                    // 获取写锁
                    stamp = sl.writeLock();
                }
            }
        } finally {
            // 释放锁
            sl.unlock(stamp);
        }
    }
}

从上面的例子咱们能够与ReentrantReadWriteLock进行对比:code

(1)写锁的使用方式基本一对待;队列

(2)读锁(悲观)的使用方式能够进行升级,经过tryConvertToWriteLock()方式能够升级为写锁;

(3)乐观读锁是一种全新的方式,它假定数据没有改变,乐观读以后处理完业务逻辑再判断版本号是否有改变,若是没改变则乐观读成功,若是有改变则转化为悲观读锁重试;

下面咱们一块儿来学习它的源码是怎么实现的。

源码分析

主要内部类

static final class WNode {
    // 前一个节点
    volatile WNode prev;
    // 后一个节点
    volatile WNode next;
    // 读线程所用的链表(实际是一个栈结果)
    volatile WNode cowait;    // list of linked readers
    // 阻塞的线程
    volatile Thread thread;   // non-null while possibly parked
    // 状态
    volatile int status;      // 0, WAITING, or CANCELLED
    // 读模式仍是写模式
    final int mode;           // RMODE or WMODE
    WNode(int m, WNode p) { mode = m; prev = p; }
}

队列中的节点,相似于AQS队列中的节点,能够看到它组成了一个双向链表,内部维护着阻塞的线程。

主要属性

// 一堆常量
// 读线程的个数占有低7位
private static final int LG_READERS = 7;
// 读线程个数每次增长的单位
private static final long RUNIT = 1L;
// 写线程个数所在的位置
private static final long WBIT  = 1L << LG_READERS;  // 128 = 1000 0000
// 读线程个数所在的位置
private static final long RBITS = WBIT - 1L;  // 127 = 111 1111
// 最大读线程个数
private static final long RFULL = RBITS - 1L;  // 126 = 111 1110
// 读线程个数和写线程个数的掩码
private static final long ABITS = RBITS | WBIT;  // 255 = 1111 1111
// 读线程个数的反数,高25位所有为1
private static final long SBITS = ~RBITS;  // -128 = 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1000 0000

// state的初始值
private static final long ORIGIN = WBIT << 1;  // 256 = 1 0000 0000
// 队列的头节点
private transient volatile WNode whead;
// 队列的尾节点
private transient volatile WNode wtail;
// 存储着当前的版本号,相似于AQS的状态变量state
private transient volatile long state;

经过属性能够看到,这是一个相似于AQS的结构,内部一样维护着一个状态变量state和一个CLH队列。

构造方法

public StampedLock() {
    state = ORIGIN;
}

state的初始值为ORIGIN(256),它的二进制是 1 0000 0000,也就是初始版本号。

writeLock()方法

获取写锁。

public long writeLock() {
    long s, next;
    // ABITS = 255 = 1111 1111
    // WBITS = 128 = 1000 0000
    // state与ABITS若是等于0,尝试原子更新state的值加WBITS
    // 若是成功则返回更新的值,若是失败调用acquireWrite()方法
    return ((((s = state) & ABITS) == 0L &&
             U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
            next : acquireWrite(false, 0L));
}

咱们以state等于初始值为例,则state & ABITS的结果为:

StampedLock

此时state为初始状态,与ABITS与运算后的值为0,因此执行后面的CAS方法,s + WBITS的值为384 = 1 1000 0000。

到这里咱们大胆猜想:state的高24位存储的是版本号,低8位存储的是是否有加锁,第8位存储的是写锁,低7位存储的是读锁被获取的次数,并且若是只有第8位存储写锁的话,那么写锁只能被获取一次,也就不可能重入了。

到底咱们猜想的对不对呢,走着瞧^^

咱们接着来分析acquireWrite()方法:

(手机横屏看源码更方便)

private long acquireWrite(boolean interruptible, long deadline) {
    // node为新增节点,p为尾节点(即将成为node的前置节点)
    WNode node = null, p;
    
    // 第一次自旋——入队
    for (int spins = -1;;) { // spin while enqueuing
        long m, s, ns;
        // 再次尝试获取写锁
        if ((m = (s = state) & ABITS) == 0L) {
            if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
                return ns;
        }
        else if (spins < 0)
            // 若是自旋次数小于0,则计算自旋的次数
            // 若是当前有写锁独占且队列无元素,说明快轮到本身了
            // 就自旋就好了,若是自旋完了还没轮到本身才入队
            // 则自旋次数为SPINS常量
            // 不然自旋次数为0
            spins = (m == WBIT && wtail == whead) ? SPINS : 0;
        else if (spins > 0) {
            // 当自旋次数大于0时,当前此次自旋随机减一次自旋次数
            if (LockSupport.nextSecondarySeed() >= 0)
                --spins;
        }
        else if ((p = wtail) == null) {
            // 若是队列未初始化,新建一个空节点并初始化头节点和尾节点
            WNode hd = new WNode(WMODE, null);
            if (U.compareAndSwapObject(this, WHEAD, null, hd))
                wtail = hd;
        }
        else if (node == null)
            // 若是新增节点还未初始化,则新建之,并赋值其前置节点为尾节点
            node = new WNode(WMODE, p);
        else if (node.prev != p)
            // 若是尾节点有变化,则更新新增节点的前置节点为新的尾节点
            node.prev = p;
        else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
            // 尝试更新新增节点为新的尾节点成功,则退出循环
            p.next = node;
            break;
        }
    }

    // 第二次自旋——阻塞并等待唤醒
    for (int spins = -1;;) {
        // h为头节点,np为新增节点的前置节点,pp为前前置节点,ps为前置节点的状态
        WNode h, np, pp; int ps;
        // 若是头节点等于前置节点,说明快轮到本身了
        if ((h = whead) == p) {
            if (spins < 0)
                // 初始化自旋次数
                spins = HEAD_SPINS;
            else if (spins < MAX_HEAD_SPINS)
                // 增长自旋次数
                spins <<= 1;
            
            // 第三次自旋,不断尝试获取写锁
            for (int k = spins;;) { // spin at head
                long s, ns;
                if (((s = state) & ABITS) == 0L) {
                    if (U.compareAndSwapLong(this, STATE, s,
                                             ns = s + WBIT)) {
                        // 尝试获取写锁成功,将node设置为新头节点并清除其前置节点(gc)
                        whead = node;
                        node.prev = null;
                        return ns;
                    }
                }
                // 随机立减自旋次数,当自旋次数减为0时跳出循环再重试
                else if (LockSupport.nextSecondarySeed() >= 0 &&
                         --k <= 0)
                    break;
            }
        }
        else if (h != null) { // help release stale waiters
            // 这段代码很难进来,是用于协助唤醒读节点的
            // 我是这么调试进来的:
            // 起三个写线程,两个读线程
            // 写线程1获取锁不要释放
            // 读线程1获取锁,读线程2获取锁(会阻塞)
            // 写线程2获取锁(会阻塞)
            // 写线程1释放锁,此时会唤醒读线程1
            // 在读线程1里面先不要唤醒读线程2
            // 写线程3获取锁,此时就会走到这里来了
            WNode c; Thread w;
            // 若是头节点的cowait链表(栈)不为空,唤醒里面的全部节点
            while ((c = h.cowait) != null) {
                if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                    (w = c.thread) != null)
                    U.unpark(w);
            }
        }
        
        // 若是头节点没有变化
        if (whead == h) {
            // 若是尾节点有变化,则更新
            if ((np = node.prev) != p) {
                if (np != null)
                    (p = np).next = node;   // stale
            }
            else if ((ps = p.status) == 0)
                // 若是尾节点状态为0,则更新成WAITING
                U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
            else if (ps == CANCELLED) {
                // 若是尾节点状态为取消,则把它从链表中删除
                if ((pp = p.prev) != null) {
                    node.prev = pp;
                    pp.next = node;
                }
            }
            else {
                // 有超时时间的处理
                long time; // 0 argument to park means no timeout
                if (deadline == 0L)
                    time = 0L;
                else if ((time = deadline - System.nanoTime()) <= 0L)
                    // 已超时,剔除当前节点
                    return cancelWaiter(node, node, false);
                // 当前线程
                Thread wt = Thread.currentThread();
                U.putObject(wt, PARKBLOCKER, this);
                // 把node的线程指向当前线程
                node.thread = wt;
                if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
                    whead == h && node.prev == p)
                    // 阻塞当前线程
                    U.park(false, time);  // 等同于LockSupport.park()
                    
                // 当前节点被唤醒后,清除线程
                node.thread = null;
                U.putObject(wt, PARKBLOCKER, null);
                // 若是中断了,取消当前节点
                if (interruptible && Thread.interrupted())
                    return cancelWaiter(node, node, true);
            }
        }
    }
}

这里对acquireWrite()方法作一个总结,这个方法里面有三段自旋逻辑:

第一段自旋——入队:

(1)若是头节点等于尾节点,说明没有其它线程排队,那就多自旋一会,看能不能尝试获取到写锁;

(2)不然,自旋次数为0,直接让其入队;

第二段自旋——阻塞并等待被唤醒 + 第三段自旋——不断尝试获取写锁:

(1)第三段自旋在第二段自旋内部;

(2)若是头节点等于前置节点,那就进入第三段自旋,不断尝试获取写锁;

(3)不然,尝试唤醒头节点中等待着的读线程;

(4)最后,若是当前线程一直都没有获取到写锁,就阻塞当前线程并等待被唤醒;

这么一大段逻辑看着比较闹心,其实真正分解下来仍是比较简单的,无非就是自旋,把不少状态的处理都糅合到一个for循环里面处理了。

unlockWrite()方法

释放写锁。

public void unlockWrite(long stamp) {
    WNode h;
    // 检查版本号对不对
    if (state != stamp || (stamp & WBIT) == 0L)
        throw new IllegalMonitorStateException();
    // 这行代码实际有两个做用:
    // 1. 更新版本号加1
    // 2. 释放写锁
    // stamp + WBIT实际会把state的第8位置为0,也就至关于释放了写锁
    // 同时会进1,也就是高24位总体加1了
    state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
    // 若是头节点不为空,而且状态不为0,调用release方法唤醒它的下一个节点
    if ((h = whead) != null && h.status != 0)
        release(h);
}
private void release(WNode h) {
    if (h != null) {
        WNode q; Thread w;
        // 将其状态改成0
        U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
        // 若是头节点的下一个节点为空或者其状态为已取消
        if ((q = h.next) == null || q.status == CANCELLED) {
            // 从尾节点向前遍历找到一个可用的节点
            for (WNode t = wtail; t != null && t != h; t = t.prev)
                if (t.status <= 0)
                    q = t;
        }
        // 唤醒q节点所在的线程
        if (q != null && (w = q.thread) != null)
            U.unpark(w);
    }
}

写锁的释放过程比较简单:

(1)更改state的值,释放写锁;

(2)版本号加1;

(3)唤醒下一个等待着的节点;

readLock()方法

获取读锁。

public long readLock() {
    long s = state, next;  // bypass acquireRead on common uncontended case
    // 没有写锁占用,而且读锁被获取的次数未达到最大值
    // 尝试原子更新读锁被获取的次数加1
    // 若是成功直接返回,若是失败调用acquireRead()方法
    return ((whead == wtail && (s & ABITS) < RFULL &&
             U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
            next : acquireRead(false, 0L));
}

获取读锁的时候先看看如今有没有其它线程占用着写锁,若是没有的话再检测读锁被获取的次数有没有达到最大,若是没有的话直接尝试获取一次读锁,若是成功了直接返回版本号,若是没成功就调用acquireRead()排队。

下面咱们一块儿来看看acquireRead()方法,这又是一个巨长无比的方法,请保持耐心,咱们一步步来分解:

(手机横屏看源码更方便)

private long acquireRead(boolean interruptible, long deadline) {
    // node为新增节点,p为尾节点
    WNode node = null, p;
    // 第一段自旋——入队
    for (int spins = -1;;) {
        // 头节点
        WNode h;
        // 若是头节点等于尾节点
        // 说明没有排队的线程了,快轮到本身了,直接自旋不断尝试获取读锁
        if ((h = whead) == (p = wtail)) {
            // 第二段自旋——不断尝试获取读锁
            for (long m, s, ns;;) {
                // 尝试获取读锁,若是成功了直接返回版本号
                if ((m = (s = state) & ABITS) < RFULL ?
                    U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
                    (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))
                    // 若是读线程个数达到了最大值,会溢出,返回的是0
                    return ns;
                else if (m >= WBIT) {
                    // m >= WBIT表示有其它线程先一步获取了写锁
                    if (spins > 0) {
                        // 随机立减自旋次数
                        if (LockSupport.nextSecondarySeed() >= 0)
                            --spins;
                    }
                    else {
                        // 若是自旋次数为0了,看看是否要跳出循环
                        if (spins == 0) {
                            WNode nh = whead, np = wtail;
                            if ((nh == h && np == p) || (h = nh) != (p = np))
                                break;
                        }
                        // 设置自旋次数
                        spins = SPINS;
                    }
                }
            }
        }
        // 若是尾节点为空,初始化头节点和尾节点
        if (p == null) { // initialize queue
            WNode hd = new WNode(WMODE, null);
            if (U.compareAndSwapObject(this, WHEAD, null, hd))
                wtail = hd;
        }
        else if (node == null)
            // 若是新增节点为空,初始化之
            node = new WNode(RMODE, p);
        else if (h == p || p.mode != RMODE) {
            // 若是头节点等于尾节点或者尾节点不是读模式
            // 当前节点入队
            if (node.prev != p)
                node.prev = p;
            else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
                p.next = node;
                break;
            }
        }
        else if (!U.compareAndSwapObject(p, WCOWAIT,
                                         node.cowait = p.cowait, node))
            // 接着上一个elseif,这里确定是尾节点为读模式了
            // 将当前节点加入到尾节点的cowait中,这是一个栈
            // 上面的CAS成功了是不会进入到这里来的
            node.cowait = null;
        else {
            // 第三段自旋——阻塞当前线程并等待被唤醒
            for (;;) {
                WNode pp, c; Thread w;
                // 若是头节点不为空且其cowait不为空,协助唤醒其中等待的读线程
                if ((h = whead) != null && (c = h.cowait) != null &&
                    U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                    (w = c.thread) != null) // help release
                    U.unpark(w);
                // 若是头节点等待前前置节点或者等于前置节点或者前前置节点为空
                // 这一样说明快轮到本身了
                if (h == (pp = p.prev) || h == p || pp == null) {
                    long m, s, ns;
                    // 第四段自旋——又是不断尝试获取锁
                    do {
                        if ((m = (s = state) & ABITS) < RFULL ?
                            U.compareAndSwapLong(this, STATE, s,
                                                 ns = s + RUNIT) :
                            (m < WBIT &&
                             (ns = tryIncReaderOverflow(s)) != 0L))
                            return ns;
                    } while (m < WBIT); // 只有当前时刻没有其它线程占有写锁就不断尝试
                }
                // 若是头节点不曾改变且前前置节点也不曾改
                // 阻塞当前线程
                if (whead == h && p.prev == pp) {
                    long time;
                    // 若是前前置节点为空,或者头节点等于前置节点,或者前置节点已取消
                    // 从第一个for自旋开始重试
                    if (pp == null || h == p || p.status > 0) {
                        node = null; // throw away
                        break;
                    }
                    // 超时检测
                    if (deadline == 0L)
                        time = 0L;
                    else if ((time = deadline - System.nanoTime()) <= 0L)
                        // 若是超时了,取消当前节点
                        return cancelWaiter(node, p, false);
                    
                    // 当前线程
                    Thread wt = Thread.currentThread();
                    U.putObject(wt, PARKBLOCKER, this);
                    // 设置进node中
                    node.thread = wt;
                    // 检测以前的条件不曾改变
                    if ((h != pp || (state & ABITS) == WBIT) &&
                        whead == h && p.prev == pp)
                        // 阻塞当前线程并等待被唤醒
                        U.park(false, time);
                    
                    // 唤醒以后清除线程
                    node.thread = null;
                    U.putObject(wt, PARKBLOCKER, null);
                    // 若是中断了,取消当前节点
                    if (interruptible && Thread.interrupted())
                        return cancelWaiter(node, p, true);
                }
            }
        }
    }
    
    // 只有第一个读线程会走到下面的for循环处,参考上面第一段自旋中有一个break,当第一个读线程入队的时候break出来的
    
    // 第五段自旋——跟上面的逻辑差很少,只不过这里单独搞一个自旋针对第一个读线程
    for (int spins = -1;;) {
        WNode h, np, pp; int ps;
        // 若是头节点等于尾节点,说明快轮到本身了
        // 不断尝试获取读锁
        if ((h = whead) == p) {
            // 设置自旋次数
            if (spins < 0)
                spins = HEAD_SPINS;
            else if (spins < MAX_HEAD_SPINS)
                spins <<= 1;
                
            // 第六段自旋——不断尝试获取读锁
            for (int k = spins;;) { // spin at head
                long m, s, ns;
                // 不断尝试获取读锁
                if ((m = (s = state) & ABITS) < RFULL ?
                    U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
                    (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
                    // 获取到了读锁
                    WNode c; Thread w;
                    whead = node;
                    node.prev = null;
                    // 唤醒当前节点中全部等待着的读线程
                    // 由于当前节点是第一个读节点,因此它是在队列中的,其它读节点都是挂这个节点的cowait栈中的
                    while ((c = node.cowait) != null) {
                        if (U.compareAndSwapObject(node, WCOWAIT,
                                                   c, c.cowait) &&
                            (w = c.thread) != null)
                            U.unpark(w);
                    }
                    // 返回版本号
                    return ns;
                }
                // 若是当前有其它线程占有着写锁,而且没有自旋次数了,跳出当前循环
                else if (m >= WBIT &&
                         LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
                    break;
            }
        }
        else if (h != null) {
            // 若是头节点不等待尾节点且不为空且其为读模式,协助唤醒里面的读线程
            WNode c; Thread w;
            while ((c = h.cowait) != null) {
                if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                    (w = c.thread) != null)
                    U.unpark(w);
            }
        }
        
        // 若是头节点不曾变化
        if (whead == h) {
            // 更新前置节点及其状态等
            if ((np = node.prev) != p) {
                if (np != null)
                    (p = np).next = node;   // stale
            }
            else if ((ps = p.status) == 0)
                U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
            else if (ps == CANCELLED) {
                if ((pp = p.prev) != null) {
                    node.prev = pp;
                    pp.next = node;
                }
            }
            else {
                // 第一个读节点即将进入阻塞
                long time;
                // 超时设置
                if (deadline == 0L)
                    time = 0L;
                else if ((time = deadline - System.nanoTime()) <= 0L)
                    // 若是超时了取消当前节点
                    return cancelWaiter(node, node, false);
                Thread wt = Thread.currentThread();
                U.putObject(wt, PARKBLOCKER, this);
                node.thread = wt;
                if (p.status < 0 &&
                    (p != h || (state & ABITS) == WBIT) &&
                    whead == h && node.prev == p)
                    // 阻塞第一个读节点并等待被唤醒
                    U.park(false, time);
                node.thread = null;
                U.putObject(wt, PARKBLOCKER, null);
                if (interruptible && Thread.interrupted())
                    return cancelWaiter(node, node, true);
            }
        }
    }
}

读锁的获取过程比较艰辛,一共有六段自旋,Oh my god,让咱们来大体地分解一下:

(1)读节点进来都是先判断是头节点若是等于尾节点,说明快轮到本身了,就不断地尝试获取读锁,若是成功了就返回;

(2)若是头节点不等于尾节点,这里就会让当前节点入队,这里入队又分红了两种;

(3)一种是首个读节点入队,它是会排队到整个队列的尾部,而后跳出第一段自旋;

(4)另外一种是非第一个读节点入队,它是进入到首个读节点的cowait栈中,因此更确切地说应该是入栈;

(5)不论是入队还入栈后,都会再次检测头节点是否是等于尾节点了,若是相等,则会再次不断尝试获取读锁;

(6)若是头节点不等于尾节点,那么才会真正地阻塞当前线程并等待被唤醒;

(7)上面说的首个读节点实际上是连续的读线程中的首个,若是是两个读线程中间夹了一个写线程,仍是老老实实的排队。

自旋,自旋,自旋,旋转的木马,让我忘了伤^^

unlockRead()方法

释放读锁。

public void unlockRead(long stamp) {
    long s, m; WNode h;
    for (;;) {
        // 检查版本号
        if (((s = state) & SBITS) != (stamp & SBITS) ||
            (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
            throw new IllegalMonitorStateException();
        // 读线程个数正常
        if (m < RFULL) {
            // 释放一次读锁
            if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
                // 若是读锁所有都释放了,且头节点不为空且状态不为0,唤醒它的下一个节点
                if (m == RUNIT && (h = whead) != null && h.status != 0)
                    release(h);
                break;
            }
        }
        else if (tryDecReaderOverflow(s) != 0L)
            // 读线程个数溢出检测
            break;
    }
}

private void release(WNode h) {
    if (h != null) {
        WNode q; Thread w;
        // 将其状态改成0
        U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
        // 若是头节点的下一个节点为空或者其状态为已取消
        if ((q = h.next) == null || q.status == CANCELLED) {
            // 从尾节点向前遍历找到一个可用的节点
            for (WNode t = wtail; t != null && t != h; t = t.prev)
                if (t.status <= 0)
                    q = t;
        }
        // 唤醒q节点所在的线程
        if (q != null && (w = q.thread) != null)
            U.unpark(w);
    }
}

读锁释放的过程就比较简单了,将state的低7位减1,当减为0的时候说明彻底释放了读锁,就唤醒下一个排队的线程。

tryOptimisticRead()方法

乐观读。

public long tryOptimisticRead() {
    long s;
    return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}

若是没有写锁,就返回state的高25位,这里把写所在位置一块儿返回了,是为了后面检测数据有没有被写过。

validate()方法

检测乐观读版本号是否变化。

public boolean validate(long stamp) {
    // 强制加入内存屏障,刷新数据
    U.loadFence();
    return (stamp & SBITS) == (state & SBITS);
}

检测二者的版本号是否一致,与SBITS与操做保证不受读操做的影响。

变异的CLH队列

StampedLock中的队列是一种变异的CLH队列,图解以下:

StampedLock

总结

StampedLock的源码解析到这里就差很少了,让咱们来总结一下:

(1)StampedLock也是一种读写锁,它不是基于AQS实现的;

(2)StampedLock相较于ReentrantReadWriteLock多了一种乐观读的模式,以及读锁转化为写锁的方法;

(3)StampedLock的state存储的是版本号,确切地说是高24位存储的是版本号,写锁的释放会增长其版本号,读锁不会;

(4)StampedLock的低7位存储的读锁被获取的次数,第8位存储的是写锁被获取的次数;

(5)StampedLock不是可重入锁,由于只有第8位标识写锁被获取了,并不能重复获取;

(6)StampedLock中获取锁的过程使用了大量的自旋操做,对于短任务的执行会比较高效,长任务的执行会浪费大量CPU;

(7)StampedLock不能实现条件锁;

彩蛋

StampedLock与ReentrantReadWriteLock的对比?

答:StampedLock与ReentrantReadWriteLock做为两种不一样的读写锁方式,彤哥大体概括了它们的异同点:

(1)二者都有获取读锁、获取写锁、释放读锁、释放写锁的方法,这是相同点;

(2)二者的结构基本相似,都是使用state + CLH队列;

(3)前者的state分红三段,高24位存储版本号、低7位存储读锁被获取的次数、第8位存储写锁被获取的次数;

(4)后者的state分红两段,高16位存储读锁被获取的次数,低16位存储写锁被获取的次数;

(5)前者的CLH队列能够当作是变异的CLH队列,连续的读线程只有首个节点存储在队列中,其它的节点存储的首个节点的cowait栈中;

(6)后者的CLH队列是正常的CLH队列,全部的节点都在这个队列中;

(7)前者获取锁的过程当中有判断首尾节点是否相同,也就是是否是快轮到本身了,若是是则不断自旋,因此适合执行短任务;

(8)后者获取锁的过程当中非公平模式下会作有限次尝试;

(9)前者只有非公平模式,一上来就尝试获取锁;

(10)前者唤醒读锁是一次性唤醒连续的读锁的,并且其它线程还会协助唤醒;

(11)后者是一个接着一个地唤醒的;

(12)前者有乐观读的模式,乐观读的实现是经过判断state的高25位是否有变化来实现的;

(13)前者各类模式能够互转,相似tryConvertToXxx()方法;

(14)前者写锁不可重入,后者写锁可重入;

(15)前者没法实现条件锁,后者能够实现条件锁;

差很少就这么多吧,若是你还能想到,也欢迎补充哦^^

推荐阅读

一、死磕 java同步系列之开篇

二、死磕 java魔法类之Unsafe解析

三、死磕 java同步系列之JMM(Java Memory Model)

四、死磕 java同步系列之volatile解析

五、死磕 java同步系列之synchronized解析

六、死磕 java同步系列之本身动手写一个锁Lock

七、死磕 java同步系列之AQS起篇

八、死磕 java同步系列之ReentrantLock源码解析(一)——公平锁、非公平锁

九、死磕 java同步系列之ReentrantLock源码解析(二)——条件锁

十、死磕 java同步系列之ReentrantLock VS synchronized

十一、死磕 java同步系列之ReentrantReadWriteLock源码解析

十二、死磕 java同步系列之Semaphore源码解析

1三、死磕 java同步系列之CountDownLatch源码解析

1四、死磕 java同步系列之AQS终篇


欢迎关注个人公众号“彤哥读源码”,查看更多源码系列文章, 与彤哥一块儿畅游源码的海洋。

qrcode

相关文章
相关标签/搜索