到目前为止,咱们在Java世界里看到了两种实现key-value的数据结构:Hash、TreeMap,这两种数据结构各自都有着优缺点。java
什么是SkipList?Skip List ,称之为跳表,它是一种能够替代平衡树的数据结构,其数据元素默认按照key值升序,自然有序。Skip list让已排序的数据分布在多层链表中,以0-1随机数决定一个数据的向上攀升与否,经过“空间来换取时间”的一个算法,在每一个节点中增长了向前的指针,在插入、删除、查找时能够忽略一些不可能涉及到的结点,从而提升了效率。node
咱们先看一个简单的链表,以下:算法
若是咱们须要查询九、2一、30,则须要比较次数为3 + 6 + 8 = 17 次,那么有没有优化方案呢?有!咱们将该链表中的某些元素提炼出来做为一个比较“索引”,以下:数组
咱们先与这些索引进行比较来决定下一个元素是往右仍是下走,因为存在“索引”的缘故,致使在检索的时候会大大减小比较的次数。固然元素不是不少,很难体现出优点,当元素足够多的时候,这种索引结构就会大显身手。安全
SkipList具有以下特性:数据结构
咱们将上图再作一些扩展就能够变成一个典型的SkipList结构了并发
SkipListd的查找算法较为简单,对于上面咱们咱们要查找元素21,其过程以下:app
红色虚线表明路径。dom
SkipList的插入操做主要包括:函数
假定咱们要插入的元素为23,通过查找能够确认她是位于25前,九、1六、21后。固然须要考虑申请的层次K。
若是层次K > 3
须要申请新层次(Level 4)
若是层次 K = 2
直接在Level 2 层插入便可
这里会涉及到以个算法:经过丢硬币决定层次K,该算法咱们经过后面ConcurrentSkipListMap源码来分析。还有一个须要注意的地方就是,在K层插入元素后,须要确保全部小于K层的层次都应该出现新节点。
删除节点和插入节点思路基本一致:找到节点,删除节点,调整指针。
好比删除节点9,以下:
经过上面咱们知道SkipList采用空间换时间的算法,其插入和查找的效率O(logn),其效率不低于红黑树,可是其原理和实现的复杂度要比红黑树简单多了。通常来讲会操做链表List,就会对SkipList毫无压力。
ConcurrentSkipListMap其内部采用SkipLis数据结构实现。为了实现SkipList,ConcurrentSkipListMap提供了三个内部类来构建这样的链表结构:Node、Index、HeadIndex。其中Node表示最底层的单链表有序节点、Index表示为基于Node的索引层,HeadIndex用来维护索引层次。到这里咱们能够这样说ConcurrentSkipListMap是经过HeadIndex维护索引层次,经过Index从最上层开始往下层查找,一步一步缩小查询范围,最后到达最底层Node时,就只须要比较很小一部分数据了。在JDK中的关系以下图:
static final class Node<K,V> {
final K key;
volatile Object value;
volatile ConcurrentSkipListMap.Node<K, V> next;
/** 省略些许代码 */
}
复制代码
Node的结构和通常的单链表毫无区别,key-value和一个指向下一个节点的next。
static class Index<K,V> {
final ConcurrentSkipListMap.Node<K,V> node;
final ConcurrentSkipListMap.Index<K,V> down;
volatile ConcurrentSkipListMap.Index<K,V> right;
/** 省略些许代码 */
}
复制代码
Index提供了一个基于Node节点的索引Node,一个指向下一个Index的right,一个指向下层的down节点。
static final class HeadIndex<K,V> extends Index<K,V> {
final int level; //索引层,从1开始,Node单链表层为0
HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
super(node, down, right);
this.level = level;
}
}
复制代码
HeadIndex内部就一个level来定义层级。
ConcurrentSkipListMap提供了四个构造函数,每一个构造函数都会调用initialize()方法进行初始化工做。
final void initialize() {
keySet = null;
entrySet = null;
values = null;
descendingMap = null;
randomSeed = seedGenerator.nextInt() | 0x0100; // ensure nonzero
head = new ConcurrentSkipListMap.HeadIndex<K,V>(new ConcurrentSkipListMap.Node<K,V>(null, BASE_HEADER, null),
null, null, 1);
}
复制代码
注意,initialize()方法不只仅只在构造函数中被调用,如clone,clear、readObject时都会调用该方法进行初始化步骤。这里须要注意randomSeed的初始化。
private transient int randomSeed;
randomSeed = seedGenerator.nextInt() | 0x0100; // ensure nonzero
复制代码
randomSeed一个简单的随机数生成器(在后面介绍)。
CoucurrentSkipListMap提供了put()方法用于将指定值与此映射中的指定键关联。源码以下:
public V put(K key, V value) {
if (value == null)
throw new NullPointerException();
return doPut(key, value, false);
}
复制代码
首先判断value若是为null,则抛出NullPointerException,不然调用doPut方法,其实若是各位看过JDK的源码的话,应该对这样的操做很熟悉了,JDK源码里面不少方法都是先作一些必要性的验证后,而后经过调用do**()方法进行真正的操做。
doPut()方法内容较多,咱们分步分析。
private V doPut(K key, V value, boolean onlyIfAbsent) {
Node<K,V> z; // added node
if (key == null)
throw new NullPointerException();
// 比较器
Comparator<? super K> cmp = comparator;
outer: for (;;) {
for (Node<K, V> b = findPredecessor(key, cmp), n = b.next; ; ) {
/** 省略代码 */
复制代码
doPut()方法有三个参数,除了key,value外还有一个boolean类型的onlyIfAbsent,该参数做用与若是存在当前key时,该作何动做。当onlyIfAbsent为false时,替换value,为true时,则返回该value。用代码解释为:
if (!map.containsKey(key))
return map.put(key, value);
else
return map.get(key);
复制代码
首先判断key是否为null,若是为null,则抛出NullPointerException,从这里咱们能够确认ConcurrentSkipList是不支持key或者value为null的。而后调用findPredecessor()方法,传入key来确认位置。findPredecessor()方法其实就是确认key要插入的位置。
private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
if (key == null)
throw new NullPointerException(); // don't postpone errors
for (;;) {
// 从head节点开始,head是level最高级别的headIndex
for (Index<K,V> q = head, r = q.right, d;;) {
// r != null,表示该节点右边还有节点,须要比较
if (r != null) {
Node<K,V> n = r.node;
K k = n.key;
// value == null,表示该节点已经被删除了
// 经过unlink()方法过滤掉该节点
if (n.value == null) {
//删掉r节点
if (!q.unlink(r))
break; // restart
r = q.right; // reread r
continue;
}
// value != null,节点存在
// 若是key 大于r节点的key 则往前进一步
if (cpr(cmp, key, k) > 0) {
q = r;
r = r.right;
continue;
}
}
// 到达最右边,若是dowm == null,表示指针已经达到最下层了,直接返回该节点
if ((d = q.down) == null)
return q.node;
q = d;
r = d.right;
}
}
}
复制代码
findPredecessor()方法意思很是明确:寻找前辈。从最高层的headIndex开始向右一步一步比较,直到right为null或者右边节点的Node的key大于当前key为止,而后再向下寻找,依次重复该过程,直到down为null为止,即找到了前辈,看返回的结果注意是Node,不是Item,因此插入的位置应该是最底层的Node链表。
在这个过程当中ConcurrentSkipListMap赋予了该方法一个其余的功能,就是经过判断节点的value是否为null,若是为null,表示该节点已经被删除了,经过调用unlink()方法删除该节点。
final boolean unlink(Index<K,V> succ) {
return node.value != null && casRight(succ, succ.right);
}
复制代码
删除节点过程很是简单,更改下right指针便可。
经过findPredecessor()找到前辈节点后,作什么呢?看下面:
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
// 前辈节点的next != null
if (n != null) {
Object v; int c;
Node<K,V> f = n.next;
// 不一致读,主要缘由是并发,有节点捷足先登
if (n != b.next) // inconsistent read
break;
// n.value == null,该节点已经被删除了
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
// 前辈节点b已经被删除
if (b.value == null || v == n) // b is deleted
break;
// 节点大于,往前移
if ((c = cpr(cmp, key, n.key)) > 0) {
b = n;
n = f;
continue;
}
// c == 0 表示,找到一个key相等的节点,根据onlyIfAbsent参数来作判断
// onlyIfAbsent ==false,则经过casValue,替换value
// onlyIfAbsent == true,返回该value
if (c == 0) {
if (onlyIfAbsent || n.casValue(v, value)) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
break; // restart if lost race to replace value
}
// else c < 0; fall through
}
// 将key-value包装成一个node,插入
z = new Node<K,V>(key, value, n);
if (!b.casNext(n, z))
break; // restart if lost race to append to b
break outer;
}
复制代码
找到合适的位置后,就是在该位置插入节点咯。插入节点的过程比较简单,就是将key-value包装成一个Node,而后经过casNext()方法加入到链表当中。固然是插入以前须要进行一系列的校验工做。
在最下层插入节点后,下一步工做是什么?新建索引。前面博主提过,在插入节点的时候,会根据采用抛硬币的方式来决定新节点所插入的层次,因为存在并发的可能,ConcurrentSkipListMap采用ThreadLocalRandom来生成随机数。以下:
int rnd = ThreadLocalRandom.nextSecondarySeed();
复制代码
抛硬币决定层次的思想很简单,就是经过抛硬币若是硬币为正面则层次level + 1 ,不然中止,以下:
// 抛硬币决定层次
while (((rnd >>>= 1) & 1) != 0)
++level;
复制代码
在阐述SkipList插入节点的时候说明了,决定的层次level会分为两种状况进行处理,一是若是层次level大于最大的层次话则须要新增一层,不然就在相应层次以及小于该level的层次进行节点新增处理。
// 若是决定的层次level比最高层次head.level小,直接生成最高层次的index
// 因为须要确认每一层次的down,因此须要从最下层依次往上生成
if (level <= (max = h.level)) {
for (int i = 1; i <= level; ++i)
idx = new ConcurrentSkipListMap.Index<K,V>(z, idx, null);
}
复制代码
从底层开始,小于level的每一层都初始化一个index,每次的node都指向新加入的node,down指向下一层的item,右侧next所有为null。整个处理过程很是简单:为小于level的每一层初始化一个index,而后加入到原来的index链条中去。
// leve > head.level 则新增一层
else { // try to grow by one level
// 新增一层
level = max + 1;
// 初始化 level个item节点
@SuppressWarnings("unchecked")
ConcurrentSkipListMap.Index<K,V>[] idxs =
(ConcurrentSkipListMap.Index<K,V>[])new ConcurrentSkipListMap.Index<?,?>[level+1];
for (int i = 1; i <= level; ++i)
idxs[i] = idx = new ConcurrentSkipListMap.Index<K,V>(z, idx, null);
//
for (;;) {
h = head;
int oldLevel = h.level;
// 层次扩大了,须要从新开始(有新线程节点加入)
if (level <= oldLevel) // lost race to add level
break;
// 新的头结点HeadIndex
ConcurrentSkipListMap.HeadIndex<K,V> newh = h;
ConcurrentSkipListMap.Node<K,V> oldbase = h.node;
// 生成新的HeadIndex节点,该HeadIndex指向新增层次
for (int j = oldLevel+1; j <= level; ++j)
newh = new ConcurrentSkipListMap.HeadIndex<K,V>(oldbase, newh, idxs[j], j);
// HeadIndex CAS替换
if (casHead(h, newh)) {
h = newh;
idx = idxs[level = oldLevel];
break;
}
}
复制代码
当抛硬币决定的level大于最大层次level时,须要新增一层进行处理。处理逻辑以下:
// 从插入的层次level开始
splice: for (int insertionLevel = level;;) {
int j = h.level;
// 从headIndex开始
for (ConcurrentSkipListMap.Index<K,V> q = h, r = q.right, t = idx;;) {
if (q == null || t == null)
break splice;
// r != null;这里是找到相应层次的插入节点位置,注意这里只横向找
if (r != null) {
ConcurrentSkipListMap.Node<K,V> n = r.node;
int c = cpr(cmp, key, n.key);
// n.value == null ,解除关系,r右移
if (n.value == null) {
if (!q.unlink(r))
break;
r = q.right;
continue;
}
// key > n.key 右移
if (c > 0) {
q = r;
r = r.right;
continue;
}
}
// 上面找到节点要插入的位置,这里就插入
// 当前层是最顶层
if (j == insertionLevel) {
// 创建联系
if (!q.link(r, t))
break; // restart
if (t.node.value == null) {
findNode(key);
break splice;
}
// 标志的插入层 -- ,若是== 0 ,表示已经到底了,插入完毕,退出循环
if (--insertionLevel == 0)
break splice;
}
// 上面节点已经插入完毕了,插入下一个节点
if (--j >= insertionLevel && j < level)
t = t.down;
q = q.down;
r = q.right;
}
}
复制代码
这段代码分为两部分看,一部分是找到相应层次的该节点插入的位置,第二部分在该位置插入,而后下移。
至此,ConcurrentSkipListMap的put操做到此就结束了。代码量有点儿多,这里总结下:
相比于put操做 ,get操做会简单不少,其过程其实就只至关于put操做的第一步:
private V doGet(Object key) {
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
for (ConcurrentSkipListMap.Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
if (n == null)
break outer;
ConcurrentSkipListMap.Node<K,V> f = n.next;
if (n != b.next) // inconsistent read
break;
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
if (b.value == null || v == n) // b is deleted
break;
if ((c = cpr(cmp, key, n.key)) == 0) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
if (c < 0)
break outer;
b = n;
n = f;
}
}
return null;
}
复制代码
与put操做第一步类似,首先调用findPredecessor()方法找到前辈节点,而后顺着right一直往右找便可,同时在这个过程当中一样承担了一个删除value为null的节点的职责。
remove操做为删除指定key节点,以下:
public V remove(Object key) {
return doRemove(key, null);
}
复制代码
直接调用doRemove()方法,这里remove有两个参数,一个是key,另一个是value,因此doRemove方法即提供remove key,也提供同时知足key-value。
final V doRemove(Object key, Object value) {
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
for (ConcurrentSkipListMap.Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
if (n == null)
break outer;
ConcurrentSkipListMap.Node<K,V> f = n.next;
// 不一致读,从新开始
if (n != b.next) // inconsistent read
break;
// n节点已删除
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
// b节点已删除
if (b.value == null || v == n) // b is deleted
break;
if ((c = cpr(cmp, key, n.key)) < 0)
break outer;
// 右移
if (c > 0) {
b = n;
n = f;
continue;
}
/* * 找到节点 */
// value != null 表示须要同时校验key-value值
if (value != null && !value.equals(v))
break outer;
// CAS替换value
if (!n.casValue(v, null))
break;
if (!n.appendMarker(f) || !b.casNext(n, f))
findNode(key); // retry via findNode
else {
// 清理节点
findPredecessor(key, cmp); // clean index
// head.right == null表示该层已经没有节点,删掉该层
if (head.right == null)
tryReduceLevel();
}
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
}
return null;
}
复制代码
调用findPredecessor()方法找到前辈节点,而后经过右移,而后比较,找到后利用CAS把value替换为null,而后判断该节点是否是这层惟一的index,若是是的话,调用tryReduceLevel()方法把这层干掉,完成删除。
其实从这里能够看出,remove方法仅仅是把Node的value设置null,并无真正删除该节点Node,其实从上面的put操做、get操做咱们能够看出,他们在寻找节点的时候都会判断节点的value是否为null,若是为null,则调用unLink()方法取消关联关系,以下:
if (n.value == null) {
if (!q.unlink(r))
break; // restart
r = q.right; // reread r
continue;
}
复制代码
ConcurrentSkipListMap的size()操做和ConcurrentHashMap不一样,它并无维护一个全局变量来统计元素的个数,因此每次调用该方法的时候都须要去遍历。
public int size() {
long count = 0;
for (Node<K,V> n = findFirst(); n != null; n = n.next) {
if (n.getValidValue() != null)
++count;
}
return (count >= Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) count;
}
复制代码
调用findFirst()方法找到第一个Node,而后利用node的next去统计。最后返回统计数据,最多能返回Integer.MAX_VALUE。注意这里在线程并发下是安全的。