使用 LinkedBlockingQueue 实现简易版线程池

前一阵子在作联系人的导入功能,使用POI组件解析Excel文件后获取到联系人列表,校验以后批量导入。单从技术层面来讲,导入操做一般状况下是一个比较耗时的操做,并且若是联系人达到几万、几十万级别,必须拆分红为子任务来执行。综上,可使用线程池来解决问题。技术选型上,没有采用已有的 ThreadPoolExecutor 框架,而使用了自制的简易版线程池。该简易版的线程池,其实也是一个简易版的【生产者-消费者】模型,任务的加入就像是生产的过程,任务的处理就像是消费的过程。咱们在这里不去讨论方案的合理性,只是从技术层面总结一下在实现简易版线程池的过程当中,我所学到的知识。
 
代码放在Github上,分享一下: https://github.com/Julius-Liu/threadpool

 

1、线程池设计

咱们首先使用数组 ArrayList 来做为线程池的存储结构,例如数组大小为10就意味着这是一个大小为10的线程池。而后咱们使用 LinkedBlockingQueue(链式阻塞队列)来存放线程的参数。示意图以下:

 

当线程池里的线程初始化完成后,咱们但愿线程都处于【饥饿】状态,随时等待参数传入,而后执行。因此,此时线程应该处于阻塞状态,以下图所示:
 
当咱们将一个执行任务(一个参数)交给线程池之后,线程池会安排一个线程接收参数,这个线程会进入运行状态。线程执行完之后,线程又会由于参数队列为空而进入阻塞状态。某线程的执行状态以下图所示,执行完的阻塞态,如上图所示。

 

假设线程池中有3个线程,咱们连续扔了3个参数给线程池,线程池会轮询获取线程,将参数塞给他们,而后这些线程会进入运行状态。运行完成后回归阻塞状态。以下图所示:

 

以下图所示,假设线程池中只有3个线程,咱们连续发8个参数给线程池,那么池会轮流分配参数。线程在收到参数后就会执行。“消耗”掉一个参数后,会继续消耗下一个参数,直到参数列表为空为止。

 

2、为何使用 LinkedBlockingQueue

1. BlockingQueue

咱们必须先来讲说为何使用阻塞队列 BlockingQueue。BlockingQueue 队列为空时,尝试获取队头元素的操做会阻塞,一直等到队列中有元素时再返回。这个阻塞的特性,正是咱们须要的,咱们可让线程一直等待元素插入,一旦插入当即执行。BlockingQueue 也支持在添加元素时,若是队列已满,那么等到队列能够放入新元素时再放入。如此一来,咱们交给线程池的任务就不会丢失,哪怕超过了队列的容量。
 
因此咱们定下方案,采用阻塞队列来做为数据结构,而后咱们来调研阻塞队列经常使用的5种实现,看看选择哪一种实现来完成线程池。
 

2. ArrayBlockingQueue

ArrayBlockingQueue 是一个用数组实现的有界阻塞队列,其内部按先进先出的原则对元素进行排序,其中put方法和take方法为添加和删除的阻塞方法。能够说 ArrayBlockingQueue 是 阻塞队列的最直观的实现。
 

3. DelayQueue

DelayQueue是一个无界阻塞队列,延迟队列提供了在指定时间才能获取队列元素的功能,队列头元素是最接近过时的元素。没有过时元素的话,使用poll()方法会返回null值,超时断定是经过getDelay(TimeUnit.NANOSECONDS)方法的返回值小于等于0来判断。
 
DelayQueue阻塞队列在咱们系统开发中也经常会用到,例如缓存系统的设计。缓存中的对象,超过了空闲时间,须要从缓存中移出;例如任务调度系统,须要准确的把握任务的执行时间。咱们可能须要经过线程处理不少时间上要求很严格的数据,若是使用普通的线程,咱们就须要遍历全部的对象,一个个检查看数据是否过时。首先这样在执行上的效率不会过高,其次就是这种设计的风格也大大的影响了数据的精度。一个须要12:00点执行的任务可能12:01 才执行,这样对数据要求很高的系统有更大的弊端。使用 DelayQueue 能够作到精准触发。
 
由上可知,延迟队列不是咱们须要的阻塞队列实现。
 

4. LinkedBlockingQueue

LinkedBlockingQueue是一个由链表实现的有界队列阻塞队列,但大小默认值为Integer.MAX_VALUE,也能够在初始化的时候指定 capacity。和 ArrayBlockingQueue 同样,其中put方法和take方法为添加和删除的阻塞方法。
 

