接着上集继续,SynchronousQueue是一个不存储元素的阻塞队列。每个put操做必须等待一个take操做,不然不能继续添加元素,因此其peek()方法始终返回null,没有数据缓存空间。SynchronousQueue支持公平与非公平访问,默认采用非公平性策略访问队列。java
public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue() : new TransferStack();
}
复制代码
相对于ArrayBlockingQueue利用ReentrantLock实现公平与非公平,而SynchronousQueue利用TransferQueue、TransferStack实现公平与非公平,从命名上来看前者队列,后者栈,SynchronousQueue的入队、出队操做都是基于transfer来实现,ctrl+alt+h查看方法调用node
TransferQueue内部定义以下编程
// 头节点
transient volatile QNode head;
// 尾节点
transient volatile QNode tail;
// 指向一个可能还未出队被取消的节点,由于它在被取消时是最后一个插入节点
transient volatile QNode cleanMe;
// 默认构造函数,建立一个假节点
TransferQueue() {
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}
static final class QNode {
// 后继节点
volatile QNode next;
// item数据
volatile Object item;
// 用来控制阻塞或唤醒
volatile Thread waiter; // to control park/unpark
// 是不是生产者
final boolean isData;
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
...
}
...
复制代码
E transfer(E e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
// 判断是不是生产者,true为生产者,false为消费者
boolean isData = (e != null);
// 死循环
for (;;) {
// 获取尾节点
QNode t = tail;
// 获取头节点
QNode h = head;
// 若尾节点或尾节点为空则跳出本次循序
if (t == null || h == null) // saw uninitialized value
continue; // spin
// 若TransferQueue为空或当前节点与尾节点模式同样
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next;
// 若t不是尾节点代表已有其余线程操做过,跳出本次循环从新来
if (t != tail) // inconsistent read
continue;
// 若以前获取的尾节点后继不为空代表已有其余线程添加过节点
if (tn != null) { // lagging tail
// CAS将tn置为尾节点
advanceTail(t, tn);
continue;
}
// 若采用了时限模式且超时,直接返回null
if (timed && nanos <= 0) // can't wait
return null;
// 若s为null,构建一个新节点
if (s == null)
s = new QNode(e, isData);
// CAS将新节点加入队列中,若失败从新来
if (!t.casNext(null, s)) // failed to link in
continue;
// CAS将新节点s置为尾节点
advanceTail(t, s); // swing tail and wait
// 自旋获取匹配item
Object x = awaitFulfill(s, e, timed, nanos);
// 若x==s代表线程获取匹配项时,超时或者被中断,清除节点s
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
// 判断节点s是否已经出队
if (!s.isOffList()) { // not already unlinked
// CAS将节点s置为head,移出队列
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
}
// else分支下述
}
}
复制代码
咱们假定有线程A、B在put操做,线程C在take操做,当前TransferQueue初始化以下:数组
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
// 获取最后期限
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 获取当前线程
Thread w = Thread.currentThread();
// 获取自旋次数,若新节点s为头节点后继节点才能自旋
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
// 判断当前线程是否被中断
if (w.isInterrupted())
// 取消当前节点,cas将item置为this
s.tryCancel(e);
// 获取节点s的item
Object x = s.item;
// 若线程中断,节点s的item与x会不相等,直接返回x
if (x != e)
return x;
// 若采用了时限模式
if (timed) {
// 计算剩余时间
nanos = deadline - System.nanoTime();
// 若超时,取消节点
if (nanos <= 0L) {
s.tryCancel(e);
continue;
}
}
// 若还有自旋次数,自旋-1
if (spins > 0)
--spins;
// 若等待线程为null,将节点s的等待线程置为当前线程
else if (s.waiter == null)
s.waiter = w;
// 若没有采用时限模式则调用LockSupport.park()直接阻塞线程
else if (!timed)
LockSupport.park(this);
// 若剩余时间超过自旋时间阈值则指定时间阻塞
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
void tryCancel(Object cmp) {
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}
复制代码
从源码中能够看到只有头节点后继才能自旋,线程A自旋一段时间匹配节点,若自旋次数用光会一直阻塞,因此每个线程只有匹配到节点后或者因超时、中断被取消才能继续添加元素
缓存
线程A自旋,线程B接着put安全
那么何时才匹配到呢?在开头咱们提到每个put操做必须等待一个take操做,这时其余线程take(),e为null,isData为false,与尾节点的isData属性不一样,走进else分支,先获取头节点的后继节点数据,若没有其余线程抢先操做,且put操做未被取消,m.casItem(x, e)数据替换,将节点m的item属性置为null,若CAS替换成功代表匹配成功,在put自旋时会用item与e比较,take()将item置为null,不相等返回null
并发
else { // complementary-mode
// 获取头节点后继
QNode m = h.next; // node to fulfill
// 若t不是尾节点或者m为null或者h不是头节点,即已有其余线程抢先操做过
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
if (isData == (x != null) || // 节点已被操做过
x == m || // 节点被取消
!m.casItem(x, e)) { // lost CAS
// CAS将m置为头节点,重来
advanceHead(h, m); // dequeue and retry
continue;
}
// 若走这里,代表匹配成功
advanceHead(h, m); // successfully fulfilled
// 唤醒m的等待线程
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
复制代码
TransferStack内部定义以下app
// 未执行的消费者
static final int REQUEST = 0;
// 未执行的生产者
static final int DATA = 1;
// 线程正在匹配节点
static final int FULFILLING = 2;
volatile SNode head;
static final class SNode {
volatile SNode next; // next node in stack
volatile SNode match; // the node matched to this
volatile Thread waiter; // to control park/unpark
Object item; // data; or null for REQUESTs
int mode;
...
}
...
复制代码
TransferStack相对于TransferQueue中的节点,其数据项item与模式mode不须要用volatile修饰,由于它们老是写在前读在后。less
E transfer(E e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed
// REQUEST:消费者;DATA:生产者
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
// 若栈为空或者新增元素模式与首元素模式相同
if (h == null || h.mode == mode) { // empty or same-mode
// 超时
if (timed && nanos <= 0) { // can't wait
// 若节点被取消,将取消节点出队,从新来
if (h != null && h.isCancelled())
casHead(h, h.next); // pop cancelled node
else
return null;
//若不采用限时或者未超时,建立节点CAS将其置为头节点,s→h
} else if (casHead(h, s = snode(s, e, h, mode))) {
// 自旋匹配
SNode m = awaitFulfill(s, timed, nanos);
// 若m==s代表节点被取消
if (m == s) { // wait was cancelled
clean(s);
return null;
}
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
return (E) ((mode == REQUEST) ? m.item : s.item);
}
// 其他分支下述
}
}
复制代码
依然模拟场景,假定如今线程A、B在put,线程C、D在take。
线程A进行put新增元素A,CAS头插元素A,调用awaitFulfill()自旋匹配,注意只有头节点、空栈或者协助节点才能自旋,每次自旋都会进行条件判断,为了
dom
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
// 最后期限
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
// 自旋次数
// 若栈为空、节点为首结点或者该节点模式为FULFILLING才能自旋
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
// 若线程中断,取消该节点
if (w.isInterrupted())
s.tryCancel();
// 匹配节点
SNode m = s.match;
if (m != null)
return m;
if (timed) {
nanos = deadline - System.nanoTime();
// 超时,取消节点
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
// 每次自旋需先判断是否知足自旋条件,知足次数-1
if (spins > 0)
spins = shouldSpin(s) ? (spins-1) : 0;
else if (s.waiter == null)
s.waiter = w; // establish waiter so can park next iter
// 若没有采用时限模式则调用LockSupport.park()直接阻塞线程
else if (!timed)
LockSupport.park(this);
// 若剩余时间超过自旋时间阈值则指定时间阻塞
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
复制代码
线程B接着put元素B,头节点A的模式与put操做的模式一致,CAS头插成功后,也调用awaitFulfill()自旋,因为头节点变为线程B因此只有线程B才能自旋匹配,这也是不公平的体现
void tryCancel() {
UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}
复制代码
这时线程C进行take操做,take的模式(REQUEST)明显与当前头节点B(DATA)不一致且头节点模式也不为FULFILLING,因此transfer走入else if分支。
// 若头节点的模式不为 FULFILLING
} else if (!isFulfilling(h.mode)) { // try to fulfill
// 若头节点被取消,将头节点出队从新来
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) { // loop until matched or waiters disappear
SNode m = s.next; // m is s's match
if (m == null) { // all waiters are gone
// 将节点s出队
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
// 获取节点m的后继节点
SNode mn = m.next;
// 尝试匹配
if (m.tryMatch(s)) {
// 匹配成功,将节点s、m出队
casHead(s, mn); // pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
// 若匹配失败,将m出队
s.casNext(m, mn); // help unlink
}
}
复制代码
建立一个FULFILLING模式的节点并CAS将其置为头节点,与其后继匹配,匹配方法以下
boolean tryMatch(SNode s) {
if (match == null &&
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
if (w != null) { // waiters need at most one unpark
waiter = null;
LockSupport.unpark(w);
}
return true;
}
return match == s;
}
复制代码
若节点没有被取消,其match为null,被取消则为其自身。成功匹配后将一对put、take操做的节点出队。咱们假定另外一种场景,若线程C的take节点入队后,未进行匹配线程D中途take
// 头节点模式为 FULFILLING
} else { // help a fulfiller
SNode m = h.next; // m is h's match
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
复制代码
LinkedTransferQueue是由链表结构组成的无界阻塞FIFO队列
// 判断是否多核处理器
private static final boolean MP =
Runtime.getRuntime().availableProcessors() > 1;
// 自旋次数
private static final int FRONT_SPINS = 1 << 7;
// 前驱节点正在操做,当前节点自旋的次数
private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
static final int SWEEP_THRESHOLD = 32;
// 头节点
transient volatile Node head;
// 尾节点
private transient volatile Node tail;
// 删除节点失败的次数
private transient volatile int sweepVotes;
/**
* xfer()方法中使用
*/
private static final int NOW = 0; // for untimed poll, tryTransfer
private static final int ASYNC = 1; // for offer, put, add
private static final int SYNC = 2; // for transfer, take
private static final int TIMED = 3; // for timed poll, tryTransfer
复制代码
static final class Node {
final boolean isData; // false if this is a request node
volatile Object item; // initially non-null if isData; CASed to match
volatile Node next;
volatile Thread waiter;
Node(Object item, boolean isData) {
UNSAFE.putObject(this, itemOffset, item); // relaxed write
this.isData = isData;
}
...
}
复制代码
是否是感受与SynchronousQueue中TransferQueue的QNode节点类定义很相似
LinkedTransferQueue的大多方法都是基于xfer()方法
/**
* @param e 入队数据
* @param haveData true:入队;flase:出队
* @param how NOW, ASYNC, SYNC, or TIMED
* @param nanos 期限仅TIMED限时模式使用
*/
private E xfer(E e, boolean haveData, int how, long nanos) {
// 如果入队操做,但无数据抛异常
if (haveData && (e == null))
throw new NullPointerException();
Node s = null; // the node to append, if needed
retry:
for (;;) { // restart on append race
// 从头节点遍历
for (Node h = head, p = h; p != null;) { // find & match first node
// 获取模式isData
boolean isData = p.isData;
// 获取数据项
Object item = p.item;
// 找到未匹配的节点
if (item != p && (item != null) == isData) { // unmatched
// 若操做模式同样,不匹配
if (isData == haveData) // can't match
break;
// 若匹配,CAS将替换item
if (p.casItem(item, e)) { // match
for (Node q = p; q != h;) {
Node n = q.next; // update by 2 unless singleton
// 更新 head
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
} // advance and retry
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break; // unless slack < 2
}
// 唤醒线程
LockSupport.unpark(p.waiter);
return LinkedTransferQueue.cast(item);
}
}
// 后继
Node n = p.next;
// 若p的后继是其自身,代表p已经有其余线程操做过,从头节点重写开始
p = (p != n) ? n : (h = head); // Use head if p offlist
}
// 若没有找到匹配节点,
// NOW为untimed poll, tryTransfer,不会入队
if (how != NOW) { // No matches available
if (s == null)
// 建立节点
s = new Node(e, haveData);
// 尾插入队
Node pred = tryAppend(s, haveData);
if (pred == null)
continue retry; // lost race vs opposite mode
// 若不是异步操做
if (how != ASYNC)
// 阻塞等待匹配值
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e; // not waiting
}
}
复制代码
以put()方法为例,假定队列为空此时有线程put(其内部xfer(e, true, ASYNC, 0)),由于不等于now,调用tryAppend()方法尾插入队
private Node tryAppend(Node s, boolean haveData) {
// 从尾节点开始
for (Node t = tail, p = t;;) { // move p to last node and append
Node n, u; // temps for reads of next & tail
// 若队列为空CAS将S置为头节点
if (p == null && (p = head) == null) {
if (casHead(null, s))
return s; // initialize
}
else if (p.cannotPrecede(haveData))
return null; // lost race vs opposite mode
// 若不是最后节点
else if ((n = p.next) != null) // not last; keep traversing
p = p != t && t != (u = tail) ? (t = u) : // stale tail
(p != n) ? n : null; // restart if off list
// CAS设置将s置为p的后继
else if (!p.casNext(null, s))
// 若设置失败从新来
p = p.next; // re-read on CAS failure
else {
if (p != t) { // update if slack now >= 2
while ((tail != t || !casTail(t, s)) &&
(t = tail) != null &&
(s = t.next) != null && // advance and retry
(s = s.next) != null && s != t);
}
return p;
}
}
}
复制代码
从源码中能够得知,当第一次tryAppend()队列为空时只设置了头节点,第二次tryAppend()才会设置尾结点,入队后,若不是ASYNC还会调用awaitMatch()方法阻塞匹配
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
// 若限时获取最后期限
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = -1; // initialized after first item and cancel checks
ThreadLocalRandom randomYields = null; // bound if needed
for (;;) {
Object item = s.item;
// 不相等代表已经匹配过,有其余线程已操做过
if (item != e) { // matched
// assert item != s;
// 取消节点
s.forgetContents(); // avoid garbage
return LinkedTransferQueue.cast(item);
}
// 若线程中断或超时则取消节点
if ((w.isInterrupted() || (timed && nanos <= 0)) &&
s.casItem(e, s)) { // cancel
unsplice(pred, s);
return e;
}
// 初始化自旋次数
if (spins < 0) { // establish spins at/near front
if ((spins = spinsFor(pred, s.isData)) > 0)
randomYields = ThreadLocalRandom.current();
}
// 自旋
else if (spins > 0) { // spin
--spins;
if (randomYields.nextInt(CHAINED_SPINS) == 0)
Thread.yield(); // occasionally yield
}
else if (s.waiter == null) {
s.waiter = w; // request unpark then recheck
}
// 若采用限时则限时阻塞
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
}
// 直接阻塞
else {
LockSupport.park(this);
}
}
}
复制代码
其整个队列只存在一个操做(入队或出队),若不一样操做会替换item唤醒相应另个线程,若相同操做则根据形参how判断判断
NOW:直接返回操做节点不入队
ASYNC:操做节点尾插入队,但不会阻塞等待直接返回,同一个线程随便可以接着操做
SYNC:操做节点尾插入队且会自旋匹配一段时间,自旋次数用完进入阻塞状态,像SynchronousQueue同样同一个线程操做完必须匹配到或被取消后才能继续操做
TIMED:限时模式,在指定时间内若没匹配到操做会被取消
相对于SynchronousQueue,LinkedTransferQueue能够存储元素且可支持不阻塞形式的操做,而相对于LinkedBlockingQueue维护了入队锁和出队锁,LinkedTransferQueue经过CAS保证线程安全更提升了效率
LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列,双向队列就意味着能够从对头、对尾两端插入和移除元素。LinkedBlockingDeque默认构造容量Integer.MAX_VALUE,也能够指定容量
// 头节点
transient Node first;
// 尾节点
transient Node last;
// 元素个数
private transient int count;
// 容量
private final int capacity;
final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
复制代码
static final class Node {
// 数据项
E item;
// 前驱节点
Node prev;
// 后继节点
Node next;
Node(E x) {
item = x;
}
}
复制代码
public void putFirst(E e) throws InterruptedException {
// 判空
if (e == null) throw new NullPointerException();
// 建立节点
Node node = new Node(e);
final ReentrantLock lock = this.lock;
// 获取锁
lock.lock();
try {
while (!linkFirst(node))
notFull.await();
} finally {
lock.unlock();
}
}
复制代码
判空处理而后获取锁,调用linkFirst()入队
private boolean linkFirst(Node node) {
// assert lock.isHeldByCurrentThread();
// 若当前元素个数超过指定容量,返回false
if (count >= capacity)
return false;
// 获取首节点
Node f = first;
// 新节点后继指向首节点
node.next = f;
// 新节点置为首节点
first = node;
// 若队列为空则新节点置为尾节点
if (last == null)
last = node;
// 若不为空,新节点置为首节点的前驱节点
else
f.prev = node;
// 元素个数+1
++count;
// 唤醒出队(消费者)等待队列中线程
notEmpty.signal();
return true;
}
复制代码
public void putLast(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node node = new Node(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkLast(node))
notFull.await();
} finally {
lock.unlock();
}
}
复制代码
判空处理而后获取锁,调用linkLast()入队
private boolean linkLast(Node node) {
// assert lock.isHeldByCurrentThread();
// 若当前元素个数超过指定容量,返回false
if (count >= capacity)
return false;
// 获取尾节点
Node l = last;
// 将新节点的前驱节点置为原尾节点
node.prev = l;
// 新节点置为尾节点
last = node;
// 若队列为空,首结点置为头节点
if (first == null)
first = node;
// 不然将新节点置为原未节点的后继节点
else
l.next = node;
// 元素个数+1
++count;
// 唤醒出队(消费者)等待队列中线程
notEmpty.signal();
return true;
}
复制代码
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
复制代码
unlinkFirst()方法
private E unlinkFirst() {
// assert lock.isHeldByCurrentThread();
// 获取头节点
Node f = first;
// 若first为null即队列为空,返回null
if (f == null)
return null;
// 获取头节点的后继节点
Node n = f.next;
E item = f.item;
// 删除头节点
f.item = null;
f.next = f; // help GC
// 将原头节点的后继节点置为头节点
first = n;
// 若原队列仅一个节点,则尾节点置空
if (n == null)
last = null;
// 不然原头节点的后继节点的前驱置为null
else
n.prev = null;
// 元素个数-1
--count;
// 唤醒入队(生产者)等待队列中线程
notFull.signal();
return item;
}
复制代码
public E takeLast() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkLast()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
复制代码
unlinkLast
private E unlinkLast() {
// assert lock.isHeldByCurrentThread();
// 获取尾节点
Node l = last;
// 尾节点为null即队列为空,返回null
if (l == null)
return null;
// 获取原尾节点的前驱节点
Node p = l.prev;
E item = l.item;
// 删除尾节点
l.item = null;
l.prev = l; // help GC
// 将原尾节点的前驱节点置为尾节点
last = p;
// 若原队列仅一个节点,则头节点置空
if (p == null)
first = null;
// 不然原尾节点的前驱节点的后继置为null
else
p.next = null;
// 元素个数-1
--count;
notFull.signal();
return item;
}
复制代码
逻辑就很少说了,看过LinkedList源码的应该不会陌生,除了多了唤醒阻塞获取锁操做,基本逻辑相似
《java并发编程的艺术》