这两天手痒用jsoup撸了个抓取图片爬虫java
第一版:多线程
ThreadPoolExecutor executor = new ThreadPoolExecutor(6, 6, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(200)); for (int j = 1; j <= 总页数; j++) { executor.execute(()->{ // 1.抓取网页,得到图片url // 2.根据url保存图片 // 3.保存后记录成功和失败的信息到本地txt }); }
程序看起来没有什么问题,只开了6线程操做,开始没敢开太多线程,怕被网站拉黑。。并发
可是运行起来太慢了,一夜只爬了10个多G,目前分析问题主要有两点:异步
1.并发操做本地txt,会拖慢单个任务执行的速度优化
2.线程没有充分利用网站
首先看下操做文件方法吧,所用方法来自NIO:url
Files.write(log, attr.getBytes("utf8"), StandardOpenOption.APPEND);
经过查看源码发现,该方法会构造一个OutputStream去调用write方法,而write方法上有synchronized,多线程操做无疑会转为重量锁spa
那么想要记录日志的话,最好是让它们没有线程竞争的状况下再去操做文件;线程
而后是优化多线程操做,相比于获取url,下载图片确定是要比它更慢的,若是先统一获取url,而后根据url再去下载图片是否会更好?日志
第一次优化:
// 用于记录全部url Queue<String> queue = new ConcurrentLinkedQueue<String>(); // 用于记录全部日志 Queue<String> logQueue = new ConcurrentLinkedQueue<String>(); // 全部任务 List<Consumer> allTasks = new ArrayList<>(); for (int j = 1; j <= 总页数; j++) { allTasks.add(t ->{ // 得到url,放入queue中 }); } // 使用ForkJoin并行执行记录url的任务 BatchTaskRunner.execute(allTasks, taskPerThread, tasks -> { tasks.forEach(t->t.accept(null)); }); // 将全部url并行执行下载 List<String> list = queue.stream().collect(Collectors.toList()); BatchTaskRunner.execute(list, taskPerThread, tasks -> { tasks.forEach( // 1.下载文件 // 2.将url成功或失败放到logQueue中 ); }); // 最后再记录日志 logQueue.forEach( // 将全部日志保存到本地txt中 );
这里主要分为三步:
1.并行执行任务,抓取url放入queue
2.并行执行下载,从queue中取url
3.从logQueue中保存日志到本地
分析:先是抓取全部url,而后再去并行执行保存;将保存日志放到最后,保存了图片后最后的日志反而可有可无了,可是运行时候我发现仍是存在问题:
我去,为何必定要先放url再去处理啊!!放的同时也取任务,最后剩余的任务再并行执行不是更快!
好吧,有了这个想法,直接开干:
第二次优化:
/****************************第二次增长的逻辑start**************************************/ // 控制主线程执行 CountDownLatch countDownLatch = new CountDownLatch(totalPageSize); // 用于消费queue的线程池 ThreadPoolExecutor executor = new ThreadPoolExecutor(12, 12, 0, TimeUnit.SECONDS, new SynchronousQueue<>()); // 用于自旋时的开关 volatile boolean flag = false; /****************************第二次增长的逻辑end**************************************/ // 用于记录全部url Queue<String> queue = new ConcurrentLinkedQueue<String>(); // 用于记录全部日志 Queue<String> logQueue = new ConcurrentLinkedQueue<String>(); // 全部任务 List<Consumer> allTasks = new ArrayList<>(); for (int j = 1; j <= 总页数; j++) { allTasks.add(t ->{ // 得到url,放入queue中 }); } // 开了一个线程去执行,主要是为了让它异步去操做 new Thread(()->{ // 使用ForkJoin并行执行记录url的任务 // finally中调用countDownLatch.countDown() BatchTaskRunner.execute(allTasks, taskPerThread, tasks -> { tasks.forEach(t->t.accept(null)); }); }).start(); // 一边抓取一边消费 for (int i = 0; i < 12; i++) { executor.execute(()->{ try { takeQueue(); // 从queue得到url并消费,若是信号量归零则将flag置为true } catch (InterruptedException e) { } }); } for(;;) { if(flag) { break; } Thread.sleep(10000); } countDownLatch.await(); executor.shutdownNow(); // 都取完了,就没必要再去并行执行了 if(queue.size() == 0) { return; } // 将全部url并行执行下载 List<String> list = queue.stream().collect(Collectors.toList()); BatchTaskRunner.execute(list, taskPerThread, tasks -> { tasks.forEach( // 1.下载文件 // 2.将url成功或失败放到logQueue中 ); }); // 最后再记录日志 logQueue.forEach( // 将全部日志保存到本地txt中 );
其中的takeQueue方法逻辑:
void takeQueue() throws InterruptedException { for(;;) { long count = countDownLatch.getCount(); // 未归零则一直去消费 if(count > 0) { String poll = queue.poll(); if(poll != null) { consumer.accept(poll); // 根据url去下载 }else { Thread.sleep(3000); } } else { flag = true; return; } } }
大概撸了个逻辑,日志什么的已经不重要了。。。
主线程自旋,保存url同时去并发下载,若是保存url的逻辑执行完了队列中还有url,则并行去下载
看着线程都用上了,感受爽多了
即便在消费,queue中对象仍是愈来愈多
大概逻辑写好了,代码还能够再优化一下,有想法的欢迎留言