5. PriorityBlockingQueue

优先级阻塞队列经过使用堆这种数据结构实现将队列中的元素按照某种排序规则进行排序,从而改变先进先出的队列顺序,提供开发者改变队列中元素的顺序的能力。队列中的元素必须是可比较的,即实现Comparable接口,或者在构建函数时提供可对队列元素进行比较的Comparator对象。不能够放null,会报空指针异常,也不可放置没法比较的元素;add方法添加元素时,是自下而上的调整堆,取出元素时,是自上而下的调整堆顺序。
 
咱们放入参数队列中的参数都是平级的,不涉及优先级,所以咱们不考虑优先级阻塞队列。
 

6. SynchronousQueue

同步队列实际上不是一个真正的队列,由于它不会为队列中元素维护存储空间。与其余队列不一样的是,它维护一组线程,这些线程在等待着把元素加入或移出队列。同步队列是轻量级的,不具备任何内部容量,咱们能够用来在线程间安全的交换单一元素。
由于同步队列没有存储功能,所以put和take会一直阻塞,直到有另外一个线程已经准备好参与到交付过程当中。仅当有足够多的消费者,而且老是有一个消费者准备好获取交付的工做时,才适合使用同步队列。
 
应用场景,咱们来看一下Java并发包里的 newCachedThreadPool 方法:
 1 package java.util.concurrent;
 2 
 3 /**
 4  * 带有缓存的线程池
 5  */
 6 public static ExecutorService newCachedThreadPool() {
 7     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
 8                                   60L, TimeUnit.SECONDS,
 9                                   new SynchronousQueue<Runnable>());
10 }

 

Executors.newCachedThreadPool() 方法返回的 ThreadPoolExecutor 实例,其内部的阻塞队列使用的就是同步队列。因为ThreadPoolExecutor内部实现任务提交的时候调用的是工做队列的非阻塞式入队列方法(offer方法),所以,在使用同步队列做为工做队列的前提下,客户端代码向线程池提交任务时,而线程池中又没有空闲的线程可以从同步队列队列实例中取一个任务,那么相应的offer方法调用就会失败(即任务没有被存入工做队列)。此时,ThreadPoolExecutor会新建一个新的工做者线程用于对这个入队列失败的任务进行处理(假设此时线程池的大小还未达到其最大线程池大小)。
 
如上所述,同步队列没有内部容量来存放参数,所以咱们不选择同步队列。
 

7. 阻塞队列选择

研究了阻塞队列的5中实现之后,候选者就在 ArrayBlockingQueue 和 LinkedBlockingQueue 二者中。其实要实现本文的简易版线程池,使用数组阻塞队列和连接阻塞队列均可以,若是你要考虑一些极端状况下的性能问题,那么透彻的研究二者的使用场景就很是有必要。数组阻塞队列和连接阻塞队列的成员变量和方法都很类似,相同点咱们就先不说了。下面咱们来看看二者的不一样点:
  1. 队列大小有所不一样,ArrayBlockingQueue是有界的初始化必须指定大小,而LinkedBlockingQueue能够是有界的也能够是无界的(Integer.MAX_VALUE)。对于后者而言,当添加速度大于移除速度时,在无界的状况下,可能会形成内存溢出等问题。
  2. 数据存储容器不一样,ArrayBlockingQueue采用的是数组做为数据存储容器,而LinkedBlockingQueue采用的则是以Node节点做为链接对象的链表。
  3. 因为ArrayBlockingQueue采用的是数组的存储容器,所以在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。这可能在长时间内须要高效并发地处理大批量数据的时,对于GC可能存在较大影响。
  4. 实现队列添加或移除的锁不同,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操做和移除操做采用的同一个ReentrantLock锁,而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提升队列的吞吐量,也意味着在高并发的状况下生产者和消费者能够并行地操做队列中的数据,以此来提升整个队列的并发性能。
 

3、LinkedBlockingQueue 底层方法

咱们来调研一下 LinkedBlockingQueue,看看哪些变量和方法可使用。
先来看一下 LinkedBlockingQueue 的数据结构,有一个直观的了解:

 

