做者:MING
我的公众号:Python编程时光
我的微信:mrbensonwonpython注:本系列已在微信公众号更新完成。查看最新文章,请关注公众号获取。git
你们好,并发编程 进入第十一章。github
前面两节,咱们讲了协程中的单任务和多任务redis
这节咱们将经过一个小实战,来对这些内容进行巩固。编程
在实战中,将会用到如下知识点:微信
在实战以前,咱们要先了解下在asyncio中如何将协程态添加到事件循环中的。这是前提。多线程
如何实现呢,有两种方法:并发
import time
import asyncio
from queue import Queue
from threading import Thread
def start_loop(loop):
# 一个在后台永远运行的事件循环
asyncio.set_event_loop(loop)
loop.run_forever()
def do_sleep(x, queue, msg=""):
time.sleep(x)
queue.put(msg)
queue = Queue()
new_loop = asyncio.new_event_loop()
# 定义一个线程,并传入一个事件循环对象
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print(time.ctime())
# 动态添加两个协程
# 这种方法,在主线程是同步的
new_loop.call_soon_threadsafe(do_sleep, 6, queue, "第一个")
new_loop.call_soon_threadsafe(do_sleep, 3, queue, "第二个")
while True:
msg = queue.get()
print("{} 协程运行完..".format(msg))
print(time.ctime())
复制代码
因为是同步的,因此总共耗时6+3=9秒.app
输出结果异步
Thu May 31 22:11:16 2018
第一个 协程运行完..
Thu May 31 22:11:22 2018
第二个 协程运行完..
Thu May 31 22:11:25 2018
复制代码
import time
import asyncio
from queue import Queue
from threading import Thread
def start_loop(loop):
# 一个在后台永远运行的事件循环
asyncio.set_event_loop(loop)
loop.run_forever()
async def do_sleep(x, queue, msg=""):
await asyncio.sleep(x)
queue.put(msg)
queue = Queue()
new_loop = asyncio.new_event_loop()
# 定义一个线程,并传入一个事件循环对象
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print(time.ctime())
# 动态添加两个协程
# 这种方法,在主线程是异步的
asyncio.run_coroutine_threadsafe(do_sleep(6, queue, "第一个"), new_loop)
asyncio.run_coroutine_threadsafe(do_sleep(3, queue, "第二个"), new_loop)
while True:
msg = queue.get()
print("{} 协程运行完..".format(msg))
print(time.ctime())
复制代码
输出结果
因为是同步的,因此总共耗时max(6, 3)=6
秒
Thu May 31 22:23:35 2018
第二个 协程运行完..
Thu May 31 22:23:38 2018
第一个 协程运行完..
Thu May 31 22:23:41 2018
复制代码
对于并发任务,一般是用生成消费模型,对队列的处理可使用相似master-worker的方式,master主要用户获取队列的msg,worker用户处理消息。
为了简单起见,而且协程更适合单线程的方式,咱们的主线程用来监听队列,子线程用于处理队列。这里使用redis的队列。主线程中有一个是无限循环,用户消费队列。
先安装Redis
到 https://github.com/MicrosoftArchive/redis/releases 下载
而后,在当前路径运行cmd,运行redis的服务端。
一切准备就绪以后,咱们就能够运行咱们的代码了。
import time
import redis
import asyncio
from queue import Queue
from threading import Thread
def start_loop(loop):
# 一个在后台永远运行的事件循环
asyncio.set_event_loop(loop)
loop.run_forever()
async def do_sleep(x, queue):
await asyncio.sleep(x)
queue.put("ok")
def get_redis():
connection_pool = redis.ConnectionPool(host='127.0.0.1', db=0)
return redis.Redis(connection_pool=connection_pool)
def consumer():
while True:
task = rcon.rpop("queue")
if not task:
time.sleep(1)
continue
asyncio.run_coroutine_threadsafe(do_sleep(int(task), queue), new_loop)
if __name__ == '__main__':
print(time.ctime())
new_loop = asyncio.new_event_loop()
# 定义一个线程,运行一个事件循环对象,用于实时接收新任务
loop_thread = Thread(target=start_loop, args=(new_loop,))
loop_thread.setDaemon(True)
loop_thread.start()
# 建立redis链接
rcon = get_redis()
queue = Queue()
# 子线程:用于消费队列消息,并实时往事件对象容器中添加新任务
consumer_thread = Thread(target=consumer)
consumer_thread.setDaemon(True)
consumer_thread.start()
while True:
msg = queue.get()
print("协程运行完..")
print("当前时间:", time.ctime())
复制代码
稍微讲下代码
loop_thread
:单独的线程,运行着一个事件对象容器,用于实时接收新任务。consumer_thread
:单独的线程,实时接收来自Redis的消息队列,并实时往事件对象容器中添加新任务。
输出结果
Thu May 31 23:42:48 2018
协程运行完..
当前时间: Thu May 31 23:42:49 2018
协程运行完..
当前时间: Thu May 31 23:42:51 2018
协程运行完..
当前时间: Thu May 31 23:42:53 2018
复制代码
咱们在Redis,分别发起了5s,3s,1s的任务。
从结果来看,这三个任务,确实是并发执行的,1s的任务最早结束,三个任务完成总耗时5s
运行后,程序是一直运行在后台的,咱们每一次在Redis中输入新值,都会触发新任务的执行。。
好了,通过这个实战内容,你应该对asyncio的实际应用有了一个更深入的认识了,至此,你已经可使用asyncio来实现任务的并发。快去体验一下。若是有什么疑问,请在后台加我微信与我联系。。