WebMagic源码分析

Webmagic

##1. WebMagic概览 WebMagic项目代码分为核心和扩展两部分。核心部分(webmagic-core)是一个精简的、模块化的爬虫实现,而扩展部分则包括一些便利的、实用性的功能。WebMagic的架构设计参照了Scrapy,目标是尽可能的模块化,并体现爬虫的功能特色。css

这部分提供很是简单、灵活的API,在基本不改变开发模式的状况下,编写一个爬虫。html

扩展部分(webmagic-extension)提供一些便捷的功能,例如注解模式编写爬虫等。同时内置了一些经常使用的组件,便于爬虫开发。web

另外WebMagic还包括一些外围扩展和一个正在开发的产品化项目webmagic-avalon。 ##2. 核心组件 ###2.1 结构图 webmagic ###2.2 四大组件正则表达式

  • 1.Downloader:下载器
  • 2.PageProcessor:抽取器
  • 3.Scheduler:调度器
  • 4.Pipeline:结果处理器

##3. 源码分析(主类Spider) ###3.1 各组件初始化及可扩展 ####3.1.1
初始化Scheduler:(默认QueueScheduler) protected Scheduler scheduler = new QueueScheduler(); redis

采用新的Scheduler:数据库

public Spider setScheduler(Scheduler scheduler) {
        checkIfRunning();
        Scheduler oldScheduler = this.scheduler;
        this.scheduler = scheduler;
        if (oldScheduler != null) {
            Request request;
            while ((request = oldScheduler.poll(this)) != null) {
            //复制原来的url到新的scheduler
                this.scheduler.push(request, this);
            }
        }
        return this;
    }

####3.1.2
初始化Downloader:(默认HttpClientDownloader)缓存

protected void initComponent() {
        if (downloader == null) {
        //用户没有自定义Downloader
            this.downloader = new HttpClientDownloader();
        }
        if (pipelines.isEmpty()) {
        //用户没有自定义Pipeline
            pipelines.add(new ConsolePipeline());
        }
        downloader.setThread(threadNum);
        if (threadPool == null || threadPool.isShutdown()) {
            if (executorService != null && !executorService.isShutdown()) {
                threadPool = new CountableThreadPool(threadNum, executorService);
            } else {
                threadPool = new CountableThreadPool(threadNum);
            }
        }
        if (startRequests != null) {
            for (Request request : startRequests) {
                scheduler.push(request, this);
            }
            startRequests.clear();
        }
        startTime = new Date();
    }

####3.1.3 初始化Pipeline:(默认ConsolePipeline)
####3.1.4 初始化PageProcessor:(用户自定义完成) ###3.2 如何实现多线程 ####3.2.1
初始化线程池(默认Executors.newFixedThreadPool(threadNum)) Executors.newFixedThreadPool做用:建立一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待.安全

public CountableThreadPool(int threadNum) {
        this.threadNum = threadNum;
        this.executorService = Executors.newFixedThreadPool(threadNum);
    }

####3.2.2
多线程并发控制多线程

public void execute(final Runnable runnable) {
        if (threadAlive.get() >= threadNum) {
            try {
                reentrantLock.lock();//同步锁  下面为保护代码块
                while (threadAlive.get() >= threadNum) {
                    try {
                        condition.await();
                    } catch (InterruptedException e) {
                    }
                }
            } finally {
                reentrantLock.unlock();
            }
        }
        threadAlive.incrementAndGet();
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    runnable.run();
                } finally {
                    try {
                        reentrantLock.lock();
                        threadAlive.decrementAndGet();
                        //线程数量减小一个时,经过signal()方法通知前面condition.await()的线程
                        condition.signal();
                    } finally {
                        reentrantLock.unlock();
                    }
                }
            }
        });
    }

####3.2.3 Java中的ReentrantLock和synchronized两种锁定机制的对比架构

ReentrantLock默认状况下为不公平锁