说明:
  1. LinkedBlockingQueue继承于AbstractQueue,它本质上是一个FIFO(先进先出)的队列。
  2. LinkedBlockingQueue实现了BlockingQueue接口,它支持多线程并发。当多线程竞争同一个资源时,某线程获取到该资源以后,其它线程须要阻塞等待。
  3. LinkedBlockingQueue是经过单链表实现的。
    • head是链表的表头。取出数据时,都是从表头head处获取。
    • last是链表的表尾。新增数据时,都是从表尾last处插入。
    • count是链表的实际大小,即当前链表中包含的节点个数。
    • capacity是列表的容量,它是在建立链表时指定的。
    • putLock是插入锁,takeLock是取出锁;notEmpty是“非空条件”,notFull是“未满条件”。经过它们对链表进行并发控制。
 
咱们来看一下 LinkedBlockingQueue 经常使用的变量:

 1 // 容量
 2 private final int capacity;
 3 
 4 // 当前数量
 5 private final AtomicInteger count = new AtomicInteger(0);
 6 
 7 // 链表的表头
 8 transient Node<E> head; 
 9 
10 // 链表的表尾
11 private transient Node<E> last; 
12 
13 // 用于控制删除元素的【取出锁】和锁对应的【非空条件】
14 private final ReentrantLock takeLock = new ReentrantLock();
15 private final Condition notEmpty = takeLock.newCondition();
16 
17 // 用于控制添加元素的【插入锁】和锁对应的【非满条件】
18 private final ReentrantLock putLock = new ReentrantLock();
19 private final Condition notFull = putLock.newCondition();

 

这里的两把锁,takeLock 和 putLock,和两个条件,notEmpty 和 notFull 是咱们考察的重点。
LinkedBlockingQueue在实现“多线程对竞争资源的互斥访问”时,对于“插入”和“取出(删除)”操做分别使用了不一样的锁
  • 对于插入操做,经过 putLock(插入锁)进行同步
  • 对于取出操做,经过 takeLock(取出锁)进行同步
 
此外,插入锁putLock和notFull(非满条件)相关联,取出锁takeLock和notEmpty(非空条件)相关联。经过notFull条件和notEmpty条件更细腻的控制putLock 和 takeLock。
 
举例说明,若某线程(线程A)要取出数据时,队列正好为空,则该线程会执行notEmpty.await()进行等待;当其它某个线程(线程B)向队列中插入了数据以后,会调用notEmpty.signal()唤醒“notEmpty上的等待线程”。此时,线程A会被唤醒从而得以继续运行。 此外,线程A在执行取数据前,会获取takeLock,在取数据执行完毕再释放takeLock。
 
若某线程(线程H)要插入数据时(put操做),队列已满,则该线程会它执行notFull.await()进行等待;当其它某个线程(线程I)取出数据以后,会调用notFull.signal()唤醒“notFull上的等待线程”。此时,线程H就会被唤醒从而得以继续运行。 此外,线程H在执行插入操做前,会获取putLock,在插入操做执行完毕才释放putLock。
 

LinkedBlockingQueue 经常使用函数

 1 // 建立一个容量为 Integer.MAX_VALUE 的 LinkedBlockingQueue
 2 LinkedBlockingQueue()
 3 
 4 // 建立一个容量是 Integer.MAX_VALUE 的 LinkedBlockingQueue,最初包含给定 collection 的元素,元素按该 collection 迭代器的遍历顺序添加
 5 LinkedBlockingQueue(Collection<? extends E> c)
 6 
 7 // 建立一个具备给定(固定)容量的 LinkedBlockingQueue
 8 LinkedBlockingQueue(int capacity)
 9 
10 // 从队列完全移除全部元素
11 void clear()
12 
13 // 将指定元素插入到此队列的尾部(若是当即可行且不会超出此队列的容量),在成功时返回 true,若是此队列已满,则返回 false
14 boolean offer(E e)
15 
16 // 将指定元素插入到此队列的尾部,若有必要,则等待指定的时间以使空间变得可用
17 boolean offer(E e, long timeout, TimeUnit unit)
18 
19 // 获取但不移除此队列的头;若是此队列为空,则返回 null
20 E peek()
21 
22 // 获取并移除此队列的头,若是此队列为空,则返回 null
23 E poll()
24 
25 // 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(若是有必要)
26 E poll(long timeout, TimeUnit unit)
27 
28 // 将指定元素插入到此队列的尾部,若有队列满,则等待空间变得可用
29 void put(E e)
30 
31 // 返回理想状况下(没有内存和资源约束)此队列可接受而且不会被阻塞的附加元素数量
32 int remainingCapacity()
33 
34 // 今后队列移除指定元素的单个实例(若是存在)
35 boolean remove(Object o)
36 
37 // 返回队列中的元素个数
38 int size()
39 
40 // 获取并移除此队列的头部,在元素变得可用以前一直等待(若是有必要)
41 E take()

 

