异步编程 101:asyncio 进阶上篇

写在前面:编程

asyncio 初学者可能较难理解,能够结合我前面的几篇文章一块儿食用:微信

0x01 本文简介

原视频为 PyCon2019上的一场技术分享,做者是 Spotify的工程师, 经过一个案例, 介绍了 asyncio 的一些 best practice。并发

0x02 初始化 setup

concurrently publish messages

并发地 publish message:注意图中高亮的那一段,这里用的不是await queue.put(msg),这是由于await会阻塞while循环(参考前面的一篇文章:异步编程 101:asyncio中的 for 循环),也就是说,要等queue.put(msg)完成了,下一趟才会开始。而asyncio.create_task()会立刻 "fire" 而且当即返回,你能够理解为fire and forget machinism异步

若是你没时间看异步编程 101:asyncio中的 for 循环,我简要回顾一下:await作的事情是把当前的协程挂起,把控制权交给事件循环,以便于事件循环有其余协程能够调度时,接着运行其余协程。可是对于执行await的这个协程而言,它是被阻塞的。这个例子中,publish()中的while循环是一个总体。async

concurrently consume messages

这里使用msg = await queue.get()是make sense 的,由于你得先获得 message 而后才能接着作其余事情。然后面的restart_host则用create_task,由于咱们不想对他await(等待它完成)而阻塞了整个 while Ture循环。ide

concurrent work

收到 message 以后,除了restart_host()以外,咱们可能还须要作一些其余的任务,好比持久化保存message异步编程

这只须要在consume方法里面再添加一个create_task()函数

block when needed

然而有时候咱们是但愿异步任务可以serial执行的。若是要把restart_host()的逻辑改一下:先获取上次重启时间,而后判断上次重启时间是否是大于7天,若是是,再 restart_host()。这里的last_restart_date()restart_host()是有明确前后顺序的。post

可是咱们又不想这里的线性执行影响后面的 message 获取,很简单,只要把这个逻辑封装成一个协程,而后create_task()就行。性能

0x03 cleanup

须要对message ack,这样producer才不会从新发送。因此如今处理消息的逻辑以下:

须要保证:saverestart_host所有完成以前,才能cleanup

使用await是可以 work 的,可是性能确定不够。

因此asyncio.gather()就派上用场了:这里把save()restart_host()两个协程交给gather,并传给它一个callback,等两个任务所有完成以后调用callback函数,也就是cleanup()

若是不想用 callback,也能够直接await gather,这样的 code 更加 clean:

最后把程序跑一下,图中不一样颜色表示的是同一个 message:

  • 获取 message 是没有阻塞的
  • saverestart_host所有结束以后,才ack message。

0x04 graceful shutdowns

publishconsume组合起来,获得最后的main()

0x05 总结一下

  1. Asynchronous != concurrent

不是说你在原来代码的基础上加上asyncawait就能得到并发性能的,极可能你的异步代码仍是 sequantial 的。

  1. Serial != blocking

一些有前后顺序的操做,不意味着就必定是要 block 的。好比先作A 在作 B,在等待 A 完成的过程当中,我能够抽出时间作 C。放到本文的例子来讲,我须要先saverestart_host才能cleanup,但我能够在等待saverestart_host的时候继续作其余事情:把控制权交给主事件循环,接着运行其余协程。

  1. 回顾一下知识点
  • await的做用是什么?
  • asyncio.create_task()
  • asyncio.gather()

若是你像我同样真正热爱计算机科学,喜欢研究底层逻辑,欢迎关注个人微信公众号:

相关文章
相关标签/搜索