在多线程编程中一项很重要的功能就是执行任务,而执行任务的方式有不少种,为何必定须要使用线程池呢?下面咱们使用Socket编程处理请求的功能,分别对每种执行任务的方式进行分析。java
当Socket监听到客户端有链接,经过handleSocket方法顺序的处理每个客户端链接,当处理完成后,继续监听。代码以下:数据库
ServerSocket serverSocket = new ServerSocket(); SocketAddress endpoint = new InetSocketAddress(host, port); serverSocket.bind(endpoint,1023); while (!isStop) { Socket socket = serverSocket.accept(); handleSocket(socket); }
这种方式的缺点很是明显:当我有多个客户端请求时,在server处理一个请求的过程当中,其余请求都须要等待前一个请求处理完毕。这种在高并发状况下几乎不可用。编程
针对上面的问题进行优化:为每个客户端请求建立一个线程来处理请求,主线程只须要建立线程,以后便可继续坚挺客户端请求.流程图以下:
安全
代码以下:服务器
ServerSocket serverSocket = new ServerSocket(); SocketAddress endpoint = new InetSocketAddress(host, port); serverSocket.bind(endpoint,1023); while (!isStop) { Socket socket = serverSocket.accept(); new SocketHandler(socket, THREAD_NAME_PREFIX + threadIndex++).start(); }
这种方式有如下优势:多线程
1.将处理客户端链接的操做从主线程中分离出去,使得主循环能够更快的响应下一次请求。
2.处理客户端链接的操做是并行的,提升了程序的吞吐量。并发
可是这种方式有有如下几个缺点:异步
1.处理请求的线程必须是线程安全的socket
2.线程的建立和销毁都须要开销,当大量建立线程的时候,将会消耗大量计算机资源ide
3.当可用的CPU数量小于可运行的线程的时候,那么多出来的线程会占用内存资源,给垃圾回收带来压力,而且在大量线程竞争CPU资源的时候会有很大的性能开销
4.JVM中可建立的线程数存在一个上限,这个上限随着平台的不一样而不一样,而且受多个因素的限制,包括JVM的启动参数,每一个线程所占用的内存大小等,若是超出这些限制,将会抛出OOM异常。
对于1.2中出现的问题,最好的解决方案就是使用线程池来执行task,这样能够对建立的线程总数作限制,从而避免1.2中的问题。流程图以下:
处理方式以下:
ServerSocket serverSocket = new ServerSocket(); SocketAddress endpoint = new InetSocketAddress(host, port); serverSocket.bind(endpoint,1023); while (!isStop) { Socket socket = serverSocket.accept(); executorService.execute(new SocketHandler(socket, THREAD_NAME_PREFIX + threadIndex++)); }
此中方式有如下几个优势:
1.任务提交和任务执行分离开
2.执行任务的线程能够重用,减小了线程建立和销毁的开销,同时当任务到达时能够直接使用建立好的线程执行任务,也提升了程序的响应速度。
在java中线程池的实现是基于生产者-消费者模式的,线程池的功能将任务的提交和任务的执行分离,任务提交的过程为生产者,执行任务的过程为消费过程。具体的分析见源码分析。java线程池的顶层接口为Executor,源码以下:
public interface Executor { void execute(Runnable command); }
此接口为全部线程池实现的顶层接口,其规定了能够接受的task类型为Runnable实现类,可是具体的执行task的逻辑由线程池实现类本身定义,好比:
可使用主线程串行执行任务,
也能够为每一个任务建立一个新的线程
或者提早建立好一组线程,每次执行任务的时候从一组线程中取,等等
对于线程池的执行策略主要有如下几个方面:
1.在什么线程中执行任务
2.按照什么顺序执行任务(FIFO、LIFO、优先级?)
3.有多少个任务能够并发执行
4.最多能够有多少个任务在队列中等待执行
5.当等待队列中达到最大值的时候,怎么样拒绝新提交的task
6.在执行一个任务以前或者以后须要作哪些操做?
应该根据具体的业务选择不一样的执行策略。在java类库中提供了Executors工具类来常见默认策略的线程池。主要有如下几个接口:
public static ExecutorService newFixedThreadPool(int nThreads) 将会建立一个固定大小的线程池,每当有新任务提交的时候,当线程总数没有达到核心线程数的时候,为每一个任务建立一个新线程,当线程的个数到达最大值后,重用以前建立的线程,当线程由于未知异常而中止时候,将会重现建立一个线程做为补充。 public static ExecutorService newCachedThreadPool() 根据需求建立线程的个数,当线程数大于任务数的时候,将会注销多余的线程 public static ExecutorService newSingleThreadExecutor() 建立一个单线程的线程池 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) 建立一个可执行定时任务的线程池
在以上的例子中,全部提交的task在提交到线程池后其执行状态是不可见的,即主线程没法知道提交的task是否执行结束或者执行结果。针对这个问题,java提供了能够返回数据的task接口Future和Callable接口。
其中Callable接口提供了任务返回数据以及抛出异常的功能,定义以下:
public interface Callable<V> { V call() throws Exception; }
在ExecutorService中全部的submit方法都会返回一个Future对象,其接口定义以下:
public interface Future<V> { 取消任务执行,当mayInterruptIfRunning为true,interruptedthisthread boolean cancel(boolean mayInterruptIfRunning); 返回此任务是否在执行完毕以前被取消执行 boolean isCancelled(); 返回此任务是否已经完成,包括正常结束,异常结束以及被cancel boolean isDone(); 返回执行结果,当任务没有执行结束的时候,等待 V get() throws InterruptedException, ExecutionException; }
1.线程饥饿死锁
在单线程的Executor中,若是Executor中执行的一个任务中,再次提交任务到同一个Executor中,而且等待这个任务执行完毕,那么就会发生死锁问题。以下demo中所示:
public class ThreadDeadLock { private static final ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadExecutor(); public static void main(String[] args) throws Exception { System.out.println("Main Thread start."); EXECUTOR_SERVICE.submit(new DeadLockThread()); System.out.println("Main Thread finished."); } private static class DeadLockThread extends Thread{ @Override public void run() { try { System.out.println("DeadLockThread start."); Future future = EXECUTOR_SERVICE.submit(new DeadLockThread2()); future.get(); System.out.println("DeadLockThread finished."); } catch (Exception e) { } } } private static class DeadLockThread2 extends Thread { @Override public void run() { try { System.out.println("DeadLockThread2 start."); Thread.sleep(1000 * 10); System.out.println("DeadLockThread2 finished."); } catch (Exception e) { } } } }
输出结果为:
Main Thread start. Main Thread finished. DeadLockThread start.
对于多个线程的线程池,若是全部正在执行的线程都由于等待处于工做队列中的任务执行而阻塞,那么就会发生线程饥饿死锁。
当往线程池中提交有依赖的任务时,应清楚的知道可能会出现的线程饥饿死锁风险。==应考虑是否将依赖的task提交到不一样的线程池中==
或者使用无界的线程池。
==只有当任务相对独立时,设置线程池大小和工做队列的大小才是合理的,不然有可能会出现线程饥饿死锁==
2.任务运行时间过长
任务执行时间过长会影响线程池的响应时间,当运行时间长的任务远大于线程池线程的个数时,会出现全部线程都在执行运行时间长的任务,从而影响对其余任务的响应。
解决办法:
1.经过限定任务等待的时长,而不要无限期等待下去,当等待超时的时候,能够将任务标记为失败,或者从新放到线程池中。
2.当线程池中阻塞任务过多的时,应该考虑扩大线程池的大小
线程池的大小依赖于提交任务的类型以及服务器的可用资源,线程池的大小应该避免设置过大或者太小,当线程设置过打的时候可能会有资源耗尽的风险,线程池设置太小会有可用cpu空闲从而影响系统吞吐量。
影响线程池大小的资源有不少,好比CPU、内存、数据库连接池等,只须要计算资源可用总资源 / 每一个任务须要的资源,取最小值,便可得出线程池的上限。
线程池的最小值应该大于可用的CPU数量。
ThreadPoolExecutor线程池是比较经常使用的一个线程池实现类,经过Executors工具类建立的线程池中,其具体实现类是ThreadPoolExecutor。首先咱们能够看下ThreadPoolExecutor的构造函数以下:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
下面分别对构造函数中的各个参数对应的策略进行分析:
1.线程的建立与销毁
首先构造函数中corePoolSize、maximumPoolSize、keepAliveTime和unit参数影响线程的建立和销毁。其中corePoolSize为核心线程数,当第一次提交任务的时候若是正在执行的线程数小于corePoolSize,则新建一个线程执行task,若是已经超过corePoolSize,则将任务放到任务队列中等待执行。当任务队列的个数到达上限的时候,而且工做线程数量小于maximumPoolSize,则继续建立线程执行工做队列中的任务。当任务的个数小于maximumPoolSize的时候,将会把空闲的线程标记为可回收的垃圾线程。对于如下代码段测试此功能:
public class ThreadPoolTest { private static ThreadPoolExecutor executorService = new ThreadPoolExecutor(3, 6,100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3)); public static void main(String[] args) throws Exception { for (int i = 0; i< 9; i++) { executorService.submit(new Task()); System.out.println("Active thread:" + executorService.getActiveCount() + ".Task count:" + executorService.getTaskCount() + ".TaskQueue size:" + executorService.getQueue().size()); } } private static class Task extends Thread { @Override public void run() { try { Thread.sleep(1000 * 100); } catch (Exception e) { } } } }
输出结果为:
Active thread:1.Task count:1.TaskQueue size:0 Active thread:2.Task count:2.TaskQueue size:0 Active thread:3.Task count:3.TaskQueue size:0 Active thread:3.Task count:4.TaskQueue size:1 Active thread:3.Task count:5.TaskQueue size:2 Active thread:3.Task count:6.TaskQueue size:3 Active thread:4.Task count:7.TaskQueue size:3 Active thread:5.Task count:8.TaskQueue size:3 Active thread:6.Task count:9.TaskQueue size:3
2.任务队列
在ThreadPoolExecutor的构造函数中能够传入保存任务的队列,当新提交的任务没有空闲线程执行时候,会将task保存到此队列中。保存的顺序是根据插入的顺序或者Comparator来排序的。
3.饱和策略
ThreadPoolExecutor.AbortPolicy 抛出RejectedExecutionException ThreadPoolExecutor.CallerRunsPolicy 将任务的执行交给调用者,即将本该异步执行的任务变成同步执行。
4.线程工厂
当线程池须要建立线程的时候,默认是使用线程工厂方法来建立线程的,一般状况下咱们经过指定线程工厂的方式来为线程命名,便于出现线程安全问题时候来定位问题。
1.项目中全部的线程应该都有线程池来提供,不容许自行建立线程
2.尽可能不要用Executors来建立线程,而是使用ThreadPoolExecutor来建立
Executors有如下问题:
1)FixedThreadPool 和 SingleThreadPool: 容许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而致使 OOM。 2)CachedThreadPool 和 ScheduledThreadPool: 容许的建立线程数量为 Integer.MAX_VALUE,可能会建立大量的线程,从而致使 OOM。