pycurl的使用

     我使用的是pycurl库是python使用libcurl的接口,官网是http://pycurl.sourceforge.net。相似urllib库,pycurl用来获取经过域名访问的网络资源。它支持多项协议:FTP, FTPS, HTTP, HTTPS, SCP, SFTP, TFTP, TELNET, DICT, LDAP, LDAPS, FILE, IMAP, SMTP, POP3。
    下面代码:
php

c = pycurl.Curl()
    #url = "http://image.baidu.com/i?tn=baiduimage&ct=201326592&lm=-1&cl=2&nc=1&word="
    url = '/duy/d' //地址
    c.setopt(pycurl.URL,url)
    c.setopt(pycurl.USERAGENT,'Mozilla/5.0 (Windows NT 6.1; rv:27.0) Gecko/20100101 Firefox/27.0')//使用的客户端
    c.setopt(pycurl.REFERER,'http://www.google.com/search?sourceid=chrome&ie=UTF-8&q='+rand_str())//上一个网页
    c.setopt(pycurl.HTTPHEADER,['text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'])//http包头
    c.setopt(pycurl.COOKIE,cookie)//使用的cookie格式是字符串:"key=value;key=value".
    c.setopt(pycurl.VERBOSE,1)//输出调试信息
    c.setopt(pycurl.FOLLOWLOCATION, 1)//遇到302时候是否进行自动跳转
    c.setopt(pycurl.MAXREDIRS, 5)
    c.setopt(pycurl.COOKIEFILE,"cookie_file_name")//使用的cookie的保存的文件名
    c.setopt(pycurl.COOKIEJAR, "cookie_file_name")
    c.setopt(pycurl.POST,1)//是不是post方法,默认是get
    c.setopt(pycurl.POSTFIELDS, urllib.urlencode(post_data))//post的数据,是字典:个字典:{"key":"value"}
    c.setopt(c.WRITEFUNCTION, t.body_callback)//结果写入的回调函数,能够是”
def body_callback(self,buf):
        self.contents = self.contents + buf
“
    c.setopt(pycurl.HEADERFUNCTION, d.body_callback)//一样是结果的回调函数
    c.setopt(pycurl.ENCODING, 'gzip,deflate')//编码


设置完了必要的参数以后能够调用c.perform进行请求。细节不少参数功能能够参照curl库的其余文档,和官网。下面是我本身使用的一个类,封装了pycurl:html


class curl_request:
    c=None
    def __init__(self,url,action='get'):
        self.url = url
        self.url_para =None
        self.c = pycurl.Curl()
        print self.url,"     d"
        self.c.setopt(pycurl.URL,self.url)
        self.c.setopt(pycurl.USERAGENT,'Miozilla/4.0 (compatible; MSIE 6.0; WindowsNT 5.1');
        self.c.setopt(pycurl.REFERER,'http://www.google.com/search?sourceid=chrome&ie=UTF-8&q='+rand_str())
        self.c.setopt(pycurl.COOKIE,'Hm_lvt_5251b1b3df8c7fd322ea256727293cf0=1393221156,1393223230,1393223252,1393223985;_jzqa=1.46109393469532')
        self.c.setopt(pycurl.VERBOSE,1)

        self.c.setopt(pycurl.HEADER,1)
        self.c.setopt(pycurl.FOLLOWLOCATION, 1)
        self.c.setopt(pycurl.MAXREDIRS, 5)
        self.c.setopt(pycurl.COOKIEFILE, 'cookie_file_name.txt')
        self.c.setopt(pycurl.COOKIEJAR, 'cookie_file_name.txt')
        if action == 'post':
            self.c.setopt(pycurl.POST,1)
            self.c.setopt(pycurl.POSTFIELDS, post_data = {"noe":"noe"})
        else:
            self.c.setopt(pycurl.HTTPGET,1)

#        c.setopt(c.WRITEFUNCTION, self.write)

