图片来自pexelsjava
最近在阅读《多处理器编程艺术》一书,学习了不少Java多线程的底层知识,如今就作一下书中链表-锁的做用一章的总结。 node
粗粒度同步
所谓粗粒度同步其实很简单,就是在List的 add
, remove
, contains
函数的开始就直接使用Lock加锁,而后在函数结尾释放。 git
add
函数的代码以下所示,函数的主体就是链表的遍历添加逻辑,只不过在开始和结束进行了锁的获取和释放。程序员
private Node head;private Lock lock = new ReentrantLock();public boolean add(T item) { Node pred, curr; int key = item.hashCode(); lock.lock(); try { pred = head; curr = pred.next; while(curr.key < key) { pred = curr; curr = pred.next; } if (key == curr.key) { return false; } else { Node node = new Node(item); node.next = curr; pred.next = node; return true; } } finally { lock.unlock(); }}
github
算法
编程
微信
多线程
并发
你们看到这里就会想到,这不就是相似于 Hashtable
的实现方式吗?把可能出现多线程问题的函数都用一个重入锁锁住。
可是这个方法的缺点很明显,若是竞争激烈的话,对链表的操做效率会很低,由于 add
, remove
, contains
三个函数都须要获取锁,也都须要等待锁的释放。至于如何优化,咱们能够一步一步往下看
细粒度同步
咱们能够经过锁定单个节点而不是整个链表来提升并发。给每一个节点增长一个Lock变量以及相关的lock()和unlock()函数,当线程遍历链表的时候,若它是第一个访问节点的线程,则锁住被访问的节点,在随后的某个时刻释放锁。这种细粒度的锁机制容许并发线程以流水线的方式遍历链表。
使用这种方式来遍历链表,必须同时获取两个相邻节点的锁,经过“交叉手”的方式来获取锁:除了初始的head哨兵节点外,只有在已经获取pred的锁时,才能获取curr的锁。
//每一个Node对象中都有一个Lock对象,能够进行lock()和unlock()操做public boolean add(T item) { int key = item.hashCode(); head.lock(); Node pred = head; try { Node curr = pred.next; curr.lock(); try { while (curr.key < key) { // 释放前一个节点的锁 pred.unlock(); pred = curr; curr = pred.next; // 获取当前节点的锁 curr.lock(); } if (curr.key == key) { return false; } Node newNode = new Node(item); newNode.next = curr; pred.next = newNode; return true; } finally { curr.unlock(); } } finally { pred.unlock(); }}
乐观同步
虽然细粒度锁是对单一粒度锁的一种改进,但它仍然出现很长的获取锁和释放锁的序列。并且,访问链表中不一样部分的线程仍然可能相互阻塞。例如,一个正在删除链表中第二个元素的线程将会阻塞全部试图查找后继节点的线程。
减小同步代价的一种方式就是乐观:不须要获取锁就能够查找,对找到的节点进行加锁,而后确认锁住的节点是正确的;若是一个同步冲突致使节点被错误的锁定,则释放这些锁从新开始。
public boolean add(T item) { int key = item.hashCode(); while (true) { //若是不成功,就进行重试 Node pred = head; Node curr = pred.next; while (curr.key < key) { pred = curr; curr = pred.next; } //找到目标相关的pred和curr以后再将两者锁住 pred.lock(); curr.lock(); try { //锁住两者以后再进行判断,是否存在并发冲突 if (validate(pred, curr)) { //若是不存在,那么就直接进行正常操做 if (curr.key == key) { return false; } else { Node node = new Node(item); node.next = curr; pred.next = node; } } } finally { pred.unlock(); curr.unlock(); } }}public boolean validate(Node pred, Node curr) { //从队列头开始查找pred和curr,判断是否存在并发冲突 Node node = head; while (node.key <= pred.key) { if (node == pred) { return pred.next == curr; } node = node.next; } return false;}
因为再也不使用能保护并发修改的锁,因此每一个方法调用均可能遍历那些已经被删除的节点,因此在进行添加,删除获取判断是否存在的以前必须再次进行验证。
惰性同步
当不用锁遍历两次链表的代价比使用锁遍历一次链表的代价小不少时,乐观同步的实现效果很是好。可是这种算法的缺点之一就是contains()方法在遍历时须要锁,这一点并不使人满意,其缘由在于对contains()的调用要比其余方法的调用频繁得多。
使用惰性同步的方法,使得contains()调用是无等待的,同时add()和remove()方法即便在被阻塞的状况下也只须要遍历一次链表。
对每一个节点增长一个布尔类型的marked域,用于说明该节点是否在节点集合中。如今,遍历再也不须要锁定目标结点,也没有必须经过从新遍历整个链表来验证结点是否可达。全部未被标记的节点必然是可达的。
//add方法和乐观同步的方法一致,只有检验方法作了修改。//只须要检测节点的marked变量就能够,而且查看pred的next是否仍是指向curr,须要注意的是marked变量必定是voliate的。private boolean validate(Node pred, Node curr) { return !pred.marked && !curr.marked && pred.next == curr;}
惰性同步的优势之一就是可以将相似于设置一个flag这样的逻辑操做与相似于删除结点的连接这种对结构的物理改变分开。
一般状况下,延迟操做能够是批量处理方式进行,且在某个方便的时候再懒惰地进行处理,从而下降了对结构进行物理修改的总体破裂性。惰性同步的主要缺点是add()和remove()调用是阻塞的,若是一个线程延迟,那么其余线程也将延迟。
非阻塞同步
使用惰性同步的思惟是很是有益处的。咱们能够进一步将add(),remove()和contains()这三个方法都变成非阻塞的。
前两个方法是无锁的,最后一个方法是无等待的。咱们没法直接使用compareAndSet方法来改变next域来实现,由于这样会出现问题。可是咱们能够将结点的next域和marked域看做是单个的原子单位,当marked域为true时,对next域的任何修改都将失败。
咱们可使用AtomicMarkableReference
public Window find(Node head, int key) { Node pred = null, curr = null, succ = null; boolean[] marked = {false}; boolean snip; retry: while(true) { pred = head; curr = curr.next.get(marked); while(true) { succ = curr.next.get(marked); //获取succ,而且查看是被被标记 while (marked[0]) {//若是被标记了,说明curr被逻辑删除了,须要继续物理删除 snip = pred.next.compareAndSet(curr, succ, false, false);// if (!snip) continue retry; curr = succ; succ = curr.next.get(marked); } //当不须要删除后,才继续遍历 if (curr.key >= key) { return new Window(pred, curr); } pred = curr; curr = succ; } }}public boolean add(T item) { int key = item.hashCode(); while(true) { Window window = find(head, key); Node pred = window.pred, curr = window.curr; if (curr.key == key) { return false; } else { Node node = new Node(item); node.next = new AtomicMarkableReference<>(curr, false); if (pred.next.compareAndSet(curr, node, false, false)) { return true; } } }}public boolean remove(T item) { int key = item.hashCode(); boolean sinp; while(true) { Window window = find(head, key); Node pred = window.pred, curr = window.curr; if (curr.key != key) { return false; } else { Node succ = curr.next.getReference(); //要进行删除了,那么就直接将curr.next设置为false,而后在进行真正的物理删除。 sinp = curr.next.compareAndSet(curr, succ, false, true); if (!sinp) { continue; } pred.next.compareAndSet(curr, succ, false, false); return true; } }}class Node { AtomicMarkableReference<Node> next;}
后记
文中的代码在个人github的这个repo中均可以找到。
本文分享自微信公众号 - 程序员历小冰(gh_a1d0b50d8f0a)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。