获取多线程的方法,咱们都知道有三种,还有一种是实现Callable接口java
当咱们实现Ruannable接口的时候,须要重写run方法,也就是线程启动的时候,会自动调用的方法。算法
同理,咱们实现Callable接口,也须要实现call方法,可是这个时候咱们还须要有返回值。这个Callable接口的应用场景通常在于批处理业务,好比转帐的时候,须要给返回结果的状态码回来,表明本次操做成功仍是失败。数据库
public class MyThread implements Callable<Integer> { @Override public Integer call() throws Exception { System.out.println("进入了Callable方法"); return 1024; } }
而后经过Thread线程,将MyThread实现Callable接口的类包装起来。小程序
这里须要用到FutureTask
类,他实现了Runnable接口,类的构造函数须要传递一个实现Callable接口的类。缓存
public static void main(String[] args){ // 一、实例化FutureTask类,传进去一个实现了Callable的类 FutureTask<Integer> futureTask = new FutureTask<>(new MyThread()); // 二、而后在用Thread进行实例化,传入实现Runnable接口的FutureTask的类 Thread t1 = new Thread(futureTask, "A线程"); t1.start(); // 三、最后经过 futureTask.get() 获取到返回值 System.out.println("FutureTask的返回值为:" + futureTask.get()); }
原来咱们的方式是一个main方法冰糖葫芦似的串下去,引入Callable后,对于执行比较久的线程,能够单独新开一个线程进行执行,最后再进行汇总输出。服务器
若是咱们用
get()
获取Callable的计算结果,可是若是并无计算完成,会致使阻塞,直到计算完成为止。也就是说,futureTask.get()
须要放在最后执行,这样不会致使主线程阻塞。网络//也可使用下面算法,使用相似自旋锁的方式来进行判断是否运行完毕 while(!futureTask.isDone()){ }
多个线程执行一个FutureTask对象的时候,只会计算一次。多线程
FutureTask<Integer> futureTask = new FutureTask<>(new MyThread()); // 开启两个线程计算futureTask new Thread(futureTask, "A线程").start(); new Thread(futureTask, "B线程").start();
若是咱们须要两个线程同时计算任务的话,那么须要定义两个FutureTask对象架构
FutureTask<Integer> futureTask1 = new FutureTask<>(new MyThread); FutureTask<Integer> futureTask2 = new FutureTask<>(new MyThread); // 开启两个线程计算futureTask new Thread(futureTask1, "A线程").start(); new Thread(futureTask2, "B线程").start();
线程池作的主要工做就是控制运行的线程的数量,处理过程当中,将任务放入到阻塞队列中,而后线程建立后,启动这些任务,若是线程数量超过了最大数量的线程排队等候,等其它线程执行完毕,再从队列中取出任务来执行。并发
它的主要特色为:线程复用、控制最大并发数、管理线程
说白了就是一个线程集合(workerSet)和一个阻塞队列(workQueue)。当用户向线程池提交一个任务时,线程池会先将任务放入阻塞队列中。线程集合中的线程会不断的从阻塞队列中获取任务执行,当阻塞队列中没有任务的时候,就会阻塞,直到队列中有任务了就取出来继续执行。
任务:客户。 线程集合:办理窗口。 阻塞队列:候客区。
Java中线程池是经过Executor框架实现的,该框架中用到了Executor
,Executors
(辅助工具类),ExecutorService
,ThreadPoolExecutor
这几个类。
ExecutorService threadPool = Executors.newFixedThreadPool(5);
ExecutorService threadPool = Executors.newSingleThreadExecutor();
ExecutorService threadPool = Executors.newCacheThreadPool();
ExecutorService threadPool = Executors.newFixedThreadPool(5); // 模拟10个用户来办理业务,每一个用户就是一个来自外部请求线程 try { // 循环十次,模拟业务办理,让5个线程处理这10个请求 for (int i = 0; i < 10; i++) { final int tempInt = i; threadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + " 给用户:" + tempInt + "办理业务"); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); // 用池化技术,必定要记得关闭 }
结果:
咱们可以看到,一共有5个线程,在给10个用户办理业务。
咱们经过查看源码,发现底层都是使用了ThreadPoolExecutor。
由于在单线程池和固定线程池中,线程数量是有限的,所以提交的任务须要在队列中等待空闲的线程。
而在缓存线程池中,线程数量几乎无限,所以提交的任务在 SynchronousQueue 队列中同步给空余线程便可。
Tips:SynchronousQueue是一个没有存储空间的队列,生产者进行 put 操做时,必需要等待消费者的 take 操做。
线程池在建立的时候,一共有7大参数:
workQueue:任务队列,被提交的但未被执行的任务(相似于银行里面的候客区)
整个线程池的工做就像银行办理业务同样。
ps:临时增长的业务窗口,会先处理那些后面来的,没位置坐的客户。(候客区客户os:凭什么= =)
如下全部拒绝策略都实现了RejectedExecutionHandler接口
RejectedExcutionException
异常,阻止系统正常运行工做中应该用哪个方法来建立线程池呢?答案是一个都不用,咱们生产上只能使用自定义的。
阿里巴巴开发手册:【强制】线程池不容许使用Executors去建立,而是经过ThreadPoolExecutor的方式,这样的处理方式让写的同窗更加明确线程池的运行规则,规避资源耗尽的风险。
弊端以下:
- FixedThreadPool 和 SingleThreadPool 使用了无界队列(LinkedBlockingQueue),可能会堆积大量的请求,从而致使OOM
- CacheThreadPool 和 ScheduledThreadPool 的最大线程为 Integer.MAX_VALUE,可能会建立大量的线程,从而致使OOM
ExecutorService threadPool = new ThreadPoolExecutor( 2, 5, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3),//候客区3个座位 Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
//而后使用for循环,模拟10个用户来进行请求 for (int i = 0; i < 10; i++) { final int tempInt = i; threadPool.execute(() -> { System.out.println(Thread.currentThread().getName()+" 给用户:"+tempInt+"办理业务"); }); }
可是用户执行到第九个的时候,触发了异常,程序中断。
这是由于触发了AbortPolicy的拒绝策略:直接报异常。
触发条件是,请求的线程大于 阻塞队列大小 + 最大线程数 = 8的时候,也就是说第9个线程来获取线程池中的线程时,就会抛出异常。
也称为回退策略,就是用调用者所在的线程处理任务。
咱们看运行结果:
咱们发现,输出的结果里面出现了main线程,由于线程池触发了拒绝策略,把任务回退到main线程,而后main线程对任务进行处理。
DiscardPolicy拒绝策略、DiscardOldestPolicy拒绝策略
这两种策略都是把任务丢弃。
前者丢弃的是,进来排队排不上的任务。
后者丢弃的是当前队列中最老的任务,即排队下一个就到你了,可是由于有人进来,致使你被丢弃了(为何这么惨?)
。处理逻辑以下:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); //用 poll() 移除队列中的首个元素 e.execute(r); //执行当前任务 } }
生产环境中如何配置 corePoolSize 和 maximumPoolSize?。这个是根据具体业务来配置的,分为CPU密集型和IO密集型。
//能够看CPU是几核的 Runtime.getRuntime().availableProcessors();
CPU密集型也叫计算密集型,指的是系统的硬盘、内存性能比CPU要好,CPU的IO操做很快,可是CPU还有不少运算要处理,致使系统的CPU大部分都是 100%。
须要尽量少的线程数量,通常为:CPU核数 + 1
即该任务须要大量的IO,即大量的阻塞。IO包括:数据库交互,文件上传下载,网络传输等
IO密集型指的是系统的CPU性能相对硬盘、内存要好不少。此时大部分的情况是CPU在等IO操做,则应配置尽量多的线程,如 CPU核数 * 2。
参考公式:CPU核数 / (1 - 阻塞系统)。通常阻塞系数是0.9,好比 8 核CPU,应该配置 8 / 0.1 = 80 个线程数
若是执行方式是 execute 时,会看到堆栈异常的输出。
当执行方式是 submit 时,堆栈异常没有输出。而且调用 Future.get() 方法时,能够捕获到异常。不会影响线程池里面其余线程的正常执行,线程池会把这个异常的线程移除掉,并建立一个新的线程放入线程池中。?