Python并发实践_02_经过yield实现协程

python中实现并发的方式有不少种,经过多进程并发能够真正利用多核资源,而多线程并发则实现了进程内资源的共享,然而Python中因为GIL的存在,多线程是没有办法真正实现多核资源的。python

对于计算密集型程序,应该使用多进程并发充分利用多核资源,而在IO密集型程序中,多核优点并不明显,甚至因为大多数时间都是在IO堵塞状态,多进程的切换消耗反而让程序效率更加低下。多线程

而当须要并发处理IO密集型任务时,就须要用到协程(Coroutine)。协程并无系统级的调度,而是用户级的调度方式,避免了系统调用的开销,虽然协程最终是串行工做,可是却能够实现很是大的并发量。经过多进程+协程的方式,能够有效均衡多核计算和请求等待。并发

参考文章:函数

https://blog.tonyseek.com/post/event-manage-with-greenlet/post

producer-consumer

利用yield生成器,能够简单展示协程的工做方式:spa

import time
def consumer():
    print "Ready to receive"
    while True:
        y = (yield )
        time.sleep(1)
        print "Receive %s from producer”%y
def producer():
    c = consumer()
    c.next()
    i = 1
    while i > 0 and i < 11:
        time.sleep(1)
        print "Send %s to consumer"%i
        c.send(i)
        i += 1
if __name__ == '__main__':
    producer()线程

上述过程展现了基本的生产者-消费者模型,消费者consumer是一个生成器;code

当第一次在producer中调用c.next()时,激活consumer,而且运行到yield时协程(consumer)被挂起,等待生成器被调用next或者send。协程

producer进行后续操做,并进入一个循环,每次暂停1s后,向生成器send一个消息,消费者yield获取到该消息,并进行后续的工做。blog

能够看到,每次yield都须要等待send传入的消息以后才会继续执行以后的任务。

经过yield实现协程

如今要来用yield真正建立一个协程了。

能够想象这样一个模型,一个工地里有不少类似的任务(jobs),而且会源源不断产生这些任务,工地里有一个工头(foreman)负责,工头为了分配任务给工人(worker),会制定一套流程(pipeline)来方便管理:分配工人,验收工做(accept),因为工人工做(work)的时间远远大于分配任务的时间,将这些工人的工做(简单枯燥的重复劳动)当作IO操做的话,这就是一个IO密集型的任务。下面看看python是如何经过yield来实现协程完成真个工做的:

 

 1 def main():
 2     foreman(args_of_overall,worker_num)
 3 
 4 def foreman(args_of_overall,worker_num):
 5     pipeline = create_pipeline(args_of_pipeline,worker_num)
 6     for i,job in enumerate(get_jobs(args_of_ceate_jobs)):
 7         worker_id  = i % worker_num
 8         pipeline.send((job,worker_id))
 9 
10 @coroutine
11 def worker(pipeline,accepting,job,my_id):
12     while True:
13         args_of_job, worker_id = (yield )
14         if worker_id == my_id:
15             result = work(args_of_job)
16             accepting.send(result)
17         elif pipeline is not None:
18             pipeline.send((job,worker_id))
19 
20 @coroutine
21 def accept():
22     while True:
23         result = (yield )
24         #do_some_accepting
25 
26 def create_pipeline(args_of_pipeline,worker_num):
27     pipeline = None
28     accepting = accept()
29     for work_id in range(work_num):
30         pipeline = worker(pipeline,accepting,job,work_id)
31     return pipeline
32 
33 def get_jobs(args_of_ceate_jobs):
34     for job in job_source:
35         yield job
36 
37 def coroutine(func):
38     def warper(*args):
39         f = func(*args)
40         f.next()
41         return f
42     return warper
43 
44 def work(args_of_job):
45     pass
46     #do_some_work
47 
48 if __name__ == '__main__':
49     main()

 上述过程当中,工人和验收工做都是协程,而get_jobs()函数是一个生成器,当job是动态添加时,就能够改写成一个协程。

上述全部的工做都是串行完成,虽然有不少工人,工人之间的工做是并发的(IO等待时间),可是工做一直是从第一个开始一个一个分配任务。

相关文章
相关标签/搜索