JUC阻塞队列之BlockedQueue(七)

一.阻塞队列

      阻塞队列是一个队列,它最大的特色就是阻塞的线程知足条件就会被自动唤醒,不须要咱们人为的判断。java

  • 当队列为空时,从队列中获取元素的操做就会被阻塞;
  • 当队列为满时,从队列中添加元素的操做就会被阻塞。

 

 

 二.阻塞队列的好处

        以前总结的线程间通讯,须要判断对应的值,一个生产者与一个消费者,在判断状态的时候须要加一个标志类,还须要控制线程。而阻塞队列在某些状况会挂起<暂停>线程(阻塞),知足条件,就会被自动的唤起数组

        java中阻塞队列的方法以下:安全

       

 

 

 BlockQueue的源码:ide

public interface BlockingQueue<E> extends Queue<E> {
 
    //增长一个元索 若是队列已满,则抛出一个IIIegaISlabEepeplian异常
    boolean add(E e);
 
    //添加一个元素并返回true 若是队列已满,则返回false
    boolean offer(E e);
 
    //添加一个元素 若是队列满,则阻塞
    void put(E e) throws InterruptedException;
 
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;
 
    //移除并返回队列头部的元素 若是队列为空,则阻塞
    E take() throws InterruptedException;
 
    //移除并返问队列头部的元素 若是队列为空,则返回null
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;
 
    //剩余容量
    int remainingCapacity();
 
    //移除并返回队列头部的元素 若是队列为空,则抛出一个NoSuchElementException异常
    boolean remove(Object o);
 
    public boolean contains(Object o);
 
    //一次性从BlockingQueue获取全部可用的数据对象并转移到参数集合中
    int drainTo(Collection<? super E> c);
 
    int drainTo(Collection<? super E> c, int maxElements);
}

 能够看到,BlockQueue提供了不少不一样于其余集合的方法。下面是它的子类:this

 

 

 咱们随便选一个ArrayBlockQueue来探索一下它是怎么作到阻塞的。先看看它的三个构造方法:spa

public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
 
    public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
            throw new IllegalArgumentException();
//初始化一个数组 this.items = new Object[capacity];
//重入锁 lock = new ReentrantLock(fair);
//下面初始化的是两个队列 notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }

  

咱们关注的重点固然是第三个构造方法,此处用到了lock锁来把一个普通的集合转移到ArrayBlockQueue中。ArrayBlockQueue的初始化是在第二个构造方法中完成的。须要注意的是,ArrayBlockQueue内部存储对象的方式是经过Object数组实现的。线程

不难想象,构造方法就已经用lock锁来达到安全的目的了,那么,其余的阻塞相关方法也确定离不开lock锁的影子了。咱们带着这个flag继续往下走。先来看看offer()方法和put()方法,发现和咱们猜测的同样:对象

该方法在ArrayBlockQueue中有两个重载方法offer(E e, long timeout, TimeUnit unit)和offer(E e)。 
将指定的元素插入到此队列的尾部(若是当即可行且不会超过该队列的容量),在成功时返回 true,若是此队列已满,则返回 false。前者与后者的主要区别在于,若是队列中没有可用空间,能够设置必定的等待时间,等待可用空间。blog

 public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try { 
//若是长度等于数组长度表示已经满了 if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } }

 将指定的元素插入到队列的尾部,若是有可用空间直接插入,若是没有可用空间,调用condition.await()方法等待,直到被唤醒,而后插入元素。 队列

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
//这种锁能够中断 lock.lockInterruptibly(); try { while (count == items.length) notFull.await();
//能够跟进 enqueue(e); } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } }

  

private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
//此处putIndex能够当成游标 items[putIndex] = x;
//当数据满了,游标会恢复为0 if (++putIndex == items.length) putIndex = 0;
//队列中元素个数 count++;
//唤醒 notEmpty.signal(); }

 

 若是插入元素成功,返回true,若是插入失败抛出异常IllegalStateException(“Queue full”)。

public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

出队列方法: 
该方法也有两个重载方法poll(long timeout, TimeUnit unit)和poll(),从队列头部移除一个元素,前者与后者的区别在于,若是队列中没有能够移除的元素,前者会等待必定时间,而后执行移除方法。

public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();//若是没有能够移出元素,返回null,不然执行dequeue()方法
        } finally {
            lock.unlock();
        }
    }
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);//若是没有能够移出元素,调用condition的线程等待的方法,等待必定时间
            }
            return dequeue();
        } finally {
            lock.unlock();//最后释放锁lock
        }
    }
private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();//最后唤醒其余等待的线程
        return x;
    }

  

获取并移除此队列的头部。take()和poll()的区别在于,若是队列中没有可移除元素,take()会一直等待,而poll()可设置直接返回null或者等待必定时间。

 
public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();//若是队列中没有元素,该线程一直处于阻塞状态
            return dequeue();
        } finally {
            lock.unlock();
        }
      }

 分析完了上面的源码,咱们以一个小Demo来结束上面的话题,咱们以积分分发和消费为例来随便搞个例子

public class User {
    private String name;

    public User(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "User{" +
                "name='" + name + '\'' +
                '}';
    }
}

  

    

public class UserService {

    private final ExecutorService executorService= Executors.newSingleThreadExecutor();

    ArrayBlockingQueue<User> arrayBlockingQueue=new ArrayBlockingQueue(10);
    {
        init();
    }
    public void init(){ //不断消费队列的线程
        executorService.execute(()->{
            while(true){
                try {
                    User user=arrayBlockingQueue.take(); //阻塞式
                    System.out.println("发送优惠券给:"+user);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    public boolean register(){
        User user=new User("用户A");
        addUser(user);
        //发送积分.
        try {
            arrayBlockingQueue.put(user);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return true;
    }
    private void addUser(User user){
        System.out.println("添加用户:"+user);
    }

    public static void main(String[] args) {
        new UserService().register();
    }
}

  

二.CountDownLatch

相关文章
相关标签/搜索