记录ThreadPoolTaskExecutor线程池的在项目中的实际应用,讲解一下线程池的配置和参数理解。

前言:最近项目中与融360项目中接口对接,有反馈接口(也就是咱们接收到请求,须要当即响应,而且还要有一个接口推送给他们其余计算结果),推送过程耗时、或者说两个接口不能是同时返回,有前后顺序。

这时我想到了把本身Controller当即返回接受成功,中间添加一个新的线程去作其余耗时的操做(线程池配置和参数测试讲解请阅读第5步)。

一、Controller代码以下:java

@Autowired
private CallThreadDemo worker;
@RequestMapping("/bandBankConfirm2")
    public void bandBankConfirm2(String jsonString) {
        System.out.println("controller开始--------------");
    //这里是须要调用第三方的接口传入参数 String method
= "is.api.v3.order.bindcardfeedback"; Map<String, Object> map = new HashMap<>(); map.put("order_no", "254986512848973"); map.put("bind_status", 1); map.put("reason", "");     //这里开始调用线程池的方法 worker.callRong360(method, map); System.out.println("controller end --------------");
    //renderJson这个方法是jfinal的,能够理解为@ReponseBody 须要return的操做了
    renderJson("接口调用完毕");   
}
二、调用线程池的方法 CallThreadDemo 类代码:
@Component
public class CallThreadDemo {

   //这里是Spring.xml中配置的bean名称 @Autowired
private ThreadPoolTaskExecutor executor; @Autowired private ServiceTest serviceTest; public void callRong360(final String method, final Map<String, Object> map) { //这个类是我封装的抽象类,里面有一个公共方法,具体代码下面有 ServiceParent callRong360Method = new ServiceParent() { @Override public void run() { try { Thread.sleep(20000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程开始-------------");
          //这里调用第三方公用接口 JSONObject result
= callBandBankMethod(method, map);

          //这里调用service方法,实现本身的业务逻辑 serviceTest.insertUser(
"111", "222222");
System.out.println(result); System.out.println(
"线程结束-------------"); } };
     //这里线程池方法调用一个线程继承类或者实现Runable接口的类 executor.execute(callRong360Method); System.out.println(
"当前活动线程数:"+ executor.getActiveCount()); System.out.println("核心线程数:"+ executor.getCorePoolSize()); System.out.println("总线程数:"+ executor.getPoolSize()); System.out.println("最大线程池数量"+executor.getMaxPoolSize()); System.out.println("线程处理队列长度"+executor.getThreadPoolExecutor().getQueue().size()); }

三、封装的抽象类代码以下:web

public abstract class ServiceParent implements Runnable {

