Java 高并发六:JDK并发包2详解

  1. 线程池的基本使用
    1.1.为何须要线程池

平时的业务中,若是要使用多线程,那么咱们会在业务开始前建立线程,业务结束后,销毁线程。
可是对于业务来讲,线程的建立和销毁是与业务自己无关的,只关心线程所执行的任务。
所以但愿把尽量多的cpu用在执行任务上面,而不是用在与业务无关的线程建立和销毁上面。
而线程池则解决了这个问题,线程池的做用就是将线程进行复用。java

1.2.JDK为咱们提供了哪些支持
Java 高并发六:JDK并发包2详解算法

JDK中的相关类图如上图所示。
其中要提到的几个特别的类。api

Callable类和Runable类类似,可是区别在于Callable有返回值。缓存

ThreadPoolExecutor是线程池的一个重要实现。markdown

而Executors是一个工厂类。多线程

1.3.线程池的使用并发

1.3.1.线程池的种类
new FixedThreadPool 固定数量的线程池,线程池中的线程数量是固定的,不会改变。
new SingleThreadExecutor 单一线程池,线程池中只有一个线程。
new CachedThreadPool 缓存线程池,线程池中的线程数量不固定,会根据需求的大小进行改变。
new ScheduledThreadPool 计划任务调度的线程池,用于执行计划任务,好比每隔5分钟怎么样。
源码:分布式

