JAVA-ConcurrentSkipListMap笔记

什么是跳表

跳表(skip list)是在1990年由William Pugh提出的一种数据结构.JAVA里也有对应的数据结构--ConcurrentSkipListMap.引用wiki中的一个gif来展现跳表的插入过程.node

ConcurrentSkipListMap的数据结构

在ConcurrentSkipListMap中有三种类型的节点:Node<K,V>、Index<K,V>和HeadIndex<K,V>.数组

  • Node<K,V>:用于构建最底层的数据链表.里面存储真实的数据,K、V表示键值对.
static final class Node<K,V> {
    final K key;
    volatile Object value;
    volatile Node<K,V> next;
    
    // 建立一个普通节点
    Node(K key, Object value, Node<K,V> next) {
        this.key = key;
        this.value = value;
        this.next = next;
    }
    
    // 建立一个新的marker节点,它的value字段指向它本身,而且key是null(base-level-header的key也是null)
    Node(Node<K,V> next) {
        this.key = null;
        this.value = this;
        this.next = next;
    }
    
    // 经过cas的方式设置value字段
    boolean casValue(Object cmp, Object val) {
        return UNSAFE.compareAndSwapObject(this, valueOffset, cmp, val);
    }
    
    // 经过cas的方式设置next字段.
    boolean casNext(Node<K,V> cmp, Node<K,V> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }
   
    boolean isMarker() {
        return value == this;
    }
    
    boolean isBaseHeader() {
        return value == BASE_HEADER;
    }
    
    // 该操做分两步.
    // 1.建立一个marker节点,并将该marker节点的next指向f.
    // 2.将当前节点的next字段由f替换为建立的marker节点.
    boolean appendMarker(Node<K,V> f) {
        return casNext(f, new Node<K,V>(f));
    }
    
    // 删除当前节点
    // 若是未进行标记,则在当前节点以后添加标记节点.
    // 若是已作过标记,那么直接调用cas操做,将b的next字段指向f.next(此时f是marker节点,f.next才是真正的后继节点)
    void helpDelete(Node<K,V> b, Node<K,V> f) {
         // 这里分两步操做,主要是减小协助删除的线程之间的CAS干扰.
        if (f == next && this == b.next) {
            if (f == null || f.value != f) // not already marked
                casNext(f, new Node<K,V>(f));
            else
                b.casNext(this, f.next);
        }
    }
    
    V getValidValue() {
        Object v = value;
        if (v == this || v == BASE_HEADER)
            return null;
        @SuppressWarnings("unchecked") V vv = (V)v;
        return vv;
    }
    
    AbstractMap.SimpleImmutableEntry<K,V> createSnapshot() {
        Object v = value;
        if (v == null || v == this || v == BASE_HEADER)
            return null;
        @SuppressWarnings("unchecked") V vv = (V)v;
        return new AbstractMap.SimpleImmutableEntry<K,V>(key, vv);
    }
    // UNSAFE mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    private static final long nextOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = Node.class;
            // 获取value字段的偏移量
            valueOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("value"));
            // 获取next字段的偏移量
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}
复制代码
  • Index<K,V>:用于构建上层的链表层.
static class Index<K,V> {
    final Node<K,V> node; // 指向base-level Node节点
    final Index<K,V> down; // 指向下一层Index节点.
    volatile Index<K,V> right; // 指向右侧Index节点.
    /**
     * Creates index node with given values.
     */
    Index(Node<K,V> node, Index<K,V> down, Index<K,V> right) {
        this.node = node;
        this.down = down;
        this.right = right;
    }
    
    // cas right字段
    final boolean casRight(Index<K,V> cmp, Index<K,V> val) {
        return UNSAFE.compareAndSwapObject(this, rightOffset, cmp, val);
    }
    
    final boolean indexesDeletedNode() {
        return node.value == null;
    }
    
    final boolean link(Index<K,V> succ, Index<K,V> newSucc) {
        Node<K,V> n = node;
        newSucc.right = succ;
        // 当前Index节点的node节点未被删除,才经过cas将right字段指向newSucc节点.
        return n.value != null && casRight(succ, newSucc);
    }
    
    // 解绑,使当前Index节点的right字段跳过succ节点.
    final boolean unlink(Index<K,V> succ) {
        return node.value != null && casRight(succ, succ.right);
    }
    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long rightOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = Index.class;
            rightOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("right"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}
复制代码
  • HeadIndex<K,V>:头结点.其属性level用于记录层数.bash

  • 结构数据结构

ConcurrentSkipListMap的结构相似于上图.

1.head头结点位于最左侧.HeadIndex节点的个数决定了层数.
2.上层的链表由Index节点组成.
3.最底层的链表结构由Node节点组成.
4.其中浅黄的N表明指向底层Node节点的指针.
5.D表明Down指针,指向下一层的Index节点.
6.浅绿色的N表明next指针,指向下个一个Node节点.
7.L1和L2后面的数字表明HeadIndex节点处于的层数.
复制代码

put操做

ConcurrentSkipListMap的put操做(doPut方法)能够分解为两个步骤.app

  • 底层Node链表的构造
Node<K,V> z;             // added node
if (key == null)
    throw new NullPointerException();
Comparator<? super K> cmp = comparator;
// 0.外层outer循环.
outer: for (;;) {
    // 1.每次循环查询key的前置Node节点(b),并获取前置节点的下一个节点(n)
    for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
        //2.判断b的后继节点n是否为null.为null跳转到8
        if (n != null) {
            Object v; int c;
            Node<K,V> f = n.next;
            // 3.若是n再也不是b的下一个节点,说明结构已被其余线程修改.跳到1继续循环
            if (n != b.next)               // inconsistent read
                break;
            //4. 节点n被删除的状况,删除节点n,跳到1继续循环
            if ((v = n.value) == null) {   // n is deleted
                n.helpDelete(b, f);
                break;
            }
            //5. 节点b被删除,跳到1继续执行
            if (b.value == null || v == n) // b is deleted
                break;
            
            //6. 比较传入的key和n的key,比较结果大于0.将b和n都后移,而后跳到2继续执行
            if ((c = cpr(cmp, key, n.key)) > 0) {
                b = n;
                n = f;
                continue;
            }
            //7. key相等
            if (c == 0) {
                // 若是onlyIfAbsent为true或者value替换成功,则返回老的value值,结束doPut方法.
                if (onlyIfAbsent || n.casValue(v, value)) {
                    @SuppressWarnings("unchecked") V vv = (V)v;
                    return vv;
                }
                // 跳转到1.
                break; // restart if lost race to replace value
            }
            // else c < 0; fall through
  
        //8. 若是n为null,直接新建Node节点z
        z = new Node<K,V>(key, value, n);
        //9. 经过cas操做将b的next指针指向z.
        if (!b.casNext(n, z))
            //10. cas操做失败,跳到1继续执行.
            break;         // restart if lost race to append to b
        //11. cas成功,跳出outer循环,进行Index节点的构建.
        break outer;
    }
}
复制代码

流程dom

  • 上层Index链表的构造(可能有多层)
// 获取随机数,用于计算层数
int rnd = ThreadLocalRandom.nextSecondarySeed();
//0. 判断最高位和最低位是否都为0,rnd的最高位和最低位都为0的状况才构建Index
if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
    int level = 1, max;
    //1. 对rnd的每一位进行判断,当bit位为1时将level+1
    while (((rnd >>>= 1) & 1) != 0)
        ++level;
    Index<K,V> idx = null;
    HeadIndex<K,V> h = head;
    //2. 判断level是否小于等于当前的层数.true,执行3;false,执行4.
    if (level <= (max = h.level)) {
        //3. 建立level个Index节点,每一个Index节点的down指针指向上一个Index节点.第一个Index节点的down指针指向null.
        for (int i = 1; i <= level; ++i)
            // z为须要插入的Node节点,让全部建立的index节点都指向z.
            idx = new Index<K,V>(z, idx, null);
    }
    else { // try to grow by one level
        //4. 将level设置为max + 1.若是当前跳表层数是4,那么level就是5.
        level = max + 1; // hold in array and later pick the one to use
        //5. 建立Index数组,这里的数组大小是level+1,由于下标0不存储数据.
        @SuppressWarnings("unchecked")Index<K,V>[] idxs =
            (Index<K,V>[])new Index<?,?>[level+1];
        //6. 建立Index节点,并放入idxs数组中.
        for (int i = 1; i <= level; ++i)
            idxs[i] = idx = new Index<K,V>(z, idx, null);
        //7. 循环.
        for (;;) {
            //8. 从新获取头节点
            h = head;
            //9. 从新获取跳表的层数
            int oldLevel = h.level;
            //10. 比较level和跳表层数.true,执行14;false,执行11.
            if (level <= oldLevel) // lost race to add level
                break;
            HeadIndex<K,V> newh = h;
            Node<K,V> oldbase = h.node;
            //11. 构造新的Head节点.此时level是大于oldLevel的.这里举个例子(E1):好比原来的层数是3,而level是4,那么就须要新增1个HeadIndex节点,那么HeadIndex也就变成了4层.最后获得的newh也就是最上层的HeadIndex节点.
            for (int j = oldLevel+1; j <= level; ++j)
                newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
                
            //12. 经过cas操做将head节点替换为新的最上层的headIndex节点.操做成功,执行14;操做失败,跳到7继续循环.
            if (casHead(h, newh)) {
                h = newh;
                //*13*. 这里将level设置为了oldLevel.并将idx指向idxs[level],参照E1,那么这里idx就是指向idxs[3].之因此要这样作,是由于以前跳表只有3层,如今变成了4层,而且在step11的时候建立了新的HeadIndex,而且直接将右指针指向了idxs[4],所以后面的操做只须要将idxs[1]、idxs[2]、idxs[3]插入到对应层的Index链表中便可.
                idx = idxs[level = oldLevel];
                break;
            }
        }
    }
    // find insertion points and splice in
    splice: for (int insertionLevel = level;;) {
        //14.
        int j = h.level;
        //15. 初始化数据.
        for (Index<K,V> q = h, r = q.right, t = idx;;) {
            //16. q(当前层的headIndex节点)为null或t(要插入的Index节点)为null.跳出splice循环,结束方法.
            if (q == null || t == null)
                break splice;
            //17. 判断r是否为null.true,执行18;false,执行25.
            if (r != null) {
                //18. n为r所指向的Node节点.
                Node<K,V> n = r.node;
                // compare before deletion check avoids needing recheck
                //19. 获取key值比较的结果.
                int c = cpr(cmp, key, n.key);
                //20. 判断n是否被删除.
                if (n.value == null) {
                    //21.断开q与r的链接,也就是经过cas操做使q的right指针指向r的右继Index节点.
                    if (!q.unlink(r))
                        //22. 操做失败,有其余线程在修改,跳到14继续执行.
                        break;
                    //23. 操做成功,让r从新指向q的右继节点,跳到16继续执行.
                    r = q.right;
                    continue;
                }
                if (c > 0) {
                    //24. c大于0.将q和r右移,跳到16继续执行.
                    q = r;
                    r = r.right;
                    continue;
                }
            }

            //25. 判断j和insertionLevel是否相等.相等才插入Index节点.
            // 这里j为当前头结点h的层数(有可能执行到这里时,其余线程又增长了HeadIndex节点的个数,所以此时的h指向的并不必定是最高层的HeadIndex节点,可是不要紧,咱们此次的插入操做只须要关心h所指向的HeadIndex节点及其下层的HeadIndex节点所在的Index链表).
            // insertionLevel指的是须要插入t的层数.
            // 仍是拿E1来举例,若是操做都成功,那么j就是4,而insertionLevel为3,因此这时不能插入,由于此时t指向的Index节点是属于第3层的,须要将q = q.down,直到q也是第3层时才能对t进行插入操做.
            if (j == insertionLevel) {
                //26. 插入t
                if (!q.link(r, t))
                    // 不成功则跳到14继续执行.
                    break; // restart
                // 若是新插入的节点被删除
                if (t.node.value == null) {
                    // 这里的findNode方式实际上是为了辅助删除,在遍历的链路上的被标记删除的节点都会被完全删除掉.并跳出splice循环,结束put操做.
                    findNode(key);
                    break splice;
                }
                //27. insertionLevel自减,等于0说明操做完成,跳出splice循环,结束方法.
                if (--insertionLevel == 0)
                    break splice;
            }

            //*28*. j自减.
            // 正常状况,当执行完27以后,--j应该是等于insertionLevel,而且j也是小于level的,这个时候将t下移没有问题.
            // 可是有一种异常状况,仍是拿E1举例.当j为4,insertionLevel为3时,因为j != insertionLevel,所以j会一直自减直到j也为3的时候,会执行25.
            // 若是j为3的时候执行插入成功,那么27执行后insertionLevel会变为2,j也会变为2,此时t将下移.
            // 而后继续循环,好比当insertionLevel = 1的时候,若是走到26的时候插入失败,那么就会跳到14继续执行,此时j和t都会被初始化.也就是说此时j又变成4了,而且t也再也不指向第1层的Index节点,而是指向第3层的Index节点(而此时第3层节点其实已经插入成功了),可是insertionLevel仍是1,因此在这种状况下,--j > insertionLevel && j < level的状况下,也须要将t下移,使其指向最后将要插入的Index节点.
            // 经过上面两种状况,所以if表达式中须要写成 --j >= insertionLevel && j < level,而不是--j == insertionLevel && j < level
            if (--j >= insertionLevel && j < level)
                t = t.down;
            
            // q下移,以进行下一层的插入操做.
            q = q.down;
            r = q.right;
        }
    }
}
复制代码