咱们看到 offer(E e) 和 put(E e) 都是往队尾插入元素,poll() 和 take() 都是取出队头的元素,可是它们之间仍是有细微的差异,咱们接下来重点看看这4个方法的源码。
 
下面来看一下 offer(E e) 的源码:

 1 /**
 2  * 将指定元素插入到此队列的尾部(若是当即可行且不会超出此队列的容量)
 3  * 在成功时返回 true,若是此队列已满,则返回 false
 4  * 若是使用了有容量限制的队列,推荐使用add方法,add方法在失败的时候只是抛出异常
 5  */
 6 public boolean offer(E e) {
 7     if (e == null) throw new NullPointerException();
 8     final AtomicInteger count = this.count;
 9     if (count.get() == capacity)
10         // 若是队列已满,则返回false,表示插入失败
11         return false;
12     int c = -1;
13     Node<E> node = new Node<E>(e);
14     final ReentrantLock putLock = this.putLock;
15     // 获取 putLock
16     putLock.lock();
17     try {
18         // 再次对【队列是否是满】的进行判断,若是不是满的,则插入节点
19         if (count.get() < capacity) {
20             enqueue(node);                 // 在队尾插入节点
21             c = count.getAndIncrement();   // 当前节点数量+1,并返回插入以前节点数量
22             if (c + 1 < capacity)
23                 // 若是在插入元素以后,队列仍然未满,则唤醒notFull上的等待线程
24                 notFull.signal();
25         }
26     } finally {
27         // 释放 putLock
28         putLock.unlock();
29     }
30     if (c == 0)
31         // 若是在插入节点前,队列为空,那么插入节点后,唤醒notEmpty上的等待线程
32         signalNotEmpty();
33     return c >= 0;
34 }

 

下面来看看 put(E e) 的源码:php

 1 /**
 2  * 将指定元素插入到此队列的尾部,若有队列满,则等待空间变得可用
 3  *
 4  * @throws InterruptedException {@inheritDoc}
 5  * @throws NullPointerException {@inheritDoc}
 6  */
 7 public void put(E e) throws InterruptedException {
 8     if (e == null) throw new NullPointerException();
 9     
10     int c = -1;
11     Node<E> node = new Node<E>(e);
12     final ReentrantLock putLock = this.putLock;
13     final AtomicInteger count = this.count;
14     putLock.lockInterruptibly();    // 可中断地获取 putLock
15     try {
16         // count 变量是被 putLock 和 takeLock 保护起来的,因此能够真实反映队列当前的容量状况
17         while (count.get() == capacity) {
18             notFull.await();
19         }
20         enqueue(node);                // 在队尾插入节点
21         c = count.getAndIncrement();  // 当前节点数量+1,并返回插入以前节点数量
22         if (c + 1 < capacity)
23             // 若是在插入元素以后,队列仍然未满,则唤醒notFull上的等待线程
24             notFull.signal();
25     } finally {
26         putLock.unlock();             // 释放 putLock
27     }
28     if (c == 0)
29         // 若是在插入节点前,队列为空,那么插入节点后,唤醒notEmpty上的等待线程
30         signalNotEmpty();
31 }

 

二者都用到了 signalNotEmpty(),下面来看一下源码:

 1 /**
 2  * 通知一个等待的take。该方法应该仅仅从put/offer调用,不然通常很难锁住takeLock
 3  */
 4 private void signalNotEmpty() {
 5     final ReentrantLock takeLock = this.takeLock;
 6     takeLock.lock();           // 获取 takeLock
 7     try {
 8         notEmpty.signal();     // 唤醒notEmpty上的等待线程,意味着如今能够获取元素了
 9     } finally {
10         takeLock.unlock();    // 释放 takeLock
11     }
12 }

 

下面来看看 poll() 的源码:

 1 /**
 2  * 获取并移除此队列的头,若是此队列为空,则返回 null
 3  */
 4 public E poll() {
 5     final AtomicInteger count = this.count;
 6     if (count.get() == 0)
 7         return null;
 8     E x = null;
 9     int c = -1;
10     final ReentrantLock takeLock = this.takeLock;
11     takeLock.lock();            // 获取 takeLock
12     try {
13         if (count.get() > 0) {
14             x = dequeue();                 // 获取队头元素,并移除
15             c = count.getAndDecrement();   // 当前节点数量-1,并返回移除以前节点数量
16             if (c > 1)
17                 // 若是在移除元素以后,队列中仍然有元素,则唤醒notEmpty上的等待线程
18                 notEmpty.signal();
19         }
20     } finally {
21         takeLock.unlock();      // 释放 takeLock
22     }
23     if (c == capacity)
24         // 若是在移除节点前,队列是满的,那么移除节点后,唤醒notFull上的等待线程
25         signalNotFull();
26     return x;
27 }

 

下面来看看 take() 的源码:

 1 /**
 2  * 取出并返回队列的头。若队列为空,则一直等待
 3  */
 4 public E take() throws InterruptedException { 
 5     E x; 
 6     int c = -1; 
 7     final AtomicInteger count = this.count; 
 8     final ReentrantLock takeLock = this.takeLock; 
 9     // 获取 takeLock,若当前线程是中断状态,则抛出InterruptedException异常 
10     takeLock.lockInterruptibly(); 
11     try { 
12         // 若队列为空,则一直等待
13        while (count.get() == 0) { 
14            notEmpty.await(); 
15        } 
16        x = dequeue();                  // 从队头取出元素 
17        c = count.getAndDecrement();    // 取出元素以后,节点数量-1;并返回移除以前的节点数量
18        if (c > 1) 
19            // 若是在移除元素以后,队列中仍然有元素,则唤醒notEmpty上的等待线程
20            notEmpty.signal();
21     } finally { 
22         takeLock.unlock();             // 释放 takeLock
23     } 
24     
25     if (c == capacity) 
26         // 若是在取出元素以前,队列是满的,就在取出元素以后,唤醒notFull上的等待线程
27         signalNotFull(); 
28     return x;
29 }

 

二者都用到了signalNotFull(),signalNotFull()的源码以下:

 1 /**
 2  * 唤醒notFull上的等待线程,只能从 poll 或 take 调用
 3  */
 4 private void signalNotFull() { 
 5     final ReentrantLock putLock = this.putLock; 
 6     putLock.lock();           // putLock 上锁
 7     try { 
 8         notFull.signal();     // 唤醒notFull上的等待线程,意味着能够插入元素了
 9     } finally { 
10         putLock.unlock();    // putLock 解锁
11     }
12 }

 

 
从上面的4个经常使用函数来看,咱们想要在队列为空的时候,将获取这个动做阻塞,所以咱们选择【take方法】而不是【poll方法】。值得注意的是带有参数的poll方法能够更精细地控制当队列为空时,获取动做阻塞多久。在本文中咱们不考虑这种作法,直接让获取操做在 notEmpty 上等待。对于插入操做,咱们采用【offer方法】而不是【put方法】,前者在队列满的时候返回false,后者在队列满的时候会在 notFull 上等待。在本文中,咱们把线程池作的简单一些,若是队列满就提示重试。
 

4、简易版线程池代码实现

具有了 LinkedBlockingQueue 的底层代码解读之后,咱们来实现简易版线程池。
其实在简易版线程池初期,因为对 LinkedBlockingQueue 的底层方法不熟悉,所以对线程手动 wait 和上锁。具体来讲,根据队列size的状况来决定线程是否要进入wait方法,而后在插入参数的时候,使用 synchronized 关键字锁住整个队列,再offer。这种作法,彻底没有考虑已有的 takeLock,putLock,notEmpty条件和notFull条件。因此后来仔细研究了连接阻塞队列的特性,修改了线程池的实现,算是作了正确的事。
 

1. 注册成为 Spring Bean

咱们但愿在Springboot 程序启动的时候,将线程池初始化。可使用 Spring 提供的 InitializingBean 接口的 afterPropertiesSet 方法,在全部基础属性初始化完成后,进行线程池的初始化。
 1 package cn.com.gkmeteor.threadpool.utils;
 2 
 3 @Component
 4 public class ThreadPoolUtil implements InitializingBean {
 5 
 6     public static int POOL_SIZE = 10;
 7 
 8     @Autowired
 9     private ThreadExecutorService threadExecutorService;   // 具体的线程处理类
10 
11     private List<ThreadWithQueue> threadpool = new ArrayList<>();
12 
13     /**
14      * 在全部基础属性初始化完成后,初始化当前类
15      *
16      * @throws Exception
17      */
18     @Override
19     public void afterPropertiesSet() throws Exception {
20         for (int i = 0; i < POOL_SIZE; i++) {
21             ThreadWithQueue threadWithQueue = new ThreadWithQueue(i, threadExecutorService);
22             this.threadpool.add(threadWithQueue);
23         }
24     }
25 }

 

