要实现对队列的安全访问,有两种方式:阻塞算法和非阻塞算法。阻塞算法的实现是使用一把锁(出队和入队同一把锁ArrayBlockingQueue)和两把锁(出队和入队各一把锁LinkedBlockingQueue)来实现;非阻塞算法使用自旋+CAS实现。html
今天来探究下使用非阻塞算法来实现的线程安全队列ConcurrentLinkedQueue,它是一个基于连接节点的无界线程安全队列,采用先进先出的规则对节点进行排序,当咱们添加一个元素的时候,它会添加到队列的尾部,当咱们获取一个元素时,它会返回队列头部的元素。它采用了“wait-free”算法(即CAS算法)来实现。java
ConcurrentLinkedQueue的类图结构:node
从类图中能够看到,ConcurrentLinkedQueue由head和tail节点组成,每一个节点Node由节点元素item和指向下一个节点的引用next组成,节点与节点之间经过next关联起来组成一张链表结构的队列。算法
private static class Node<E> { volatile E item;//元素 volatile Node<E> next;//下一节点 Node(E item) {//添加元素 UNSAFE.putObject(this, itemOffset, item); } boolean casItem(E cmp, E val) {//cas修改元素 return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } void lazySetNext(Node<E> val) {//添加节点 UNSAFE.putOrderedObject(this, nextOffset, val); } boolean casNext(Node<E> cmp, Node<E> val) {//cas修改节点 return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = Node.class; //得到元素的偏移位置 itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); //得到下一节点的偏移位置 nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } } //头节点 private transient volatile Node<E> head; //尾节点 private transient volatile Node<E> tail; public ConcurrentLinkedQueue() { //默认状况下head节点存储的元素为空,tail节点等于head节点。 head = tail = new Node<E>(null); } public ConcurrentLinkedQueue(Collection<? extends E> c) { Node<E> h = null, t = null; //遍历集合 for (E e : c) { checkNotNull(e);//检查是否为空,若是为空抛出空指针异常 //建立节点和将元素存储到节点中 Node<E> newNode = new Node<E>(e); if (h == null)//头节点为空 h = t = newNode;//头和尾节点是建立的节点 else { t.lazySetNext(newNode);//添加节点 t = newNode;//修改尾节点的标识 } } //若是集合没有元素,设置队列的头尾节点为空 if (h == null) h = t = new Node<E>(null); head = h;//更新队列的头节点标识 tail = t;//更新队列的尾节点标识 } private static void checkNotNull(Object v) { if (v == null) throw new NullPointerException(); }
入队操做主要作两件事情,第一是将入队节点设置成当前队列尾节点的下一个节点;第二是更新tail节点,若是tail节点的next节点不为空,则将入队节点设置成tail节点,若是tail节点的next节点为空,则将入队节点设置成tail的next节点,因此tail节点不老是尾节点;安全
上面的分析让咱们从单线程入队的角度来理解入队过程,可是多个线程同时进行入队状况就变得更加复杂,由于可能会出现其余线程插队的状况。若是有一个线程正在入队,那么它必须先获取尾节点,而后设置尾节点的下一个节点为入队节点,但这时可能有另一个线程插队了,那么队列的尾节点就会发生变化,这时当前线程要暂停入队操做,而后从新获取尾节点。多线程
public boolean add(E e) { return offer(e); } public boolean offer(E e) { checkNotNull(e);//检查是否为空 //建立入队节点,将元素添加到节点中 final Node<E> newNode = new Node<E>(e); //自旋队列CAS直到入队成功 // 一、根据tail节点定位出尾节点(last node);二、将新节点置为尾节点的下一个节点;三、casTail更新尾节点 for (Node<E> t = tail, p = t;;) { //p是尾节点,q获得尾节点的next Node<E> q = p.next; //若是q为空 if (q == null) { //p是last node,将尾节点的next修改成建立的节点 if (p.casNext(null, newNode)) { //p在遍历后会变化,所以须要判断,若是不相等即p != t = tail,表示t(= tail)不是尾节点,则将入队节点设置成tail节点,更新失败了也不要紧,由于失败了表示有其余线程成功更新了tail节点 if (p != t) casTail(t, newNode);//入队节点更新为尾节点,容许失败,所以t= tail并不老是尾节点 return true;//结束 } } //从新获取head节点:多线程操做时,轮询后p有可能等于q,此时,就须要对p从新赋值 //(多线程自引用的状况,只有offer()和poll()交替执行时会出现) else if (p == q) //由于并发下可能tail被改了,若是被改了,则使用新的t,不然跳转到head,从链表头从新轮询,由于从head开始全部的节点均可达 p = (t != (t = tail)) ? t : head;//运行到这里再继续自旋遍历 else /** * 寻找尾节点,一样,当t不等于p时,说明p在上面被从新赋值了,而且tail也被别的线程改了,则使用新的tail,不然循环检查p的下个节点 * (多offer()状况下会出现) * p=condition?result1:result2 * 知足result1的场景为 : * 获取尾节点tail的快照已通过时了(其余线程更新了新的尾节点tail),直接跳转到当前得到的最新尾节点的地方 * 知足result2的场景为: * 多线程同时操做offer(),执行p.casNext(null, newNode)CAS成功后,未更新尾节点(未执行casTail(t, newNode)方法:两种缘由 1是未知足前置条件if判断 2是CAS更新失败),直接找next节点 */ p = (p != t && t != (t = tail)) ? t : q;//运行到这里再继续自旋遍历 } }
public static void main(String[] args) throws IndexOutOfBoundsException { ConcurrentLinkedQueue c = new ConcurrentLinkedQueue(); new Thread(()->{ int i; for(i=0;i<10;){ c.offer(i++); Object poll = c.poll();//注释或取消进行测试 System.out.println(Thread.currentThread().getName()+":"+poll); } }).start(); new Thread(()->{ int i; for(i=200;i<210;){ c.offer(i++); Object poll = c.poll();//注释或取消进行测试 System.out.println(Thread.currentThread().getName()+":"+poll); } }).start(); }
public E poll() { restartFromHead: //自旋 for (;;) { //得到头节点 for (Node<E> h = head, p = h, q;;) { E item = p.item;//得到头节点元素 //若是头节点元素不为null而且cas删除头节点元素成功 if (item != null && p.casItem(item, null)) { //p被修改了 if (p != h) // hop two nodes at a time // 若是p 的next 属性不是null ,将 p 做为头节点,而 q 将会消失 updateHead(h, ((q = p.next) != null) ? q : p); return item; } //若是头节点的元素为空或头节点发生了变化,这说明头节点已经被另一个线程修改了。 // 那么获取p节点的下一个节点,若是p节点的下一节点为null,则代表队列已经空了 // 若是 p(head) 的 next 节点 q 也是null,则表示没有数据了,返回null,则将 head 设置为null // 注意:updateHead 方法最后还会将原有的 head 做为本身 next 节点,方便offer 链接。 else if ((q = p.next) == null) { updateHead(h, p); return null; } //若是 p == q,说明别的线程取出了 head,并将 head 更新了。就须要从新开始获取head节点 else if (p == q) continue restartFromHead; // 若是下一个元素不为空,则将头节点的下一个节点设置成头节点 else p = q; } } } final void updateHead(Node<E> h, Node<E> p) { if (h != p && casHead(h, p)) // 将旧的头结点h的next域指向为h h.lazySetNext(h); }
若是这时,再有一个线程来添加元素,经过tail获取的next节点则仍然是它自己,这就出现了p == q的状况,出现该种状况以后,则会触发执行head的更新,将p节点从新指向为head,全部“活着”的节点(指未删除节点),都能从head经过遍历可达,这样就能经过head成功获取到尾节点,而后添加元素了。并发
// 获取链表的首部元素(只读取而不移除) public E peek() { restartFromHead: //自旋 for (;;) { for (Node<E> h = head, p = h, q;;) { //得到头节点元素 E item = p.item; //头节点元素不为空或头节点下一节点为空(表示链表只有一个节点) if (item != null || (q = p.next) == null) { updateHead(h, p);//更新头节点标识 return item; } /若是 p == q,说明别的线程取出了 head,并将 head 更新了。就须要从新开始获取head节点 else if (p == q) continue restartFromHead; // 若是下一个元素不为空,则将头节点的下一个节点设置成头节点 else p = q; } } }
public boolean isEmpty() { return first() == null; } Node<E> first() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { //头节点是否有元素 boolean hasItem = (p.item != null); //头节点有元素或当前链表只有一个节点 if (hasItem || (q = p.next) == null) { updateHead(h, p); return hasItem ? p : null;//头节点有值返回节点,不然返回null } else if (p == q) continue restartFromHead; else p = q; } } }
public int size() { int count = 0; // first()获取第一个具备非空元素的节点,若不存在,返回null // succ(p)方法获取p的后继节点,若p == p的后继节点,则返回head for (Node<E> p = first(); p != null; p = succ(p)) //节点有元素数量+1 if (p.item != null) if (++count == Integer.MAX_VALUE) break; return count; } //取下一节点 final Node<E> succ(Node<E> p) { Node<E> next = p.next; //若p == p的后继节点(自引用状况下会出现),则返回head return (p == next) ? head : next; }
public boolean contains(Object o) { if (o == null) return false; for (Node<E> p = first(); p != null; p = succ(p)) { E item = p.item; // 若找到匹配节点,则返回true if (item != null && o.equals(item)) return true; } return false; }
public boolean remove(Object o) { //删除的元素不能为null, if (o != null) { Node<E> next, pred = null; //遍历,开始得到头节点, for (Node<E> p = first(); p != null; pred = p, p = next) { boolean removed = false;//删除的标识 E item = p.item;//节点元素 if (item != null) { //节点的元素不等于要删除的元素,获取下一节点进行遍历循环操做 if (!o.equals(item)) { next = succ(p);//将当前遍历的节点移到下一节点 continue; } //节点元素等于删除元素,CAS将节点元素置为null removed = p.casItem(item, null); } next = succ(p);//获取删除节点的下一节点, //有前节点和后置节点 if (pred != null && next != null) // unlink pred.casNext(p, next);//删除当前节点,即当前节点移除出队列 if (removed)//元素删除了返回true return true; } } return false; }