Java 多线程爬虫及分布式爬虫架构探索

这是 Java 爬虫系列博文的第五篇,在上一篇 Java 爬虫服务器被屏蔽,不要慌,我们换一台服务器 中,咱们简单的聊反爬虫策略和反反爬虫方法,主要针对的是 IP 被封及其对应办法。前面几篇文章咱们把爬虫相关的基本知识都讲的差很少啦。这一篇咱们来聊一聊爬虫架构相关的内容。java

前面几章内容咱们的爬虫程序都是单线程,在咱们调试爬虫程序的时候,单线程爬虫没什么问题,可是当咱们在线上环境使用单线程爬虫程序去采集网页时,单线程就暴露出了两个致命的问题:git

  • 采集效率特别慢,单线程之间都是串行的,下一个执行动做须要等上一个执行完才能执行
  • 对服务器的CUP等利用率不高,想一想咱们的服务器都是 8核16G,32G 的只跑一个线程会不会太浪费啦

线上环境不可能像咱们本地测试同样,不在意采集效率,只要能正确提取结果就行。在这个时间就是金钱的年代,不可能给你时间去慢慢的采集,因此单线程爬虫程序是行不通的,咱们须要将单线程改为多线程的模式,来提高采集效率和提升计算机利用率。github

多线程的爬虫程序设计比单线程就要复杂不少,可是与其余业务在高并发下要保证数据安全又不一样,多线程爬虫在数据安全上到要求不是那么的高,由于每一个页面均可以被看做是一个独立体。要作好多线程爬虫就必须作好两点:第一点就是统一的待采集 URL 维护,第二点就是 URL 的去重, 下面咱们简单的来聊一聊这两点。正则表达式

维护待采集的 URL

多线程爬虫程序就不能像单线程那样,每一个线程独自维护这本身的待采集 URL,若是这样的话,那么每一个线程采集的网页将是同样的,你这就不是多线程采集啦,你这是将一个页面采集的屡次。基于这个缘由咱们就须要将待采集的 URL 统一维护,每一个线程从统一 URL 维护处领取采集 URL ,完成采集任务,若是在页面上发现新的 URL 连接则添加到 统一 URL 维护的容器中。下面是几种适合用做统一 URL 维护的容器:redis

  • JDK 的安全队列,例如 LinkedBlockingQueue
  • 高性能的 NoSQL,好比 Redis、Mongodb
  • MQ 消息中间件

URL 的去重

URL 的去重也是多线程采集的关键一步,由于若是不去重的话,那么咱们将采集到大量重复的 URL,这样并无提高咱们的采集效率,好比一个分页的新闻列表,咱们在采集第一页的时候能够获得 二、三、四、5 页的连接,在采集第二页的时候又会获得 一、三、四、5 页的连接,待采集的 URL 队列中将存在大量的列表页连接,这样就会重复采集甚至进入到一个死循环当中,因此就须要 URL 去重。URL 去重的方法就很是多啦,下面是几种经常使用的 URL 去重方式:数据库

  • 将 URL 保存到数据库进行去重,好比 redis、MongoDB
  • 将 URL 放到哈希表中去重,例如 hashset
  • 将 URL 通过 MD5 以后保存到哈希表中去重,相比于上面一种,可以节约空间
  • 使用 布隆过滤器(Bloom Filter)去重,这种方式可以节约大量的空间,就是不那么准确。

关于多线程爬虫的两个核心知识点咱们都知道啦,下面我画了一个简单的多线程爬虫架构图,以下图所示:安全

多线程爬虫架构图

上面咱们主要了解了多线程爬虫的架构设计,接下来咱们不妨来试试 Java 多线程爬虫,咱们以采集虎扑新闻为例来实战一下 Java 多线程爬虫,Java 多线程爬虫中设计到了 待采集 URL 的维护和 URL 去重,因为咱们这里只是演示,因此咱们就使用 JDK 内置的容器来完成,咱们使用 LinkedBlockingQueue 做为待采集 URL 维护容器,HashSet 做为 URL 去重容器。下面是 Java 多线程爬虫核心代码,详细代码以上传 GitHub,地址在文末:服务器

/** * 多线程爬虫 */
public class ThreadCrawler implements Runnable {
    // 采集的文章数
    private final AtomicLong pageCount = new AtomicLong(0);
    // 列表页连接正则表达式
    public static final String URL_LIST = "https://voice.hupu.com/nba";
    protected Logger logger = LoggerFactory.getLogger(getClass());
    // 待采集的队列
    LinkedBlockingQueue<String> taskQueue;
    // 采集过的连接列表
    HashSet<String> visited;
    // 线程池
    CountableThreadPool threadPool;
    /** * * @param url 起始页 * @param threadNum 线程数 * @throws InterruptedException */
    public ThreadCrawler(String url, int threadNum) throws InterruptedException {
        this.taskQueue = new LinkedBlockingQueue<>();
        this.threadPool = new CountableThreadPool(threadNum);
        this.visited = new HashSet<>();
        // 将起始页添加到待采集队列中
        this.taskQueue.put(url);
    }

