整理了阻塞队列LinkedBlockingQueue的学习笔记,但愿对你们有帮助。有哪里不正确,欢迎指出,感谢。html
咱们先来看看LinkedBlockingQueue的继承体系。使用IntelliJ IDEA查看类的继承关系图形 node
- 蓝色实线箭头是指类继承关系
- 绿色箭头实线箭头是指接口继承关系
- 绿色虚线箭头是指接口实现关系。
LinkedBlockingQueue实现了序列化接口 Serializable,所以它有序列化的特性。 LinkedBlockingQueue实现了BlockingQueue接口,BlockingQueue继承了Queue接口,所以它拥有了队列Queue相关方法的操做。编程
类图来自Java并发编程之美 安全
LinkedBlockingQueue主要特性:bash
//容量范围,默认值为 Integer.MAX_VALUE
private final int capacity;
//当前队列元素个数
private final AtomicInteger count = new AtomicInteger();
//头结点
transient Node<E> head;
//尾节点
private transient Node<E> last;
//take, poll等方法的可重入锁
private final ReentrantLock takeLock = new ReentrantLock();
//当队列为空时,执行出队操做(好比take )的线程会被放入这个条件队列进行等待
private final Condition notEmpty = takeLock.newCondition();
//put, offer等方法的可重入锁
private final ReentrantLock putLock = new ReentrantLock();
//当队列满时, 执行进队操做( 好比put)的线程会被放入这个条件队列进行等待
private final Condition notFull = putLock.newCondition();
复制代码
LinkedBlockingQueue有三个构造函数:数据结构
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
复制代码
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
//设置队列大小
this.capacity = capacity;
//new一个null节点,head、tail节点指向该节点
last = head = new Node<E>(null);
}
复制代码
public LinkedBlockingQueue(Collection<? extends E> c) {
//调用指定容量的构造器
this(Integer.MAX_VALUE);
//获取put, offer的可重入锁
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
int n = 0;
//循环向队列中添加集合中的元素
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
//将队列的last节点指向该节点
enqueue(new Node<E>(e));
++n;
}
//更新容量值
count.set(n);
} finally {
//释放锁
putLock.unlock();
}
}
复制代码
static class Node<E> {
// 当前节点的元素值
E item;
// 下一个节点的索引
Node<E> next;
//节点构造器
Node(E x) {
item = x;
}
}
复制代码
LinkedBlockingQueue的节点符合单向链表的数据结构要求:并发
item表示当前节点的元素值,next表示指向下一节点的指针函数
入队方法,其实就是向队列的尾部插入一个元素。若是元素为空,抛出空指针异常。若是队列已满,则丢弃当前元素,返回false,它是非阻塞的。若是队列空闲则插入成功返回true。学习
offer方法源码以下:ui
public boolean offer(E e) {
//为空直接抛空指针
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
//若是当前队列满了的话,直接返回false
if (count.get() == capacity)
return false;
int c = -1;
//构造新节点
Node<E> node = new Node<E>(e);
获取put独占锁
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
//判断队列是否已满
if (count.get() < capacity) {
//进队列
enqueue(node);
//递增元素计数
c = count.getAndIncrement();
//若是元素入队,还有空闲,则唤醒notFull条件队列里被阻塞的线程
if (c + 1 < capacity)
notFull.signal();
}
} finally {
//释放锁
putLock.unlock();
}
//若是容量为0,则
if (c == 0)
//激活 notEmpty 的条件队列,唤醒被阻塞的线程
signalNotEmpty();
return c >= 0;
}
复制代码
enqueue方法源码以下:
private void enqueue(Node<E> node) {
//从尾节点加进去
last = last.next = node;
}
复制代码
为了形象生动,咱们用一张图来看看往队列里依次放入元素A和元素B。图片参考来源【细谈Java并发】谈谈LinkedBlockingQueue
signalNotEmpty方法源码以下
private void signalNotEmpty() {
//获取take独占锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//唤醒notEmpty条件队列里被阻塞的线程
notEmpty.signal();
} finally {
//释放锁
takeLock.unlock();
}
}
复制代码
基本流程:
put方法也是向队列尾部插入一个元素。若是元素为null,抛出空指针异常。若是队列己满则阻塞当前线程,直到队列有空闲插入成功为止。若是队列空闲则插入成功,直接返回。若是在阻塞时被其余线程设置了中断标志, 则被阻塞线程会抛出 InterruptedException 异常而返回。
public void put(E e) throws InterruptedException {
////为空直接抛空指针异常
if (e == null) throw new NullPointerException();
int c = -1;
// 构造新节点
Node<E> node = new Node<E>(e);
//获取putLock独占锁
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//获取独占锁,它跟lock的区别,是能够被中断
putLock.lockInterruptibly();
try {
//队列已满线程挂起等待
while (count.get() == capacity) {
notFull.await();
}
//进队列
enqueue(node);
//递增元素计数
c = count.getAndIncrement();
//若是元素入队,还有空闲,则唤醒notFull条件队列里被阻塞的线程
if (c + 1 < capacity)
notFull.signal();
} finally {
//释放锁
putLock.unlock();
}
//若是容量为0,则
if (c == 0)
//激活 notEmpty 的条件队列,唤醒被阻塞的线程
signalNotEmpty();
}
复制代码
基本流程:
从队列头部获取并移除一个元素, 若是队列为空则返回 null, 该方法是不阻塞的。
poll方法源代码
public E poll() {
final AtomicInteger count = this.count;
//若是队列为空,返回null
if (count.get() == 0)
return null;
E x = null;
int c = -1;
//获取takeLock独占锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//若是队列不为空,则出队,并递减计数
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
////容量大于1,则激活 notEmpty 的条件队列,唤醒被阻塞的线程
if (c > 1)
notEmpty.signal();
}
} finally {
//释放锁
takeLock.unlock();
}
if (c == capacity)
//唤醒notFull条件队列里被阻塞的线程
signalNotFull();
return x;
}
复制代码
dequeue方法源代码
//出队列
private E dequeue() {
//获取head节点
Node<E> h = head;
//获取到head节点指向的下一个节点
Node<E> first = h.next;
//head节点原来指向的节点的next指向本身,等待下次gc回收
h.next = h; // help GC
// head节点指向新的节点
head = first;
// 获取到新的head节点的item值
E x = first.item;
// 新head节点的item值设置为null
first.item = null;
return x;
}
复制代码
为了形象生动,咱们用一张图来描述出队过程。图片参考来源【细谈Java并发】谈谈LinkedBlockingQueue
signalNotFull方法源码
private void signalNotFull() {
//获取put独占锁
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
////唤醒notFull条件队列里被阻塞的线程
notFull.signal();
} finally {
//释放锁
putLock.unlock();
}
}
复制代码
基本流程:
获取队列头部元素可是不从队列里面移除它,若是队列为空则返回 null。 该方法是不 阻塞的。
public E peek() {
//队列容量为0,返回null
if (count.get() == 0)
return null;
//获取takeLock独占锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
//判断first是否为null,若是是直接返回
if (first == null)
return null;
else
return first.item;
} finally {
//释放锁
takeLock.unlock();
}
}
复制代码
基本流程:
获取当前队列头部元素并从队列里面移除它。 若是队列为空则阻塞当前线程直到队列 不为空而后返回元素,若是在阻塞时被其余线程设置了中断标志, 则被阻塞线程会抛出 InterruptedException 异常而返回。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
//获取takeLock独占锁
final ReentrantLock takeLock = this.takeLock;
//获取独占锁,它跟lock的区别,是能够被中断
takeLock.lockInterruptibly();
try {
//当前队列为空,则阻塞挂起
while (count.get() == 0) {
notEmpty.await();
}
//)出队并递减计数
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
//激活 notEmpty 的条件队列,唤醒被阻塞的线程
notEmpty.signal();
} finally {
//释放锁
takeLock.unlock();
}
if (c == capacity)
//激活 notFull 的条件队列,唤醒被阻塞的线程
signalNotFull();
return x;
}
复制代码
基本流程:
删除队列里面指定的元素,有则删除并返回 true,没有则返回 false。
public boolean remove(Object o) {
//为空直接返回false
if (o == null) return false;
//双重加锁
fullyLock();
try {
//边历队列,找到元素则删除并返回true
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
//执行unlink操做
unlink(p, trail);
return true;
}
}
return false;
} finally {
//解锁
fullyUnlock();
}
}
复制代码
双重加锁,fullyLock方法源代码
void fullyLock() {
//putLock独占锁加锁
putLock.lock();
//takeLock独占锁加锁
takeLock.lock();
}
复制代码
unlink方法源代码
void unlink(Node<E> p, Node<E> trail) {
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
//若是当前队列满 ,则删除后,也不忘记唤醒等待的线程
if (count.getAndDecrement() == capacity)
notFull.signal();
}
复制代码
fullyUnlock方法源代码
void fullyUnlock() {
//与双重加锁顺序相反,先解takeLock独占锁
takeLock.unlock();
putLock.unlock();
}
复制代码
基本流程
获取当前队列元素个数。
public int size() {
return count.get();
}
复制代码
因为进行出队、入队操做时的 count是加了锁的,因此结果相比ConcurrentLinkedQueue 的 size 方法比较准确。
Java并发编程之美中,有一张图唯妙唯肖描述了它,以下图: