Python-asyncio

一、asyncio 多线程

  3.4版本加入标准库并发

  asyncio 底层基于selectors 实现,看似库,其实就是一个框架,包含异步IO,事件循环,协程,任务等待等内容。   框架

二、问题引出异步

def a():
    for x in range(3):
        print(x)

def b():
    for x in 'abc':
        print(x)
a()
b()

# 运行结果
0
1
2
a
b
c

 

这是一个串行的程序。async

def a():
    for x in range(3):
        print(x)
        yield

def b():
    for x in 'abc':
        print(x)
        yield

x = a()
y = b()
for i in range(3):
    next(x)
    next(y)

 

三、事件循环:tcp

  事件循环是asyncio 提供的核心运行机制ide

  

四、协程函数

  • 协程不是进程,也不是线程,它是用户空间调度的完成并发处理的方式
  • 进程,线程由操做系统完成调度,而协程是线程内完成调度。它不须要更多的线程,天然也没有多线程切换带来的开销
  • 协程是非抢占式调度,只有一个协程主动让出控制权,另外一个协程才会被调度
  • 协程不须要使用锁机制,由于在同一个线程中执行。
  • 多CPU下,能够使用多进程和协程配合,既能进程并发,又能发挥协程在单线程中的 优点
  • Python中协程是基于生成器的。

五、协程的使用oop

  3.4引入asyncio ,使用装饰器测试

  asyncio.sleep(0.001):也是一个coroutine,是一个生成器函数,yield值
 1 import asyncio
 2 
 3 @asyncio.coroutine
 4 def sleep(x): #  协程函数
 5     for i in range(3):
 6         print('sleep {}'.format(i))
 7         yield from asyncio.sleep(x)
 8 
 9 loop = asyncio.get_event_loop()
10 loop.run_until_complete(sleep(3)) # 将sleep(3) 封装成Task对象执行
11 loop.close()
12 print('===================')

    结果:每一秒打印一个,最终打印 ========

1 sleep 0
2 sleep 1
3 sleep 2
4 ===================

 

  将生成器函数,转换为协程函数,就能够在时间循环中执行了。

  测试:

 1 import asyncio
 2 
 3 @asyncio.coroutine
 4 def sleep(x):
 5     for i in range(3):
 6         print('sleeP {}'.format(i))
 7         yield from asyncio.sleep(x)
 8 
 9 loop = asyncio.get_event_loop()
10 
11 #本身封装 task 对象
12 task = loop.create_task(sleep(3))
13 print(1, task)
14 loop.run_until_complete(task)
15 print(2, task)
16 loop.close()
17 print('======== end =======')

  结果:

1 1 <Task pending coro=<sleep() running at E:/code_pycharm/tt10.py:23>>
2 sleeP 0
3 sleeP 1
4 sleeP 2
5 2 <Task finished coro=<sleep() done, defined at E:/code_pycharm/tt10.py:23> result=None>
6 ======== end =======

 

  测试:添加回调函数,知道运行完,返回结果(异步非阻塞)

 1 import asyncio
 2 
 3 @asyncio.coroutine
 4 def sleep(x):
 5     for i in range(3):
 6         print('sleeP {}'.format(i))
 7         yield from asyncio.sleep(0.001)
 8     # 给一个result
 9     return 2000
10 
11 def cb(future): # 回调函数
12     print(4, future,'===')
13     print(5, future.result())
14 
15 loop = asyncio.get_event_loop()
16 
17 #本身封装 task 对象
18 task = loop.create_task(sleep(3))
19 task.add_done_callback(cb)# 注册了一个回调函数
20 print(1, task)
21 loop.run_until_complete(task)
22 print(2, task)
23 print(3, task.result()) # 获取结果
24 loop.close()
25 print('======== end =======')

 

  结果:打印2 以前,先执行了回调函数,且获得最终结果以前,一直在运行

1 1 <Task pending coro=<sleep() running at E:/code_pycharm/tt10.py:42> cb=[cb() at E:/code_pycharm/tt10.py:50]>
2 sleeP 0
3 sleeP 1
4 sleeP 2
5 4 <Task finished coro=<sleep() done, defined at E:/code_pycharm/tt10.py:42> result=2000> ===
6 5 2000
7 2 <Task finished coro=<sleep() done, defined at E:/code_pycharm/tt10.py:42> result=2000>
8 3 2000
9 ======== end =======

 

 

  测试:多任务:

 1 import asyncio
 2 
 3 @asyncio.coroutine
 4 def sleep(x):
 5     for i in range(3):
 6         print('sleeP {}'.format(i))
 7         yield from asyncio.sleep(0.001)
 8     # 给一个result
 9     return 2000