#        c.setopt(pycurl.HEADERFUNCTION, d.body_callback)
        self.c.setopt(pycurl.ENCODING, 'gzip,deflate');

    def set_url_para(self,para):
        self.url_para = para
        url = self.url + para
        self.c.setopt(pycurl.URL,url)

    def set_post_para(self,para):
        self.c.setopt(pycurl.POST,1)
        self.c.setopt(pycurl.POSTFIELDS, urllib.urlencode( para))
    def set_cookie(self,cookie):
        self.c.setopt(pycurl.COOKIE,cookie)

    def perform(self,url='',referer=''):
        if url != '':
            self.c.setopt(pycurl.URL,url)
        if referer != '':
            self.c.setopt(pycurl.REFERER,referer)
        self.buf = cStringIO.StringIO()
        self.head = cStringIO.StringIO()
        self.c.setopt(self.c.WRITEFUNCTION, self.buf.write)
        self.c.setopt(pycurl.HEADERFUNCTION, self.head.write)
        try:
            self.c.perform()
        except Exception,e:
            self.c.close()
            self.buf.close()
            self.head.close()
        self.r = self.buf.getvalue()
        self.h = self.head.getvalue()
        self.code = self.c.getinfo(pycurl.HTTP_CODE)
        self.info = self.c.getinfo(pycurl.EFFECTIVE_URL)
        self.cookie = self.c.getinfo(pycurl.INFO_COOKIELIST)

        self.buf.close()
        self.head.close()
    def __del__(self):
        self.c.close()

    def get_body(self):
        return self.r
    def get_head(self):
        return self.h
    def get_code(self):
        return self.code
    def get_info(self):
        return self.info
    def get_cookie(self):
        return self.cookie


在涉及到某些网页须要登陆才能访问时,能够设置cookie和post的数据进行登陆操做。登陆完成以后登陆的session信息会保存到cookie文件中,之后的访问都会附带上cookie验证身份。python

使用这个请求完网页内容以后,可使用beautifulsoup来解析网页内容。这个用法相似于xml2的使用方法,能够查找,也能够遍历。sql

像以下的代码就是分析某个特定网站的html代码以后,进行相应图片的下载:chrome

def get_dynamic_mm(buf):
    root_soup = BeautifulSoup(''.join( buf ),fromEncoding="utf-8")
    div = root_soup.find('div',{ "class":"mm_time"})
    if div:
        for divsub in div.div :
            if str(type(divsub)) == "<class 'BeautifulSoup.Tag'>" and divsub['class'] == "girl_info" :
                name = divsub.a.string.strip().replace(" ","")
                page = divsub.a['href']
        os.makedirs("./照片/"+name)
        img_url = div.img['src']
        get_img(img_url,name,name)
        return page
 

def get_img(url,name,path):
    while 1:
        try :
            r = urllib2.urlopen(url)
            print './照片/'+path+'/'+name+'.gif'
            f = open('./照片/'+path+'/'+name+'.gif','ab+')
            f.write(r.read())
            r.close()
            f.close()
            break
        except Exception,e:
            print 'error'
            continue


beautiful的使用文档也能够在官网找到:http://www.crummy.com/software/BeautifulSoup/bash

配合这两个3方库,很容易能够实现网络机器人,进行图片的下载,特定内容的监控(例如飞机票的价格)以及各个论坛去发帖。cookie

在这里介绍一本书,比较简易,介绍了做者怎样使用php来编写爬虫,编写爬虫时应该注意的点,例如访问网站时应该间隔多长时间,以及网络机器人的用途,例如能够检测一个网页里的无效连接有多少。《Webbots, Spiders, and Screen Scrapers 2nd Editior》网络

下面是我本身写的简易的爬虫,爬取连接和相关的内容,把相关内容的网页内容写入sqlite文件。session

使用了线程池,在线程池初始化时,启动线程,每一个线程循环获取任务队列的数据,获取任务。获取到任务后就进行任务处理(爬取网页)。知道任务结束,设置flag结束全部线程。 这个方法很好,之前在工做中竟然没有使用过线程(池)(一直都是无尽的fork,两年的工做经验啊,真是坑了公司),能够节省系统资源,又能够灵活的调整任务的效率,相对多进程来讲还节省了进程间的数据传递,以及不容易出错。app

