背景(注释):java
一个并发的相似ConcurrentNavigableMap的实现。node
这个map经过实现Comparable或者提供一个Comparator来实现排列的,经过构造函数来提供。算法
这个实现是一个SkipLists的并发版本而且为containsKey/get/put/remove操做提供了log(n)的消耗。插入、删除、更新和读取能够在多个线程之间安全并发。数组
Iterators和spliterators是弱兼容的。从小到大的key排列中的视图比从大到小的要快。安全
全部从这些方法中返回的Map.Entry只是某时刻的一个快照。他们不提供setValue方法。(注意到你能够经过改变映射经过使用put、putIfAbsent、replace方法,依赖于你本身想要的效果)并发
注意不像其余大部分的集合那样,size方法不是一个常量时间运算。由于这个map的异步特性,决定了这个map的元素个数必须经过遍历获得,因此在遍历过程当中改变了map那么就会获得一个不精确的结果。而且,这些以大量数据位参数的方法像putAll、equals、toArray、containsValue以及clear不会保证以原子的方式执行。好比,一个遍历操做和putAll操做并发,那么只会看到部分添加的元素。app
这个类和它的视图还有迭代器实现了全部的可选的Map和Iterator接口的方法。像其余的并发结合,这个类不支持把null做为key或者value,由于null做为返回值没法区分是否缺乏元素。dom
算法(注释):异步
这个类实现了一个相似于树的二维跳表,标识段经过连接包含不一样数据的基本节点来展现。有两个缘由说明为什么使用这种方法代替类数组的结构:函数
为了使用删除标记,这个链表使用null的方法来指示删除,一种相似于延迟删除的模型。若是一个节点的value为null,那么它被认为是局部删除,就算它仍是可达的。这样须要组织合适的并发控制在代替vs删除操做--一个代替操做必须失败,若是删除操做首先nulling,以及一个删除操做必须返回删除以前的值。(这个删除是能够和其余方法并发的,若是其余方法返回null说明不存在这个元素)
这里有一个当节点删除时的事件序列(b:前驱,n:当前节点,f:后继),初始化:
+------+ +------+ +------+
... | b |------>| n |----->| f | ...
+------+ +------+ +------+
1 首先经过CAS操做让n的value从non-null变为null。如今没有公共操做会认为这个映射(n)会存在了。固然,其余的不间断的插入或者删除操做仍是可能改变n的指向下一个的引用的。
2 CAS操做n的next引用指向一个新的标记节点。如今没有其余节点可以被附加到n的后面。从而可以在基于CAS的链表中避免删除错误。
+------+ +------+ +------+ +------+
... | b |------>| n |----->|marker|------>| f | ...
+------+ +------+ +------+ +------+
3 CAS操做b的next引用从而忽略了n和他的标记节点。如今,没有新的遍历会遭遇n,最后n和marker会被垃圾回收。
+------+ +------+
... | b |----------------------------------->| f | ...
+------+ +------+
第一步的失败会致使简单的重试(由于另外一个操做而竞争失败)。2、三两步失败是由于其余线程在遍历的过程当中注意到一个节点有null值,经过协做的方式帮忙marking或者unlinking了。这种协做的方式保证了没有线程会由于执行删除的线程尚未进展而卡住等待。这种标记节点的用法稍微复杂化了协做代码,由于遍历过程必须确保一直地读取四个节点(b,n,marker,f),不是仅仅(b,n,f),当一个节点的next域指向了一个marker,它就不会改变。
跳表的模型中增长了段,因此基础遍历开始于接近目的地--常常只须要遍历不多的节点。不须要改变算法除了只要确保遍历开始于前驱(here,b),没有被删除(结构上),不然在处理这个删除以后重试。
段层级以链表的形式经过volatile的next来使用CAS操做。在段上的竞争会好比新增/删除节点会致使连接失败。就算这个发生时,段链表依然保持有序,从而能够做为划分。这个会影响性能,可是跳表自己就是依赖几率的,结果就是"p"值可能会小于虚值。这个竞争窗口会保持得足够小,从而在实际上失败是很是少的,甚至在大量竞争的状况下。
由于使用了一些重试逻辑从而使得base和index链表的重试是比较廉价的。遍历会在尝试了大多数”协做“CAS操做以后执行。这个不是很是必要,可是隐含的价值是能够帮助减小其余下游的CAS失败操做,从而好于从新开始的开销。这样恶化了坏状况,可是改进了高度竞争的状况。
区别于其余的跳表实现,段插入和删除须要一个分开的遍历过程在基本层面的动做以后,增长或者删除段节点。这样增长了一个线程的消耗,可是提高了多个线程的竞争性能,经过缩小干扰窗口,删除使得全部index节点不可达在删除操做返回后,从而也避免了垃圾回收。这个在这里是很是重要的,由于咱们不可以直接把拥有key的节点直接去除,由于他们仍然可能被读取。
段使用了保持良好性能的稀疏策略:初始的k=1,p=0.5意味着四分之一的节点有段中的下标。。。。。。这个期待的总共的空间比咱们的java.util.TreeMap稍微少点。
改变段的层级(这个相似树结构的高度)使用CAS操做。初始化的高度为1.当建立一个比当前层级高的段时会在头上增长一个层级。为了保持良好的性能,删除方法中会使用启发式的方法去下降层级,若是最高的层级上是空的。这样可能出如今没有层级的段上遭遇竞争。这样不会形成多大伤害,实际上相对于没有限制的提高层级,这是个更好的选择。
实现这些的代码比你想象的更详细。大多数运算会涉及定位元素(或者插入元素的位置)。这些代码不能被很是好的分块,由于子运算须要立刻获得前面运算的结果,不这样作的话会增长GC的负担。(这是又一个我但愿JAVA提供宏的地方)findPredecessor()操做搜仅仅索段节点,返回最底层节点的前驱。findNode()操做完成最底层节点的搜索。这种方法一样出现了一点代码的复制。
为了在线程之间参数随机的值,咱们使用了JDK中的随机支持(经过"secondary seed")。
实现:
让咱们来看源代码,首先是put方法:
public V put(K key, V value) { if (value == null) throw new NullPointerException(); return doPut(key, value, false); }
private V doPut(K key, V value, boolean onlyIfAbsent) { Node<K,V> z; // added node if (key == null) throw new NullPointerException(); Comparator<? super K> cmp = comparator; outer: for (;;) { for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) { if (n != null) { Object v; int c; Node<K,V> f = n.next; if (n != b.next) // inconsistent read break; if ((v = n.value) == null) { // n is deleted n.helpDelete(b, f); break; } if (b.value == null || v == n) // b is deleted break; if ((c = cpr(cmp, key, n.key)) > 0) { b = n; n = f; continue; } if (c == 0) { if (onlyIfAbsent || n.casValue(v, value)) { @SuppressWarnings("unchecked") V vv = (V)v; return vv; } break; // restart if lost race to replace value } // else c < 0; fall through } z = new Node<K,V>(key, value, n); if (!b.casNext(n, z)) break; // restart if lost race to append to b break outer; } } int rnd = ThreadLocalRandom.nextSecondarySeed(); if ((rnd & 0x80000001) == 0) { // test highest and lowest bits int level = 1, max; while (((rnd >>>= 1) & 1) != 0) ++level; Index<K,V> idx = null; HeadIndex<K,V> h = head; if (level <= (max = h.level)) { for (int i = 1; i <= level; ++i) idx = new Index<K,V>(z, idx, null); } else { // try to grow by one level level = max + 1; // hold in array and later pick the one to use @SuppressWarnings("unchecked")Index<K,V>[] idxs = (Index<K,V>[])new Index<?,?>[level+1]; for (int i = 1; i <= level; ++i) idxs[i] = idx = new Index<K,V>(z, idx, null); for (;;) { h = head; int oldLevel = h.level; if (level <= oldLevel) // lost race to add level break; HeadIndex<K,V> newh = h; Node<K,V> oldbase = h.node; for (int j = oldLevel+1; j <= level; ++j) newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j); if (casHead(h, newh)) { h = newh; idx = idxs[level = oldLevel]; break; } } } // find insertion points and splice in splice: for (int insertionLevel = level;;) { int j = h.level; for (Index<K,V> q = h, r = q.right, t = idx;;) { if (q == null || t == null) break splice; if (r != null) { Node<K,V> n = r.node; // compare before deletion check avoids needing recheck int c = cpr(cmp, key, n.key); if (n.value == null) { if (!q.unlink(r)) break; r = q.right; continue; } if (c > 0) { q = r; r = r.right; continue; } } if (j == insertionLevel) { if (!q.link(r, t)) break; // restart if (t.node.value == null) { findNode(key); break splice; } if (--insertionLevel == 0) break splice; } if (--j >= insertionLevel && j < level) t = t.down; q = q.down; r = q.right; } } } return null; }
首先put方法直接调用doPut方法,这里的关键加入了参数onlyIfAbsent,用于指明是否只在缺乏的状况下添加。
因为doPut方法比较长(超过100行),咱们把它分为两部分来看(详情以下):
第一部分是outer嵌套的双层循环:
private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) { if (key == null) throw new NullPointerException(); // don't postpone errors for (;;) { for (Index<K,V> q = head, r = q.right, d;;) { if (r != null) { Node<K,V> n = r.node; K k = n.key; if (n.value == null) { if (!q.unlink(r)) break; // restart r = q.right; // reread r continue; } if (cpr(cmp, key, k) > 0) { q = r; r = r.right; continue; } } if ((d = q.down) == null) return q.node; q = d; r = d.right; } } }
private Node<K,V> findNode(Object key) { if (key == null) throw new NullPointerException(); // don't postpone errors Comparator<? super K> cmp = comparator; outer: for (;;) { for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) { Object v; int c; if (n == null) break outer; Node<K,V> f = n.next; if (n != b.next) // inconsistent read break; if ((v = n.value) == null) { // n is deleted n.helpDelete(b, f); break; } if (b.value == null || v == n) // b is deleted break; if ((c = cpr(cmp, key, n.key)) == 0) return n; if (c < 0) break outer; b = n; n = f; } } return null; }findNode的原理实际上也在doPut中出现过了,它会进行以下操做:
public V remove(Object key) { return doRemove(key, null); }
final V doRemove(Object key, Object value) { if (key == null) throw new NullPointerException(); Comparator<? super K> cmp = comparator; outer: for (;;) { for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) { Object v; int c; if (n == null) break outer; Node<K,V> f = n.next; if (n != b.next) // inconsistent read break; if ((v = n.value) == null) { // n is deleted n.helpDelete(b, f); break; } if (b.value == null || v == n) // b is deleted break; if ((c = cpr(cmp, key, n.key)) < 0) break outer; if (c > 0) { b = n; n = f; continue; } if (value != null && !value.equals(v)) break outer; if (!n.casValue(v, null)) break; if (!n.appendMarker(f) || !b.casNext(n, f)) findNode(key); // retry via findNode else { findPredecessor(key, cmp); // clean index if (head.right == null) tryReduceLevel(); } @SuppressWarnings("unchecked") V vv = (V)v; return vv; } } return null; }
背景(注释):
一个并发的相似ConcurrentNavigableMap的实现。
这个map经过实现Comparable或者提供一个Comparator来实现排列的,经过构造函数来提供。
这个实现是一个SkipLists的并发版本而且为containsKey/get/put/remove操做提供了log(n)的消耗。插入、删除、更新和读取能够在多个线程之间安全并发。
Iterators和spliterators是弱兼容的。从小到大的key排列中的视图比从大到小的要快。
全部从这些方法中返回的Map.Entry只是某时刻的一个快照。他们不提供setValue方法。(注意到你能够经过改变映射经过使用put、putIfAbsent、replace方法,依赖于你本身想要的效果)
注意不像其余大部分的集合那样,size方法不是一个常量时间运算。由于这个map的异步特性,决定了这个map的元素个数必须经过遍历获得,因此在遍历过程当中改变了map那么就会获得一个不精确的结果。而且,这些以大量数据位参数的方法像putAll、equals、toArray、containsValue以及clear不会保证以原子的方式执行。好比,一个遍历操做和putAll操做并发,那么只会看到部分添加的元素。
这个类和它的视图还有迭代器实现了全部的可选的Map和Iterator接口的方法。像其余的并发结合,这个类不支持把null做为key或者value,由于null做为返回值没法区分是否缺乏元素。
算法(注释):
这个类实现了一个相似于树的二维跳表,标识段经过连接包含不一样数据的基本节点来展现。有两个缘由说明为什么使用这种方法代替类数组的结构:
为了使用删除标记,这个链表使用null的方法来指示删除,一种相似于延迟删除的模型。若是一个节点的value为null,那么它被认为是局部删除,就算它仍是可达的。这样须要组织合适的并发控制在代替vs删除操做--一个代替操做必须失败,若是删除操做首先nulling,以及一个删除操做必须返回删除以前的值。(这个删除是能够和其余方法并发的,若是其余方法返回null说明不存在这个元素)
这里有一个当节点删除时的事件序列(b:前驱,n:当前节点,f:后继),初始化:
+------+ +------+ +------+
... | b |------>| n |----->| f | ...
+------+ +------+ +------+
1 首先经过CAS操做让n的value从non-null变为null。如今没有公共操做会认为这个映射(n)会存在了。固然,其余的不间断的插入或者删除操做仍是可能改变n的指向下一个的引用的。
2 CAS操做n的next引用指向一个新的标记节点。如今没有其余节点可以被附加到n的后面。从而可以在基于CAS的链表中避免删除错误。
+------+ +------+ +------+ +------+
... | b |------>| n |----->|marker|------>| f | ...
+------+ +------+ +------+ +------+
3 CAS操做b的next引用从而忽略了n和他的标记节点。如今,没有新的遍历会遭遇n,最后n和marker会被垃圾回收。
+------+ +------+
... | b |----------------------------------->| f | ...
+------+ +------+
第一步的失败会致使简单的重试(由于另外一个操做而竞争失败)。2、三两步失败是由于其余线程在遍历的过程当中注意到一个节点有null值,经过协做的方式帮忙marking或者unlinking了。这种协做的方式保证了没有线程会由于执行删除的线程尚未进展而卡住等待。这种标记节点的用法稍微复杂化了协做代码,由于遍历过程必须确保一直地读取四个节点(b,n,marker,f),不是仅仅(b,n,f),当一个节点的next域指向了一个marker,它就不会改变。
跳表的模型中增长了段,因此基础遍历开始于接近目的地--常常只须要遍历不多的节点。不须要改变算法除了只要确保遍历开始于前驱(here,b),没有被删除(结构上),不然在处理这个删除以后重试。
段层级以链表的形式经过volatile的next来使用CAS操做。在段上的竞争会好比新增/删除节点会致使连接失败。就算这个发生时,段链表依然保持有序,从而能够做为划分。这个会影响性能,可是跳表自己就是依赖几率的,结果就是"p"值可能会小于虚值。这个竞争窗口会保持得足够小,从而在实际上失败是很是少的,甚至在大量竞争的状况下。
由于使用了一些重试逻辑从而使得base和index链表的重试是比较廉价的。遍历会在尝试了大多数”协做“CAS操做以后执行。这个不是很是必要,可是隐含的价值是能够帮助减小其余下游的CAS失败操做,从而好于从新开始的开销。这样恶化了坏状况,可是改进了高度竞争的状况。
区别于其余的跳表实现,段插入和删除须要一个分开的遍历过程在基本层面的动做以后,增长或者删除段节点。这样增长了一个线程的消耗,可是提高了多个线程的竞争性能,经过缩小干扰窗口,删除使得全部index节点不可达在删除操做返回后,从而也避免了垃圾回收。这个在这里是很是重要的,由于咱们不可以直接把拥有key的节点直接去除,由于他们仍然可能被读取。
段使用了保持良好性能的稀疏策略:初始的k=1,p=0.5意味着四分之一的节点有段中的下标。。。。。。这个期待的总共的空间比咱们的java.util.TreeMap稍微少点。
改变段的层级(这个相似树结构的高度)使用CAS操做。初始化的高度为1.当建立一个比当前层级高的段时会在头上增长一个层级。为了保持良好的性能,删除方法中会使用启发式的方法去下降层级,若是最高的层级上是空的。这样可能出如今没有层级的段上遭遇竞争。这样不会形成多大伤害,实际上相对于没有限制的提高层级,这是个更好的选择。
实现这些的代码比你想象的更详细。大多数运算会涉及定位元素(或者插入元素的位置)。这些代码不能被很是好的分块,由于子运算须要立刻获得前面运算的结果,不这样作的话会增长GC的负担。(这是又一个我但愿JAVA提供宏的地方)findPredecessor()操做搜仅仅索段节点,返回最底层节点的前驱。findNode()操做完成最底层节点的搜索。这种方法一样出现了一点代码的复制。
为了在线程之间参数随机的值,咱们使用了JDK中的随机支持(经过"secondary seed")。
实现:
让咱们来看源代码,首先是put方法:
public V put(K key, V value) { if (value == null) throw new NullPointerException(); return doPut(key, value, false); }
private V doPut(K key, V value, boolean onlyIfAbsent) { Node<K,V> z; // added node if (key == null) throw new NullPointerException(); Comparator<? super K> cmp = comparator; outer: for (;;) { for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) { if (n != null) { Object v; int c; Node<K,V> f = n.next; if (n != b.next) // inconsistent read break; if ((v = n.value) == null) { // n is deleted n.helpDelete(b, f); break; } if (b.value == null || v == n) // b is deleted break; if ((c = cpr(cmp, key, n.key)) > 0) { b = n; n = f; continue; } if (c == 0) { if (onlyIfAbsent || n.casValue(v, value)) { @SuppressWarnings("unchecked") V vv = (V)v; return vv; } break; // restart if lost race to replace value } // else c < 0; fall through } z = new Node<K,V>(key, value, n); if (!b.casNext(n, z)) break; // restart if lost race to append to b break outer; } } int rnd = ThreadLocalRandom.nextSecondarySeed(); if ((rnd & 0x80000001) == 0) { // test highest and lowest bits int level = 1, max; while (((rnd >>>= 1) & 1) != 0) ++level; Index<K,V> idx = null; HeadIndex<K,V> h = head; if (level <= (max = h.level)) { for (int i = 1; i <= level; ++i) idx = new Index<K,V>(z, idx, null); } else { // try to grow by one level level = max + 1; // hold in array and later pick the one to use @SuppressWarnings("unchecked")Index<K,V>[] idxs = (Index<K,V>[])new Index<?,?>[level+1]; for (int i = 1; i <= level; ++i) idxs[i] = idx = new Index<K,V>(z, idx, null); for (;;) { h = head; int oldLevel = h.level; if (level <= oldLevel) // lost race to add level break; HeadIndex<K,V> newh = h; Node<K,V> oldbase = h.node; for (int j = oldLevel+1; j <= level; ++j) newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j); if (casHead(h, newh)) { h = newh; idx = idxs[level = oldLevel]; break; } } } // find insertion points and splice in splice: for (int insertionLevel = level;;) { int j = h.level; for (Index<K,V> q = h, r = q.right, t = idx;;) { if (q == null || t == null) break splice; if (r != null) { Node<K,V> n = r.node; // compare before deletion check avoids needing recheck int c = cpr(cmp, key, n.key); if (n.value == null) { if (!q.unlink(r)) break; r = q.right; continue; } if (c > 0) { q = r; r = r.right; continue; } } if (j == insertionLevel) { if (!q.link(r, t)) break; // restart if (t.node.value == null) { findNode(key); break splice; } if (--insertionLevel == 0) break splice; } if (--j >= insertionLevel && j < level) t = t.down; q = q.down; r = q.right; } } } return null; }
首先put方法直接调用doPut方法,这里的关键加入了参数onlyIfAbsent,用于指明是否只在缺乏的状况下添加。
因为doPut方法比较长(超过100行),咱们把它分为两部分来看(详情以下):
第一部分是outer嵌套的双层循环:
private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) { if (key == null) throw new NullPointerException(); // don't postpone errors for (;;) { for (Index<K,V> q = head, r = q.right, d;;) { if (r != null) { Node<K,V> n = r.node; K k = n.key; if (n.value == null) { if (!q.unlink(r)) break; // restart r = q.right; // reread r continue; } if (cpr(cmp, key, k) > 0) { q = r; r = r.right; continue; } } if ((d = q.down) == null) return q.node; q = d; r = d.right; } } }
private Node<K,V> findNode(Object key) { if (key == null) throw new NullPointerException(); // don't postpone errors Comparator<? super K> cmp = comparator; outer: for (;;) { for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) { Object v; int c; if (n == null) break outer; Node<K,V> f = n.next; if (n != b.next) // inconsistent read break; if ((v = n.value) == null) { // n is deleted n.helpDelete(b, f); break; } if (b.value == null || v == n) // b is deleted break; if ((c = cpr(cmp, key, n.key)) == 0) return n; if (c < 0) break outer; b = n; n = f; } } return null; }findNode的原理实际上也在doPut中出现过了,它会进行以下操做:
public V remove(Object key) { return doRemove(key, null); }
final V doRemove(Object key, Object value) { if (key == null) throw new NullPointerException(); Comparator<? super K> cmp = comparator; outer: for (;;) { for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) { Object v; int c; if (n == null) break outer; Node<K,V> f = n.next; if (n != b.next) // inconsistent read break; if ((v = n.value) == null) { // n is deleted n.helpDelete(b, f); break; } if (b.value == null || v == n) // b is deleted break; if ((c = cpr(cmp, key, n.key)) < 0) break outer; if (c > 0) { b = n; n = f; continue; } if (value != null && !value.equals(v)) break outer; if (!n.casValue(v, null)) break; if (!n.appendMarker(f) || !b.casNext(n, f)) findNode(key); // retry via findNode else { findPredecessor(key, cmp); // clean index if (head.right == null) tryReduceLevel(); } @SuppressWarnings("unchecked") V vv = (V)v; return vv; } } return null; }