2. 轮询获取一个线程

咱们但愿将任务轮流分给线程池中的线程。要实现这个比较简单,直接两行代码搞定。
1 public static int POOL_SIZE = 10;  // 线程池容量
2 index = (++index) % POOL_SIZE;     // index 是当前选中的线程下标

 

3. 参数入队和出队,线程运行和阻塞

主要使用 queue.offer(param) 和 String param = queue.take() 这两个方法,具体来看下面的代码:

 1 package cn.com.gkmeteor.threadpool.utils;
 2 
 3 import cn.com.gkmeteor.threadpool.service.ThreadExecutorService;
 4 import org.slf4j.Logger;
 5 import org.slf4j.LoggerFactory;
 6 
 7 import java.util.concurrent.BlockingQueue;
 8 
 9 /**
10  * 带有【参数阻塞队列】的线程
11  */
12 public class ThreadWithQueue extends Thread {
13 
14     public static int CAPACITY = 10;
15 
16     private Logger logger = LoggerFactory.getLogger(ThreadWithQueue.class);
17 
18     private BlockingQueue<String> queue;
19 
20     private ThreadExecutorService threadExecutorService;    // 线程运行后的业务逻辑处理
21 
22     private String threadName;
23 
24     public String getThreadName() {
25         return threadName;
26     }
27 
28     public void setThreadName(String threadName) {
29         this.threadName = threadName;
30     }
31 
32     /**
33      * 构造方法
34      *
35      * @param i                     第几个线程
36      * @param threadExecutorService 线程运行后的业务逻辑处理
37      */
38     public ThreadWithQueue(int i, ThreadExecutorService threadExecutorService) {
39         queue = new java.util.concurrent.LinkedBlockingQueue<>(CAPACITY);
40         threadName = "Thread(" + i + ")";
41 
42         this.threadExecutorService = threadExecutorService;
43 
44         this.start();
45     }
46 
47     /**
48      * 将参数放到线程的参数队列中
49      *
50      * @param param 参数
51      * @return
52      */
53     public String paramAdded(String param) {
54         String result = "";
55         if(queue.offer(param)) {
56             logger.info("参数已入队,{} 目前参数个数 {}", this.getThreadName(), queue.size());
57             result = "参数已加入线程池,等待处理";
58         } else {
59             logger.info("队列已达最大容量,请稍后重试");
60             result = "线程池已满,请稍后重试";
61         }
62         return result;
63     }
64 
65     public synchronized int getQueueSize() {
66         return queue.size();
67     }
68 
69     @Override
70     public void run() {
71         while (true) {
72             try {
73                 String param = queue.take();
74                 logger.info("{} 开始运行,参数队列中还有 {} 个在等待", this.getThreadName(), this.getQueueSize());
75                 if (param.startsWith("contact")) {
76                     threadExecutorService.doContact(param);
77                 } else if (param.startsWith("user")) {
78                     threadExecutorService.doUser(param);
79                 } else {
80                     logger.info("参数无效,不作处理");
81                 }
82                 logger.info("{} 本次处理完成", this.getThreadName());
83             } catch (Exception e) {
84                 e.printStackTrace();
85             }
86         }
87     }
88 }

 

了解了连接阻塞队列的底层方法后,使用起来就底气十足。具体来讲:java

offer方法会往队尾添加元素,若是队列已满,那么就会返回false,我在这时告诉调用者,线程池已满,请稍后重试。
take方法会取出队首元素,若是队列为空则一直等待。因此当全部线程初始化完成后,第一次运行的时候都会阻塞在 String param = queue.take(),一旦有参数入队,才会继续执行。又由于 while(true) 循环,会不断地take,根据队列中参数的状况来运行或阻塞。
 

5、总结

本文使用 LinkedBlockingQueue 实现了一个简易版的线程池,该线程池使用在联系人导入的任务中。同时阅读了连接阻塞队列和数组阻塞队列的源码,对阻塞队列有所了解,仅仅作到了会使用阻塞队列。
 

6、参考资料

相关文章
相关标签/搜索