异步IO

参考: 同步/异步 阻塞/非阻塞 iterator / generator asyio/cortine asy_io
# -------------------------------->同步/异步 阻塞/非阻塞------------------------------------

#
# 在一个线程中,CPU执行代码的速度极快,然而,一旦遇到IO操做,如读写文件、发送网络数据时,就须要等待IO操做完成,才能继续进行下一步操做。这种状况称为同步IO。
#
# 使用多线程或者多进程来并发执行代码,为多个用户服务。每一个用户都会分配一个线程,若是遇到IO致使线程被挂起,其余用户的线程不受影响,可是系统不能无上限地增长线程。因为系统切换线程的开销也很大,一旦线程数量过多,CPU的时间就花在线程切换上了,结果致使性能严重降低。
#
# loop = get_event_loop()
# while True:
#     event = loop.get_event()
#     process_event(event)
# 当代码须要执行一个耗时的IO操做时,它只发出IO指令,并不等待IO结果,而后就去执行其余代码了。一段时间后,当IO返回结果时,再通知CPU进行处理。
#
# 当遇到IO操做时,代码只负责发出IO请求,不等待IO结果,而后直接结束本轮消息处理,进入下一轮消息处理过程。当IO操做完成后,将收到一条“IO完成”的消息,处理该消息时就能够直接获取IO操做结果。
#
# 在“发出IO请求”到收到“IO完成”的这段时间里,同步IO模型下,主线程只能挂起,但异步IO模型下,主线程并无休息,而是在消息循环中继续处理其余消息。这样,在异步IO模型下,一个线程就能够同时处理多个IO请求,而且没有切换线程的操做。
#
# 烧水:
#  1 老张把水壶放到火上,立等水开。(同步阻塞)
#  2 老张把水壶放到火上,去客厅看电视,时不时去厨房看看水开没有。(同步非阻塞)
#  3 老张把水开了会响的水壶放到火上,立等水开。(异步阻塞)
#  4 老张把水开了会响的水壶放到火上,去客厅看电视,水壶响以前再也不去看它了,响了再去拿壶。(异步非阻塞)
#
#  所谓同步异步,只是对于水壶而言:普通水壶,同步,能没有结束前,一直等结果;响水壶,异步,不须要知道该功能结果,该功能有结果后通知(回调通知)。 响水壶能够在本身完工以后,提示老张水开了。 普通水壶同步只能让调用者去轮询本身。
#  所谓阻塞非阻塞,仅仅对于老张而言。 立等的老张,阻塞,(函数)没有接收完数据或者没有获得结果以前,不会返回;看电视的老张,非阻塞,(函数)当即返回,经过select通知调用者。

# -------------------------------->COROUTINE-------------------------------------------

# 子程序,或者称为函数,在全部语言中都是层级调用,好比A调用B,B在执行过程当中又调用了C,C执行完毕返回,B执行完毕返回,最后是A执行完毕。
# 因此子程序调用是经过栈实现的,一个线程就是执行一个子程序。
# 子程序调用老是一个入口,一次返回,调用顺序是明确的。而协程的调用和子程序不一样。
# 协程看上去也是子程序,但执行过程当中,在子程序内部可中断,而后转而执行别的子程序,在适当的时候再返回来接着执行。有点相似CPU的中断.

# 执行有点像多线程,但协程的特色在因而一个线程执行
# 协程极高的执行效率。子程序切换不是线程切换,而是由程序自身控制,没有线程切换的开销。
# 不须要多线程的锁机制,由于只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只须要判断状态。
# 协程是一个线程执行,利用多核CPU:多进程+协程,既充分利用多核,又充分发挥协程的高效率。

# yield 与yield
# from:
# yield list:返回整个列表,yield from list:一次返回一个元素。
# yield from:会自动处理大量错误。
# yield from后面必须是子生成器函数
#
# # 子生成器
# def ger_0():
#   loop:
#     x = yield
#     do
#     somthing
#   return result
#
# # 委托生成器
# def ger_1():
#     result = yield from ger_0()
#
# # 调用方
# g = ger_1()
# g.send(0) -->ger_0: x = 0
# g.send(1) -->ger_0: x = 1
# g.send(2) -->ger_0: x = 2
# ger_1: result = ger_0:result

# consumer函数是一个generator
def consumer():
    print('2---------')
    r = ''
    while True:
        print('3---------')
        # 经过yield拿到消息n处理,又经过yield把结果r传回
        # 赋值语句先计算= 右边,因为右边是 yield 语句,
        # 因此yield语句执行完之后,进入暂停,而赋值语句在下一次启动生成器的时候首先被执行;
        # P:sned(None)->C:yield r=''->P:send(1)->C:n=1->C:yield r='200 OK'->P:send(2)
        n = yield r
        if not n:
            print('6---------')
            return
        print('[CONSUMER] Consuming %s...' % n)
        r = '200 OK'

