在Java中,BlockingQueue
接口位于java.util.concurrent
包下。阻塞队列主要用来线程安全的实现生产者-消费者模型。他们能够使用于多个生产者和多个消费者的场景中。java
咱们能够在各类论坛和文章中找到BlockingQueue
的范例。在这篇文章中,咱们将介绍如何持续管理队列中的请求,以及如何在请求进入队列后马上处理。面试
咱们将使用单个线程管理任务放入队列的操做以及从队列中取出的操做。同时这个线程会持续的管理队列。另外一个线程将用来建立BlockingQueue
,它将一直运行知道服务器终止。安全
阻塞队列的大小能够在对象初始化的时候设置。它的大小应该基于系统堆的大小。服务器
如今,让咱们回顾建立阻塞队列的步骤以及如何持续的管理和处理请求。微信
新建一个EventData
的POJO类,它会存储生产者产生的事件数据并输入到队列中 - 同时它会被消费者从队列中取出e并处理。ide
package com.dzone.blockingqueue.example; class EventData { private String eventID; private String eventName; private String eventDate; private String eventType; private String eventLocation; public String getEventID() { return eventID; } public void setEventID(String eventID) { this.eventID = eventID; } public String getEventName() { return eventName; } public void setEventName(String eventName) { this.eventName = eventName; } public String getEventDate() { return eventDate; } public void setEventDate(String eventDate) { this.eventDate = eventDate; } public String getEventType() { return eventType; } public void setEventType(String eventType) { this.eventType = eventType; } public String getEventLocation() { return eventLocation; } public void setEventLocation(String eventLocation) { this.eventLocation = eventLocation; } }
建立一个QueueService
单例类,用来将请求放入队列中,以及从队列中提取请求并处理。this
package com.dzone.blockingqueue.example; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class QueueService { private static QueueService instance = null; private static BlockingQueue < EventData > eventQueue = null; private QueueService() {} public static QueueService getInstance() { if (instance == null) { instance = new QueueService(); } return instance; } private void initialize() { if (eventQueue == null) { eventQueue = new LinkedBlockingQueue <EventData> (); EventProcessor eventProcessor = new EventProcessor(); eventProcessor.start(); } } public void putEventInQueue(EventData eventData) { try { initialize(); eventQueue.put(eventData); } catch (InterruptedException ex) { ex.printStackTrace(); } } class EventProcessor extends Thread { @Override public void run() { for (;;) { EventData eventData = null; try { eventData = eventQueue.take(); System.out.println("Process Event Data : Type : " + eventData.getEventType() + " / Name : " + eventData.getEventName()); } catch (InterruptedException ex) { ex.printStackTrace(); } } } } }
咱们新建了一个静态的BlockingQueue
变量。它在初始化时会比初始化为ArrayBlockingQueue
或是LinkedBlockingQueue
,这取决于需求。在此以后,这个对象会被用来放入或是提取请求。spa
咱们还新建了一个继承了Thread
的EventProcessor
私有类。它在BlockingQueue初始化的时候启动。在EventProcessor
中使用了一个for循环来管理队列。BlockingQueue
的优势在于它会在没有元素的时候进入等待模式。当队列为空时,for循环不会继续遍历。当请求进入队列后,BlockingQueue
会继续运行并处理请求。线程
单个EventProcessor线程将处理特定队列中的全部请求。此线程永远不会过时,有助于实现持续监控。code
咱们还在QueueService
中建立了一个公有的putEventInQueue
方法,它会帮助咱们将请求放入由getInstance
方法获取的队列中。在这个方法里,请求被放入BlockingQueue
。这些请求将会自动的被BlockingQueue
获取,并在EventProcessor
线程中继续处理。
如今让咱们向队列中加载数据。咱们已经实现了一个EventService
类。它会将几个请求写入BlockingQueue
中。在QueueService
中,咱们会看到请求是如何被取出并处理的。
package com.dzone.blockingqueue.example; public class EventService { public static void main(String arg[]) { try { EventData event = null; for (int i = 0; i < 100; i++) { event = new EventData(); event.setEventType("EventType " + i); event.setEventName("EventName " + i); QueueService.getInstance().putEventInQueue(event); Thread.sleep(100); } } catch (InterruptedException e) { e.printStackTrace(); } } }
输出结果以下:
Process Event Data : Type : EventType 0 / Name : EventName 0 Process Event Data : Type : EventType 1 / Name : EventName 1 Process Event Data : Type : EventType 2 / Name : EventName 2 Process Event Data : Type : EventType 3 / Name : EventName 3 Process Event Data : Type : EventType 4 / Name : EventName 4
想要了解更多开发技术,面试教程以及互联网公司内推,欢迎关注个人微信公众号!将会不按期的发放福利哦~