简单的描述一下业务场景,项目里面有一个分布式定时job ,按期去扒取数据,那么有两层循环,第一层是大概1000条数据 ,而后第二层循环每一个数据下面大概20个子数据,而后经过多线程的方式去扒取数据入库。这就是一个简单的业务背景。这个对刚入行的小白写代码得注意了!程序员
做为程序员的我也是第一次遇到这个问题,虽然这个问题解决很简单,可是形成的影响很大,我以为仍是有必要作一个小总结,小伙伴们之后写相似代码的时候就会有注意,这个问题的形成是我一个同事没有理解透线程池,致使的一个很大的问题。那么先说问题以前先扒拉扒拉线程池。数据库
首先我先说明一点在企业中通常都是自定义线程池,不多使用jdk 给咱们提供的几种线程池方法,这样是为了作到一个可控制。bash
这里帮你们只是一次简单的回顾吧,具体线程池网上已经一大片一大片的文章了。多线程
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
private static ThreadPoolExecutor poolExecutor = new
ThreadPoolExecutor(3,
30,
1,
TimeUnit.MINUTES,
new ArrayBlockingQueue(1000),
new MyRejectedExecutionHandler(MAX_QUEUE_SIZE));
复制代码
那么说到线程池无非就理解这几个参数。并发
流程异步
那么上面也是整个线程池的核心流程作了一个描述。分布式
上面已经描述了线程池的流程和原理,下面自定义线程池直接贴代码了,就不作过多的阐述了。ide
// 定义一个线程池类
public class MyWorkerThreadPool {
private static final int MAX_QUEUE_SIZE = 1000;
private static ThreadPoolExecutor poolExecutor = new
ThreadPoolExecutor(3,
30,
1,
TimeUnit.MINUTES,
new ArrayBlockingQueue(MAX_QUEUE_SIZE),
new MyRejectedExecutionHandler(MAX_QUEUE_SIZE));
public static void submitTak(Runnable run) {
poolExecutor.submit(run);
}
public static void shutdown() {
poolExecutor.shutdown();
}
}
// 定义一个拒绝策略
public class MyRejectedExecutionHandler implements RejectedExecutionHandler{
private final Log logger = LogFactory.getLog(this.getClass());
private int maxQueueSize;
public MyRejectedExecutionHandler(int maxQueueSize){
this.maxQueueSize=maxQueueSize;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("提交任务失败了" +
"当前队列最大长度" +
"maxQueueSize="+maxQueueSize+
",maximumPoolSize="+executor.getMaximumPoolSize());
if(executor.getQueue().size()<maxQueueSize){
executor.submit(r);
}else{
try {
Thread.sleep(3000);
executor.submit(r);
}catch (Exception e){
//此异常忽略
executor.submit(r);
}
}
}
}
复制代码
那么接下来就是重点了,这里只贴一部分伪代码,前面已经说了这是一个分布式定时job,这里是我精简了同事的代码提炼出来的。this
//模拟场景
for(int i= 0;i< 1000;i++){
for(int j = 0;j<20;j++){
MyWorkerThreadPool.submitTak(()->{
// 真实业务场景这里很是耗时,
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
执行结果:
提交任务失败了当前队列最大长度maxQueueSize=1000,maximumPoolSize=30
提交任务失败了当前队列最大长度maxQueueSize=1000,maximumPoolSize=30
提交任务失败了当前队列最大长度maxQueueSize=1000,maximumPoolSize=30
提交任务失败了当前队列最大长度maxQueueSize=1000,maximumPoolSize=30
提交任务失败了当前队列最大长度maxQueueSize=1000,maximumPoolSize=30
复制代码
这里 第一层模拟了1000条数据,第二层循环30条数据,同事的代码,致使同一时间咱们自定义的线程队列爆满,2000*30 这个是要开多少个线程啊。细细想下是否是很恐怖,这么使用会致使什么后果呢,当前业务又不少都被拒绝掉没有执行,另外当线程池爆满后,咱们项目的其它功能执行异步方法也会被拒绝掉,结果可想而知。spa
那么如何解决了,其实很简单,咱们跑这个比较耗时的任务咱们能够指定10个线程去跑就是了,这样就不会影响到其它的功能业务。 我这里简单的使用了一个计数器,好比超过了10个线程则阻塞等待一会,跑完一个线程则减一,直接上代码
public class TestMain {
public static void main(String[] args) throws Exception{
//模拟场景,这是一个分布式定时任务,因此不会存在并发同时执行的问题
AtomicInteger threadNum = new AtomicInteger(0);
// 模拟当前第一层有1000数据
for(int i= 0;i< 1000;i++){
// 模拟每条线路有20条子数据
for(int j = 0;j<20;j++){
// 多线程拉去爬取网上的数据汇总到数据库
// 一次最多开启10个线程取执行耗时操做,
while (threadNum.get() > 10){
// 能够小睡一会再看是否有资格执行
Thread.sleep(500);
}
// 小增长1
threadNum.incrementAndGet();
int tempI = i;
int tempJ = j;
MyWorkerThreadPool.submitTak(()->{
// 真实业务场景这里很是耗时,
try {
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName()+"执行完第一层数据"+ tempI +"第二层:"+tempJ);
// 执行完减小一个1
threadNum.decrementAndGet();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
执行结果:
pool-2-thread-2执行完第一层数据0第二层:1
pool-2-thread-1执行完第一层数据0第二层:0
pool-2-thread-3执行完第一层数据0第二层:2
pool-2-thread-2执行完第一层数据0第二层:3
pool-2-thread-3执行完第一层数据0第二层:5
pool-2-thread-1执行完第一层数据0第二层:4
pool-2-thread-3执行完第一层数据0第二层:7
pool-2-thread-1执行完第一层数据0第二层:8
pool-2-thread-2执行完第一层数据0第二层:6
pool-2-thread-1执行完第一层数据0第二层:10
pool-2-thread-3执行完第一层数据0第二层:9
pool-2-thread-2执行完第一层数据0第二层:11
pool-2-thread-3执行完第一层数据0第二层:13
pool-2-thread-2执行完第一层数据0第二层:14
pool-2-thread-1执行完第一层数据0第二层:12
复制代码
其实咱们开发中又不少小细节,只是咱们有时候没有注意或对其原理不清楚,有时候写出来的代码就会带来比较糟糕的后果。那么今天这篇就到这里!