public static ExecutorService newFixedThreadPool(int nThreads) {
 return new ThreadPoolExecutor(nThreads, nThreads,
     0L, TimeUnit.MILLISECONDS,
     new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newSingleThreadExecutor() {
 return new FinalizableDelegatedExecutorService
  (new ThreadPoolExecutor(1, 1,
     0L, TimeUnit.MILLISECONDS,
     new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newCachedThreadPool() {
 return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
     60L, TimeUnit.SECONDS,
     new SynchronousQueue<Runnable>());
}

从方法上来看,显然 FixedThreadPool,SingleThreadExecutor,CachedThreadPool都是ThreadPoolExecutor的不一样实例,只是参数不一样。ide

public ThreadPoolExecutor(int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue) {
 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  Executors.defaultThreadFactory(), defaultHandler);
}

来简述下 ThreadPoolExecutor构造函数中参数的含义。函数

corePoolSize 线程池中核心线程数的数目
maximumPoolSize 线程池中最多能容纳多少个线程
keepAliveTime 当如今线程数目大于corePoolSize时,超过keepAliveTime时间后,多出corePoolSize的那些线程将被终结。
unit keepAliveTime的单位
workQueue 当任务数量很大,线程池中线程没法知足时,提交的任务会被放到阻塞队列中,线程空闲下来则会不断从阻塞队列中取数据。
这样在来看上面所说的FixedThreadPool,它的线程的核心数目和最大容纳数目都是同样的,以致于在工做期间,并不会建立和销毁线程。当任务数量很大,线程池中的线程没法知足时,任务将被保存到LinkedBlockingQueue中,而LinkedBlockingQueue的大小是Integer.MAX_VALUE。这就意味着,任务不断地添加,会使内存消耗愈来愈大。

而CachedThreadPool则不一样,它的核心线程数量是0,最大容纳数目是Integer.MAX_VALUE,它的阻塞队列是SynchronousQueue,这是一个特别的队列,它的大小是0。因为核心线程数量是0,因此必然要将任务添加到SynchronousQueue中,这个队列只有一个线程在从中添加数据,同时另外一个线程在从中获取数据时,才能成功。独自往这个队列中添加数据会返回失败。当返回失败时,则线程池开始扩展线程,这就是为何CachedThreadPool的线程数目是不固定的。当60s该线程仍未被使用时,线程则被销毁。

1.4.线程池使用的小例子

1.4.1.简单线程池

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolDemo {
 public static class MyTask implements Runnable {
 @Override
 public void run() {
 System.out.println(System.currentTimeMillis() + "Thread ID:"
 + Thread.currentThread().getId());
 try {
 Thread.sleep(1000);
 } catch (Exception e) {
 e.printStackTrace();
 }
 }
 }

 public static void main(String[] args) {
 MyTask myTask = new MyTask();
 ExecutorService es = Executors.newFixedThreadPool(5);
 for (int i = 0; i < 10; i++) {
 es.submit(myTask);
 }
 }
}

因为使用的newFixedThreadPool(5),可是启动了10个线程,因此每次执行5个,而且 能够很明显的看到线程的复用,ThreadId是重复的,也就是前5个任务和后5个任务都是同一批线程去执行的。
这里用的是

es.submit(myTask);
还有一种提交方式:
es.execute(myTask);
区别在于submit会返回一个Future对象,这个将在之后介绍。

1.4.2.ScheduledThreadPool

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ThreadPoolDemo {
 public static void main(String[] args) {
 ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
 //若是前面的任务还未完成,则调度不会启动。
 ses.scheduleWithFixedDelay(new Runnable() {

 @Override
 public void run() {
 try {
 Thread.sleep(1000);
 System.out.println(System.currentTimeMillis()/1000);
 } catch (Exception e) {
 // TODO: handle exception
 }

 }
 }, 0, 2, TimeUnit.SECONDS);//启动0秒后执行,而后周期2秒执行一次
 }
}

输出:

1454832514
1454832517
1454832520
1454832523
1454832526
...
因为任务执行须要1秒,任务调度必须等待前一个任务完成。也就是这里的每隔2秒的意思是,前一个任务完成后2秒再开启新的一个任务。

  1. 扩展和加强线程池

2.1.回调接口
线程池中有一些回调的api来给咱们提供扩展的操做。

ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS,
 new LinkedBlockingQueue<Runnable>()){

 @Override
 protected void beforeExecute(Thread t, Runnable r) {
 System.out.println("准备执行");
 }

 @Override
 protected void afterExecute(Runnable r, Throwable t) {
 System.out.println("执行完成");
 }

 @Override
 protected void terminated() {
 System.out.println("线程池退出");
 }

 };

咱们能够经过实现ThreadPoolExecutor的子类去覆盖ThreadPoolExecutor的beforeExecute,afterExecute,terminated方法来实如今线程执行先后,线程池退出时的日志管理或其余操做。

2.2.拒绝策略

有时候,任务很是繁重,致使系统负载太大。在上面说过,当任务量愈来愈大时,任务都将放到FixedThreadPool的阻塞队列中,致使内存消耗太大,最终致使内存溢出。这样的状况是应该要避免的。所以当咱们发现线程数量要超过最大线程数量时,咱们应该放弃一些任务。丢弃时,咱们应该把任务记下来,而不是直接丢掉。

ThreadPoolExecutor中还有另外一个构造函数。

public ThreadPoolExecutor(int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler) {
 if (corePoolSize < 0 ||
  maximumPoolSize <= 0 ||
  maximumPoolSize < corePoolSize ||
  keepAliveTime < 0)
  throw new IllegalArgumentException();
 if (workQueue == null || threadFactory == null || handler == null)
  throw new NullPointerException();
 this.corePoolSize = corePoolSize;
 this.maximumPoolSize = maximumPoolSize;
 this.workQueue = workQueue;
 this.keepAliveTime = unit.toNanos(keepAliveTime);
 this.threadFactory = threadFactory;
 this.handler = handler;
 }

hreadFactory咱们在后面再介绍。
而handler就是拒绝策略的实现,它会告诉咱们,若是任务不能执行了,该怎么作。

Java 高并发六:JDK并发包2详解

共有以上4种策略。

AbortPolicy:若是不能接受任务了,则抛出异常。

CallerRunsPolicy:若是不能接受任务了,则让调用的线程去完成。

DiscardOldestPolicy:若是不能接受任务了,则丢弃最老的一个任务,由一个队列来维护。

DiscardPolicy:若是不能接受任务了,则丢弃任务。

ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS,
 new LinkedBlockingQueue<Runnable>(),
 new RejectedExecutionHandler() {

 @Override
 public void rejectedExecution(Runnable r,
 ThreadPoolExecutor executor) {
 System.out.println(r.toString() + "is discard");
 }
 });

固然咱们也能够本身实现RejectedExecutionHandler接口来本身定义拒绝策略。

2.3.自定义ThreadFactory

刚刚已经看到了,在ThreadPoolExecutor的构造函数中能够指定threadFactory。

线程池中的线程都是由线程工厂建立出来,咱们能够自定义线程工厂。

 
 

默认的线程工厂:

static class DefaultThreadFactory implements ThreadFactory {
 private static final AtomicInteger poolNumber = new AtomicInteger(1);
 private final ThreadGroup group;
 private final AtomicInteger threadNumber = new AtomicInteger(1);
 private final String namePrefix;

 DefaultThreadFactory() {
  SecurityManager s = System.getSecurityManager();
  group = (s != null) ? s.getThreadGroup() :
     Thread.currentThread().getThreadGroup();
  namePrefix = "pool-" +
    poolNumber.getAndIncrement() +
    "-thread-";
 }

 public Thread newThread(Runnable r) {
  Thread t = new Thread(group, r,
     namePrefix + threadNumber.getAndIncrement(),
     0);
  if (t.isDaemon())
  t.setDaemon(false);
  if (t.getPriority() != Thread.NORM_PRIORITY)
  t.setPriority(Thread.NORM_PRIORITY);
  return t;
 }
 }
  1. ForkJoin

3.1.思想

Java 高并发六:JDK并发包2详解

就是分而治之的思想。

fork/join相似MapReduce算法,二者区别是:Fork/Join 只有在必要时如任务很是大的状况下才分割成一个个小任务,而 MapReduce老是在开始执行第一步进行分割。看来,Fork/Join更适合一个JVM内线程级别,而MapReduce适合分布式系统。

4.2.使用接口

RecursiveAction:无返回值
RecursiveTask:有返回值

 
 

4.3.简单例子

import java.util.ArrayList;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

public class CountTask extends RecursiveTask<Long>{

 private static final int THRESHOLD = 10000;
 private long start;
 private long end;

 public CountTask(long start, long end) {
 super();
 this.start = start;
 this.end = end;
 }

 @Override
 protected Long compute() {
 long sum = 0;
 boolean canCompute = (end - start) < THRESHOLD;
 if(canCompute)
 {
 for (long i = start; i <= end; i++) {
 sum = sum + i;
 }
 }else
 {
 //分红100个小任务
 long step = (start + end)/100;
 ArrayList<CountTask> subTasks = new ArrayList<CountTask>();
 long pos = start;
 for (int i = 0; i < 100; i++) {
 long lastOne = pos + step;
 if(lastOne > end )
 {
 lastOne = end;
 }
 CountTask subTask = new CountTask(pos, lastOne);
 pos += step + 1;
 subTasks.add(subTask);
 subTask.fork();//把子任务推向线程池
 }
 for (CountTask t : subTasks) {
 sum += t.join();//等待全部子任务结束
 }
 }
 return sum;
 }

 public static void main(String[] args) {
 ForkJoinPool forkJoinPool = new ForkJoinPool();
 CountTask task = new CountTask(0, 200000L);
 ForkJoinTask<Long> result = forkJoinPool.submit(task);
 try {
 long res = result.get();
 System.out.println("sum = " + res);
 } catch (Exception e) {
 // TODO: handle exception
 e.printStackTrace();
 }
 }

}

上述例子描述了一个累加和的任务。将累加任务分红100个任务,每一个任务只执行一段数字的累加和,最后join后,把每一个任务计算出的和再累加起来。

4.4.实现要素

4.4.1.WorkQueue与ctl

每个线程都会有一个工做队列

static final class WorkQueue
在工做队列中,会有一系列对线程进行管理的字段
volatile int eventCount; // encoded inactivation count; < 0 if inactive
int nextWait; // encoded record of next event waiter
int nsteals; // number of steals
int hint; // steal index hint
short poolIndex; // index of this queue in pool
final short mode; // 0: lifo, > 0: fifo, < 0: shared
volatile int qlock; // 1: locked, -1: terminate; else 0
volatile int base; // index of next slot for poll
int top; // index of next slot for push
ForkJoinTask<?>[] array; // the elements (initially unallocated)
final ForkJoinPool pool; // the containing pool (may be null)
final ForkJoinWorkerThread owner; // owning thread or null if shared
volatile Thread parker; // == owner during call to park; else null
volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin
ForkJoinTask<?> currentSteal; // current non-local task being executed
这里要注意的是,JDK7和JDK8在ForkJoin的实现上有了很大的差异。咱们这里介绍的是JDK8中的。 在线程池中,有时不是全部的线程都在执行的,部分线程会被挂起,那些挂起的线程会被存放到一个栈中。内部经过一个链表表示。
nextWait会指向下一个等待的线程。

poolIndex线程在线程池中的下标索引。

eventCount 在初始化时,eventCount与poolIndex有关。总共32位,第一位表示是否被激活,15位表示被挂起的次数

eventCount,剩下的表示poolIndex。用一个字段来表示多个意思。

工做队列WorkQueue用ForkJoinTask<?>[] array来表示。而top,base来表示队列的两端,数据在这二者之间。

在ForkJoinPool中维护着ctl(64位long型)

volatile long ctl;

  • Field ctl is a long packed with:
    • AC: Number of active running workers minus target parallelism (16 bits)
    • TC: Number of total workers minus target parallelism (16 bits)
    • ST: true if pool is terminating (1 bit)
    • EC: the wait count of top waiting thread (15 bits)
    • ID: poolIndex of top of Treiber stack of waiters (16 bits)
      AC表示活跃的线程数减去并行度(大概就是CPU个数)

TC表示总的线程数减去并行度

ST表示线程池自己是不是激活的

EC表示顶端等待线程的挂起数

ID表示顶端等待线程的poolIndex

很明显ST+EC+ID就是咱们刚刚所说的 eventCount 。
那么为何明明5个变量,非要合成一个变量呢。其实用5个变量占用容量也差很少。

用一个变量代码的可读性上会差不少。

那么为何用一个变量呢?其实这点才是最巧妙的地方,由于这5个变量是一个总体,在多线程中,若是用5个变量,那么当修改其中一个变量时,如何保证5个变量的总体性。那么用一个变量则就解决了这个问题。若是用锁解决,则会下降性能。

用一个变量则保证了数据的一致性和原子性。

在ForkJoin中队ctl的更改都是使用CAS操做,在前面系列的文章中已经介绍过,CAS是无锁的操做,性能很好。

因为CAS操做也只能针对一个变量,因此这种设计是最优的。

 
 

4.4.2.工做窃取

接下来要介绍下整个线程池的工做流程。

每一个线程都会调用runWorker

final void runWorker(WorkQueue w) {
 w.growArray(); // allocate queue
 for (int r = w.hint; scan(w, r) == 0; ) {
  r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
 }
 }

scan()函数是扫描是否有任务要作。
r是一个相对随机的数字。

private final int scan(WorkQueue w, int r) {
 WorkQueue[] ws; int m;
 long c = ctl;    // for consistency check
 if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && w != null) {
  for (int j = m + m + 1, ec = w.eventCount;;) {
  WorkQueue q; int b, e; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
  if ((q = ws[(r - j) & m]) != null &&
   (b = q.base) - q.top < 0 && (a = q.array) != null) {
   long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
   if ((t = ((ForkJoinTask<?>)
    U.getObjectVolatile(a, i))) != null) {
   if (ec < 0)
    helpRelease(c, ws, w, q, b);
   else if (q.base == b &&
     U.compareAndSwapObject(a, i, t, null)) {
    U.putOrderedInt(q, QBASE, b + 1);
    if ((b + 1) - q.top < 0)
    signalWork(ws, q);
    w.runTask(t);
   }
   }
   break;
  }
  else if (--j < 0) {
   if ((ec | (e = (int)c)) < 0) // inactive or terminating
   return awaitWork(w, c, ec);
   else if (ctl == c) {  // try to inactivate and enqueue
   long nc = (long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
   w.nextWait = e;
   w.eventCount = ec | INT_SIGN;
   if (!U.compareAndSwapLong(this, CTL, c, nc))
    w.eventCount = ec; // back out
   }
   break;
  }``
  }
 }
 return 0;
 }

咱们接下来看看scan方法,scan的一个参数是WorkQueue,上面已经说过,每一个线程都会拥有一个WorkQueue,那么多个线程的WorkQueue就会保存在workQueues里面,r是一个随机数,经过r来找到某一个 WorkQueue,在WorkQueue里面有所要作的任务。
而后经过WorkQueue的base,取得base的偏移量。

b = q.base
..
long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
..
而后经过偏移量获得最后一个的任务,运行这个任务

t = ((ForkJoinTask<?>)U.getObjectVolatile(a, i))
..
w.runTask(t);
..
经过这个大概的分析理解了过程,咱们发现,当前线程调用scan方法后,不会执行当前的WorkQueue中的任务,而是经过一个随机数r,来获得其余 WorkQueue的任务。这就是ForkJoinPool的主要的一个机理。
当前线程不会只着眼于本身的任务,而是优先完成其余任务。这样作来,防止了饥饿现象的发生。这样就预防了某些线程由于卡死或者其余缘由而没法及时完成任务,或者某个线程的任务量很大,其余线程却没事可作。

而后来看看runTask方法
final void runTask(ForkJoinTask<?> task) {
  if ((currentSteal = task) != null) {
  ForkJoinWorkerThread thread;
  task.doExec();
  ForkJoinTask<?>[] a = array;
  int md = mode;
  ++nsteals;
  currentSteal = null;
  if (md != 0)
   pollAndExecAll();
  else if (a != null) {
   int s, m = a.length - 1;
   ForkJoinTask<?> t;
   while ((s = top - 1) - base >= 0 &&
    (t = (ForkJoinTask<?>)U.getAndSetObject
    (a, ((m & s) << ASHIFT) + ABASE, null)) != null) {
   top = s;
   t.doExec();
   }
  }
  if ((thread = owner) != null) // no need to do in finally clause
   thread.afterTopLevelExec();
  }
 }

有一个有趣的命名:currentSteal,偷得的任务,的确是刚刚解释的那样。 task.doExec();
将会完成这个任务。完成了别人的任务之后,将会完成本身的任务。 经过获得top来得到本身任务第一个任务

while ((s = top - 1) - base >= 0 && (t = (ForkJoinTask<?>)U.getAndSetObject(a, ((m & s) << ASHIFT) + ABASE, null)) != null)
{
 top = s;
 t.doExec();
}

接下来,经过一个图来总结下刚刚线程池的流程
Java 高并发六:JDK并发包2详解

好比有T1,T2两个线程,T1会经过T2的base来得到T2的最后一个任务(固然其实是经过一个随机数r来取得某个线程最后一个任务),T1也会经过本身的top来执行本身的第一个任务。反之,T2也会如此。
拿其余线程的任务都是从base开始拿的,本身拿本身的任务是从top开始拿的。这样能够减小冲突

若是没有找到其余任务

else if (--j < 0) {
   if ((ec | (e = (int)c)) < 0) // inactive or terminating
   return awaitWork(w, c, ec);
   else if (ctl == c) {  // try to inactivate and enqueue
   long nc = (long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
   w.nextWait = e;
   w.eventCount = ec | INT_SIGN;
   if (!U.compareAndSwapLong(this, CTL, c, nc))
    w.eventCount = ec; // back out
   }
   break;
  }

那么首先会经过一系列运行来改变ctl的值,得到了nc,而后用CAS将新的值赋值。而后就调用awaitWork()将线程进入等待状态(调用的 前面系列文章中提到的unsafe的park方法)。
这里要说明的是改变ctl值这里,首先是将ctl中的AC-1,AC是占ctl的前16位,因此不能直接-1,而是经过AC_UNIT(0x1000000000000)来达到使ctl的前16位-1的效果。

前面说过eventCount中有保存poolIndex,经过poolIndex以及WorkQueue中的nextWait,就能遍历全部的等待线程。

转自:https://www.jb51.net/article/92364.htm

相关文章
相关标签/搜索