private ReentrantLock lock = new ReentrantLock(true); //公平锁
        try {
            lock.lock(); //若是被其它资源锁定,会在此等待锁释放,达到暂停的效果
            //操做
        } finally {
            lock.unlock();
        }```
        不公平锁与公平锁的区别:
公平状况下,操做会排一个队按顺序执行,来保证执行顺序。(会消耗更多的时间来排队)
不公平状况下,是无序状态容许插队,jvm会自动计算如何处理更快速来调度插队。(若是不关心顺序,这个速度会更快)  
  
#### 3.2.4 AtomicInteger && CAS
  >AtomicInteger,一个提供原子操做的Integer的类。在Java语言中,++i和i++操做并非线程安全的,在使用的时候,不可避免的会用到synchronized关键字。而AtomicInteger则经过一种线程安全的加减操做接口。
首先要说一下,AtomicInteger类compareAndSet经过原子操做实现了CAS操做,最底层基于汇编语言实现  
CAS是Compare And Set的一个简称,以下理解:  
1,已知当前内存里面的值current和预期要修改为的值new传入  
2,内存中AtomicInteger对象地址对应的真实值(由于有可能别修改)real与current对比,相等表示real未被修改过,是“安全”的,将new赋给real结束而后返回;不相等说明real已经被修改,结束并从新执行1直到修改为功  

####3.2.5 程序如何终止

while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) { Request request = scheduler.poll(this); //当scheduler内目标URL为空时 if (request == null) { //已经没有线程在运行了, exitWhenComplete默认为true if (threadPool.getThreadAlive() == 0 && exitWhenComplete) { break; } // wait until new url added waitNewUrl(); }

上述while循环结束,则程序完成任务并终止  
###3.3 HttpClient使用http链接池发送http请求
将用户设置的线程数设置为httpclient最大链接池数

public void setThread(int thread) { httpClientGenerator.setPoolSize(thread); }

 

public HttpClientGenerator setPoolSize(int poolSize) { // 将最大链接数增长为poolSize connectionManager.setMaxTotal(poolSize); return this; }

###3.4 URL在Scheduler中去重  
将下载结果页面中的连接抽取出来并放入scheduler中

public void push(Request request, Task task) { logger.trace("get a candidate url {}", request.getUrl()); if (!duplicatedRemover.isDuplicate(request, task) || shouldReserved(request)) { logger.debug("push to queue {}", request.getUrl()); pushWhenNoDuplicate(request, task); } }

####3.4.1 redischeduler URL去重复

```boolean isDuplicate = jedis.sismember(getSetKey(task), request.getUrl());```  




RedisScheduler 中判断url是否重复的方法,由于一个Spider就是对应只有一个UUID,故上述的判断则是:判断当前的url是不是uuid集合的元素  

>System.out.println(jedis.sismember("sname", "minxr"));// 判断 minxr是不是sname集合的元素   

####3.4.2 bloomFilter URL去重复
```boolean isDuplicate = bloomFilter.mightContain(getUrl(request));```


####3.4.3  hashset  URL去重复
```public boolean isDuplicate(Request request, Task task) {
    return !urls.add(getUrl(request));
}```  



优势:  
1)节约缓存空间(空值的映射),再也不须要空值映射。  
2)减小数据库或缓存的请求次数。  
3)提高业务的处理效率以及业务隔离性。  
缺点:  
1)存在误判的几率。  
2)传统的Bloom Filter不能做删除操做。
###3.5 抽取部分API
| 方法        | 说明           | 示例  |
| ------------- |:-------------:| -----:|
| xpath(String xpath)      | 使用XPath选择| html.xpath("//div[@class='title']")|
| $(String selector)    | 使用Css选择器选择|   html.$("div.title") |
| css(String selector)      |   功能同$(),使用Css选择器选择  |  html.css("div.title") |
|regex(String regex)    |  使用正则表达式抽取    |  html.regex("\(.*?)\") |
| replace(String regex, String replacement)    |  替换内容    |  html.replace("\","")  |
这部分抽取API返回的都是一个Selectable接口,意思是说,抽取是支持链式调用的。
###3.6 代理池
####3.6.1 代理池初始化:

//从以往保存的本地文件中读取代理信息做为新的代理池 public SimpleProxyPool() { this(null, true); } //以往保存的本地文件中读取代理+用户输入的httpProxyList合并为新的代理池 public SimpleProxyPool(List<String[]> httpProxyList) { this(httpProxyList, true); } //以往保存的本地文件中读取代理+用户输入的httpProxyList合并为新的代理池(后者可认为操控) public SimpleProxyPool(List<String[]> httpProxyList, boolean isUseLastProxy) { if (httpProxyList != null) { addProxy(httpProxyList.toArray(new String[httpProxyList.size()][])); } if (isUseLastProxy) { if (!new File(proxyFilePath).exists()) { setFilePath(); } readProxyList(); timer.schedule(saveProxyTask, 0, saveProxyInterval); } }

####3.6.2 httpProxyList怎么传值

String[] source = { "::0.0.0.1:0", "::0.0.0.2:0", "::0.0.0.3:0", "::0.0.0.4:0" }; for (String line : source) { httpProxyList.add(new String[] {line.split(":")[0], line.split(":")[1], line.split(":")[2], line.split(":")[3] }); }

####3.6.3 本地文件Proxy获存储与获取:定时任务

if (isUseLastProxy) { if (!new File(proxyFilePath).exists()) { setFilePath(); } readProxyList(); timer.schedule(saveProxyTask, 0, saveProxyInterval); }

saveProxyTask()函数负责把最新的代理池ip写入到本地指定文件  
  
####3.6.4 使用DelayQueue管理Proxy  
目的:能够根据compareTo方法制定的优先取出代理池中使用间隔较短的代理(一开始默认都为1.5s)优先取出并执行.  
目前代理池的策略是:

* 1. 在添加时链接相应端口作校验  
* 2. 每一个代理有1.5S的使用间隔  
* 3. 每次失败后,下次取出代理的时间改成1.5S*失败次数  
* 4. 若是代理失败次数超过20次,则直接丢弃

public void returnProxy(HttpHost host, int statusCode) { Proxy p = allProxy.get(host.getAddress().getHostAddress()); if (p == null) { return; } switch (statusCode) { //成功 case Proxy.SUCCESS: p.setReuseTimeInterval(reuseInterval); p.setFailedNum(0); p.setFailedErrorType(new ArrayList<Integer>()); p.recordResponse(); p.successNumIncrement(1); break; //失败 case Proxy.ERROR_403: // banned,try longer interval p.fail(Proxy.ERROR_403); p.setReuseTimeInterval(reuseInterval * p.getFailedNum()); logger.info(host + " >>>> reuseTimeInterval is >>>> " + p.getReuseTimeInterval() / 1000.0); break; //代理被禁 case Proxy.ERROR_BANNED: p.fail(Proxy.ERROR_BANNED); p.setReuseTimeInterval(10 * 60 * 1000 * p.getFailedNum()); logger.warn("this proxy is banned >>>> " + p.getHttpHost()); logger.info(host + " >>>> reuseTimeInterval is >>>> " + p.getReuseTimeInterval() / 1000.0); break; //404 case Proxy.ERROR_404: // p.fail(Proxy.ERROR_404); // p.setReuseTimeInterval(reuseInterval * p.getFailedNum()); break; default: p.fail(statusCode); break; } //当前代理失败次数超过20:reviveTime = 2 * 60 * 60 * 1000; if (p.getFailedNum() > 20) { p.setReuseTimeInterval(reviveTime); logger.error("remove proxy >>>> " + host + ">>>>" + p.getFailedType() + " >>>> remain proxy >>>> " + proxyQueue.size()); return; } //检验代理ip符合下列要求的: if (p.getFailedNum() > 0 && p.getFailedNum() % 5 == 0) { if (!ProxyUtils.validateProxy(host)) { p.setReuseTimeInterval(reviveTime); logger.error("remove proxy >>>> " + host + ">>>>" + p.getFailedType() + " >>>> remain proxy >>>> " + proxyQueue.size()); return; } } try { proxyQueue.put(p); } catch (InterruptedException e) { logger.warn("proxyQueue return proxy error", e); } }

使用Socket来校验代理是否有效,客户端为本地.建立与代理的链接

public static boolean validateProxy(HttpHost p) { if (localAddr == null) { logger.error("cannot get local IP"); return false; } boolean isReachable = false; Socket socket = null; try { socket = new Socket(); socket.bind(new InetSocketAddress(localAddr, 0)); InetSocketAddress endpointSocketAddr = new InetSocketAddress(p.getAddress().getHostAddress(), p.getPort()); socket.connect(endpointSocketAddr, 3000); logger.debug("SUCCESS - connection established! Local: " + localAddr.getHostAddress() + " remote: " + p); isReachable = true; } catch (IOException e) { logger.warn("FAILRE - CAN not connect! Local: " + localAddr.getHostAddress() + " remote: " + p); } finally { if (socket != null) { try { socket.close(); } catch (IOException e) { logger.warn("Error occurred while closing socket of validating proxy", e); } } } return isReachable; }

相关文章
相关标签/搜索