流程ui

Remove操做

final V doRemove(Object key, Object value) {
    if (key == null)
        throw new NullPointerException();
    Comparator<? super K> cmp = comparator;
    outer: for (;;) {
        // 0. 获取前置Node节点b以及b的next节点n.
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
            // 1.
            Object v; int c;
            if (n == null)
                // n为null,由于b.key < key <= n.key,所以若是n为null,那么key所对应的节点应该也被删除,所以直接跳到20.
                break outer;
            Node<K,V> f = n.next;
            if (n != b.next)                    // inconsistent read
                // 2. 说明在以前有线程对跳表的数据结构进行了修改,致使n != b.next,因此跳到0继续执行.
                break;
            if ((v = n.value) == null) {        // n is deleted
                //3.value为null,说明节点n被删除了,因此此时协助删除.参考数据结构中的Node类的方法解析.
                // 跳转到0继续执行.
                n.helpDelete(b, f);
                break;
            }
            
            if (b.value == null || v == n)      // b is deleted
                //4. b的value为null或者v == n(说明n是一个marker节点)都说明节点b已被删除.
                // 跳转到0继续执行.
                break;
            if ((c = cpr(cmp, key, n.key)) < 0)
                //5. key < n.key说明没有须要删除的节点,直接跳转到20返回.
                break outer;
            if (c > 0) {
                //6. key > n.key,将b、n右移,而后跳转到1继续执行.
                b = n;
                n = f;
                continue;
            }
            // 参考remove方法
            if (value != null && !value.equals(v))
                break outer;
            //7. 经过cas操做将n的value设置为null
            if (!n.casValue(v, null))
                //8. 失败,跳转到0继续执行.
                break;
            
            if (!n.appendMarker(f) || !b.casNext(n, f))
                //9. 添加marker节点失败或直接删除节点n失败,调用findNode方法
                findNode(key);                  // retry via findNode
            else {
                //10. 调用findPredecessor
                findPredecessor(key, cmp);      // clean index
                if (head.right == null)
                    //11. head的右边节点为null,则尝试减少level.
                    tryReduceLevel();
            }
            @SuppressWarnings("unchecked") V vv = (V)v;
            return vv;
        }
    }
    // 20.
    return null;
}
复制代码
相关文章
相关标签/搜索