爬连接的时候只是获取 <a href=""></a>里的href字段。

爬网页的时候还进行了关键字的搜寻,搜寻到关键字后,就把内容写入队列,让主线程进行数据的写入(使用 sqlite)


import sys
import os
import re
import urllib
import urllib2
import time
import random
import pycurl
import Queue
import threading
import logging
from BeautifulSoup import BeautifulSoup
import getopt
import sqlite3
from Request import curl_request


global logger
class MyThread(threading.Thread):
    def __init__(self, workQueue, resultQueue, contentQueue, key, timeout=15):
        threading.Thread.__init__(self)
        self.mutex = threading.Lock()
        self.timeout = timeout
        self.setDaemon(True)
        self.workQueue = workQueue
        self.resultQueue = resultQueue
        self.contentQueue = contentQueue
        self.start()
        self.flag = False
        self.exit_flag = False
        self.key = key
        
    def run(self):
        while True:
            try:
               # if self.mutex.acquire(1): 
                callable, args, kwargs, deep = self.workQueue.get(timeout=self.timeout)
                #self.mutex.release()
                self.flag = True
                res = callable(args,self.resultQueue,self.contentQueue,kwargs,deep,self.key)
                self.flag = False
            except Queue.Empty:
                logger.debug('queue is emtpy')
                self.flag = False
                if self.exit_flag:
                    logger.info('exit_flag set')
                    break
                continue
            except :
                print sys.exc_info()
                raise
            
class ThreadPool:
    def __init__(self, key, num_of_threads=10):
        self.workQueue = Queue.Queue()
        self.resultQueue = Queue.Queue()
        self.contentQueue = Queue.Queue()
        self.threads = []
        self.key = key
        self.__createThreadPool(num_of_threads)
       
    def __createThreadPool(self, num_of_threads):
        for i in range( num_of_threads ):
            thread = MyThread( self.workQueue, self.resultQueue, self.contentQueue, self.key )
            self.threads.append(thread)
            
    def wait_for_complete(self):
        while len(self.threads):
            thread = self.threads.pop()
            if thread.isAlive():
                thread.join()
    def get_flag(self):
        flag = False
        for thread in self.threads:
            if thread.flag:
                flag = True
        return flag
    def get_num(self):
        num = 0
        for thread in self.threads:
            if thread.flag:
                num += 1
        return num
    def set_flag(self):
        flag = False
        for thread in self.threads:
            thread.exit_flag = True
                
    def add_job(self,callable, args,kwargs, deep):
        self.workQueue.put( (callable, args, kwargs, deep) )

def resovle_address(base_url,link):
    base_url = base_url.strip()
    logger.debug('url base is: '+base_url.encode()+' and link is: '+link.encode())
    link = link.strip()
    link.replace(';','')
    link.replace('\\','')
    link.replace('\'','')
    link.replace('/./','/')
    bash = base_url.rfind('/')
    if len(link) < 1:
        return None
    if bash != -1 and base_url[:bash+1] != "http://":
        base_url = base_url[:base_url.rfind('/')]
    m = re.search("http|www",link)
    if link[0] == '/' and len(link)>1:
        logger.debug('return url is ' + base_url.encode() + link.encode())
        return base_url + link
    elif m is not None:
        logger.debug('return link is' + link.encode()) 
        return link
    return None
    
        
    
        
