异步编程学习

此文已转:https://www.jianshu.com/p/6606d1a44340

原文章来源于:
https://mp.weixin.qq.com/s?__biz=MzIxMjY5NTE0MA==&mid=2247483720&idx=1&sn=f016c06ddd17765fd50b705fed64429c


原文章写得很精彩,但有些代码仍是能够优化下的。并且这文章一直只有上篇,惋惜了。

接下来按我的看法,从代码角度解析下这篇文章:

前提知识讲解:
一、计算机资源:常分为CPU资源、内存资源、硬盘资源和网络资源
二、进程阻塞:正在运行的程序,因为自身某个模块须要使用硬盘或网络I/O资源等,而系统又未及时响应,致使进程处于待机状态,直至等待事件做出回应后才会被唤醒。
三、进程非阻塞:同理,在获取某些资源时,不会等待结果响应,而是继续处理其余模块。
咱们以socket为例,以下可获取阻塞与非阻塞两种编程
import socket
sock = socket.socket()

socket.setblocking(True)
# 默认就是阻塞。即套接字 创建链接/发送请求/接受请求 的时候,是阻塞的。
socket.setblocking(False) # 设置为非阻塞,即上述请求过程不会阻塞,而是继续处理其余模块。

 

使用原生asyncio编写异步程序:python

在此代码中,咱们须要注意几个关键点git

一、loop = asyncio.get_event_loop()  # 开启事件循环,异步"任务"将在此循环执行github

二、asyncio.create_task()  # 将一个协程包装成一个"任务"排入日程准备执行编程

三、asyncio.gather()  # 同步执行"任务"服务器

import asyncio
import aiohttp
import time

loop = asyncio.get_event_loop()

async def fetch():
    async with aiohttp.ClientSession(loop=loop) as session:
        async with session.get('http://www.baidu.com') as response:
            print(await response.read())

async def multi_fetch():
    await asyncio.gather(*[asyncio.create_task(fetch()) for _ in range(10)])

if __name__ == '__main__':
    start = time.time()
    loop.run_until_complete(fetch())  # 执行一次
    # loop.run_until_complete(multi_fetch())  # 执行十次
    print(time.time() - start)

 

接下来咱们就来一步一步的实现上述几个关键点,实现手写本身的异步程序微信

首先得实现一个阻塞程序,以socket为例。此例子比较简单,大体看一下便可。网络

用时的话,咱们能够明显看出blocking_socket() 用时大体0.7ssession

而multi_blocking_socket() 用时大体0.7s,恰好是10倍左右。异步

import socket
import time

def blocking_socket(response=b''):
    sock = socket.socket()  # 默认为阻塞链接
    sock.connect(('www.baidu.com', 80))  # 创建百度TCP链接
    sock.send(b'GET / HTTP/1.0\r\n\r\n')  # 发送HTTP协议
    chunk = sock.recv(1024)  # 接收数据
    while chunk:
        response += chunk
        chunk = sock.recv(1024)
    return response

def mutil_blocking_socket():
    return [blocking_socket() for _ in range(10)].__len__()

if __name__ == '__main__':
    start = time.time()
    blocking_socket()  # 执行一次
    # mutil_blocking_socket()  # 执行十次
    print(time.time() - start)

 

接着咱们来实现非阻塞程序,仍然使用socket来实现socket

来比较时间,咱们会惊讶的发现,no_blocking_socket() 执行一次大体0.07s

而multi_no_blocking_socket() 执行十次,大体是0.7s....

没错,花费时间和阻塞编程是同样的。咱们的非阻塞编程并无达到实际的效果。

import socket
import time

def no_blocking_socket(response=b''):
    sock = socket.socket()
    sock.setblocking(False)  # 设置非阻塞链接
    try:
        sock.connect(('www.baidu.com', 80))
    except BlockingIOError:  # 非阻塞式创建链接在此处会报错,捕获忽略便可
        pass

    while True:
        try:
            sock.send(b'GET / HTTP/1.0\r\n\r\n')  # 发送HTTP协议
            break
        except OSError:  # 此处会报错。由于套接字与百度服务器的TCP链接尚未创建,
