一 , 单个task 入到pool的过程 .node
1public void execute(Runnable command) { 2 if (command == null) 3 throw new NullPointerException(); 4 int c = ctl.get(); 5 if (workerCountOf(c) < corePoolSize) { // 1 6 if (addWorker(command, true)) 7 return; 8 c = ctl.get(); 9 } 10 if (isRunning(c) && workQueue.offer(command)) { // 2 11 int recheck = ctl.get(); 12 if (! isRunning(recheck) && remove(command)) 13 reject(command); 14 else if (workerCountOf(recheck) == 0) 15 addWorker(null, false); 16 } 17 else if (!addWorker(command, false)) // 3 18 reject(command); 19}
worker数 还未到达 核心线程数(corePoolSize),不入队列,直接使用新线程执行task数组
不然,task数已经达到corePoolSize,将task放到队列中 .函数
不然,直接尝试使用新线程执行,失败的话,执行溢出策略.学习
1private Runnable getTask() { 2 boolean timedOut = false; // Did the last poll() time out? 3 for (;;) { 4 int c = ctl.get(); 5 int rs = runStateOf(c); 6 7 // Check if queue empty only if necessary. 8 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 1 9 decrementWorkerCount(); 10 return null; 11 } 12 int wc = workerCountOf(c); 13 // Are workers subject to culling? 14 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; 15 if ((wc > maximumPoolSize || (timed && timedOut)) 16 && (wc > 1 || workQueue.isEmpty())) { // 1 17 if (compareAndDecrementWorkerCount(c)) 18 return null; 19 continue; 20 } 21 try { 22 Runnable r = timed ? 23 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 2 24 workQueue.take(); // 3 25 if (r != null) 26 return r; 27 timedOut = true; 28 } catch (InterruptedException retry) { 29 timedOut = false; 30 } 31 } 32 }
以默认的LinkedBlockingQueue 为例,了解一下阻塞队列.this
属性:spa
1//节点比较简单,只是一个单链 2static class Node<E> { 3 E item; 4 Node<E> next; 5 Node(E x) { item = x; } 6 }
做为阻塞队列,有如下锁线程
1 /** Lock held by take, poll, etc */ 2 private final ReentrantLock takeLock = new ReentrantLock(); 3 4 /** Wait queue for waiting takes */ 5 private final Condition notEmpty = takeLock.newCondition(); 6 7 /** Lock held by put, offer, etc */ 8 private final ReentrantLock putLock = new ReentrantLock(); 9 10 /** Wait queue for waiting puts */ 11 private final Condition notFull = putLock.newCondition();
队列的进出用了不一样的锁,队列的进出是能够同时进行的.code
put()方法对象
1//put 2public void put(E e) throws InterruptedException { 3 if (e == null) throw new NullPointerException(); 4 // Note: put/take/etc 方法都使用本地变量操做 5 // 保持count计数为负数,表示失败 . 除非set操做 6 int c = -1; 7 Node<E> node = new Node<E>(e); 8 final ReentrantLock putLock = this.putLock; 9 final AtomicInteger count = this.count; 10 //可被 Interrupt 的锁 11 putLock.lockInterruptibly(); 12 try { 13 //当已经达到最大容量,那么阻塞线程,并等待. 14 //count是原子的,因此不用lock 保护 15 // 16 while (count.get() == capacity) { 17 notFull.await(); 18 } 19 //将节点 入队列 20 enqueue(node); 21 //count数增长 22 c = count.getAndIncrement(); 23 //若是还没到达最大容量, 24 if (c + 1 < capacity) 25 //那么唤醒其余(一个)put操做 26 notFull.signal(); 27 } finally { 28 //put解锁 29 putLock.unlock(); 30 } 31 if (c == 0) 32 signalNotEmpty(); 33}
offer(E) 方法 , ThreadPoolExecutor内,用的是这个方法队列
1public boolean offer(E e) { 2 if (e == null) throw new NullPointerException(); 3 final AtomicInteger count = this.count; 4 //若是已经到达最大容量,直接退出 5 if (count.get() == capacity) 6 return false; 7 int c = -1; 8 Node<E> node = new Node<E>(e); 9 final ReentrantLock putLock = this.putLock; 10 //加不可被interrupted的锁 11 putLock.lock(); 12 try { 13 //还未到达最大容量 14 if (count.get() < capacity) { 15 //入队列 16 enqueue(node); 17 //count增长 18 c = count.getAndIncrement(); 19 if (c + 1 < capacity) 20 notFull.signal(); 21 } 22 } finally { 23 putLock.unlock(); 24 } 25 // 还未理解 这个条件的场景 26 if (c == 0) 27 //唤醒其余take锁 28 signalNotEmpty(); 29 //只要队列有值 ,那么就是加入成功 30 return c >= 0; 31}
offer(E e, long timeout, TimeUnit unit) 方法
相对于offer(E)方法,这里加了个超时
1//当达到最大容量 2while (count.get() == capacity) { 3 //若是等待结束,仍是没有可用容量(仍是最大容量) 4 if (nanos <= 0) 5 //那么结束入队 6 return false; 7 //等待timeout的时间 8 nanos = notFull.awaitNanos(nanos); 9} 10//等待结束,有可用容量,那么入队列操做 11enqueue(new Node<E>(e));
poll(long timeout, TimeUnit unit)方法,ThreadPoolExecutor类keepAliveTime,主要就是使用这个方法
1public E poll(long timeout, TimeUnit unit) throws InterruptedException { 2 E x = null; 3 // 假设count为负数,没有数据 4 int c = -1; 5 //取timeout的毫秒时间 6 long nanos = unit.toNanos(timeout); 7 final AtomicInteger count = this.count; 8 final ReentrantLock takeLock = this.takeLock; 9 //take锁,可Interrupt的锁 10 takeLock.lockInterruptibly(); 11 try { 12 //当count==0时,已经没有元素了 13 while (count.get() == 0) { 14 //等待结束后,依然没有元素 15 if (nanos <= 0) 16 //结束,并返回null对象 17 return null; 18 //等待timeout的时间,线程状态: TIMED_WAIT 19 nanos = notEmpty.awaitNanos(nanos); 20 } 21 //等待结束后,队列中有元素了. 22 //取队列的最顶元素 23 x = dequeue(); 24 //count - 1 ,原子操做 25 c = count.getAndDecrement(); 26 if (c > 1) 27 //减完后,队列中依然有元素,那么叫醒其余take 等待锁 28 notEmpty.signal(); 29 } finally { 30 takeLock.unlock(); 31 } 32 // 这个未理解 33 if (c == capacity) 34 //叫醒其余全部 put锁,有空间了,能够放元素了. 35 signalNotFull(); 36 return x; 37}
take() 方法,ThreadPoolExecutor类,保活的线程,在getTask()时,调用此方法
1public E take() throws InterruptedException { 2 E x; 3 //假设失败,count为负数 4 int c = -1; 5 final AtomicInteger count = this.count; 6 final ReentrantLock takeLock = this.takeLock; 7 //可被Interrupt的take锁,当被Interrupt后,抛出异常 8 takeLock.lockInterruptibly(); 9 try { 10 //当队列没有元素后 11 while (count.get() == 0) { 12 //线程等待状态,除非被其余线程唤醒 13 //处于永久等待状态 14 notEmpty.await(); 15 } 16 //取队列头的元素 17 x = dequeue(); 18 c = count.getAndDecrement(); 19 if (c > 1) 20 notEmpty.signal(); 21 } finally { 22 takeLock.unlock(); 23 } 24 if (c == capacity) 25 //叫醒其余全部的put锁 26 signalNotFull(); 27 return x; 28}
阻塞队列的阻塞机制,下篇再了解下.
// lock已经快分不清了,只凭理论知识 已经不能把这个LinkedBlockingQueue理的明白了.得先学习下Lock了
有如下问题: