阻塞队列

在java集合中还有一个重要的分支是concurrent包中的阻塞队列,这些队列都是线程安全的,有基于数组建立的队列,也有基于单链表或者双链表建立的队列,比较常见的一些队列有:java

ArrayBlockingQueue:基于数组的阻塞队列(添加与获取均可阻塞)数组

LinkedBlockingQueue:基于链表的阻塞队列(添加与获取均可阻塞)安全

ArrayBlockingQueue与LinkedBlockingQueue放一块儿说,由于可能常常使用到这两个阻塞队列,主区别以下:ide

1.锁的方式不一样测试

    ArrayBlockingQueue读写只有一个锁this

/** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;

    notEmpty和notFull都是从lock获取的,不适用于多生产者和消费者的场景,锁的争用致使效率下降线程

 

    LinkedBlockingQueue读写锁是分离的code

private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

    notEmpty由takeLock获取,notFull由putLock获取对象

2.添加的对象不一样排序

    ArrayBlockingQueue直接将对象添加到数组中

    LinkedBlockingQueue须要将对象从新构形成NODE(单链表节点)对象,再链接到链表中

3.使用场景

    虽然ArrayBlockingQueue锁的争用会下降效率,可是LinkedBlockingQueue在建立Node也会增长开销,在本身电脑上实际测试中,ArrayBlockingQueue速度更优于LinkedBlockingQueue

    总结:在实际开发中我认为任意选择他们中的一个都是能够的(但愿有朋友来指正)

    测试代码以下:

package com.jv.queue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

import com.jv.queue.TestLinkedBlockingQueue.Consumer;
import com.jv.queue.TestLinkedBlockingQueue.Productor;

public class TestArrayBlockingQueue {
	public static void main(String[] args) {
		
		ArrayBlockingQueue<Integer> q = new ArrayBlockingQueue<>(100000000);
		new Thread(new Productor(q)).start();
		new Thread(new Productor(q)).start();
		new Thread(new Productor(q)).start();
		new Thread(new Productor(q)).start();
		new Thread(new Productor(q)).start();
		
		new Thread(new Consumer(q)).start();
		new Thread(new Consumer(q)).start();
		new Thread(new Consumer(q)).start();
		new Thread(new Consumer(q)).start();
		new Thread(new Consumer(q)).start();
		new Thread(new Consumer(q)).start();
		new Thread(new Consumer(q)).start();
		
	}
	
	static class Productor implements Runnable{
		ArrayBlockingQueue<Integer> q ;

		public Productor(ArrayBlockingQueue<Integer> q) {
			this.q = q;
		}
		
		@Override
		public void run() {
			long start = System.currentTimeMillis();
			for(int i = 0 ; i < 10000000 ; i++) {
				try {
					q.offer(i, 2000, TimeUnit.MILLISECONDS);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			long end = System.currentTimeMillis();
			System.out.println(end-start);
		}
		
	}
	static class Consumer implements Runnable{
		ArrayBlockingQueue<Integer> q ;

		public Consumer(ArrayBlockingQueue<Integer> q) {
			this.q = q;
		}
		
		@Override
		public void run() {
			while(true) {
				try {
					q.poll(2000, TimeUnit.MILLISECONDS);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
		
	}
}
package com.jv.queue;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class TestLinkedBlockingQueue {
	public static void main(String[] args) {
		
		LinkedBlockingQueue<Integer> q = new LinkedBlockingQueue<>(100000000);
		new Thread(new Productor(q)).start();
		new Thread(new Productor(q)).start();
		new Thread(new Productor(q)).start();
		new Thread(new Productor(q)).start();
		new Thread(new Productor(q)).start();
		
		new Thread(new Consumer(q)).start();
		new Thread(new Consumer(q)).start();
		new Thread(new Consumer(q)).start();
		new Thread(new Consumer(q)).start();
		new Thread(new Consumer(q)).start();
		new Thread(new Consumer(q)).start();
		new Thread(new Consumer(q)).start();
		
	}
	
	static class Productor implements Runnable{
		LinkedBlockingQueue<Integer> q ;

		public Productor(LinkedBlockingQueue<Integer> q) {
			this.q = q;
		}
		
		@Override
		public void run() {
			long start = System.currentTimeMillis();
			for(int i = 0 ; i < 10000000 ; i++) {
				try {
					q.offer(i, 2000, TimeUnit.MILLISECONDS);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			long end = System.currentTimeMillis();
			System.out.println(end-start);
		}
		
	}
	static class Consumer implements Runnable{
		LinkedBlockingQueue<Integer> q ;

		public Consumer(LinkedBlockingQueue<Integer> q) {
			this.q = q;
		}
		
		@Override
		public void run() {
			while(true) {
				try {
					q.poll(2000, TimeUnit.MILLISECONDS);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
		
	}
}

PriorityBlockingQueue:可排序的阻塞队列(添加不阻塞,获取可阻塞)

  能够输出排序后的队列元素,PriorityBlockingQueue是无界队列(刚开始设定一个队列的大小,当队列不够长时会自动增加), 该队列的元素必须实现comparable接口,使用迭代器queue.iterator是不能保证顺序的,应该使用for或者while循环

LinkedBlockingDeque:基于双向链表的阻塞队列(添加与获取均可阻塞)

    能够双向添加、双向获取

DelayQueue:延迟到期队列(添加不阻塞,获取可阻塞)

    核心是一个基于数组的PriorityQueue(并非concurrent包中的类),它包含的对象必须实现Delayed接口,重写getDelay(TimeUnit unit)方法---获得到期时间,DelayQueue是无界队列(刚开始设定一个队列的大小,当队列不够长时会自动增加)。这种队列能够用来实现任务计划,不过JDK都有本身的任务计划API了,可能会不多使用

 

 

API使用举例:

package com.jv.queue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.junit.Test;

public class TestBlockingQueue {
	/*
	 * ArrayBlockQueue 是基于数组实现的有界队列,支持阻塞、非阻塞添加和获取数据
	 *
	 */
	@Test
	public void test1() throws InterruptedException {
		ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(5);

		queue.add("1");
		queue.add("2");
		queue.add("3");
		queue.add("4");
		queue.add("5");
		
		//queue.add("6");//不阻塞
		queue.offer("6");//不阻塞
		queue.offer("6", 3000, TimeUnit.MILLISECONDS);//阻塞指定时间
		//queue.put("6");//一直阻塞
		
		
		System.out.println(queue.take());//会阻塞
		System.out.println(queue.poll());//不阻塞
		System.out.println(queue.poll());
		System.out.println(queue.poll());
		System.out.println(queue.peek());
		System.out.println(queue.poll());
		//System.out.println(queue.take());
		System.out.println(queue.poll(3000,TimeUnit.MILLISECONDS));//阻塞特定时间

		System.out.println(queue.peek());//获取队列当前index对应的元素,不会将元素从队列中移除,本质上是调用itemAt方法,使用数组下标访问
	}
	/*
	 * LinkedBlockQueue 是基于链表实现的有界队列,支持阻塞、非阻塞添加和获取数据
	 *
	 */
	@Test
	public void test2() throws InterruptedException {
		LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(5);

		queue.add("1");
		queue.add("2");
		queue.add("3");
		queue.add("4");
		queue.add("5");
		
		//queue.add("6");//不阻塞
		queue.offer("6");//不阻塞
		queue.offer("6", 3000, TimeUnit.MILLISECONDS);//阻塞指定时间
		//queue.put("6");//一直阻塞
		
		
		System.out.println(queue.take());//会阻塞
		System.out.println(queue.poll());//不阻塞
		System.out.println(queue.poll());
		System.out.println(queue.peek());
		System.out.println(queue.poll());
		System.out.println(queue.poll());
		//System.out.println(queue.take());
		System.out.println(queue.poll(3000,TimeUnit.MILLISECONDS));//阻塞特定时间

		System.out.println(queue.peek());//获取队列index的元素,不会讲元素从队列中移除,本质上是调用itemAt方法,使用数组下标访问
	}
	/*
	 * PriorityBlockingQueue 是基于数组实现的有界队列,支持非阻塞添加数据,支持阻塞和非阻塞获取数据
	 * 从类名能够看出该队列是支持根据优先级排序访问的,那么元素必须实现comparable接口
	 * 若是使用迭代器queue.iterator是不能保证顺序的,应该使用for或者while循环
	 */
	@Test
	public void test3() throws InterruptedException {
		PriorityBlockingQueue<String> queue = new PriorityBlockingQueue<>(5);

		queue.add("1");//不阻塞,调用的offer()
		queue.add("2");
		queue.add("3");
		queue.add("4");
		queue.add("5");
		
		queue.offer("6");//不阻塞
		queue.offer("7", 3000, TimeUnit.MILLISECONDS);//不阻塞,调用的offer(),超时设置无效
		queue.put("8");//不阻塞,调用的offer()
		
		System.out.println(queue.take());//会阻塞
		System.out.println(queue.poll());
		System.out.println(queue.peek());
		System.out.println(queue.poll());
		System.out.println(queue.poll());
		System.out.println(queue.take());
		System.out.println(queue.take());
		System.out.println(queue.poll(3000,TimeUnit.MILLISECONDS));//阻塞特定时间
		System.out.println(queue.poll(3000,TimeUnit.MILLISECONDS));//阻塞特定时间
		System.out.println(queue.poll(3000,TimeUnit.MILLISECONDS));//阻塞特定时间
		System.out.println(queue.take());
		System.out.println(queue.peek());//获取队列index的元素,不会将元素从队列中移除,本质上是数组下标访问
	}
	
}
相关文章
相关标签/搜索