    @Override
    public void run() {
        logger.info("Spider started!");
        while (!Thread.currentThread().isInterrupted()) {
	        // 从队列中获取待采集 URL
            final String request = taskQueue.poll();
            // 若是获取 request 为空,而且当前的线程采已经没有线程在运行
            if (request == null) {
                if (threadPool.getThreadAlive() == 0) {
                    break;
                }
            } else {
                // 执行采集任务
                threadPool.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            processRequest(request);
                        } catch (Exception e) {
                            logger.error("process request " + request + " error", e);
                        } finally {
                            // 采集页面 +1
                            pageCount.incrementAndGet();
                        }
                    }
                });
            }
        }
        threadPool.shutdown();
        logger.info("Spider closed! {} pages downloaded.", pageCount.get());
    }

    /** * 处理采集请求 * @param url */
    protected void processRequest(String url) {
        // 判断是否为列表页
        if (url.matches(URL_LIST)) {
	        // 列表页解析出详情页连接添加到待采集URL队列中
            processTaskQueue(url);
        } else {
	        // 解析网页
            processPage(url);
        }
    }
    /** * 处理连接采集 * 处理列表页,将 url 添加到队列中 * * @param url */
    protected void processTaskQueue(String url) {
        try {
            Document doc = Jsoup.connect(url).get();
            // 详情页连接
            Elements elements = doc.select(" div.news-list > ul > li > div.list-hd > h4 > a");
            elements.stream().forEach((element -> {
                String request = element.attr("href");
                // 判断该连接是否存在队列或者已采集的 set 中,不存在则添加到队列中
                if (!visited.contains(request) && !taskQueue.contains(request)) {
                    try {
                        taskQueue.put(request);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }));
            // 列表页连接
            Elements list_urls = doc.select("div.voice-paging > a");
            list_urls.stream().forEach((element -> {
                String request = element.absUrl("href");
                // 判断是否符合要提取的列表连接要求
                if (request.matches(URL_LIST)) {
                    // 判断该连接是否存在队列或者已采集的 set 中,不存在则添加到队列中
                    if (!visited.contains(request) && !taskQueue.contains(request)) {
                        try {
                            taskQueue.put(request);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }));

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /** * 解析页面 * * @param url */
    protected void processPage(String url) {
        try {
            Document doc = Jsoup.connect(url).get();
            String title = doc.select("body > div.hp-wrap > div.voice-main > div.artical-title > h1").first().ownText();

            System.out.println(Thread.currentThread().getName() + " 在 " + new Date() + " 采集了虎扑新闻 " + title);
            // 将采集完的 url 存入到已经采集的 set 中
            visited.add(url);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {

        try {
            new ThreadCrawler("https://voice.hupu.com/nba", 5).run();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
复制代码

咱们用 5 个线程去采集虎扑新闻列表页看看效果若是?运行该程序,获得以下结果:微信

多线程采集结果

结果中能够看出,咱们启动了 5 个线程采集了 61 页页面,一共耗时 2 秒钟,能够说效果仍是不错的,咱们来跟单线程对比一下,看看差距有多大?咱们将线程数设置为 1 ,再次启动程序,获得以下结果:多线程

单线程运行结果

能够看出单线程采集虎扑 61 条新闻花费了 7 秒钟,耗时差很少是多线程的 4 倍,你想一想这可只是 61 个页面,页面更多的话,差距会愈来愈大,因此多线程爬虫效率仍是很是高的。

分布式爬虫架构

分布式爬虫架构是一个大型采集程序才须要使用的架构,通常状况下使用单机多线程就能够解决业务需求,反正我是没有分布式爬虫项目的经验,因此这一块我也没什么能够讲的,可是咱们做为技术人员,咱们须要对技术保存热度,虽然不用,可是了解了解也无妨,我查阅了很多资料得出了以下结论:

分布式爬虫架构跟咱们多线程爬虫架构在思路上来讲是同样的,咱们只须要在多线程的基础上稍加改进就能够变成一个简单的分布式爬虫架构。由于分布式爬虫架构中爬虫程序部署在不一样的机器上,因此咱们待采集的 URL 和 采集过的 URL 就不能存放在爬虫程序机器的内存中啦,咱们须要将它统一在某台机器上维护啦,好比存放在 Redis 或者 MongoDB 中,每台机器都从这上面获取采集连接,而不是从 LinkedBlockingQueue 这样的内存队列中取连接啦,这样一个简单的分布式爬虫架构就出现了,固然这里面还会有不少细节问题,由于我没有分布式架构的经验,我也无从提及,若是你有兴趣的话,欢迎交流。

源代码:源代码

文章不足之处,望你们多多指点,共同窗习,共同进步

最后

打个小广告,欢迎扫码关注微信公众号:「平头哥的技术博文」,一块儿进步吧。

平头哥的技术博文
相关文章
相关标签/搜索