同步:不一样程序单元为了完成某个任务,在执行过程当中需靠某种通讯方式以协调一致,称这些程序单元是同步执行的。html
例如购物系统中更新商品库存,须要用“行锁”做为通讯信号,让不一样的更新请求强制排队顺序执行,那更新库存的操做是同步的。python
简言之,同步意味着有序。git
阻塞:程序未获得所需计算资源时被挂起的状态。程序员
程序在等待某个操做完成期间,自身没法继续干别的事情,则称该程序在该操做上是阻塞的。github
常见的阻塞形式有:网络I/O阻塞、磁盘I/O阻塞、用户输入阻塞等。web
在一个程序内,依次执行10次太耗时,那开10个同样的程序同时执行不就好了。因而咱们想到了多进程编程。为何会先想到多进程呢?发展脉络如此。在更早的操做系统(Linux 2.4)及其之前,进程是 OS 调度任务的实体,是面向进程设计的OS.算法
改善效果立竿见影。但仍然有问题。整体耗时并无缩减到原来的十分之一,而是九分之一左右,还有一些时间耗到哪里去了?进程切换开销。编程
进程切换开销不止像“CPU的时间观”所列的“上下文切换”那么低。CPU从一个进程切换到另外一个进程,须要把旧进程运行时的寄存器状态、内存状态所有保存好,再将另外一个进程以前保存的数据恢复。对CPU来说,几个小时就干等着。当进程数量大于CPU核心数量时,进程切换是必然须要的。
除了切换开销,多进程还有另外的缺点。通常的服务器在可以稳定运行的前提下,能够同时处理的进程数在数十个到数百个规模。若是进程数量规模更大,系统运行将不稳定,并且可用内存资源每每也会不足。
多进程解决方案在面临天天须要成百上千万次下载任务的爬虫系统,或者须要同时搞定数万并发的电商系统来讲,并不适合。
除了切换开销大,以及可支持的任务规模小以外,多进程还有其余缺点,如状态共享等问题,后文会有说起,此处再也不细究。服务器
因为线程的数据结构比进程更轻量级,同一个进程能够容纳多个线程,从进程到线程的优化由此展开。后来的OS也把调度单位由进程转为线程,进程只做为线程的容器,用于管理进程所需的资源。并且OS级别的线程是能够被分配到不一样的CPU核心同时运行的。网络
结果符合预期,比多进程耗时要少些。从运行时间上看,多线程彷佛已经解决了切换开销大的问题。并且可支持的任务数量规模,也变成了数百个到数千个。
可是,多线程仍有问题,特别是Python里的多线程。首先,Python中的多线程由于GIL的存在,它们并不能利用CPU多核优点,一个Python进程中,只容许有一个线程处于运行状态。那为何结果仍是如预期,耗时缩减到了十分之一?
由于在作阻塞的系统调用时,例如sock.connect(),sock.recv()时,当前线程会释放GIL,让别的线程有执行机会。可是单个线程内,在阻塞调用上仍是阻塞的。
小提示:Python中 time.sleep 是阻塞的,都知道使用它要谨慎,但在多线程编程中,time.sleep 并不会阻塞其余线程。
除了GIL以外,全部的多线程还有通病。它们是被OS调度,调度策略是抢占式的,以保证同等优先级的线程都有均等的执行机会,那带来的问题是:并不知道下一时刻是哪一个线程被运行,也不知道它正要执行的代码是什么。因此就可能存在竞态条件。
例如爬虫工做线程从任务队列拿待抓取URL的时候,若是多个爬虫线程同时来取,那这个任务到底该给谁?那就须要用到“锁”或“同步队列”来保证下载任务不会被重复执行。
并且线程支持的多任务规模,在数百到数千的数量规模。在大规模的高频网络交互系统中,仍然有些吃力。固然,多线程最主要的问题仍是竞态条件。
def nonblocking_way(): sock = socket.socket() sock.setblocking(False) try: sock.connect(('example.com', 80)) except BlockingIOError: # 非阻塞链接过程当中也会抛出异常 pass request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n' data = request.encode('ascii') # 不知道socket什么时候就绪,因此不断尝试发送 while True: try: sock.send(data) # 直到send不抛异常,则发送完成 break except OSError: pass response = b'' while True: try: chunk = sock.recv(4096) while chunk: response += chunk chunk = sock.recv(4096) break except OSError: pass return response
首先注意到两点,就感受被骗了。一是耗时与同步阻塞至关,二是代码更复杂。要非阻塞何用?且慢。
上第9行代码sock.setblocking(False)告诉OS,让socket上阻塞调用都改成非阻塞的方式。以前咱们说到,非阻塞就是在作一件事的时候,不阻碍调用它的程序作别的事情。上述代码在执行完 sock.connect() 和 sock.recv() 后的确再也不阻塞,能够继续往下执行请求准备的代码或者是执行下一次读取。
代码变得更复杂也是上述缘由所致。第11行要放在try语句内,是由于socket在发送非阻塞链接请求过程当中,系统底层也会抛出异常。connect()被调用以后,当即能够往下执行第15和16行的代码。
须要while循环不断尝试 send(),是由于connect()已经非阻塞,在send()之时并不知道 socket 的链接是否就绪,只有不断尝试,尝试成功为止,即发送数据成功了。recv()调用也是同理。
虽然 connect() 和 recv() 再也不阻塞主程序,空出来的时间段CPU没有空闲着,但并无利用好这空闲去作其余有意义的事情,而是在循环尝试读写 socket (不停判断非阻塞调用的状态是否就绪)。还得处理来自底层的可忽略的异常。也不能同时处理多个 socket 。
2 . 回调((Callback))
把I/O事件的等待和监放任务交给了 OS,那 OS 在知道I/O状态发生改变后(例如socket链接已创建成功可发送数据),它又怎么知道接下来该干吗呢?只能回调。
须要咱们将发送数据与读取数据封装成独立的函数,让epoll代替应用程序监听socket状态时,得告诉epoll:“若是socket状态变为能够往里写数据(链接创建成功了),请调用HTTP请求发送函数。若是socket 变为能够读数据了(客户端已收到响应),请调用响应处理函数。”
首先,不断尝试send() 和 recv() 的两个循环被消灭掉了。
其次,导入了selectors模块,并建立了一个DefaultSelector 实例。Python标准库提供的selectors模块是对底层select/poll/epoll/kqueue的封装。DefaultSelector类会根据 OS 环境自动选择最佳的模块,那在 Linux 2.5.44 及更新的版本上都是epoll了。
而后,在第25行和第31行分别注册了socket可写事件(EVENT_WRITE)和可读事件(EVENT_READ)发生后应该采起的回调函数。
虽然代码结构清晰了,阻塞操做也交给OS去等待和通知了,可是,咱们要抓取10个不一样页面,就得建立10个Crawler实例,就有20个事件将要发生,那如何从selector里获取当前正发生的事件,而且获得对应的回调函数去执行呢?
def consumer(): r = '' while True: n = yield r if not n: return print('[CONSUMER] Consuming %s...' % n) r = '200 OK' def produce(c): c.send(None) n = 0 while n < 5: n = n + 1 print('[PRODUCER] Producing %s...' % n) r = c.send(n) print('[PRODUCER] Consumer return: %s' % r) c.close() c = consumer() produce(c)
demo解析:
注意到consumer函数是一个generator,把一个consumer传入produce后:
首先调用c.send(None)启动生成器;
而后,一旦生产了东西,经过c.send(n)切换到consumer执行;
consumer经过yield拿到消息,处理,又经过yield把结果传回;
produce拿到consumer处理的结果,继续生产下一条消息;
produce决定不生产了,经过c.close()关闭consumer,整个过程结束
在 Python 中调用协程对象1的 send() 方法时,第一次调用必须使用参数 None, 这使得协程的使用变得十分麻烦
解决此问题:
借助 Python 自身的特性来避免这一问题,好比,建立一个装饰器
def routine(func): def start(*args, **kwargs): cr = func(*args, **kwargs) cr.send(None) return cr return start @routine def product(): pass
yield from 是Python 3.3 新引入的语法(PEP 380)。它主要解决的就是在生成器里玩生成器不方便的问题。它有两大主要功能。
第一个功能是:让嵌套生成器没必要经过循环迭代yield,而是直接yield from。如下两种在生成器里玩子生成器的方式是等价的。
def gen_one(): subgen = range(10) yield from subgendef gen_two(): subgen = range(10) for item in subgen: yield item
第二个功能就是在子生成器和原生成器的调用者之间打开双向通道,二者能够直接通讯。
def gen(): yield from subgen()def subgen(): while True: x = yield yield x+1def main(): g = gen() next(g) # 驱动生成器g开始执行到第一个 yield retval = g.send(1) # 看似向生成器 gen() 发送数据 print(retval) # 返回2 g.throw(StopIteration) # 看似向gen()抛入异常
用yield from改进基于生成器的协程,代码抽象程度更高。使业务逻辑相关的代码更精简。因为其双向通道功能可让协程之间为所欲为传递数据,使Python异步编程的协程解决方案大大向前迈进了一步。
因而Python语言开发者们充分利用yield from,使 Guido 主导的Python异步编程框架Tulip迅速脱胎换骨,并火烧眉毛得让它在 Python 3.4 中换了个名字asyncio以“实习生”角色出如今标准库中。
asyncio是Python 3.4 试验性引入的异步I/O框架(PEP 3156),提供了基于协程作异步I/O编写单线程并发代码的基础设施。其核心组件有事件循环(Event Loop)、协程(Coroutine)、任务(Task)、将来对象(Future)以及其余一些扩充和辅助性质的模块。
在引入asyncio的时候,还提供了一个装饰器@asyncio.coroutine用于装饰使用了yield from的函数,以标记其为协程。但并不强制使用这个装饰器。
import threading import asyncio @asyncio.coroutine def hello(): print('Hello world! (%s)' % threading.currentThread()) yield from asyncio.sleep(1) print('Hello again! (%s)' % threading.currentThread()) loop = asyncio.get_event_loop() tasks = [hello(), hello()] loop.run_until_complete(asyncio.wait(tasks)) loop.close()
@asyncio.coroutine把一个generator标记为coroutine类型,而后,咱们就把这个coroutine扔到EventLoop中执行。
hello()会首先打印出Hello world!,而后,yield from语法可让咱们方便地调用另外一个generator。因为asyncio.sleep()也是一个coroutine,因此线程不会等待asyncio.sleep(),而是直接中断并执行下一个消息循环。当asyncio.sleep()返回时,线程就能够从yield from拿到返回值(此处是None),而后接着执行下一行语句。
把asyncio.sleep(1)当作是一个耗时1秒的IO操做,在此期间,主线程并未等待,而是去执行EventLoop中其余能够执行的coroutine了,所以能够实现并发执行。
import asyncio import time now = lambda : time.time() async def do_some_work(x): print('Waiting: ', x) start = now() coroutine = do_some_work(2) loop = asyncio.get_event_loop() # task = asyncio.ensure_future(coroutine) # 方式一 task = loop.create_task(coroutine) # 方式二 print(task) loop.run_until_complete(task) print(task) print('TIME: ', now() - start)
建立task后,task在加入事件循环以前是pending状态,加入loop后运行中是running状态,loop调用完是Done,运行完是finished状态,虽然说本质上协程函数和task指的东西都同样,可是task有了协程函数的状态。
其中loop.run_until_complete()接受一个future参数,futurn具体指代一个协程函数,而task是future的子类,因此咱们不声明一个task直接传入协程函数也能执行。
import asyncio async def test(x): return x+3 def callback(y): print(y.result()) coroutine = test(5) loop = asyncio.get_event_loop() task = loop.create_task(coroutine) task <Task pending coro=<test() running at <ipython-input-4-61142fef17d8>:1>> task.add_done_callback(callback) loop.run_until_complete(task)
import asyncio import time async def test(1): time.sleep(1) print(time.time()) tasks = [asyncio.ensure_future(test()) for _ in range(3)] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) 1547187398.7611663 1547187399.7611988 1547187400.7632194
上面执行并非异步执行,而是顺序执行,可是改为下面形式那就是异步执行:
import asyncio import time async def test(t): await asyncio.sleep(1) print(time.time()) tasks = [asyncio.ensure_future(test()) for _ in range(3)] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) 1547187398.7611663 1547187399.7611988 1547187400.7632194
用asyncio提供的@asyncio.coroutine能够把一个generator标记为coroutine类型,而后在coroutine内部用yield from调用另外一个coroutine实现异步操做。
为了简化并更好地标识异步IO,从Python 3.5开始引入了新的语法async和await,可让coroutine的代码更简洁易读。
请注意,async和await是针对coroutine的新语法,要使用新的语法,只须要作两步简单的替换:
把@asyncio.coroutine替换为async;
把yield from替换为await。
async def hello(): print("Hello world!") r = await asyncio.sleep(1) print("Hello again!")
asyncio能够实现单线程并发IO操做。若是仅用在客户端,发挥的威力不大。若是把asyncio用在服务器端,例如Web服务器,因为HTTP链接就是IO操做,所以能够用单线程+coroutine实现多用户的高并发支持。
asyncio实现了TCP、UDP、SSL等协议,aiohttp则是基于asyncio实现的HTTP框架。
import asyncio from aiohttp import web async def index(request): await asyncio.sleep(0.5) return web.Response(body=b'<h1>Index</h1>') async def hello(request): await asyncio.sleep(0.5) text = '<h1>hello, %s!</h1>' % request.match_info['name'] return web.Response(body=text.encode('utf-8')) async def init(loop): app = web.Application(loop=loop) app.router.add_route('GET', '/', index) app.router.add_route('GET', '/hello/{name}', hello) srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000) print('Server started at http://127.0.0.1:8000...') return srv loop = asyncio.get_event_loop() # 建立一个事件循环(池) loop.run_until_complete(init(loop)) # 将协程对象包装并注册协程对象 loop.run_forever()
方法1:
asyncio、aiohttp须要配合aiomultiprocess
方法2:
gevent.pool import Pool
multiprocessing import Process
核心代码
def main(): file_list = ["7001", "7002", "7003"] p_lst = [] # 线程列表 for i in file_list: # self.run(i) p = Process(target=read_file, args=(i,)) # 子进程调用函数 p.start() # 启动子进程 p_lst.append(p) # 将全部进程写入列表中 def read_file(self, number): """ 读取文件 :param number: 文件标记 :return: """ file_name = os.path.join(self.BASE_DIR, "data", "%s.txt" % number) # print(file_name) self.write_log(number, "开始读取文件 {}".format(file_name),"green") with open(file_name, encoding='utf-8') as f: # 使用协程池,执行任务。语法: pool.map(func,iterator) # partial使用偏函数传递参数 # 注意:has_null第一个参数,必须是迭代器遍历的值 pool.map(partial(self.has_null, number=number), f)
使用loop.run_until_complete(syncio.wait(tasks)) 也可使用 loop.run_until_complete(asyncio.gather(*tasks)) ,前者传入task列表,会对task进行解包操做。
async def get(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: print(response) print(time.time()) import time async def request(): url = "http://www.baidu.com" resulit = await get(url) tasks = [asyncio.ensure_future(request()) for _ in range(10000)] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
async def get(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: print(response) print(time.time()) async def request(): url = "http://www.baidu.com" tasks = [asyncio.ensure_future(url) for _ in range(1000)] 方式一: dones, pendings = await asyncio.wait(tasks) # 返回future对象,不返回直接结果 for task in dones: print('Task ret: ', task.result()) 方式二: results = await asyncio.gather(*tasks) # 直接返回结果 方式三: for task in asyncio.as_completed(tasks): result = await task print('Task ret: {}'.format(result)) # 迭代方式返回结果 tasks = asyncio.ensure_future(request()) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
实现结束task有两种方式:关闭单个task、关闭loop,涉及主要函数: asyncio.Task.all_tasks()获取事件循环任务列表 KeyboardInterrupt捕获中止异常(Ctrl+C) loop.stop()中止任务循环 task.cancel()取消单个任务 loop.run_forever() loop.close()关闭事件循环,否则会重启
python程序实现的一种单线程下的多任务执行调度器,简单来讲在一个线程里,前后执行AB两个任务,可是当A遇到耗时操做(网络等待、文件读写等),这个时候gevent会让A继续执行,可是同时也会开始执行B任务,若是B在遇到耗时操做同时A又执行完了耗时操做,gevent又继续执行A。
import gevent def test(time): print(1) gevent.sleep(time) print(2) def test2(time): print(3) gevent.sleep(time) print(4) if __name__ == '__main__': gevent.joinall([ gevent.spawn(test, 2), gevent.spawn(test2, 3) ])
借鉴文章:
https://mp.weixin.qq.com/s/GgamzHPyZuSg45LoJKsofA
https://rgb-24bit.github.io/blog/2019/python-coroutine-event-loop.html
https://zhuanlan.zhihu.com/p/54657754
https://cloud.tencent.com/developer/article/1590280