def produce(c):
    print('1---------')
    # 启动生成器
    # 在一个生成器函数未启动以前,是不能传递值进去。
    # 也就是说在使用c.send(n)以前,必须先使用c.send(None)或者next(c)来返回生成器的第一个值
    c.send(None)
    n = 0
    while n < 5:
        n = n + 1
        print('4---------')
        print('[PRODUCER] Producing %s...' % n)
        # 切换到consumer执行
        r = c.send(n)
        print('5---------')
        # 拿到consumer处理的结果,继续生产下一条消息
        print('[PRODUCER] Consumer return: %s' % r)
    # 关闭consumer,整个过程结束。
    c.close()

#   1->2->3 ->  4->3->5  ->  4->3->5  ->   4->3->5
c = consumer()
produce(c)


# -------------------------------->ASYNCIO-------------------------------------------

# https://blog.csdn.net/SL_World/article/details/86597738
# asyncio的编程模型就是一个消息循环。咱们从asyncio模块中直接获取一个EventLoop的引用,而后把须要执行的协程扔到EventLoop中执行,就实现了异步IO。
# 异步操做须要在coroutine中经过yield from完成;
# 多个coroutine能够封装成一组Task而后并发执行。
import threading
import asyncio
# 把一个generator标记为coroutine类型
@asyncio.coroutine
def hello():
    print('Hello world! (%s)' % threading.currentThread())
    # yield from语法可让咱们方便地调用另外一个generator
    # asyncio.sleep()也是一个coroutine,因此线程不会等待asyncio.sleep(),而是直接中断并执行下一个消息循环。
    # 当asyncio.sleep()返回时,线程就能够从yield from拿到返回值(此处是None),而后接着执行下一行语句。
    yield from asyncio.sleep(1)
    print('Hello again! (%s)' % threading.currentThread())

loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
# 两个coroutine是由同一个线程并发执行的。
# 直到循环事件的全部事件都处理完才能完整结束。
loop.run_until_complete(asyncio.wait(tasks))
loop.close()


# 引入了新的语法async和await,可让coroutine的代码更简洁易读
# @asyncio.coroutine替换为async;
# 把yield from替换为await。

import threading
import asyncio

async def hello():
    print('Hello world! (%s)' % threading.currentThread())
    await asyncio.sleep(1)
    print('Hello again! (%s)' % threading.currentThread())

loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()


# -------------------------------->调用方-子生成器-委托生成器<-------------------------------


import time
import asyncio

async def taskIO_1():
    print('开始运行IO任务1...')
    await asyncio.sleep(3)
    print('IO任务1已完成,耗时3s')
    return taskIO_1.__name__

async def taskIO_2():
    print('开始运行IO任务2...')
    await asyncio.sleep(2)
    print('IO任务2已完成,耗时2s')
    return taskIO_2.__name__

# 调用方
async def main():
    # 把全部任务添加到task中
    tasks = [taskIO_1(), taskIO_2()]

    # 返回已经完成的任务,完成一个返回一个
    for completed_task in asyncio.as_completed(tasks):
        # 子生成器
        resualt = await completed_task
        print('协程无序返回值:'+resualt)

    # # done:已经完成的任务,pending:未完成任务
    # # 等待任务所有完成才返回
    # done, pending = await asyncio.wait(tasks)
    # for r in done:
    #     print('协程无序返回值:' + r.result())

if __name__ == '__main__':
    start = time.time()
    # 建立一个事件循环对象loop
    loop = asyncio.get_event_loop()
    try:
        # 完成事件循环,直到最后一个任务结束
        loop.run_until_complete(main())
    finally:
        # 结束事件循环
        loop.close()
    print('全部IO任务总耗时%.5f秒' % float(time.time()-start))

# -------------------------------->AIOHTTP-------------------------------------------

# 把asyncio用在服务器端,例如Web服务器,因为HTTP链接就是IO操做,所以能够用单线程+coroutine实现多用户的高并发支持。
# asyncio实现了TCP、UDP、SSL等协议

from aiohttp import web

routes = web.RouteTableDef()

@routes.get('/')
async def index(request):
    await asyncio.sleep(2)
    return web.json_response({
        'name': 'index'
    })

@routes.get('/about')
async def about(request):
    await asyncio.sleep(0.5)
    return web.Response(text="<h1>about us</h1>")

def init():
    app = web.Application()
    app.add_routes(routes)
    web.run_app(app)

init()
相关文章
相关标签/搜索