# 可是套接字倒是非阻塞的,故在未创建链接的状况下发送协议会报错,此处捕获此异常
continue while True: try: chunk = sock.recv(1024) while chunk: response += chunk chunk = sock.recv(1024) return response except OSError: # 同理,对方服务器未接收HTTP协议,不会返回数据,即便返回,也有时延。故此时接收数据会报异常,此处捕获此异常 pass def multi_no_blocking_socket(): return [no_blocking_socket() for _ in range(10)].__len__() if __name__ == '__main__': start = time.time() no_blocking_socket() # 执行一次 # multi_no_blocking_socket() # 执行十次 print(time.time() - start)

咱们来分析一下,为何会形成这种现象。咱们发现这套非阻塞代码与以前的阻塞代码相比,

多了三处 try: ... except: ...

多了两处 while True: ...

代码确实是非阻塞编程,也就是程序不会在网络IO模块处阻塞,可是程序也没有把空闲下来的时间花在"正确"的地方

在非阻塞的状况下,CUP把空闲下来的时间不停的去"试错",也就是程序会不停的尝试发送协议,直到发送成功。不停的尝试接收数据,直到接收成功。

这样就致使咱们的非阻塞编程,实际上和阻塞编程是同样的,惟一要说不一样。就是非阻塞程序中的CPU可能会 "忙一点"。

 

那么咱们能够注意到了问题的关键,就是咱们不知道何时程序是 "准备就绪了",也就是何时能够发送协议,何时能够接收数据。

其实操做系统已经帮咱们实现了,它将事件的 I/O 都封装成了事件,如可读事件,可写事件。那么咱们就能够当即想到,当咱们的套接字状态变为咱们所须要的时候,

就当即执行接下来的步骤。因此此处,咱们就须要 "回调"!

这里咱们说明下python中selectors模块,用于注册事件的回调

from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE

sock = socket.socket()  # 获取套接字
selector
= DefaultSelector() # 获取selector对象 selector.register(sock.fileno(), EVENT_WRITE, on_send) # fileno()获取当前socket套接字的文件描述符,并绑定事件EVENT_WRITE,回调函数为on_send selector.unregister(sock.fileno()) # 注销事件绑定 selector.register(sock.fileno(), EVENT_READ, on_recv) # 同理,绑定事件EVENT_READ,回调函数为on_recv

while True:
  events = selector.select() # 此处的events是操做系统返回的事件,也就是咱们绑定的事件被触发了,
# 此处是阻塞获取的。也就是用一个事件循环的阻塞,来代替咱们的while True: ...
  for sock, mask in events:
    sock.data() #
sock.data为绑定的回调函数,也就是上面的on_send和on_recv

如此咱们就不须要本身手动while True来监控事件状态的改变,将这件工做交给事件循环。此处咱们将其命名为loop。

 

接下来就是回调编程了

import socket
import time

from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE

selector = DefaultSelector()
stop_loop = 10

class Crawler:
    def __init__(self, flag=10):
        self.flag = flag
        self.sock = None
        self.response = b''

    def fetch(self):
        self.sock = socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect(('www.baidu.com', 80))
        except BlockingIOError:
            pass
    # fileno()获取当前socket套接字的文件描述符,并绑定事件回调
    selector.register(self.sock.fileno(), EVENT_WRITE, self.on_send)
    def on_send(self):
        selector.unregister(self.sock.fileno())
        self.sock.send(b'GET / HTTP/1.0\r\n\r\n')
        selector.register(self.sock.fileno(), EVENT_READ, self.on_recv)

    def on_recv(self):
        chunk = self.sock.recv(1024)
        if chunk:
            self.response += chunk
        else:
            global stop_loop
            stop_loop -= self.flag
            selector.unregister(self.sock.fileno())

def loop():  # 事件循环,由操做系统返回那个事件发生了,对应执行那些事件的回调。
    while stop_loop:
        events = selector.select()
        for sock, mask in events:
            sock.data()

if __name__ == '__main__':
    start = time.time()
    Crawler(10).fetch()  # 执行一次
    # [Crawler(1).fetch() for _ in range(10)]  # 执行十次
    loop()
    print(time.time() - start)

咱们来捋一下代码执行的流程:

一、首先实例化一个Crawler对象,而后执行此实例的fetch方法

二、fetch方法发起了与百度服务器的链接,而后注册了回调函数。

三、此时会走到loop()函数来,执行事件循环。直到咱们与对方服务器的链接创建成功,则此时OS会返回事件,咱们则执行对应的回调事件 sock.data()  <=> self.on_send()

四、执行on_recv,首先注销上一个事件,而后发送协议,再接着注册可读事件。继续进入等待

五、此时继续进入loop事件循环,直到触发注册事件,执行回调函数on_recv,因为一次只接收1024,故可能会接收屡次,也就是会触发屡次on_recv事件的回调,直接接收完成。

六、接收完成,咱们令全局变量stop_loop - flag,来中止loop事件循环。程序结束。

而后咱们来看下时间,咱们会发现执行一次的时间和执行十次的时间基本是差很少的。说明咱们编写的程序是没有问题的。

回调式异步编程,成功!

 

至此,咱们已经实现了回调式异步编程,可是咱们思考下第一个例子,是基于协程的异步编程,故咱们如今来调整代码,编写协程。

import socket
import time

from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE

selector = DefaultSelector()
stop_loop = 10

class Future:
    # 用于存放将来可能出现的数据,当出现时执行一次回调函数
    # 此中的result仅做为一个中转,实际仍是经过回调返回给协程

    def __init__(self):
        self.result = None
        self.callback = None

    def set_callback(self, func):
        self.callback = func

    def set_result(self, result):
        self.result = result
        self.callback(self) if self.callback else None

class Task:
    # 用于启动协程,该类实例初始化时传入为协程对象,执行self.process方法
    # 调用协程的send方法,启动协程,并最后绑定回调函数

    def __init__(self, co_routine):
        self.co_routine = co_routine
        future = Future()
        self.process(future)

    def process(self, future):
        try:
            next_future = self.co_routine.send(future.result)
        except StopIteration:
            return
        next_future.set_callback(self.process)

class Crawler:
    def __init__(self, flag=10):
        self.flag = flag
        self.sock = None
        self.response = b''

    def fetch(self):
        self.sock = socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect(('www.baidu.com', 80))
        except BlockingIOError:
            pass

        future = Future()

        def _on_send():
            future.set_result(None)

        def _on_recv():
            future.set_result(self.sock.recv(1024))

        selector.register(self.sock.fileno(), EVENT_WRITE, _on_send)
        yield future
        selector.unregister(self.sock.fileno())
        self.sock.send(b'GET / HTTP/1.0\r\n\r\n')
        selector.register(self.sock.fileno(), EVENT_READ, _on_recv)
        while True:
            chunk = yield future  # 在此处轮询EVENT_READ事件,直至全部数据加载完毕
            if chunk:
                self.response += chunk
            else:
                global stop_loop
                stop_loop -= self.flag
                return self.response

def loop():
    while stop_loop:
        events = selector.select()
        for sock, mask in events:
            sock.data()

if __name__ == '__main__':
    start = time.time()
    Task(Crawler(10).fetch())  # 传入协程fetch,使用Task实例化调用协程的send方法来启动协程
    # [Task(Crawler(1).fetch()) for _ in range(10)]  # 同理,启动十个协程任务
    loop()
    print(time.time() - start)

咱们继续来整理下代码执行的流程:

