最近在代码中使用@Async来进行异步抄单,进行压测时出现了OOM问题,经过日志信息看到了:unable to create new native threadreact
初步怀疑是建立的线程太多致使的,使用jstack 线程PID 分析打印的日志,发现大量线程处于Runnable状态,基本能够肯定是建立线程太多致使的。缓存
出问题的服务是报案的服务,在进行抄单时在方法上使用了@Asycn这个注解,进行异步抄单,经过trace-log日志能够看出建立了不少的Thread,通过简单的了解@Async异步配置使用的是SimpleAsyncTaskExecutor,该线程池默认来一个任务建立一个线程,在压测的状况下,会有大量的请求去抄单,这时会不断建立大量的线程,极有可能压爆服务器内存。bash
借此机会学习一下SimpleAsyncTaskExecutor的源码,SimpleAsyncTaskExecutor提供了限流机制,经过concurrencyLimit属性来控制开关,当concurrencyLimit>=0时开启限流机制,默认关闭限流机制即concurrencyLimit=-1,当关闭状况下,会不断建立新的线程来处理任务,核心代码以下:服务器
@Override
public void execute(Runnable task, long startTimeout) {
Assert.notNull(task, "Runnable must not be null");
Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task);
if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
this.concurrencyThrottle.beforeAccess();
doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
}
else {
doExecute(taskToUse);
}
}
复制代码
SimpleAsyncTaskExecutor限流实现:异步
首选任务进来,会循环判断当前执行线程数是否超过concurrencyLimit,若是超了,则当前线程调用wait方法,释放monitor对象锁,进入等待async
protected void beforeAccess() {
if (this.concurrencyLimit == NO_CONCURRENCY) {
throw new IllegalStateException(
"Currently no invocations allowed - concurrency limit set to NO_CONCURRENCY");
}
if (this.concurrencyLimit > 0) {
boolean debug = logger.isDebugEnabled();
synchronized (this.monitor) {
boolean interrupted = false;
while (this.concurrencyCount >= this.concurrencyLimit) {
if (interrupted) {
throw new IllegalStateException("Thread was interrupted while waiting for invocation access, " +
"but concurrency limit still does not allow for entering");
}
if (debug) {
logger.debug("Concurrency count " + this.concurrencyCount +
" has reached limit " + this.concurrencyLimit + " - blocking");
}
try {
this.monitor.wait();
}
catch (InterruptedException ex) {
// Re-interrupt current thread, to allow other threads to react.
Thread.currentThread().interrupt();
interrupted = true;
}
}
if (debug) {
logger.debug("Entering throttle at concurrency count " + this.concurrencyCount);
}
this.concurrencyCount++;
}
}
}
复制代码
线程任务执行完毕后,当执行线程数会减一,会调用monitor对象的notify方法,唤醒等待状态下的线程,等待状态下的线程会竞争monitor锁,竞争到,会继续执行线程任务。ide
protected void afterAccess() {
if (this.concurrencyLimit >= 0) {
synchronized (this.monitor) {
this.concurrencyCount--;
if (logger.isDebugEnabled()) {
logger.debug("Returning from throttle at concurrency count " + this.concurrencyCount);
}
this.monitor.notify();
}
}
}
复制代码
最终的解决办法:使用自定义线程池学习
@Configuration
@Slf4j
public class AppConfig implements AsyncConfigurer {
public static final String
ASYNC_EXECUTOR_NAME = "asyncExecutor";
@Bean(name = ASYNC_EXECUTOR_NAME)
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
//核心线程数,默认值为1
threadPoolTaskExecutor.setCorePoolSize(10);
//最大线程数
threadPoolTaskExecutor.setMaxPoolSize(20);
//缓存队列
threadPoolTaskExecutor.setQueueCapacity(256);
//超出核心线程数以外的线程在空闲时间最大的存活时间
threadPoolTaskExecutor.setKeepAliveSeconds(60);
threadPoolTaskExecutor.setTaskDecorator(new ContextDecorator());
//线程的前缀
threadPoolTaskExecutor.setThreadNamePrefix("AsyncThread-");
//是否等待全部的线程关闭以后采起关闭线程池,默认是false
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
//等待时长
threadPoolTaskExecutor.setAwaitTerminationSeconds(60);
//拒绝策略
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
/**
* <h2>定义异步任务异常处理类</h2>
*
* @return
*/
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new AsyncExceptionHandler();
}
class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
log.info("AsycError:{},Method :{},Param:{}", ex.getMessage(), method.getName(), JSON.toJSON(params));
ex.printStackTrace();
//TODO 发送邮件或者发送短信
}
}
}
复制代码
就不会出现一直建立Thread的状况,致使OOM。ui