10 
11 @asyncio.coroutine
12 def b():
13     for x in 'abc':
14         print(x)
15         yield from asyncio.sleep(0.001)
16 
17 
18 def cb(future): # 回调函数
19     print(4, future,'===')
20     print(5, future.result())
21 
22 loop = asyncio.get_event_loop()
23 
24 #本身封装 task 对象
25 task = loop.create_task(sleep(3))
26 task.add_done_callback(cb)# 注册了一个回调函数
27 print(1, task)
28 # 固定套路,多任务
29 tasks = [task, b()] 30 ret = loop.run_until_complete(asyncio.wait(tasks)) 31 
32 print(2, task)
33 print(3, task.result()) # 获取结果
34 print(6, ret)
35 loop.close()
36 print('======== end =======')

  结果:

 1 1 <Task pending coro=<sleep() running at E:/code_pycharm/tt10.py:42> cb=[cb() at E:/code_pycharm/tt10.py:57]>
 2 sleeP 0 3 a 4 sleeP 1 5 b 6 sleeP 2 7 c  8 4 <Task finished coro=<sleep() done, defined at E:/code_pycharm/tt10.py:42> result=2000> ===
 9 5 2000
10 2 <Task finished coro=<sleep() done, defined at E:/code_pycharm/tt10.py:42> result=2000>
11 3 2000
12 6 ({<Task finished coro=<sleep() done, defined at E:/code_pycharm/tt10.py:42> result=2000>, <Task finished coro=<b() done, defined at E:/code_pycharm/tt10.py:50> result=None>}, set()) 13 ======== end =======

 

  能够看出,返回一个元组,把以前的任务都会放在里边

    因此获取每一个任务的result的方式:

      一、将任务封装为task,经过回调函数,或者,直接调用result()

      二、经过任务列表返回的结果,遍历获取        

print(6, ret[0])
for i in ret[0]:
    print(i.result())

 

  3.5版本以后,Python提供关键字async,await,在语言上原生支持协程

 1 import asyncio
 2 
 3 async def sleep(x):
 4     for i in range(3):
 5         print('sleeP {}'.format(i))
 6         await asyncio.sleep(0.001)
 7     # 给一个result
 8     return 2000
 9 
10 async def b():
11     for x in 'abc':
12         print(x)
13         await asyncio.sleep(0.001)
14 
15 
16 def cb(future): # 回调函数
17     print(4, future,'===')
18     print(5, future.result())
19 
20 loop = asyncio.get_event_loop()
21 
22 #本身封装 task 对象
23 task = loop.create_task(sleep(3))
24 task.add_done_callback(cb)# 注册了一个回调函数
25 print(1, task)
26 
27 tasks = [task, b()]
28 ret = loop.run_until_complete(asyncio.wait(tasks))
29 
30 print(2, task)
31 print(3, task.result()) # 获取结果
32 print(6, ret[0])
33 for i in ret[0]:
34     print(i.result())
35 loop.close()
36 print('======== end =======')

 

  async def 用来定义协程函数,iscoroutinefunction() 返回True,协程函数中能够不包含await,async关键字,但不能使用yield 关键字

  如同生成器函数调用返生成器对象同样,协程函数调用 也会返回一个对象称为协程对象,iscoroutine()返回True。

  await语句以后是awaitable对象,能够是协程或者实现了__await__()方法的对象,await会暂停当前协程执行,使用loop调度其余协程。

 

  tcp  ECho server:

 1 import asyncio
 2 
 3 async def handle(reader:asyncio.StreamReader, writer:asyncio.StreamWriter):
 4     while True:
 5         data = await reader.read(1024)
 6         print(dir(reader))
 7         print(dir(writer))
 8         client = writer.get_extra_info('peername')
 9         message = '{} your msg {}'.format(client, data.decode()).encode()
10         writer.write(message)
11         await writer.drain() # 注意不是flush 方法
12 loop = asyncio.get_event_loop()
13 ip = '127.0.0.1'
14 port = 9999
15 crt = asyncio.start_server(handle, ip, port, loop=loop)
16 
17 server = loop.run_until_complete(crt)
18 print(server)
19 try:
20     print('=========')
21     loop.run_forever()
22 except KeyboardInterrupt:
23     pass
24 finally:
25     server.close()
26     loop.run_until_complete(server.wait_closed())
27     loop.close()
View Code

 

 

六、aiohttp库(异步的)

pip install aiohttp

文档:https://aiohttp.readthedocs.io/en/stable/

http server

http client

相关文章
相关标签/搜索
本站公众号
   欢迎关注本站公众号,获取更多信息