一、首先实例化一些Crawler对象,调用调用此实例的fetch函数获得一个协程。此时协程是没有执行的。(协程须要send等触发才会执行)

二、将此协程装于Task用来建立任务实例,在任务中会主动触发协程的send函数来启动协程

三、此时协程已触发,注册事件 selector.register(self.sock.fileno(), EVENT_WRITE, _on_send),而后返回future,此时协程到此暂停

四、返回的future会添加任务的回调函数,也就是self.precess()。而loop也开始了事件轮询,当套接字的文件描述符状态变为可写状态时,触发回调方法_on_send

五、_on_send方法执行Future中的set_result方法,此时在此方法中会调用一次future注册的回调函数,继续触发任务Task中协程的send方法,回到协程上次暂停的状态

六、回来后,首先注销事件EVENT_WRITE。发送HTTP协议请求。再注册事件 selector.register(self.sock.fileno(), EVENT_READ, _on_recv)

七、loop事件轮询,当套接字的文件描述符变为可读状态时,触发回调方法_on_recv

八、_on_recv方法执行Future中的set_result方法,此时在方法中会初始化result为sock.recv(1024)的值,并执行注册的回调函数,将此结果继续传递至协程上回暂停的地方

九、因为一直没有注销事件EVENT_READ,故会一直驱动事件轮询直至结束

十、Task、Future、Crawler、loop这四个就这么神奇的串联在一块儿了,难以想象的说。

 

不过这样写貌似不台好看,虽然感受也能够,可是不少模块其实都是能够拆离开的

下面就是一个拆分版本,就不细细的分析流程啦

import socket
import time

from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE

selector = DefaultSelector()
stop_loop = 10

def fetch(sock):
    sock.setblocking(False)
    try:
        sock.connect(('www.baidu.com', 80))
    except BlockingIOError:
        pass

    future = Future()

    def _on_send():
        future.set_result(None)

    selector.register(sock.fileno(), EVENT_WRITE, _on_send)
    yield from future
    selector.unregister(sock.fileno())
    return future

def read(sock, future, flag, response=b''):
    def _on_recv():
        future.set_result(sock.recv(1024))

    selector.register(sock.fileno(), EVENT_READ, _on_recv)
    chunk = yield from future
    while chunk:
        response += chunk
        chunk = yield from future
    selector.unregister(sock.fileno())
    global stop_loop
    stop_loop -= flag
    return response


def loop():
    while stop_loop:
        events = selector.select()
        for sock, mask in events:
            sock.data()

class Future:
    def __init__(self):
        self.result = None
        self.callback = None

    def set_callback(self, func):
        self.callback = func

    def set_result(self, result):
        self.result = result
        self.callback(self) if self.callback else None

    def __iter__(self):
        yield self
        return self.result

class Task:
    def __init__(self, co_routine):
        self.co_routine = co_routine
        future = Future()
        self.process(future)

    def process(self, future):
        try:
            next_future = self.co_routine.send(future.result)
        except StopIteration:
            return
        next_future.set_callback(self.process)

class Crawler:
    def __init__(self, flag):
        self.flag = flag

    def fetch(self):
        sock = socket.socket()
        future = yield from fetch(sock)
        sock.send(b'GET / HTTP/1.0\r\n\r\n')
        response = yield from read(sock, future, self.flag)
        print(response)

if __name__ == '__main__':
    start = time.time()
    Task(Crawler(10).fetch())
    # [Task(Crawler(1).fetch()) for _ in range(10)]
    loop()
    print(time.time() - start)

代码放到github上了
https://github.com/CzaOrz/ioco/tree/master/open_source_project/%E5%BC%82%E6%AD%A5%E6%95%99%E7%A8%8B%E5%AD%A6%E4%B9%A0/%E5%BC%82%E6%AD%A5%E5%8D%8F%E7%A8%8B
相关文章
相关标签/搜索