    public JSONObject callBandBankMethod(String method, Map<String, Object> map) {
//这个方法是调用三方的接口,公用部分
// //输入 参数以下 : // Map<String, Object> map = new HashMap<>(); // map.put("order_no", "254986512848973"); // map.put("bind_status", 1); // map.put("reason", ""); // net.sf.json.JSONObject ret = callRong360Method.callBandBankMethod("is.api.v3.order.bindcardfeedback", map); // // // 异常状况输出参数 为 null: OpenapiClient openapiClient = new OpenapiClient(); openapiClient.setMethod(method); for (Map.Entry<String, Object> entry : map.entrySet()) { openapiClient.setField(entry.getKey(), String.valueOf(entry.getValue())); } net.sf.json.JSONObject ret = null; try { ret = openapiClient.execute(); } catch (Exception e) { e.printStackTrace(); System.out.println("调用反馈接口异常---->" + e.getMessage()); } return ret; }   //这里定义为抽象方法,建立匿名内部类或者继承类必须实现 public abstract void run();

四、ServiceTest接口方法以下:spring

@Service
public class ServiceTest {

    @Autowired
    private BankMapper bankMapper;

    @Transactional(rollbackFor = {Exception.class})
    public void insertUser(String s, String s1) {
        bankMapper.insertUser(s, s1);

         //这里建立异常,测试事务。已测试,事务生效
        int i = 1 / 0;
    }
}

以上四步,成功在调用Controller方法当即返回结果,也实现了另外的一个线程去调用三方接口返回信息,而且实现了本身的业务逻辑。json

五、接着咱们继续说明一下线程池对应配置和参数说明,这里咱们经过测试来讲明参数的做用(corePoolSize 核心线程数量,maxPoolSize 最大线程数,queueCapacity 处理队列)api

  threadpool.corePoolSize=5
  threadpool.keepAliveSeconds=200
  threadpool.maxPoolSize=10
  threadpool.queueCapacity=2
  <!-- 线程池 -->
    <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="corePoolSize" value="${threadpool.corePoolSize}" />
        <property name="keepAliveSeconds" value="${threadpool.keepAliveSeconds}" />
        <property name="maxPoolSize" value="${threadpool.maxPoolSize}" />
        <property name="queueCapacity" value="${threadpool.queueCapacity}" />
    </bean>
        <!--请在bean xsd中配置task-->
    <task:annotation-driven executor="taskExecutor" />
    

 

下面贴出测试输出的日志信息:controller开始--------------app

当前活动线程数:1 核心线程数:5 总线程数:1 最大线程池数量10 线程处理队列长度0 controller end --------------

controller开始-------------- 当前活动线程数:2 核心线程数:5 总线程数:2 最大线程池数量10 线程处理队列长度0 controller end --------------

controller开始-------------- 当前活动线程数:3 核心线程数:5 总线程数:3 最大线程池数量10 线程处理队列长度0 controller end --------------

controller开始-------------- 当前活动线程数:4 核心线程数:5 总线程数:4 最大线程池数量10 线程处理队列长度0 controller end --------------

controller开始-------------- 当前活动线程数:5 核心线程数:5 总线程数:5 最大线程池数量10 线程处理队列长度0 controller end --------------

controller开始-------------- 当前活动线程数:5---------------》由于放入了队列中,此处活动线程数仍是5 核心线程数:5 总线程数:5 最大线程池数量10 线程处理队列长度1 -------------》这里达到了最大的核心线程数量 corePoolSize=5,开始放入处理队列中
queueCapacity=2
controller end --------------
controller开始-------------- 当前活动线程数:5 核心线程数:5 总线程数:5 最大线程池数量10 线程处理队列长度2 ---------------》继续放入队列中,达到了最大队列数量2 controller end --------------

controller开始-------------- 当前活动线程数:6-----------------》这里由于达到了最大队列数量,因此继续建立线程去执行,一直到最后到最大线程数量 核心线程数:5 总线程数:6 最大线程池数量10 线程处理队列长度2 controller end --------------

controller开始-------------- 当前活动线程数:7 核心线程数:5 总线程数:7 最大线程池数量10 线程处理队列长度2 controller end --------------

controller开始-------------- 当前活动线程数:8 核心线程数:5 总线程数:8 最大线程池数量10 线程处理队列长度2 controller end --------------

controller开始-------------- 当前活动线程数:9 核心线程数:5 总线程数:9 最大线程池数量10 线程处理队列长度2 controller end --------------

controller开始-------------- 当前活动线程数:10 --------------》这里活动线程数量达到了最大线程池数量 核心线程数:5 总线程数:10 最大线程池数量10 线程处理队列长度2 controller end --------------

controller开始-------------- 2018-07-01 20:23:19 -----------------》这里继续调用,由于最大线程池数量和队列中都已经到了最大值,抛出了异常 [] [] [WARN]-[Thread: qtp1276504061-60]-[org.springframework.web.servlet.handler.AbstractHandlerExceptionResolver.logException()]: Handler execution resulted in exception org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@17d95b77[Running, pool size = 10, active threads = 10, queued tasks = 2, completed tasks = 0]] did not accept task: com.fastx.cooperate.rong360.rong.service.CallThreadDemo$1@5fa6bf1 at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:245) at com.fastx.cooperate.rong360.rong.service.CallThreadDemo.callRong360(CallThreadDemo.java:43)

这样,经过输出的日志,咱们能够很容易的理解了各个参数的做用。ide

corePoolSize: 线程池维护线程的最少数量 
keepAliveSeconds 线程池维护线程所容许的空闲时间 
maxPoolSize 线程池维护线程的最大数量 
queueCapacity 线程池所使用的缓冲队列oop

 

 

ThredPoolTaskExcutor的处理流程:测试

 

  当池子大小小于corePoolSize,就新建线程,并处理请求spa

 

  当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去workQueue中取任务并处理

 

  当workQueue放不下任务时,就新建线程入池,并处理请求,若是池子大小撑到了maximumPoolSize,就用RejectedExecutionHandler来作拒绝处理

 

  当池子的线程数大于corePoolSize时,多余的线程会等待keepAliveTime长时间,若是无请求可处理就自行销毁

 

  其会优先建立  CorePoolSiz 线程, 当继续增长线程时,先放入Queue中,当 CorePoolSiz  和 Queue 都满的时候,就增长建立新线程,当线程达到MaxPoolSize的时候,就会抛出错 误 org.springframework.core.task.TaskRejectedException

 

  另外MaxPoolSize的设定若是比系统支持的线程数还要大时,会抛出java.lang.OutOfMemoryError: unable to create new native thread 异常。

相关文章
相关标签/搜索