记录:一个爬虫程序的优化过程

这两天手痒用jsoup撸了个抓取图片爬虫java

 

第一版:多线程

ThreadPoolExecutor executor = new ThreadPoolExecutor(6, 6, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(200));
for (int j = 1; j <= 总页数; j++) {
    executor.execute(()->{
        // 1.抓取网页,得到图片url
        // 2.根据url保存图片
        // 3.保存后记录成功和失败的信息到本地txt
	});
}

程序看起来没有什么问题,只开了6线程操做,开始没敢开太多线程,怕被网站拉黑。。并发

可是运行起来太慢了,一夜只爬了10个多G,目前分析问题主要有两点:异步

    1.并发操做本地txt,会拖慢单个任务执行的速度优化

    2.线程没有充分利用网站

首先看下操做文件方法吧,所用方法来自NIO:url

Files.write(log, attr.getBytes("utf8"), StandardOpenOption.APPEND);

经过查看源码发现,该方法会构造一个OutputStream去调用write方法,而write方法上有synchronized,多线程操做无疑会转为重量锁spa

那么想要记录日志的话,最好是让它们没有线程竞争的状况下再去操做文件;线程

 

而后是优化多线程操做,相比于获取url,下载图片确定是要比它更慢的,若是先统一获取url,而后根据url再去下载图片是否会更好?日志

 

第一次优化:

// 用于记录全部url
Queue<String> queue = new ConcurrentLinkedQueue<String>();
// 用于记录全部日志
Queue<String> logQueue = new ConcurrentLinkedQueue<String>();
// 全部任务
List<Consumer> allTasks = new ArrayList<>();
for (int j = 1; j <= 总页数; j++) {
    allTasks.add(t ->{
    	// 得到url,放入queue中
    });
}
// 使用ForkJoin并行执行记录url的任务
BatchTaskRunner.execute(allTasks, taskPerThread, tasks -> {
    tasks.forEach(t->t.accept(null));
});
// 将全部url并行执行下载
List<String> list = queue.stream().collect(Collectors.toList());
BatchTaskRunner.execute(list, taskPerThread, tasks -> {
    tasks.forEach(
        // 1.下载文件
        // 2.将url成功或失败放到logQueue中
    );
});
// 最后再记录日志
logQueue.forEach(
    // 将全部日志保存到本地txt中
);

这里主要分为三步:

    1.并行执行任务,抓取url放入queue

    2.并行执行下载,从queue中取url

    3.从logQueue中保存日志到本地

分析:先是抓取全部url,而后再去并行执行保存;将保存日志放到最后,保存了图片后最后的日志反而可有可无了,可是运行时候我发现仍是存在问题:

我去,为何必定要先放url再去处理啊!!放的同时也取任务,最后剩余的任务再并行执行不是更快!

 

好吧,有了这个想法,直接开干:

第二次优化:

/****************************第二次增长的逻辑start**************************************/
// 控制主线程执行
CountDownLatch countDownLatch = new CountDownLatch(totalPageSize);
// 用于消费queue的线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(12, 12, 0, TimeUnit.SECONDS, new SynchronousQueue<>());
// 用于自旋时的开关
volatile boolean flag = false;
/****************************第二次增长的逻辑end**************************************/


// 用于记录全部url
Queue<String> queue = new ConcurrentLinkedQueue<String>();
// 用于记录全部日志
Queue<String> logQueue = new ConcurrentLinkedQueue<String>();
// 全部任务
List<Consumer> allTasks = new ArrayList<>();


for (int j = 1; j <= 总页数; j++) {
    allTasks.add(t ->{
    	// 得到url,放入queue中
    });
}
// 开了一个线程去执行,主要是为了让它异步去操做
new Thread(()->{
    // 使用ForkJoin并行执行记录url的任务
    // finally中调用countDownLatch.countDown()
    BatchTaskRunner.execute(allTasks, taskPerThread, tasks -> {
        tasks.forEach(t->t.accept(null));
    });
}).start();

// 一边抓取一边消费
for (int i = 0; i < 12; i++) {
    executor.execute(()->{
    	try {
    		takeQueue(); // 从queue得到url并消费,若是信号量归零则将flag置为true
    	} catch (InterruptedException e) {
    				
    	}
    });
}
for(;;) {
    if(flag) {
    	break;
    }
    Thread.sleep(10000);
}
countDownLatch.await();
executor.shutdownNow();
// 都取完了,就没必要再去并行执行了
if(queue.size() == 0) {
    return;
}
// 将全部url并行执行下载
List<String> list = queue.stream().collect(Collectors.toList());
BatchTaskRunner.execute(list, taskPerThread, tasks -> {
    tasks.forEach(
        // 1.下载文件
        // 2.将url成功或失败放到logQueue中
    );
});
// 最后再记录日志
logQueue.forEach(
    // 将全部日志保存到本地txt中
);

其中的takeQueue方法逻辑:

void takeQueue() throws InterruptedException {
		for(;;) {
			long count = countDownLatch.getCount();
			// 未归零则一直去消费
			if(count > 0) {
				String poll = queue.poll();
				if(poll != null) {
					consumer.accept(poll); // 根据url去下载
				}else {
					Thread.sleep(3000);
				}
			} else {
				flag = true;
				return;
			}
		}
	}

大概撸了个逻辑,日志什么的已经不重要了。。。

主线程自旋,保存url同时去并发下载,若是保存url的逻辑执行完了队列中还有url,则并行去下载

看着线程都用上了,感受爽多了

 

即便在消费,queue中对象仍是愈来愈多

 

大概逻辑写好了,代码还能够再优化一下,有想法的欢迎留言

相关文章
相关标签/搜索