世界是复杂的,每一种思想都是为了解决某些现实问题而简化成的模型,想解决就得先面对,面对就须要选择角度,角度决定了模型的质量, 喜欢此UP主汤质看本质的哲学科普,其中简洁又不失细节的介绍了人类解决问题的思路,以及由概念搭建的思惟模型对人类解决问题的重要性与限制.也认识到学习的本质就是: 认识获取(了解概念) -> 知识学习(创建模型) -> 技能训练(实践)html
阅读也好, 学习也好, 妨碍咱们「理解」的障碍主要有两个:java
也就是说 概念明确 + 关系明确, 才能构成「模型」, 对照「现象」, 造成「理解」。python
在理解编程知识时能够关键概括为两点:理解核心概念群+使用场景思考与故事化讲述react
这里特别推荐码农翻身中大话编程式的科普:linux
码农翻身整年文章精华git
并发思想的一些探寻并发之痛 Thread, Goroutine, Actor中有较好的总结:
陈力就列, 不能者止 能干活的代码片断就放在线程里, 若是干不了活(须要等待, 被阻塞等), 就摘下来。通俗的说就是不要占着茅坑不拉屎, 若是拉不出来, 须要酝酿下, 先把茅坑让出来, 由于茅坑是稀缺资源。程序员
要作到这点通常有两种方案:github
典型如NodeJS, 遇到阻塞的状况, 好比网络调用, 则注册一个回调方法(其实还包括了一些上下文数据对象)给IO调度器(linux下是libev, 调度器在另外的线程里), 当前线程就被释放了, 去干别的事情了。等数据准备好, 调度器会将结果传递给回调方法而后执行, 执行其实不在原来发起请求的线程里了, 但对用户来讲无感知。但这种方式的问题就是很容易遇到callback hell, 由于全部的阻塞操做都必须异步, 不然系统就卡死了。还有就是异步的方式有点违反人类思惟习惯, 人类仍是习惯同步的方式。golang
这种方案其实和上面的方案本质上区别不大, 关键在于回调上下文的保存以及执行机制。为了解决回调方法带来的难题, 这种方案的思路是写代码的时候仍是按顺序写, 但遇到IO等阻塞调用时, 将当前的代码片断暂停, 保存上下文, 让出当前线程。等IO事件回来, 而后再找个线程让当前代码片断恢复上下文继续执行, 写代码的时候感受好像是同步的, 仿佛在同一个线程完成的, 但实际上系统可能切换了线程, 但对程序无感。算法
GreenThread
* 用户空间 首先是在用户空间, 避免内核态和用户态的切换致使的成本。 * 由语言或者框架层调度 * 更小的栈空间容许建立大量实例(百万级别)
几个概念
* Continuation 这个概念不熟悉FP编程的人可能不太熟悉, 不过这里能够简单的顾名思义, 能够理解为让咱们的程序能够暂停, 而后下次调用继续(contine)从上次暂停的地方开始的一种机制。至关于程序调用多了一种入口。 * Coroutine 是Continuation的一种实现, 通常表现为语言层面的组件或者类库。主要提供yield, resume机制。 * Fiber 和Coroutine实际上是一体两面的, 主要是从系统层面描述, 能够理解成Coroutine运行以后的东西就是Fiber。
Goroutine其实就是前面GreenThread系列解决方案的一种演进和实现。
程序员修神之路--分布式高并发下Actor模型如此优秀中说:
传统多数流行的语言并发是基于多线程之间的共享内存, 使用同步方法防止写争夺, Actors使用消息模型, 每一个Actor在同一时间处理最多一个消息, 能够发送消息给其余Actor, 保证了单独写原则。从而巧妙避免了多线程写争夺。和共享数据方式相比, 消息传递机制最大的优势就是不会产生数据竞争状态。实现消息传递有两种常见的类型:基于channel(golang为典型表明)的消息传递和基于Actor(erlang为表明)的消息传递。两者的格言都是:"Don’t communicate by sharing memory, share memory by communicating"
每一个进程各自有不一样的用户地址空间,任何一个进程的全局变量在另外一个进程中都看不到, 因此进程之间要交换数据必须经过内核,在内核中开辟一块缓冲区,进程A把数据从用户空间拷到内核缓冲区,进程B再从内核缓冲区把数据读走,内核提供的这种机制称为进程间通讯。
进程间几种通讯方式:
管道:速度慢, 容量有限, 只有父子进程能通信 FIFO:任何进程间都能通信, 但速度慢 消息队列:容量受到系统限制, 且要注意第一次读的时候, 要考虑上一次没有读完数据的问题 信号量:不能传递复杂消息, 只能用来同步 5.共享内存区:可以很容易控制容量, 速度快, 但要保持同步, 好比一个进程在写的时候, 另外一个进程要注意读写的问题, 至关于线程中的线程安全, 固然, 共享内存区一样能够用做线程间通信, 不过没这个必要, 线程间原本就已经共享了同一进程内的一块内存 Socket通讯(又名客户机服务器系统)
Python 为进程通讯提供了两种机制:
Queue:一个进程向 Queue 中放入数据, 另外一个进程从 Queue 中读取数据。如multiprocessing.Queue() Pipe:Pipe 表明链接两个进程的管道。程序在调用 Pipe() 函数时会产生两个链接端, 分别交给通讯的两个进程, 接下来进程既可从该链接端读取数据, 也可向该链接端写入数据。如multiprocessing.Pipe()
方式有不少种,其余模型中也都能在此找到影子.
CSP(communicating sequential processes)模型里消息和Channel是主体
也就是说发送方须要关心本身的消息类型以及应该写到哪一个Channel, 但不须要关心谁消费了它, 以及有多少个消费者。
Golang是本身解决的通讯问题, 从概念上就当消息队列理解, 可是技术上, golang用的不是linux的消息队列.
Actor模型是1973年提出的一个分布式并发编程模式, 在Erlang语言中获得普遍支持和应用。
Actor模型和CSP模型的区别 CSP模型和Actor模型是两门很是复古且外形接近的并发模型。但CSP与Actor有如下几点比较大的区别:
以上的铺垫应该对并发涉及到的概念有清晰的认识,也能发现这些概念都不是go或python原创的,这里有较好的总结
Go/Python/Erlang编程语言对比分析及示例 说:
Go的不少语言特性借鉴与它的三个祖先:C, Pascal和CSP。Go的语法、数据类型、控制流等继承于C, Go的包、面对对象等思想来源于Pascal分支, 而Go最大的语言特点, 基于管道通讯的协程并发模型, 则借鉴于CSP分支。
不要用共享内存来通讯, 要用通讯来共享内存大概是golang在推广中最容易被人说起的了,相似python之禅同样.
Golang调度器有三个主要数据结构。
G (goroutine) 协程, 被Golang语言自己管理的线程 举例来讲, func main() { go other() }, 这段代码建立了两个goroutine, 一个是main, 另外一个是other, 注意main自己也是一个goroutine. goroutine的新建, 休眠, 恢复, 中止都受到go运行时的管理. goroutine执行异步操做时会进入休眠状态, 待操做完成后再恢复, 无需占用系统线程, goroutine新建或恢复时会添加到运行队列, 等待M取出并运行. M (machine) 操做系统的线程, 被操做系统管理的, 原生线程 M能够运行两种代码: go代码, 即goroutine, M运行go代码须要一个P 原生代码, 例如阻塞的syscall, M运行原生代码不须要P M会从运行队列中取出G, 而后运行G, 若是G运行完毕或者进入休眠状态, 则从运行队列中取出下一个G运行, 周而复始. 有时候G须要调用一些没法避免阻塞的原生代码, 这时M会释放持有的P并进入阻塞状态, 其余M会取得这个P并继续运行队列中的G. go须要保证有足够的M能够运行G, 不让CPU闲着, 也须要保证M的数量不能过多. P (process) 调度的上下文, 运行在M上的调度器。 P是process的头文字, 表明M运行G所须要的资源. 一些讲解协程的文章把P理解为cpu核心, 其实这是错误的. 虽然P的数量默认等于cpu核心数, 但能够经过环境变量GOMAXPROC修改, 在实际运行时P跟cpu核心并没有任何关联. P也能够理解为控制go代码的并行度的机制, 若是P的数量等于1, 表明当前最多只能有一个线程(M)执行go代码, 若是P的数量等于2, 表明当前最多只能有两个线程(M)执行go代码. 执行原生代码的线程数量不受P控制. 由于同一时间只有一个线程(M)能够拥有P, P中的数据都是锁自由(lock free)的, 读写这些数据的效率会很是的高.
计算机科学领域的任何问题均可以经过增长一个间接的中间层来解决 -- G-P-M模型正是此理论践行者,此理论也用到了python的asyncio对地狱回调的处理上(使用Task+Future避免回调嵌套),是否是巧合?
其实异步≈可中断的函数+事件循环+回调,go和python都把嵌套结构转换成列表结构有点像算法中的递归转迭代.
G的状态
M的状态
M并无像G和P同样的状态标记, 但能够认为一个M有如下的状态:
P的状态
Golang 的协程本质上其实就是对 IO 事件的封装, 而且经过语言级的支持让异步的代码看上去像同步执行的同样。
能够参考这里:
本段落涉及的代码基本是对深刻理解Python异步编程(上) 的注解,以前也学习过yield,也总结了几回,
但以前都没有把事件循环联系进来,感性的知道python中的协程就是靠:"事件循环 + 回调",其中细节一直没深刻看,asyncio源码也看过几回,也是蜻蜓点水.此次偶然看到这么有系统且有示例代码辅助的文章,因此下面的东西不少都来自此文章以及对其代码的注解.
在asyncio正式转正前,就有不少人和库尝试了其余方式,如:
stackless 的通道(channel)
yield和greenlet
gevent
先了解 py3.3 -> py3.8 之间的异步方式演进,建议使用官方yield例子,在idea中debug调试运行,着重看函数中yield处中断执行后又如何被恢复,其实主要就是经过next或send让函数恢复执行.而后就是找到那些next和send以及是被怎么推进的
总结来讲,协程就是对能够中断/恢复执行的函数的调度.
题外话阅读源码的三种境界
1. 生成器 2. 用于定义上下文管理器 3. 协程 4. 配合 from 造成 yield from 用于消费子生成器并传递消息
这四种用法, 其实都源于 yield 所具备的暂停的特性, 也就说程序在运行到 yield 所在的位置 result = yield expr 时, 先执行 yield expr 将产生的值返回给调用生成器的 caller
, 而后暂停, 等待 caller 再次激活并恢复程序的执行。而根据恢复程序使用的方法不一样, yield expr 表达式的结果值 result 也会跟着变化。
若是使用 next() 来调用, 则 yield 表达式的值 result 是 None;若是使用 send() 来调用, 则 yield 表达式的值 result 是经过 send 函数传送的值。
yield from 一方面能够迭代地消耗生成器, 另外一方面则创建了一条双向通道, 把最外层的调用方与最内层的子生成器链接起来, 并自动地处理异常, 接收子生成器返回的值。
yield from 更多地被用于协程, 而 await 关键字的引入会大大减小 yield from 的使用频率。
实现yield from语法的伪代码以下:
""" _i:子生成器, 同时也是一个迭代器 _y:子生成器生产的值 _r:yield from 表达式最终的值 _s:调用方经过send()发送的值 _e:异常对象 """ #简化版 _i = iter(EXPR) try: _y = next(_i) except StopIteration as _e: _r = _e.value else: while 1: try: _s = yield _y except StopIteration as _e: _r = _e.value break RESULT = _r #完整版 _i = iter(EXPR) try: _y = next(_i) except StopIteration as _e: _r = _e.value else: while 1: try: _s = yield _y except GeneratorExit as _e: try: _m = _i.close except AttributeError: pass else: _m() raise _e except BaseException as _e: _x = sys.exc_info() try: _m = _i.throw except AttributeError: raise _e else: try: _y = _m(*_x) except StopIteration as _e: _r = _e.value break else: try: if _s is None: _y = next(_i) else: _y = _i.send(_s) except StopIteration as _e: _r = _e.value break RESULT = _r
参考 yield_to_from.py,划分一下方便理解:
一、调用方:调用委派生成器的客户端(调用方)代码 二、委托生成器:包含yield from 表达式的生成器函数 三、子生成器:yield from 后面加的生成器函数
有不清晰的地方,就在IDE中debug下,着重来看包含yield的函数之间的跳转,以及yield from存在的意义.
n = m = 5 flag = "stop" # 子生成器中止信号,此例子中是有调用者控制,也能够改写成子生成器控制,调用者检查到信号还中止迭代子生成器. """ 一、调用方:调用委派生成器的客户端(调用方)代码 二、委托生成器:包含yield from 表达式的生成器函数 三、子生成器:yield from 后面加的生成器函数 重点:yield让函数中断执行,next或send让函数恢复执行,使用debug查看各个函数间的跳转,或者直接运行,看print打印. """ def gen(): # 子生成器 print("start 子生成器") # for k in range(n): # 有限子生成器 k = "k" while True: # 无限子生成器 print("子生成器--要返回的值:", k) t = yield k # 1.运行到这里就会停下来,切换到其余地方,等待send或next触发后再今后处继续执行 2.yield功能至关于golang中的chan,可接受可发送 print("子生成器--接受到的值:", t) if t is flag: break print("end 子生成器") return "这就是result" # 生成器退出时, 生成器(或子生成器)中的return expr表达式会出发StopIteration(expr)异常抛出 def proxy_gen(): # 委托生成器--相似go-chan # 在调用方与子生成器之间创建一个双向通道,调用方能够经过send()直接发送消息给子生成器,而子生成器yield的值,也是直接返回给调用方 # while True: result = yield from gen() print("委托生成器result:", result) yield result def main1(): # 调用方1--不经过proxy_gen迭代子生成器 g = gen() # 子生成器 print(g.send(None)) print(g.send(1)) # 发送1到子生成器中 print(next(g)) try: print(g.send(flag)) # 不使用委托器 子生成器的中止信号就得手动处理 except StopIteration as e: print("StopIteration") print("子生成器return的值:", e.value) def main2(): # 调用方2--经常使用迭代 g = proxy_gen() g.send(None) # 须要先激活子生成器,不然会报错 TypeError: can't send non-None value to a just-started generator for k in range(m): print("调用方--要发送的值:", k) print("调用方--接受到的值:", g.send(k)) print("--------------------") g.send(flag) # 针对无限子生成器的中止信号 def main3(): # 调用方3--死循环 g = proxy_gen() g.send(None) # 须要先激活子生成器,不然会报错 TypeError: can't send non-None value to a just-started generator for k in g: # for调用能完整的遍历生成器,遍历的时候已经调用了__next__,至关于g.send(None) print("调用方--接受到的值:", k) print("调用方--要发送的值:", g.send("m")) print("调用方--接受到的值:", k) print("--------------------") print("*********************") main1() print("*********************") main2() print("*********************") main3() print("*********************")
包含yield语句的函数就是一个生成器对象, 调用一个生成器函数, 返回的是一个迭代器对象。迭代器Iterator表示的是一个数据流, 迭代器能够被next()函数调用并不断返回下一个数据, 直到没有数据时抛出StopIteration错误。迭代器控制生成器函数的执行, 当函数开始运行, 执行到第一个yield语句时暂停, 将yield表达式后的表达式的值返回给调用者。
在生成器函数暂停时, 其现阶段的状态都被保存下来, 包括生成器函数局部变量当前绑定的值、指令指针、函数内部执行堆栈以及任何异常状态的处理。当生成器函数再次被调用时则直接从上次暂停的yield表达式处接着运行, 直到遇到下一个yield语句, 或者没有遇到yield语句则运行结束。
须要说明的是, 在函数从新运行时, 其实上次暂停处的yield表达式会先接收一个值做为结果, 而后才接着运行直到碰到下一个yield表达式。
若是调用者使用next函数或者__next__()方法, 则默认返回给yield表达式None值;使用send()方法则传递一个值做为yield表达式的结果。
对于简单的迭代器, yield from iterable本质上等于for item in iterable: yield item的缩写版(iterable 也能够是generator),yield 和 send(next)成对出现,有点相似于go中的chan,彼此通知对方数据到位请继续执行下去
通常将yield from视为提供了一个调用者和子生成器之间的透明的双向通道。包括从子生成器获取数据以及向子生成器传送数据。
总结:
asyncio是Python 3.4 试验性引入的异步I/O框架(PEP 3156), 提供了基于协程作异步I/O编写单线程并发代码的基础设施。其核心组件有事件循环(Event Loop)、协程(Coroutine)、任务(Task)、将来对象(Future)以及其余一些扩充和辅助性质的模块。
实现原理:
事件循环+回调
有一个任务调度器(event loop), 而后能够用async def定义异步函数做为任务逻辑, 经过create_task接口把任务挂到event loop上。
event loop的运行过程应该是个不停循环的过程, 不停查看等待类别有没有能够执行的任务, 若是有的话执行任务, 直到碰到await之类的主动让出event loop的函数, 如此反复。
如果看源码的你就会发现使用yield和yield from实现协程也会用到相似EventLoop,Future,Future,Coroutine的东西,这在下面的示例部分再次看到.
对比生成器版的协程, 使用asyncio库后变化很大:
* 没有了yield 或 yield from, 而是async/await * 没有了自造的loop(), 取而代之的是asyncio.get_event_loop() * 无需本身在socket上作异步操做, 不用显式地注册和注销事件, aiohttp库已经代劳 * 没有了显式的 Future 和 Task, asyncio已封装
更少许的代码, 更优雅的设计
分别使用yield,yield from,asyncio 模拟协程,并发的爬几个url的代码.
__doc__ = '如何使用yield完成协程(简化版的asyncio)' import socket from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ selector = DefaultSelector() stopped = False host = "127.0.0.1" # 自建一个简单服务,模拟一个设置每一个请求须要等待1s才返回结果 port = 5000 urls_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'} # urls_todo = {'/'} # 在单线程内作协做式多任务调度 # 要异步,必回调 # 但为了不地狱式回调,将回调一拆为三,回调链变成了Future-Task-Coroutine # 下面的注解都是为了能方便理解Future-Task-Coroutine之间的互动以及怎么串起来的. """ 无链式调用 selector的回调里只管给future设置值, 再也不关心业务逻辑 loop 内回调callback()再也不关注是谁触发了事件,由于协程可以保存本身的状态, 知道本身的future是哪一个。也不用关心到底要设置什么值, 由于要设置什么值也是协程内安排的。 已趋近于同步代码的结构 无需程序员在多个协程之间维护状态, 例如哪一个才是本身的sock """ """ 1.建立Crawler 实例; 2.调用fetch方法, 会建立socket链接和在selector上注册可写事件; 3.fetch内并没有阻塞操做, 该方法当即返回; 4.重复上述3个步骤, 将10个不一样的下载任务都加入事件循环; 5.启动事件循环, 进入第1轮循环, 阻塞在事件监听上; 6.当某个下载任务EVENT_WRITE被触发, 回调其connected方法, 第一轮事件循环结束; 7.进入第2轮事件循环, 当某个下载任务有事件触发, 执行其回调函数;此时已经不能推测是哪一个事件发生, 由于有多是上次connected里的EVENT_READ先被触发, 也多是其余某个任务的EVENT_WRITE被触发;(此时, 原来在一个下载任务上会阻塞的那段时间被利用起来执行另外一个下载任务了) 8.循环往复, 直至全部下载任务被处理完成 9.退出事件循环, 结束整个下载程序 """ # 异步调用执行完的时候, 就把结果放在它里面。这种对象称之为将来对象。 # 暂存task执行的结果和回调 class Future: def __init__(self): self.result = None self._callbacks = [] def add_done_callback(self, fn): # 各阶段的回调 self._callbacks.append(fn) def set_result(self, result): self.result = result # 调用结果,b'http请求的结果字符' for fn in self._callbacks: # 重要,回调函数集 fn(self) # Task.step class Task: def __init__(self, coro): self.coro = coro # Crawler(url).fetch() f = Future() # f.set_result(None) # 感受这句不是很必要 self.step(f) # 预激活 def step(self, future): # 管理fetch生成器: 第一次的激活/暂停后的恢复执行/以及配合set_result循环调用 try: # send会进入到coro执行, 即fetch, 直到下次yield # next_future 为yield返回的对象,也就是下一次要调用的Future对象 next_future = self.coro.send(future.result) # __init__中的第一次step,将fetch运行到的82行的yield, # 返回EVENT_WRITE时的事件回调要用的future,而后等事件触发,由select调用on_connected,进而继续future中的回调 except StopIteration: return next_future.add_done_callback(self.step) # 这里须要重点理解,为下一次要调用的Future对象,注册下一次的step,供on_readable调用 # Coroutine yield实现的协程 class Crawler: def __init__(self, url): self.url = url self.response = b'' def fetch(self): # 函数内有了yield表达式,就是生成器了,生成器须要先调用next()迭代一次或者是先send(None)启动,遇到yield以后便暂停 sock = socket.socket() sock.setblocking(False) try: sock.connect((host, port)) except BlockingIOError: pass f = Future() # 每到一个io事件都注册一个对应的Future def on_connected(): # pass # 若没有f.set_result,会报错KeyError: '236 (FD 236) is already registered' f.set_result(None) # 必要语句,还涉及到恢复回调 selector.register(sock.fileno(), EVENT_WRITE, on_connected) # 链接io写事件 yield f # 注册完就yield出去,等待事件触发 selector.unregister(sock.fileno()) get = 'GET {0} HTTP/1.0\r\nHost: example.com\r\n\r\n'.format(self.url) # self.url 区分每一个协程 sock.send(get.encode('ascii')) global stopped while True: f = Future() def on_readable(): f.set_result(sock.recv(4096)) # 可读的状况下,读取4096个bytes暂存给Future,执行回调,使生成器继续执行下去 selector.register(sock.fileno(), EVENT_READ, on_readable) # io读事件 chunk = yield f # 返回f,并接受step中send进来的future.result值,也就是暂存的请求返回字符 selector.unregister(sock.fileno()) if chunk: self.response += chunk else: urls_todo.remove(self.url) if not urls_todo: stopped = True break print("result:", self.response) def loop(): while not stopped: # 阻塞, 直到一个事件发生 events = selector.select() for event_key, event_mask in events: # 监听事件,触发回调,推进协程运行下去 callback = event_key.data # 就是 on_connected,和 on_readable callback() if __name__ == '__main__': import time start = time.time() for url in urls_todo: crawler = Crawler(url) Task(crawler.fetch()) loop() print(time.time() - start)
__doc__ = '如何使用yield from完成协程(简化版的asyncio)' import socket from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE selector = DefaultSelector() stopped = False host = "127.0.0.1" # 自建一个简单服务,模拟一个设置每一个请求须要等待1s才返回结果 port = 5000 urls_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'} # urls_todo = {'/'} # 在单线程内作协做式多任务调度 # 要异步,必回调 # 但为了不地狱式回调,将回调一拆为三,回调链变成了Future-Task-Coroutine # 下面的注解都是为了能方便理解Future-Task-Coroutine之间的互动以及怎么串起来的. """ 无链式调用 selector的回调里只管给future设置值, 再也不关心业务逻辑 loop 内回调callback()再也不关注是谁触发了事件,由于协程可以保存本身的状态, 知道本身的future是哪一个。也不用关心到底要设置什么值, 由于要设置什么值也是协程内安排的。 已趋近于同步代码的结构 无需程序员在多个协程之间维护状态, 例如哪一个才是本身的sock """ """ 1.建立Crawler 实例; 2.调用fetch方法, 会建立socket链接和在selector上注册可写事件; 3.fetch内并没有阻塞操做, 该方法当即返回; 4.重复上述3个步骤, 将10个不一样的下载任务都加入事件循环; 5.启动事件循环, 进入第1轮循环, 阻塞在事件监听上; 6.当某个下载任务EVENT_WRITE被触发, 回调其connected方法, 第一轮事件循环结束; 7.进入第2轮事件循环, 当某个下载任务有事件触发, 执行其回调函数;此时已经不能推测是哪一个事件发生, 由于有多是上次connected里的EVENT_READ先被触发, 也多是其余某个任务的EVENT_WRITE被触发;(此时, 原来在一个下载任务上会阻塞的那段时间被利用起来执行另外一个下载任务了) 8.循环往复, 直至全部下载任务被处理完成 9.退出事件循环, 结束整个下载程序 """ # 结果保存, 每个处须要异步的地方都会调用, 保持状态和callback # 程序得知道当前所处的状态, 并且要将这个状态在不一样的回调之间延续下去。 class Future: def __init__(self): self.result = None # 重要参数1 self._callbacks = [] # 重要参数2 def add_done_callback(self, fn): # 各阶段的回调 self._callbacks.append(fn) def set_result(self, result): self.result = result # 调用结果,b'http请求的结果字符' for fn in self._callbacks: fn(self) # 执行Task.step def __iter__(self): """ yield的出现使得__iter__函数变成一个生成器, 生成器自己就有next方法, 因此不须要额外实现。 yield from x语句首先调用iter(x)获取一个迭代器(生成器也是迭代器) """ yield self # 外面使用yield from把f实例自己返回 return self.result # 在Task.step中send(result)的时候再次调用这个生成器, 可是此时会抛出stopInteration异常, 而且把self.result返回 # 激活包装的生成器, 以及在生成器yield后恢复执行继续下面的代码 class Task: def __init__(self, coro): # Crawler(url).fetch() self.coro = coro f = Future() # f.set_result(None) self.step(f) # 激活Task包裹的生成器 def step(self, future): try: # next_future = self.coro.send(future.result) next_future = self.coro.send(None) # 驱动future # next_future = future.send(None) # 这样是错误的 # __init__中的第一次step,将fetch运行到的82行的yield, # 返回EVENT_WRITE时的事件回调要用的future,而后等事件触发,由select调用on_connected,进而继续future中的回调 except StopIteration: return next_future.add_done_callback(self.step) # 这里须要重点理解,为下一次要调用的Future对象,注册下一次的step,供on_readable调用 # 异步就是能够暂定的函数, 函数间切换的调度靠事件循环,yield 正好能够中断函数运行 # Coroutine yield实现的协程 # 将yield_demo.py中的Crawler进行了拆解,并使用yield from class Crawler: def __init__(self, url): self.url = url self.response = b"" def fetch(self): # 委托生成器,参考yield_to_from.py global stopped sock = socket.socket() yield from connect(sock, (host, port)) get = "GET {0} HTTP/1.0\r\nHost:example.com\r\n\r\n".format(self.url) sock.send(get.encode('ascii')) self.response = yield from read_all(sock) print(self.response) urls_todo.remove(self.url) if not urls_todo: stopped = True # 链接事件的子协程:注册+回调 def connect(sock, address): f = Future() sock.setblocking(False) try: sock.connect(address) except BlockingIOError: pass def on_connected(): f.set_result(None) selector.register(sock.fileno(), EVENT_WRITE, on_connected) yield from f # f须要可迭代,须要新增Future.__iter__ selector.unregister(sock.fileno()) # 可读事件的子协程:注册+回调 def read(sock): f = Future() def on_readable(): f.set_result(sock.recv(4096)) selector.register(sock.fileno(), EVENT_READ, on_readable) # 注册一个文件对象以监听其IO事件; """ 此处的chunck接收的是f中return的f.result, 同时会跑出一个stopIteration的异常, 只不过被yield from处理了。 这里也可直接写成chunck = yiled f """ chunck = yield from f # f须要可迭代,须要新增Future.__iter__ selector.unregister(sock.fileno()) # 从selection中注销文件对象, 即从监听列表中移除它; 文件对象应该在关闭前注销. return chunck # 委托生成器,参考yield_to_from.py,生成器的嵌套 def read_all(sock): response = [] chunk = yield from read(sock) while chunk: response.append(chunk) chunk = yield from read(sock) # yield from来解决生成器里玩生成器的问题 result = b"".join(response) print("result:", result) # 打印下结果吧 return result # 事件驱动, 让全部以前注册的callback运行起来 def loop(): while not stopped: events = selector.select() for event_key, event_mask in events: # 监听事件,触发回调,推进协程运行下去 callback = event_key.data # data就是 on_connected,和 on_readable callback() if __name__ == "__main__": import time start = time.time() for url in urls_todo: crawler = Crawler(url) Task(crawler.fetch()) # 将各生成器和对应的callback注册到事件循环loop中, 并激活生成器 loop() print(time.time() - start)
__doc__ = "使用asyncio" import asyncio import aiohttp host = 'http://127.0.0.1:5000' urls_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'} loop = asyncio.get_event_loop() async def fetch(url): async with aiohttp.ClientSession(loop=loop) as session: async with session.get(url) as response: response = await response.read() print("result:", response) return response if __name__ == '__main__': import time start = time.time() tasks = [fetch(host + url) for url in urls_todo] loop.run_until_complete(asyncio.gather(*tasks)) print(time.time() - start)
到这里基本python的协程改进历史就说完了,下面就是对比goroutine与asyncio.
这里python协程与go协程的区别有我之前写的一个简单对比,下面的一些东西是补充和联想.
select、poll、epoll都是I/O复用的事件通知机制
它的做用是将大量的文件描述符托管给内核,内核将最底层的 I/O 状态变化封装成读写事件,这样就避免了由程序员去主动轮询状态变化的重复工做,程序员将回调函数注册到 epoll 的状态上,当检测到相对应文件描述符产生状态变化时,就进行函数回调。select/poll因为效率问题基本已被取代epoll和kqueue取代。 所谓 I/O 多路复用指的就是 select/poll/epoll 这一系列的多路选择器:支持单一线程同时监听多个文件描述符(I/O 事件),阻塞等待,并在其中某个文件描述符可读写时收到通知。 I/O 复用其实复用的不是 I/O 链接,而是复用线程,让一个 thread of control 可以处理多个链接(I/O 事件)。select 是 epoll 以前 Linux 使用的 I/O 事件驱动技术。
python的eventloop网络事件模型在asyncio库中是能够选择的(asyncio is configured to use SelectorEventLoop on Unix and ProactorEventLoop on Windows.),eventloop经过不一样平台上的事件通知机制检测事件是否可读/可写,是协程的心脏.
# 事件驱动, 让全部以前注册的callback运行起来 def loop(): while not stopped: events = selector.select() for event_key, event_mask in events: # 监听事件,触发回调,推进协程运行下去 callback = event_key.data # data就是 on_connected,和 on_readable callback()
go中的select关键词主要是检测多个channel是否ready(便是否可读或可写),避免goroutine没必要要的阻塞, 对比python中的select.select;
一个是针对channel状态检测的关键词,另外一个是针对IO事件的状态检测的函数方法.都是检查数据是否就绪,但一个是关键词一个是事件模型.
select { case v1 := <-c1: fmt.Printf("received %v from c1\n", v1) case v2 := <-c2: fmt.Printf("received %v from c2\n", v1) case c3 <- 23: fmt.Printf("sent %v to c3\n", 23) default: fmt.Printf("no one was ready to communicate\n") }
select实现逻辑在源码包src/runtime/select.go:selectgo()其伪代码以下:
func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) { //1. 锁定scase语句中全部的channel //2. 按照随机顺序检测scase中的channel是否ready // 2.1 若是case可读,则读取channel中数据,解锁全部的channel,而后返回(case index, true) // 2.2 若是case可写,则将数据写入channel,解锁全部的channel,而后返回(case index, false) // 2.3 全部case都未ready,则解锁全部的channel,而后返回(default index, false) //3. 全部case都未ready,且没有default语句 // 3.1 将当前协程加入到全部channel的等待队列 // 3.2 当将协程转入阻塞,等待被唤醒 //4. 唤醒后返回channel对应的case index // 4.1 若是是读操做,解锁全部的channel,而后返回(case index, true) // 4.2 若是是写操做,解锁全部的channel,而后返回(case index, false) }
既然同为编程语言,确定也有网络事件模型,我就猜想go对事件状态的检测中会与那些场景下用到关键词select呢? 以及当I/O 事件发生以后,模型是经过什么方式唤醒那些在I/O wait 的goroutine的?
在Go/src/net/http/server.go:Serve()中主要作的就是启动http服务监听请求,
能够看到也有用到了select,追寻源码当请求来到时就新建一个go协程去处理,处理过程主要在这里Go/src/net/http/server.go:serve()能够看到就是for循环等待数据传输过来.
再尝试追踪http.ListenAndServe,在Serve中select主要是监控请求是否完结或关闭,而在处理listen的分支中你会看到如Go/src/net/sock_posix.go
主要经过各类Syscall链接了系统底层经过for+switch+case不断检测链接IO类型,再经过/Go/src/net/dial.go:dialParallel()中的select检测对应的chan中是否知足条件
在/Go/src/net/fd_unix.go和/Go/src/net/fd_windows.go,connect函数中一样使用select检测chan,
一番走读,能够看出select是做为关键词被抽象出来,用到了go中每一处须要检查chan的地方.但仍是不能理解调用关系,再往下深追我找到了这些有用的资料:
视频:Go 原生同步网络模型解析 vs Multi-Reactors 异步网络模型
Go netpoll I/O 多路复用构建原生网络模型之源码深度解析
Go 基于 I/O multiplexing 和 goroutine 构建了一个简洁而高性能的原生网络模型(基于Go的I/O 多路复用netpoll,其底层基于epoll/kqueue/iocp这些系统调用来作封装的), 提供了 goroutine-per-connection 这样简单的网络编程模式。 在这种模式下,开发者使用的是同步的模式去编写异步的逻辑,而那些调度和上下文切换的工做转移到了 runtime 的 Go scheduler,让它来负责调度 goroutines,从而极大地下降了程序员的心智负担! 这个原生网络模型不论从适用性仍是性能上都足以知足绝大部分的应用场景。 然而,在工程性上能作到如此高的普适性和兼容性,最终暴露给开发者提供接口/模式如此简洁,其底层必然是基于很是复杂的封装,作了不少取舍,也有可能放弃了一些『极致』的设计和理念。 Go netpoll 在不一样的操做系统,其底层使用的 I/O 多路复用技术也不同,能够从 Go 源码目录结构和对应代码文件了解 Go 在不一样平台下的网络 I/O 模式的实现。 好比,在 Linux 系统下基于 epoll,freeBSD 系统下基于 kqueue,以及 Windows 系统下基于 iocp。 当 I/O 事件发生以后,netpoll 是经过什么方式唤醒那些在 I/O wait 的 goroutine 的?是经过 epoll_wait, 在 Go 源码中的 src/runtime/netpoll_epoll.go文件中有一个 func netpoll(block bool) gList 方法,它会内部调用epoll_wait获取就绪的 fd 列表, 并将每一个 fd 对应的 goroutine 添加到链表返回,go的runtime 掌握网络I/O的控制权.
Go netpoll 核心
Go netpoll 经过在底层对 epoll/kqueue/iocp 的封装,从而实现了使用同步编程模式达到异步执行的效果。 总结来讲,全部的网络操做都以网络描述符 netFD 为中心实现。netFD 与底层 PollDesc 结构绑定,当在一个 netFD 上读写遇到 EAGAIN 错误时, 就将当前 goroutine 存储到这个 netFD 对应的 PollDesc 中,同时调用 gopark 把当前 goroutine 给 park 住,直到这个 netFD 上再次发生读写事件, 才将此 goroutine 给 ready 激活从新运行。显然,在底层通知 goroutine 再次发生读写等事件的方式就是 epoll/kqueue/iocp 等事件驱动机制。
哦,原来go中为了配合本身的runtime调度方式,在经常使用的事件通知机制epoll/kqueue基础上封装了本身的netpoll,这样就能够像GPM那样调度多个协程,将多核资源利用起来.而python中的eventloop都是单线程的,主要是将回调和各类事件通知机制绑定并经过队列进行循环调度,没法使用多核资源.
题外话go也可使用eventloop:
var ch chan ElementType ch := make(chan int) ch <- value //写入 value := <-ch //读取
def step(self, future): # 管理fetch生成器: 第一次的激活/暂停后的恢复执行/以及配合set_result循环调用 try: # send会进入到coro执行, 即fetch, 直到下次yield # next_future 为yield返回的对象,也就是下一次要调用的Future对象 next_future = self.coro.send(future.result) # __init__中的第一次step,将fetch运行到的82行的yield, # 返回EVENT_WRITE时的事件回调要用的future,而后等事件触发,由select调用on_connected,进而继续future中的回调 except StopIteration: return next_future.add_done_callback(self.step) # 这里须要重点理解,为下一次要调用的Future对象,注册下一次的step,供on_readable调用 while True: f = Future() def on_readable(): f.set_result(sock.recv(4096)) # 可读的状况下,读取4096个bytes暂存给Future,执行回调,使生成器继续执行下去 selector.register(sock.fileno(), EVENT_READ, on_readable) # io读事件 chunk = yield f # 返回f,并接受step中send进来的future.result值,也就是暂存的请求返回字符 selector.unregister(sock.fileno()) if chunk: self.response += chunk else: urls_todo.remove(self.url) if not urls_todo: stopped = True break print("result:", self.response)
chunk = yield f,返回f并接受step中send进来的值,yield暂停子生成器函数的运行把cpu的使用权让出去,对比chan等待其余chan时处于等待中状态(_Gwaiting),是否是有点 chan 的味道了.
子生成器中包含多个yield和带缓存的chan,是否是也有类似呢?
python是单线程中调度多个协程,而go是多个进程中调度多个协程,感受yield和chan是有殊途同归之妙的.
维特根斯坦说「在语言中显示自身的东西, 咱们没法用语言来表示它」, 这句话不太好理解, 请容许我作一个不负责任的类比。好比计算机编程, 逻辑至关于机器语言或者汇编语言, 反正是比较底层的那种;人的语言至关于高级编程语言, 相似java和python;咱们的生活就是软件的图形界面。若是你是一个工程师, 你必定是顺着理解这件事的——机器语言必定是基础啊, 它是一切得以运做绝对前提啊。维特根斯坦会说, 幼稚!我当年也是这么想的他说, 必须倒过来理解。由于人和图形界面的交互, 才会有高级语言的各类安排, 才会有机器语言的各类运做。为何?由于人才是一切的尺度, 人这个主体和软件界面产生交互模式(人和生活), 最终决定了你那些0和1的意义(语言和逻辑)。维特根斯坦那句话的意思是, 你从图形界面的维度能解释为何这行代码要这样写, 但你在这行代码的维度解释不了它为何会被写成这样, 在人与图形界面交互的过程当中, 这段字符承载的意义远超过这段字符自己所显示的所有, 代码的意义在于使用, 「语言的意义也在于使用」, Meaning is use!
简单理解, 维特根斯坦的整个逻辑是:底层原理能解释表层现象, 但反过来却不行。表层最多能描述底层。
好比, 人性能解释商业为何是那个样子的, 但商业却不能解释人性为何是那个样子, 商业只能从它所在的侧面描述人性是什么样的, 由于商业形式就是被人性塑造的。以前, 咱们觉得代码是底层, 图形是表层。其实, 图形才是底层, 代码是表层, 这里的意思是, 生活能解释语言, 语言却只能描述生活。语言妄图解释生活, 表层妄图解释底层的结果就是哲学的出现。
这样就造就了一个可悲的事实,即人类对天然的认识永远只能无限的接近真理, 却永远没法探究到所谓的本源, 认识天然的过程其实都是在盲人摸象。
从实际出发, 不一样问题用不一样方法, 一个模型是否可靠, 看的历来不是理论或模型是否高明, 检验真理的惟一标准只有一条, 就是实践, 本身动手去尝试证明。
深刻理解Python异步编程(上) # 十分期待后续的中与下.
python3.6异步IO包asyncio部分核心源码思路梳理
以上不少内容都来自参考文档的摘抄和本身的理解,若有错误,还望指正.