Ref: HOWTO Fetch Internet Resources Using The urllib Packagehtml
Ref: Python High Performance - Second Edition【基于python3】html5
Ref: http://online.fliphtml5.com/odjuw/kcqs/#p=8【在线电子书】python
Ref: 廖雪峰的异步IO【仍是这个比较好一点】git
Ref: Efficient web-scraping with Python’s asynchronous programming【参考】github
Ref: A Web Crawler With asyncio Coroutines【参考】web
并行:parallel 编程
并发:concurrentapi
协程:Coroutines服务器
一种比线程更加轻量级的存在。正如一个进程能够拥有多个线程同样,一个线程也能够拥有多个协程。并发
协程不是被操做系统内核所管理,而彻底是由程序所控制(也就是在用户态执行)。
这样带来的好处就是性能获得了很大的提高,不会像线程切换那样消耗资源。
参考一:boost coroutine with multi core
参考二:poll 和 select
poll 和 select 的实现基本上是一致的,只是传递参数有所不一样,他们的基本流程以下:
1. 复制用户数据到内核空间
2. 估计超时时间
3. 遍历每一个文件并调用f_op->poll 取得文件当前就绪状态, 若是前面遍历的文件都没有就绪,向文件插入wait_queue节点
4. 遍历完成后检查状态:
a). 若是已经有就绪的文件转到5;
b). 若是有信号产生,重启poll或select(转到 1或3);
c). 不然挂起进程等待超时或唤醒,超时或被唤醒后再次遍历全部文件取得每一个文件的就绪状态
5. 将全部文件的就绪状态复制到用户空间
6. 清理申请的资源
import requests import string import random # 生成url def generate_urls(base_url, num_urls): """ We add random characters to the end of the URL to break any caching mechanisms in the requests library or the server """ for i in range(num_urls): yield base_url + "".join(random.sample(string.ascii_lowercase, 10)) # 执行url def run_experiment(base_url, num_iter=500): response_size = 0 for url in generate_urls(base_url, num_iter): print(url) response = requests.get(url) response_size += len(response.text) return response_size
if __name__ == "__main__": import time delay = 100 num_iter = 50 base_url = "http://www.baidu.com/add?name=serial&delay={}&".format(delay) start = time.time() result = run_experiment(base_url, num_iter) end = time.time() print("Result: {}, Time: {}".format(result, end - start))
【暂时放弃该方案,太复杂且代码不可用】
如下是有变化部分的代码:
from gevent import monkey monkey.patch_socket() #---------------------------------- import gevent from gevent.coros import Semaphore import urllib2 from contextlib import closing
import string import random
def download(url, semaphore): with semaphore, closing(urllib2.urlopen(url)) as data: return data.read() def chunked_requests(urls, chunk_size=100): semaphore = Semaphore(chunk_size)
requests = [gevent.spawn(download, u, semaphore) for u in urls]
for response in gevent.iwait(requests): yield response def run_experiment(base_url, num_iter=500): urls = generate_urls(base_url, num_iter)
response_futures = chunked_requests(urls, 100) response_size = sum(len(r.value) for r in response_futures)
return response_size
Create a new Greenlet
object and schedule it to run function(*args, **kwargs)
.
greenlet的源代码,代码很少,就2000行C语言的代码,其中有一部分栈寄存器的修改的代码是由汇编实现的。
一句话来讲明greenlet的实现原理:经过栈的复制切换来实现不一样协程之间的切换。
对于不支持使用 "with"语句 的 "相似文件” 的对象,使用 contextlib.closing():
import contextlib.closing
with closing(urllib.urlopen("http://www.python.org/")) as front_page: for line in front_page: print line
yield是有返回值的。
def consumer(): r = '' while True: n = yield r if not n: return print('[CONSUMER] Consuming %s...' % n) r = '200 OK' def produce(c): c.send(None) # <-- 启动生成器 n = 0 while n < 5: n = n + 1 print('[PRODUCER] Producing %s...' % n) r = c.send(n) print('[PRODUCER] Consumer return: %s' % r) c.close()
#-------------------------------------------------- c = consumer() produce(c) # 给消费者c喂消息
Ref: https://www.liaoxuefeng.com/wiki/1016959663602400/1017970488768640
(1) 从asyncio
模块中直接获取一个EventLoop
的引用,
(2) 而后把须要执行的协程扔到EventLoop
中执行,就实现了异步IO。
import threading import asyncio @asyncio.coroutine def hello(): print('Hello world! (%s)' % threading.currentThread()) yield from asyncio.sleep(1) # 当作是一个耗时的io操做 print('Hello again! (%s)' % threading.currentThread()) loop = asyncio.get_event_loop() # (1) 获取一个EventLoop引用 tasks = [hello(), hello()] loop.run_until_complete(asyncio.wait(tasks)) # (2) 将携程扔到EventLoop中去执行 loop.close()
writer.drain():这是一个与底层IO输入缓冲区交互的流量控制方法。当缓冲区达到上限时,drain()
阻塞,待到缓冲区回落到下限时,写操做能够被恢复。当不须要等待时,drain()
会当即返回。
#%% import asyncio @asyncio.coroutine def wget(host): print('wget %s...' % host)
# (1) 首先,得到socket双向管道 connect = asyncio.open_connection(host, 80) reader, writer = yield from connect
# (2) 发送request要网页内容 header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host writer.write(header.encode('utf-8')) yield from writer.drain()
# (3) 得到网页内容 while True: line = yield from reader.readline() if line == b'\r\n': break print('%s header > %s' % (host, line.decode('utf-8').rstrip())) # Ignore the body, close the socket writer.close()
loop = asyncio.get_event_loop() tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']] loop.run_until_complete(asyncio.wait(tasks)) loop.close()
总结下来就是主要作了两件事:
(1) @asyncio.coroutine
(2) yield from:不但愿堵塞的地方
换个写法,看上去干净一些。
import threading import asyncio async def hello(): print('Hello world! (%s)' % threading.currentThread()) await asyncio.sleep(1) # 当作是一个耗时的io操做 print('Hello again! (%s)' % threading.currentThread()) loop = asyncio.get_event_loop() # (1) 获取一个EventLoop引用 tasks = [hello(), hello()] loop.run_until_complete(asyncio.wait(tasks)) # (2) 将协程扔到EventLoop中去执行 loop.close()
如今是把asyncio放在了服务器端!
asyncio
能够实现单线程并发IO操做。若是仅用在客户端,发挥的威力不大。若是把asyncio
用在服务器端,例如Web服务器,因为HTTP链接就是IO操做,所以能够用单线程+coroutine
实现多用户的高并发支持。
# server code
import asyncio from aiohttp import web async def index(request): await asyncio.sleep(0.5) return web.Response(body=b'<h1>Index</h1>') async def hello(request): await asyncio.sleep(0.5) text = '<h1>hello, %s!</h1>' % request.match_info['name'] return web.Response(body=text.encode('utf-8')) async def init(loop): app = web.Application(loop=loop) app.router.add_route('GET', '/', index) app.router.add_route('GET', '/hello/{name}', hello)
srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000) print('Server started at http://127.0.0.1:8000...') return srv
loop = asyncio.get_event_loop() loop.run_until_complete(init(loop)) loop.run_forever()
文章不错,详见连接。
值得注意的一点是:最大并发限制的设置。
semaphore = asyncio.Semaphore(500) # 限制并发量为500
End.