def crawl_url( url, resultQueue, contentQueue, sleep, deep, key):
    global logger
    logger.debug('start to crawl the url: '+url.encode()+' and deep is: '+str(deep))
    time.sleep(int(sleep[0]))
    home_url = curl_request(url)
    home_url.perform()
    buf = home_url.get_body()
    if buf is None:
        return 
    root_soup = BeautifulSoup(''.join( buf ),fromEncoding="utf-8")
    body = root_soup.body
    u = body
    logger.info('body is '+str(u))
    m = re.findall("<a.*?>",str(u))
    for sub in m:
        if len(sub) < 1:
            continue
        tag_a = BeautifulSoup(''.join( sub ),fromEncoding="utf-8")
        if tag_a.a is not None and tag_a.a.has_key('href'):
            url_s = tag_a.a['href']
            url_s = resovle_address(url,url_s)
         #   print 'geting url and deep is ',url_s,deep
            if url_s is not None:
                #print 'adding iiiiiiiiiiiiiiiiiii',url_s
                logger.info('geting url :'+url_s.encode()+'deep is :'+str(deep))
                resultQueue.put( (url_s, deep+1) )
    if u is None:
        return
    for k in u:
        if re.search(key,str(k)) is not None:
          #  print str(k)
            contentQueue.put( (str(url), str(k) ))

def Usage():
    print 'myspider.py usage:'

def get_rand():
    return random.sample([0.1,0.2,0.3,0.4,0.5],1)
def main(argv):
    global logger
    thread_num=10
    try:
        opts, args = getopt.getopt(argv[1:],'hu:d:t:l:f:i:',['key=','thread=','dbfile='])
    except getopt.GetoptError, err:
        print str(err)
        Usage()
        sys.exit(2)
    for o, a in opts:
        if o in ('-h','--help'):
            Usage()
            sys.exit(1)
        elif o in ('-u',):
            url = a
        elif o in ('-d',):
            scrawl_level = int(a)
        elif o in ('-f',):
            log_file = a
        elif o in ('-l',):
            log_level = int(a)
        elif o in ('--key'):
            key = a
        elif o in ('--thread'):
            thread_num = int(a)
        elif o in ('--dbfile'):
            dbfile = a
        else:
            print 'unhandled option'
            sys.exit(3)

    cu = None
    cx = None
    logger = logging.getLogger()
    hdlr = logging.FileHandler(log_file)
    logger.addHandler(hdlr)
    level = (6-log_level)*10
    logger.setLevel(level)
  #  logger.info("hi")
    if dbfile is not None:
        os.remove(dbfile)
        cx = sqlite3.connect(dbfile)
        cu=cx.cursor()
        cu.execute("""create table content (id INTEGER PRIMARY KEY AUTOINCREMENT,url varchar(100), content varchar(4000)  )""")
          
    logger.debug('thread num is '+str(thread_num))
    logger.debug('scrawl_level is ' + str(scrawl_level))
    
    
    tp = ThreadPool(key,thread_num)
    tp.add_job(crawl_url, url , get_rand() ,1)
    deep = 1
    time_old = time.time()
    count = 0
    while 1:
        time_new = time.time()
        if time_new - time_old > 10:
            print '已经处理连接数:',count,'正在处理连接数',tp.get_num(),'剩余未处理的连接数:',tp.resultQueue.qsize(),'未插入数据:',tp.contentQueue.qsize()
            time_old = time.time()
        try:
            url,deep= tp.resultQueue.get(timeout=0.5)
            if url is not None and int(deep) <= scrawl_level:
               # print "adding  deep",deep
                logger.info('adding url: '+url.encode()+'and deep is: '+str(deep))
                count += 1
                tp.add_job(crawl_url, url, get_rand(), deep)
        except Queue.Empty:
            if not tp.get_flag() and tp.contentQueue.qsize() == 0 and tp.resultQueue.qsize() == 0:
                print 'work done,exiting'
                tp.set_flag()
                break
        try:
            url,content= tp.contentQueue.get(timeout=0)
            if url is not None:
              #  print 'gettingiiiiiiiiii ',content,url
                cu.execute( "insert into content(url,content) values(?,?)", (str(url), content.decode('utf-8')))
        except Queue.Empty:
            continue
            
        
    if cx is not None:
        cx.commit()
        cx.close()
    tp.wait_for_complete()
    #print tp.workQueue.qsize()
    
if __name__ == '__main__':
    main(sys.argv)
相关文章
相关标签/搜索