LinkedBlockingQueue的put,add和offer这三个方法功能很类似,都是往队列尾部添加一个元素。既然都是一样的功能,为啥要有有三个方法呢?java
这三个方法的区别在于:node
索引这三种不一样的方法在队列满时,插入失败会有不一样的表现形式,咱们能够在不一样的应用场景中选择合适的方法。app
先看看add
方法,less
public class LinkedBlockingQueueTest { public static void main(String[] args) throws InterruptedException { LinkedBlockingQueue<String> fruitQueue = new LinkedBlockingQueue<>(2); fruitQueue.add("apple"); fruitQueue.add("orange"); fruitQueue.add("berry"); }
当咱们执行这个方法的时候,会报下面的异常,async
Exception in thread "main" java.lang.IllegalStateException: Queue full at java.util.AbstractQueue.add(AbstractQueue.java:98) at com.pony.app.LinkedBlockingQueueTest.testAdd(LinkedBlockingQueueTest.java:23) at com.pony.app.LinkedBlockingQueueTest.main(LinkedBlockingQueueTest.java:16)
而后再来看看put
用法,ide
public class LinkedBlockingQueueTest implements Runnable { static LinkedBlockingQueue<String> fruitQueue = new LinkedBlockingQueue<>(2); public static void main(String[] args) throws InterruptedException { new Thread(new LinkedBlockingQueueTest()).start(); fruitQueue.put("apple"); fruitQueue.put("orange"); fruitQueue.put("berry"); System.out.println(fruitQueue.toString()); } @Override public void run() { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } fruitQueue.poll(); } }
运行这段代码,你会发现首先程序会卡住(队列阻塞)3秒左右,而后打印队列的orange
和berry
两个元素。源码分析
由于我在程序的启动的时候顺便启动了一个线程,这个线程会在3秒后从队列头部移除一个元素。ui
最后看看offer
的用法,this
public static void main(String[] args) throws InterruptedException { LinkedBlockingQueue<String> fruitQueue = new LinkedBlockingQueue<>(2); System.out.println(fruitQueue.offer("apple")); System.out.println(fruitQueue.offer("orange")); System.out.println(fruitQueue.offer("berry")); }
运行结果:线程
true true false
先来看看add
方法的实现,
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
因此add实际上是包装了一下offer,没什么能够说的。
而后来看看put
和offer
的实现,两个放在一块儿说。
put方法源码,
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
offer方法源码,
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(new Node<E>(e)); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; }
咱们重点关注他们的区别,offer方法在插入的时候会等一个超时时间timeout
,若是时间到了队列仍是满的(count.get() == capacity
),就会返回false。
而put方法是无限期等待,
while (count.get() == capacity) { notFull.await(); }
因此咱们在应用层使用的时候,若是队列满再插入会阻塞。
在早期版本的kafka中,生产者端发送消息使用了阻塞队列,代码以下:
private def asyncSend(messages: Seq[KeyedMessage[K,V]]) { for (message <- messages) { val added = config.queueEnqueueTimeoutMs match { case 0 => queue.offer(message) case _ => try { if (config.queueEnqueueTimeoutMs < 0) { queue.put(message) true } else { queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS) } } catch { case _: InterruptedException => false } } if(!added) { producerTopicStats.getProducerTopicStats(message.topic).droppedMessageRate.mark() producerTopicStats.getProducerAllTopicsStats.droppedMessageRate.mark() throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString) }else { trace("Added to send queue an event: " + message.toString) trace("Remaining queue size: " + queue.remainingCapacity) } } }
能够看到,config.queueEnqueueTimeoutMs
是0的时候,使用的是offer
方法,小于0的时候则使用put
方法。
咱们在使用kafka的时候,能够经过queue.enqueue.timeout.ms
来决定使用哪一种方式。好比某些应用场景下,好比监控,物联网等场景,容许丢失一些消息,能够把queue.enqueue.timeout.ms
配置成0,这样就kafka底层就不会出现阻塞了。
新版的kafka(我印象中是2.0.0版本开始?)用java重写了,再也不使用阻塞队列,因此没有上面说的问题。