当单线程性能不足时,咱们一般会使用多线程/多进程去加速运行。而这些代码每每多得使人绝望,须要考虑:git
- 如何建立线程执行的函数?
- 如何收集结果?若但愿结果从子线程返回主线程,则还要使用队列
- 如何取消执行? 直接kill掉全部线程?信号如何传递?
- 是否须要线程池? 不然反复建立线程的成本太高了
不只如此,若改成多进程或协程,代码还要继续修改。若多处使用并行,则这些代码还会重复不少遍,很是痛苦。github
因而,咱们考虑将并行的全部逻辑封装到一个模块以内,向外部提供像串行执行同样的编程体验,还能完全解决上面所述的疑难问题。全部代码不足180行。编程
GitHub地址:多线程
使用时很是简洁:异步
def xprint(x): time.sleep(1) # mock a long time task yield x*x i=0 for item in multi_yield(xrange(100)),xprint, process_mode,3: i+=1 print(item) if i>10: break
上面的代码会使用三个进程,并行地打印1-10的平方。当打印完10以后,进程自动回收释放。就像串行程序同样简单。async
1. 先实现串行任务
咱们一般会将任务分割为不少个子块,从而方便并行。所以能够将任务抽象为生成器。相似下面的操做,每一个seed都是任务的种子。编程语言
def get_generator(): for seed in 100: yield seed
任务自己的定义,则能够经过一个接受种子的函数来实现:分布式
def worker(seed): # some long time task return seed*seed # just example
那么实现串行任务就像这样:函数式编程
for seed in get_generator(n): print worker(seed)
进一步地,能够将其抽象为下面的函数:
def serial_yield(genenator,worker): for seed in generator(): yield worker(seed)
该函数经过传入生成器函数(generator)和任务的定义(worker函数),便可再返回一个生成器。消费时:
for result in serial_yield(your_genenator, your_worker): print(result)
咱们看到,经过定义高阶函数,serial_yield就像map函数,对seed进行加工后输出。
2. 定义并行任务
考虑以下场景: boss负责分发任务到任务队列,多个worker从任务队列捞数据,处理完以后,再写入结果队列。主线程从结果队列中取结果便可。
咱们定义以下几种执行模式:
- async: 异步/多协程
- thread: 多线程
- process: 多进程
使用Python建立worker的代码以下,func是任务的定义(是个函数)
def factory(func, args=None, name='task'): if args is None: args = () if mode == process_mode: return multiprocessing.Process(name=name, target=func, args=args) if mode == thread_mode: import threading t = threading.Thread(name=name, target=func, args=args) t.daemon = True return t if mode == async_mode: import gevent return gevent.spawn(func, *args)
建立队列的代码以下,注意seeds多是无穷流,所以须要限定队列的长度,当入队列发现队列已满时,则任务须要阻塞。
def queue_factory(size): if mode == process_mode: return multiprocessing.Queue(size) elif mode == thread_mode: return Queue(size) elif mode == async_mode: from gevent import queue return queue.Queue(size)
何时任务能够终止? 咱们罗列以下几种状况:
- 全部的seed都已经被消费完了
- 外部传入告终束请求
对第一种状况,咱们让boss在seed消费完以后,在队列里放入多个Empty标志,worker收到Empty以后,就会自动退出,下面是boss的实现逻辑:
def _boss(task_generator, task_queue, worker_count): for task in task_generator: task_queue.put(task) for i in range(worker_count): task_queue.put(Empty) print('worker boss finished')
再定义worker的逻辑:
def _worker(task_queue, result_queue, gene_func): import time try: while not stop_wrapper.is_stop(): if task_queue.empty(): time.sleep(0.01) continue task = task.get() if task == Empty: result_queue.put(Empty) break if task == Stop: break for item in gene_func(task): result_queue.put(item) print ('worker worker is stop') except Exception as e: logging.exception(e) print ('worker exception, quit')
简单吧?可是这样会有问题,这个后面再说,咱们把剩余的代码写完。
再定义multi_yield的主要代码。 代码很是好理解,建立任务和结果队列,再建立boss和worker线程(或进程/协程)并启动,以后不停地从结果队列里取数据就能够了。
def multi_yield(customer_func, mode=thread_mode, worker_count=1, generator=None, queue_size=10): workers = [] result_queue = queue_factory(queue_size) task_queue = queue_factory(queue_size) main = factory(_boss, args=(generator, task_queue, worker_count), name='_boss') for process_id in range(0, worker_count): name = 'worker_%s' % (process_id) p = factory(_worker, args=(task_queue, result_queue, customer_func), name=name) workers.append(p) main.start() for r in workers: r.start() count = 0 while not should_stop(): data = result_queue.get() if data is Empty: count += 1 if count == worker_count: break continue if data is Stop: break else: yield data
这样从外部消费时,便可:
def xprint(x): time.sleep(1) yield x i=0 for item in multi_yield(xprint, process_mode,3,xrange(100)): i+=1 print(item) if i>10: break
这样咱们就实现了一个与serial_yield
功能相似的multi_yield
。能够定义多个worker,从队列中领任务,而不需重复地建立和销毁,更不须要线程池。固然,代码不彻底,运行时可能出问题。但以上代码已经说明了核心的功能。完整的代码能够在文末找到。
可是你也会发现很严重的问题:
- 当从外部break时,内部的线程并不会自动中止
- 咱们没法判断队列的长度,若队列满,那么put操做会永远卡死在那里,任务都不会结束。
3. 改进任务中止逻辑
最开始想到的,是经过在multi_yield
函数参数中添加一个返回bool的函数,这样当外部break时,同时将该函数的返回值置为True,内部检测到该标志位后强制退出。伪代码以下:
_stop=False def can_stop(): return _stop for item in multi_yield(xprint, process_mode,3,xrange(100),can_stop): i+=1 print(item) if i>10: _stop=True break
但这样并不优雅,引入了更多的函数做为参数,还必须手工控制变量值,很是繁琐。在多进程模式下,stop标志位还如何解决?
咱们但愿外部在循环时执行了break后,会自动通知内部的生成器。实现方法彷佛就是with语句,即contextmanager.
咱们实现如下的包装类:
class Yielder(object): def __init__(self, dispose): self.dispose = dispose def __enter__(self): pass def __exit__(self, exc_type, exc_val, exc_tb): self.dispose()
它实现了with的原语,参数是dispose函数,做用是退出with代码块后的回收逻辑。
因为值类型的标志位没法在多进程环境中传递,咱们再建立StopWrapper类,用于管理中止标志和回收资源:
class Stop_Wrapper(): def __init__(self): self.stop_flag = False self.workers=[] def is_stop(self): return self.stop_flag def stop(self): self.stop_flag = True for process in self.workers: if isinstance(process,multiprocessing.Process): process.terminate()
最后的问题是,如何解决队列满或空时,put/get的无限等待问题呢?考虑包装一下put/get:包装在while True
之中,每隔两秒get/put,这样即便阻塞时,也能保证能够检查退出标志位。全部线程在主线程结束后,最迟也能在2s内自动退出。
def safe_queue_get(queue, is_stop_func=None, timeout=2): while True: if is_stop_func is not None and is_stop_func(): return Stop try: data = queue.get(timeout=timeout) return data except: continue def safe_queue_put(queue, item, is_stop_func=None, timeout=2): while True: if is_stop_func is not None and is_stop_func(): return Stop try: queue.put(item, timeout=timeout) return item except: continue
如何使用呢?咱们只需在multi_yield的yield语句以外加上一行就能够了:
with Yielder(stop_wrapper.stop): # create queue,boss,worker, then start all # ignore repeat code while not should_stop(): data = safe_queue_get(result_queue, should_stop) if data is Empty: count += 1 if count == worker_count: break continue if data is Stop: break else: yield data
仔细阅读上面的代码, 外部循环时退出循环,则会自动触发stop_wrapper的stop操做,回收所有资源,而不需经过外部的标志位传递!这样调用方在心智彻底不需有额外的负担。
实现生成器和上下文管理器的编程语言,均可以经过上述方式实现自动协程资源回收。笔者也实现了一个C#版本的,有兴趣欢迎交流。
这样,咱们就能像文章开头那样,实现并行的迭代器操做了。
4. 结语
完整代码在:
https://github.com/ferventdesert/multi_yielder/blob/master/src/multi_yielder.py
一些实现的细节颇有趣,咱们借助在函数中定义函数,能够不用复杂的类去承担职责,而仅仅只需函数。而相似的思想,在函数式编程中很是常见。
该工具已经被笔者的流式语言etlpy
所集成。可是依然有较多改进的空间,如没有集成分布式执行模式。
欢迎留言交流。