并发编程的方法:python
多线程、加载子进程、设计生成器函数的技巧。编程
1、启动和中止线程安全
threading库用来在单独的线程中执行任意的Python可调用对象。服务器
from threading import Thread t = Thread(target=func, args=(10,)) t.start()
当建立一个线程实例时,在调用它的start()方法以前,线程并不会马上开始执行。网络
线程实例会在他们本身所属的系统级线程中执行,这些线程彻底由操做系统来管理。一旦启动后,线程就开始独立运行、直到目标函数返回为止。多线程
能够查询线程实例来判断它是否还在运行。闭包
if t.is_alive(): print(' Still alive') else: print('Completed')
也能够请求链接(join)到某个线程上,这么作会等待该线程结束。架构
解释器会一直保持运行,知道全部的线程都终结为止。并发
对于须要长时间运行的线程或者一直不断运行的后台任务,应该考虑将这些线程设置为daemon(守护线程)app
>>> t = Thread(target=func, args=(10,), daemon=True)
daemon线程是没法被链接的。可是,当主线程结束后他们会自动销毁掉。
若是须要终止线程,必需要可以在某个指定的点上轮询退出状态,这就须要编程实现。
import time from threading import Thread class CountdownTask: def __init__(self): self._running = True def terminate(self): self._running = False def run(self, n): while self._running and n > 0: print('T-minus', n) n -= 1 time.sleep(2) print('Threading Done') c = CountdownTask() t = Thread(target=c.run, args=(10,), ) t.start() time.sleep(5) c.terminate() t.join()
若是线程执行阻塞性的I/O操做,须要为线程加上超时循环。
class IOTask: def __init__(self): self._running = True def run(self): sock.settimeout(5) while self._running: try: data = sock.recv(1024) break except socket.timeout: continue return
因为全局解释器锁GIL的存在,Python线程的执行模型被限制为在任意时刻只容许在解释器中运行一个线程。
2、判断线程是否已经启动
线程的核心特征:可以以非肯定性的方式独立执行。(即什么时候开始执行、什么时候被打断、什么时候恢复执行彻底由操做系统来调度管理)
若是线程须要判断某个线程是否已经到达执行过程当中的某个点。根据这个判断来执行后续的操做,产生了棘手的线程同步问题。
threading库中的Event对象。
若是事件没有被设置而线程正在等待该事件event.wait(),那么线程就会被阻塞(即,进入休眠状态),直到事件被设置为止event.set()。
当线程设置了这个事件时,这会唤醒全部正在等待该事件的线程。
若是线程等待的事件已经设置了,那么线程会继续执行。
from threading import Thread,Event import time def countdown(n,event): print(' countdown starting ') event.set() while n > 0: print('T-minus', n) n -= 1 time.sleep(2) if __name__ == '__main__': started_evet = Event() t = Thread(target=countdown, args=(10,started_evet)) t.start() started_evet.wait() print(' countdown is running ')
当运行这段代码时,字符串“countdown is running”,老是会在“countdown starting”以后显示。
使用了事件来同步线程,使得主线程等待,直到countdown()函数首先打印出启动信息以后才开始执行。
Event对象最好只用于一次性的事件。
若是线程打算一遍又一遍地重复通知某个事件,最好使用Condition对象来处理。
import threading import time class PeriodicTimer: def __init__(self, interval): self._interval = interval self._flag = 0 self._cv = threading.Condition() def start(self): t = threading.Thread(target=self.run) t.daemon = True t.start() def run(self): ''' Run the timer and notify waiting threads after each interval ''' while True: time.sleep(self._interval) with self._cv: self._cv.notify(1) print('time.sleep done 5') def wait_for_tick(self): ''' Wait for the next tick of the timer ''' with self._cv: self._cv.wait() # Example use of the timer ptimer = PeriodicTimer(3) ptimer.start() # Two threads that synchronize on the timer def countdown(nticks): while nticks > 0: ptimer.wait_for_tick() print('T-minus', nticks) nticks -= 1 def countup(last): n = 0 while n < last: ptimer.wait_for_tick() print('Counting', n) n += 1 threading.Thread(target=countup, args=(5,)).start() threading.Thread(target=countdown, args=(10,)).start()
Event对象的关键特性就是它会唤醒全部等待的线程。
若是只但愿唤醒一个单独的等待线程,那么最好使用Semaphore或者Condition对象。
# Worker thread def worker(n, sema): # Wait to be signaled sema.acquire() # Do some work print('Working', n) # Create some threads sema = threading.Semaphore(0) nworkers = 10 for n in range(nworkers): t = threading.Thread(target=worker, args=(n, sema,)) t.start()
运行上边的代码将会启动一个线程池,可是并无什么事情发生。
这是由于全部的线程都在等待获取信号量。每次信号量被释放,只有一个线程会被唤醒并执行,示例以下:
>>> sema.release() Working 0 >>> sema.release() Working 1
3、线程间通讯
程序中有多个线程,在线程之间实现安全的通讯或者交换数据。
将数据从一个线程发往另外一个线程最安全的作法就是使用queue模块中的Queue队列了。
建立一个Queue队列,使用put()或者get()操做来给队列添加或移除元素。
from threading import Thread from queue import Queue def producer(out_q): while True: out_q.put(data) def consumer(in_q): while True: data = in_q.get() q = Queue() t1 = Thread(target=producer, args=(q,)) t2 = Thread(target=consumer, args=(q,)) t1.start() t2.start()
Queue实例已经拥有了全部所需的锁,所以它能够安全地在任意多的线程之间共享。
生产者和消费者之间的协调同步,使用一个特殊的终止值。
_sentinel = object() def producer(out_q): while True: out_q.put(data) out_q.put(_sentinel) def consumer(in_q): while True: data = in_q.get() if data is _sentinel: in_q.put(_sentinel) break
建立一个线程安全的优先队列
import heapq import threading class PriorityQueue: def __init__(self): self._queue = [] self._count = 0 self._cv = threading.Condition() def put(self, item, priority): with self._cv: heapq.heappush(self._queue, (-priority, self._count, item)) self._count += 1 self._cv.notify() def get(self): with self._cv: while len(self._queue) == 0: self._cv.wait() return heapq.heappop(self._queue)[-1]
经过队列实现线程间通讯是一种单方向且不肯定的过程。
没法得知线程什么时候会实际接收到消息并开始工做。
Queue对象提供了事件完成功能。task_done()和join()方法。
from threading import Thread from queue import Queue def producer(out_q): while True: out_q.put(data) def consumer(in_q): while True: data = in_q.get() in_q.task_done() q = Queue() t1 = Thread(target=producer, args=(q,)) t2 = Thread(target=consumer, args=(q,)) t1.start() t2.start() q.join()
当消费者线程已经处理了某项特定的数据,而生产者须要对此马上感知的话,那么就应该将发送的数据和一个Event对象配对在一块儿。
def producer(out_q): while True: evt = Event() out_q.put((data, evt)) evt.wait() def consumer(in_q): while True: data,evt = in_q.get() evt.set()
在线程中使用队列时, 将某个数据放入队列并不会产生该数据的拷贝。
所以,通讯过程当中实际上涉及在不一样的线程间传递对象的引用。
Queue的get()和put()方法都支持非阻塞和超时机制
import queue q = queue.Queue() try: q.put(item, block=False) except queue.Full: pass try: q.get(block=False) except queue.Empty: pass try: q.put(item, timeout=5.0) except queue.Full: pass try: q.get(timeout = 5.0) except queue.Empty: pass
能够避免在特定的队列操做上无限期阻塞下去的问题。
还有q.qsize()、q.full()、q.empty(),查询队列的当前大小和状态。可是,这些方法在多线程环境中都是不可靠的。
4、对临界区加锁
让可变对象安全地用在多线程中,能够利用threading库中的Lock对象来解决
import threading class SharedCounter: def __init__(self, initial_value = 0): self._value = initial_value self._value_lock = threading.Lock() def incr(self, delta=1): with self._value_lock: self._value += delta def decr(self, delta=1): with self._value_lock: self._value -= delta
当使用with语句时,Lock对象可确保产生互斥的行为。
同一时间只容许一个线程执行with语句块中的代码。
在较老的代码中,咱们常会看到显式地获取和释放锁的动做。
def incr(self, delta=1): self._value_lock.acquire() self._valeu += delta self._value_lock.release() def decr(self, delta=1): self._value_lock.acquire() self._value -= delta self._value_lock.release()
采用with语句不容易出错,若是程序恰好在持有锁的时候抛出了异常,而with语句会确保老是释放锁。
RLock被称为可重入锁,他能够被同一个线程屡次获取,用来编写基于锁的代码或者基于监视器的同步处理。
当某个类持有这种类型的锁时,只有一个线程可使用类中的所有函数或者方法。
class SharedCounter: _lock = threading.RLock() def __init__(self, initial_value = 0): self._value = initial_value def incr(self, delta=1): with SharedCounter._lock: self._value += delta def decr(self, delta=1): with SharedCounter._lock: self.incr(-delta)
只有一个做用于整个类的锁,它被全部的类实例所共享。
这个锁能够确保每次只有一个线程可使用类中的方法。已经持有了锁的方法能够调用一样使用了这个锁的其余方法。
不管建立多少个counter实例,都只会有一个锁存在。
Semaphore对象是一种基于共享计数器的同步原语。
若是计数器非零,那么with语句会递减计数器而且容许线程继续执行,当with语句块结束时计数器会获得递增。
若是计数器为零,那么执行过程会被阻塞,直到由另外一个线程来递增计数器为止。
若是想在代码中限制并发的数量,可使用Semaphore来处理。
5、避免死锁
线程一次品牌须要获取不止一把锁,同时还要避免出现死锁。
出现死锁常见缘由是有一个线程获取到第一个锁,可是在尝试获取第二个锁时阻塞了,那么这个线程就有可能会阻塞住其余线程的执行,进而使得整个程序僵死。
解决方案:为每一个锁分配一个惟一的数字编号,而且在获取多个锁时只按照编号的升序方式来获取。
import threading from contextlib import contextmanager _local = threading.local() @contextmanager def acquire(*locks): locks = sorted(locks, key=lambda x:id(x)) acquired = getattr(_local, 'acquired', []) if acquired and max(id(lock) for lock in acquired) >= id(locks[0]): raise RuntimeError('Lock Order Violation') acquired.extend(locks) _local.acquired = acquired try: for lock in locks: lock.acquire() yield finally: for lock in reversed(locks): lock.release() del acquired[-len(locks):]
acquire()函数根据对象的数字编号对所锁进行排序。
经过对锁进行排序,不管用户按照什么顺序将锁提供给acquire()函数,他们老是会按照统一的顺序来获取。
运行代码:
x_lock = threading.Lock() y_lock = threading.Lock() def thread_1(): while True: with acquire(x_lock, y_lock): print('Thread 1.....') def thread_2(): while True: with acquire(y_lock, x_lock): print('Thread 2.....') t1 = threading.Thread(target=thread_1) t1.daemon = True t1.start() t2 = threading.Thread(target=thread_2) t2.daemon = True t2.start()
示例使用到了线程本地存储来解决一个小问题。即,若是有多个acquire()操做嵌套在一块儿时,能够检测可能存在的死锁状况。
def thread_1(): while True: with acquire(x_lock): with acquire(y_lock): print('Thread 1.....') def thread_2(): while True: with acquire(y_lock): with acquire(x_lock): print('Thread 2.....')
每一个线程会记住他们已经获取到的锁的顺序。
acquire()函数会检测以前获取到的锁的列表,并对锁的顺序作强制性的约束。
先获取到的锁的对象ID必须比后获取的锁的ID要小。
解决哲学家就餐问题:(5个科学家围在一块儿,拿筷子吃饭,每支筷子表明一把锁)
def philosopher(left, right): while True: with acquire(left, right): print(threading.current_thread(), 'eating...') NSTICKS = 5 chopsticks = [threading.Lock() for n in range(NSTICKS)] for n in range(NSTICKS): t = threading.Thread(target=philosopher, args=(chopsticks[n], chopsticks[(n+1) % NSTICKS])) t.start()
6、保存线程专有状态
在多线程程序中,须要保存专属于当前运行线程的状态。经过threading.local()来建立一个线程本地存储对象。
在这个对象上保存和读取的属性只对当前运行的线程可见,其余线程没法感知。
import threading from socket import socket,AF_INET,SOCK_STREAM class LazyConnection: def __init__(self, address, family=AF_INET, type=SOCK_STREAM): self.address = address self.family = family self.type = type self.local = threading.local() def __enter__(self): if hasattr(self.local, 'sock'): raise RuntimeError('Already Connected') self.local.sock = socket(self.family, self.type) self.local.sock.connect(self.address) return self.local.sock def __exit__(self, exc_type, exc_val, exc_tb): self.local.sock.close() del self.local.sock
每一个线程都建立了本身专属的socket链接。
当不一样线程在socket上执行操做时,它们并不会互相产生影响,由于它们都是在不一样的socket上完成操做的。
threading.local()实例为每一个线程维护者一个单独的实例字典。全部对实例的常见操做好比获取、设定以及删除都只是做用于每一个线程专有的字典上。
7、建立线程池
建立一个工做者线程池用来处理客户端链接。
concurrent.futures库中包含有一个ThreadPoolExecutor类可用来实现这个目的。
from concurrent.futures import ThreadPoolExecutor from socket import socket, AF_INET, SOCK_STREAM def echo_client(sock, client_addr): print('Got COnnection from ', client_addr) while True: msg = sock.recv(1024) if not msg: break sock.sendall(msg) print('Client closed connection') sock.close() def echo_server(addr): pool = ThreadPoolExecutor(128) sock = socket(AF_INET, SOCK_STREAM) sock.bind(addr) sock.listen(5) while True: client_sock, client_addr = sock.accept() echo_client(echo_client, client_sock, client_addr) echo_server(('',18000))
使用Queue来手动建立一个线程池。
from socket import socket, AF_INET, SOCK_STREAM from threading import Thread from queue import Queue def echo_client(q): sock, client_addr = q.get() print('Got COnnection from ', client_addr) while True: msg = sock.recv(1024) if not msg: break sock.sendall(msg) print('Client closed connection') sock.close() def echo_server(addr, nworkers): q = Queue() for n in range(nworkers): t = Thread(target=echo_client, args=(q,)) t.daemon = True t.start() sock = socket(AF_INET, SOCK_STREAM) sock.bind(addr) sock.listen(5) while True: client_sock, client_addr = sock.accept() q.put((client_sock, client_addr)) echo_server(('',18000), 128)
使用ThreadPoolExecutor类实现线程池,使得更容易从调用函数中取得结果。
from concurrent.futures import ThreadPoolExecutor import urllib.request def fetch_url(url): u = urllib.request.urlopen(url) data = u.read() return data pool = ThreadPoolExecutor(10) a = pool.submit(fetch_url, 'http://www.python.org') b = pool.submit(fetch_url, 'http://www.pypy.org') a.result() b.result()
a.result()操做会阻塞,直到对应的函数已经由线程池执行完毕并返回告终果为止。
当建立一个线程时,操做系统会占用一段虚拟内存来保存线程的执行栈(一般8M),这段内存只有一小部分会实际映射到物理内存上。
>>> threading.stack_size(65535) 调整线程栈的大小。
所以,Python进程占用的物理内存远比虚拟内存要小。
8、实现简单的并行编程
concurrent.futures库中提供了ProcessPoolExecutor类,用来在单独运行的Python解释器实例中执行计算密集型的函数。
import gzip import io import glob def find_robots(filename): ''' Find all of the hosts that access robots.txt in a single log file ''' robots = set() with gzip.open(filename) as f: for line in io.TextIOWrapper(f,encoding='ascii'): fields = line.split() if fields[6] == '/robots.txt': robots.add(fields[0]) return robots def find_all_robots(logdir): ''' Find all hosts across and entire sequence of files ''' files = glob.glob(logdir+'/*.log.gz') all_robots = set() for robots in map(find_robots, files): all_robots.update(robots) return all_robots if __name__ == '__main__': robots = find_all_robots('logs') for ipaddr in robots: print(ipaddr)
上面的程序以经常使用的map-reduce风格来编写。函数find_robots()被映射到一系列的文件名上
修改成利用多个CPU核心的程序。
import gzip import io import glob from concurrent import futures def find_robots(filename): ''' Find all of the hosts that access robots.txt in a single log file ''' robots = set() with gzip.open(filename) as f: for line in io.TextIOWrapper(f,encoding='ascii'): fields = line.split() if fields[6] == '/robots.txt': robots.add(fields[0]) return robots def find_all_robots(logdir): ''' Find all hosts across and entire sequence of files ''' files = glob.glob(logdir+'/*.log.gz') all_robots = set() with futures.ProcessPoolExecutor() as pool: for robots in pool.map(find_robots, files): all_robots.update(robots) return all_robots if __name__ == '__main__': robots = find_all_robots('logs') for ipaddr in robots: print(ipaddr)
ProcessPoolExecutor的典型用法:
from concurrent.futures import ProcessPoolExecutor with ProcessPoolExecutor() as pool: ... do work in parallel using pool ...
在底层,ProcessPoolExecutor建立了N个独立运行的Python解释器,这里的N就是在系统上检测到的可用的CPU个数。
能够修改建立的Python进程格式,只要给ProcessPoolExecutor(N)提供参数便可。
进程池会一直运行,直到with语句块中的最后一条语句执行完毕为止,此时进程池就会关闭。
可是,程序会一直等待全部已经提交的任务都处理完毕为止。
提交到进程池中的任务必须定义成函数的形式。
(1)若是想并行处理一个列表推导式或者map()操做,可使用pool.map()
(2)经过pool.submit()方法来手动提交一个单独的任务。
def work(x): ... return result with ProcessPoolExecutor() as pool: ... # Example of submitting work to the pool future_result = pool.submit(work, arg) # Obtaining the result (blocks until done) r = future_result.result() ...
手动提交任务,获得一个Future实例。
要获取实际的结果还须要调用它的 result() 方法,这么作会阻塞进程,直到完成了计算并将结果返回给进程池为止。
或者提供一个回调函数,让它在任务完成时获得触发执行。
def when_done(r): print('Got:', r.result()) with ProcessPoolExecutor() as pool: future_result = pool.submit(work, arg) future_result.add_done_callback(when_done)
9、规避GIL带来的限制
解释器被一个称之为全局解释器锁(GIL)的东西保护着,在任意时刻只容许一个Python线程投入执行。
GIL带来的最明显影响是Python程序没法充分利用多个CPU核心带来的优点。
一个采用多线程的技术的计算密集型应用只能在一个CPU上运行。
对于I/O密集型的线程,每当阻塞等待I/O操做时解释器都会释放GIL。
对于历来不执行任何I/O操做的CPU密集型线程,Python解释器会在执行了必定数量的字节码后释放GIL,以便其余线程获得执行的机会。
可是C语言扩展模块不一样,调用C函数时GIL会被锁定,直到它返回为止。
因为C代码的执行是不受解释器控制的,这一期间不会执行任何Python字节码,所以解释器就无法释放GIL了。
若是编写的C语言扩展调用了会阻塞的C函数,执行耗时很长的操做等,那么必须等到C函数返回时才会释放GIL,这时其余的线程就僵死了。
规避GIL的限制主要有两种经常使用的策略。
(1)使用multiprocessing模块来建立进程池,把它当作协处理器来使用。
线程版本:
# Performs a large calculation (CPU bound) def some_work(args): ... return result # A thread that calls the above function def some_thread(): while True: ... r = some_work(args) ...
使用进程池的方式:
# Processing pool (see below for initiazation) pool = None # Performs a large calculation (CPU bound) def some_work(args): ... return result # A thread that calls the above function def some_thread(): while True: ... r = pool.apply(some_work, (args)) ... # Initiaze the pool if __name__ == '__main__': import multiprocessing pool = multiprocessing.Pool()
每当有线程要执行CPU密集型的任务时,它就把任务提交到池中,而后进程池将任务转交给运行在另外一个进程中的Python解释器。
当线程等待结果的时候就会释放GIL。
(2)将计算密集型的任务转移到C语言中,使其独立于Python,在C代码中释放GIL。
总结:
CPU密集型的处理才须要考虑GIL,I/O密集型的处理则没必要。
当使用进程池来规避GIL,涉及同另外一个Python解释器之间进行数据序列化和通讯的处理。
待执行的操做须要包含在以def语句定义的Python函数中(lambda、闭包、可调用实例都是不能够的!),并且函数参数和返回值必须兼容于pickle编码。
此外,要完成的工做规模必须足够大,这样能够弥补额外产生的通讯开销。
将线程和进程混在一块儿使用。最好在先建立任何线程以前将进程池做为单例(singleton)在程序启动的时候建立。
以后,线程就可使用相同的进程池来处理全部那些计算密集型的工做了。
对于C语言的扩展,最重要的是保持与Python解释器进程的隔离。确保C代码能够独立于Python执行。
10、定义一个Actor任务
actor模式是用来解决并发和分布式计算问题的方法之一。
actor就是一个并发执行的任务,他只是简单地对发送给它的消息进行处理。
做为对这些消息的响应,actor会决定是否要对其余的actor发送进一步的消息。
actor任务之间的通讯是单向且异步的,消息的发送者并不知道消息什么时候才会实际传递,当消息已经处理完毕时也不会接受到响应或者确认。
把线程和队列结合起来使用很容易定义出actor:
from queue import Queue from threading import Thread,Event class ActorExit(Exception): pass class Actor: def __init__(self): self._mailbox = Queue() def send(self, msg): self._mailbox.put(msg) def recv(self): msg = self._mailbox.get() if msg is ActorExit: raise ActorExit return msg def close(self): self.send(ActorExit) def start(self): self._terminated = Event() t = Thread(target=self._boostrap) t.daemon = True t.start() def _boostrap(self): try: self.run() except ActorExit: pass finally: self._terminated.set() def join(self): self._terminated.wait() def run(self): while True: msg = self.recv() class PrintActor(Actor): def run(self): while True: msg = self.recv() print('Got', msg) p = PrintActor() p.start() p.send('HHHH') p.send('wwwwwwww') p.close() p.join()
使用actor实例的send()方法来发送消息。在底层,将消息放入到队列上,内部运行的线程会从中取出收到的消息处理。
close()方法经过在队列中放置一个特殊的终止值(ActorExit)开关闭Actor。
若是将并发和异步消息传递的需求去掉,那么彻底能够用生成器来定义个最简化的actor对象。
def print_actor(): while True: try: msg = yield print('Got',msg) except GeneratorExit: print('Actor terminating') p = print_actor() next(p) p.send('HHHHH') p.send('wwwwwwwww') p.close()
actor核心操做send(),在基于actor模式的系统中,消息的概念能够扩展到许多不一样的方向。
能够以元组的形式传递带标签的消息,让actor执行不一样的操做。
class TaggedActor(Actor): def run(self): while True: tag, *payload = self.recv() getattr(self,'do_'+tag)(*payload) def do_A(self, x): print('Running A', x) def do_B(self, x, y): print('Running B', x, y) a = TaggedActor() a.start() a.send(('A',3)) a.send(('B',3,888))
一个actor变形,容许在工做者线程中执行任意的函数,并经过特殊的Result对象将结果回传。
from threading import Event class Result: def __init__(self): self._evt = Event() self._result = None def set_result(self, value): self._result = value self._evt.set() def result(self): self._evt.wait() return self._result class Worker(Actor): def submit(self, func, *args, **kwargs): r = Result() self.send((func, args, kwargs, r)) return r def run(self): while True: func, args, kwargs, r = self.recv() r.set_result(func(*args, **kwargs)) worker = Worker() worker.start() r = worker.submit(pow, 2, 3) print(r.result())
能够给actor对象的send()方法实现为在socket链接上传输数据,或者经过某种消息传递的基础架构(AMQP、ZMQ)来完成传递。
11、实现发布者/订阅者消息模式
实现发布者/订阅者消息模式,通常来讲须要引入一个单独的“交换”或者“网关”这样的对象,做为全部消息的中介。
不是直接将消息从一个任务发往另外一个任务,而是将消息发往交换中介,由交换中介将消息转发给一个或多个相关联的任务。
from collections import defaultdict class Exchange: def __init__(self): self._subscribers = set() def attach(self, task): self._subscribers.add(task) def detach(self, task): self._subscribers.remove(task) def send(self, msg): for subscriber in self._subscribers: subscriber.send(msg) _exchanges = defaultdict(Exchange) def get_exchange(name): return _exchanges[name] class Task: def send(self, msg): pass task_a = Task() task_b = Task() exc = get_exchange('name') exc.attach(task_a) exc.attach(task_b) exc.send('msg1') exc.send('msg2') exc.detach(task_a) exc.detach(task_b)
交换中介其实就是一个对象,它保存了活跃的订阅者集合,并提供关联、取消关联以及发送消息的方法。
每一个交换中介都由一个名称来标识,get_exchange()函数简单地返回同给定的名称相关联的那个Exchange对象。
消息会先传递到一个中介,再由中介将消息传递给相关联的订阅者。
交换中介具备将消息广播发送给多个订阅者的能力,能够实现带有冗余任务、广播或者扇出的系统。
也能够构建调式以及诊断工具,将它们做为普通的订阅者关联到交换中介上。
class DisplayMessages: def __init__(self): self.count = 0 def send(self, msg): self.count += 1 print('msg[{}]: {!r}'.format(self.count, msg)) exc = get_exchange('name') d = DisplayMessages() exc.attach(d)
消息接收者能够是actor、协程、网络链接,甚至只要实现了合适的send()方法的对象均可以。
关于交换中介,要以适当的方式对订阅者进行关联和取消关联的处理。
这和使用文件、锁以及相似的资源对象很类似。使用上下文管理协议。
from contextlib import contextmanager from collections import defaultdict class Exchange: def __init__(self): self._subscribers = set() def attach(self, task): self._subscribers.add(task) def detach(self, task): self._subscribers.remove(task) @contextmanager def subscribe(self, *tasks): for task in tasks: self.attach(task) try: yield finally: for task in tasks: self.detach(task) def send(self, msg): for subscriber in self._subscribers: subscriber.send(msg) _exchanges = defaultdict(Exchange) def get_exchange(name): return _exchanges[name] # Example of using the subscribe() method exc = get_exchange('name') with exc.subscribe(task_a, task_b): exc.send('msg1') exc.send('msg2')
12、使用生成器做为线程的替代方案
用生成器做为系统线程的替代方案来实现并发。协程有时也称为用户级线程或绿色线程。
yield的基本行为,即,使得生成器暂停执行。
编写一个调度器将生成器函数当作一种“任务”来对待,并经过使用某种形式的任务切换来交替执行这写任务。
def countdown(n): while n > 0: print('T-minus', n) yield n -= 1 print('Blastoff') def countup(n): x = 0 while x < n: print('Counting up', x) yield x += 1 from collections import deque class TaskScheduler: def __init__(self): self._task_queue = deque() def new_task(self, task): self._task_queue.append(task) def run(self): while self._task_queue: task = self._task_queue.popleft() try: next(task) self._task_queue.append(task) except StopIteration: pass sched = TaskScheduler() sched.new_task(countdown(10)) sched.new_task(countup(8)) sched.new_task(countdown(5)) sched.run()
TaskScheduler类以循环的方式运行了一系列的生成器函数,每一个都运行到yield语句就暂停。
生成器函数就是任务,而yield语句就是通知任务须要暂停挂起的信号。
调度器只是简单地轮流执行全部的任务,直到没有一个任务还能执行为止。
使用生成器来实现actor,彻底没有用到线程:
from collections import deque class ActorScheduler: def __init__(self): self._actors = { } # Mapping of names to actors self._msg_queue = deque() # Message queue def new_actor(self, name, actor): ''' Admit a newly started actor to the scheduler and give it a name ''' self._msg_queue.append((actor,None)) self._actors[name] = actor def send(self, name, msg): ''' Send a message to a named actor ''' actor = self._actors.get(name) if actor: self._msg_queue.append((actor,msg)) def run(self): ''' Run as long as there are pending messages. ''' while self._msg_queue: actor, msg = self._msg_queue.popleft() try: actor.send(msg) except StopIteration: pass # Example use if __name__ == '__main__': def printer(): while True: msg = yield print('Got:', msg) def counter(sched): while True: # Receive the current count n = yield if n == 0: break # Send to the printer task sched.send('printer', n) # Send the next count to the counter task (recursive) sched.send('counter', n-1) sched = ActorScheduler() # Create the initial actors sched.new_actor('printer', printer()) sched.new_actor('counter', counter(sched)) # Send an initial message to the counter to initiate sched.send('counter', 10000) sched.run()
只要有消息须要传递,调度器就会运行。这里有一个值得注意的特性:
counter生成器发送消息给本身并进入一个递归循环,但却并不会受到Python的递归限制。
十3、轮询多个线程队列
有一组线程队列,想轮询这些队列来获取数据。这个轮询一组网络链接来获取数据相似。
对于轮询问题,利用隐藏的环回(loopback)网络链接。
针对每一个想要轮询的队列(或任何对象),建立一对互联的socket。而后对其中一个socket执行写操做,以此来表示数据存在。
另外一个socket就传递给select()或者相似的函数来轮询数据。
import os import socket import queue class PollableQueue(queue.Queue): def __init__(self): super().__init__() if os.name == 'posix': self._putsocket, self._getsocket = socket.socketpair() else: server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind(('127.0.0.1',0)) server.listen(1) self._putsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._putsocket.connect(server.getsockname()) self._getsocket, _ = server.accept() server.close() def fileno(self): return self._getsocket.fileno() def put(self, item): super().put(item) self._putsocket.send(b'x') def get(self): self._getsocket.recv(1) return super().get()
定义了一种新的Queue实例,底层有一对互联的socket。
在UNIX上用socketpair()函数来创建这样的socket对是很是容易的。
在Windows上,咱们不得不使用示例中展现的方法来假装socket对。
首先建立一个服务器socket,以后马上建立客户端socket并链接到服务器上。
以后对get()和put()方法作重构,在这些socket上执行了少许的I/O操做。
put()方法在将数据放入队列以后,对其中一个socket写入了一个字节的数据。
当要把数据从队列中取出时,get()方法就从另外一个socket中把那个单独的字节读出。
定义一个消费者,用来在多个队列上监视是否有数据到来。
import select import threading def consumer(queues): while True: can_read, _, _ = select.select(queues,[],[]) for r in can_read: item = r.get() print('Got', item) q1 = PollableQueue() q2 = PollableQueue() q3 = PollableQueue() t = threading.Thread(target=consumer,args=([q1,q2,q3],)) t.daemon = True t.start() q1.put(1) q2.put(10) q3.put('Helloooo')
无论把数据放到哪一个队列中,消费者最后都能接收到全部的数据。
十4、在UNIX上加载守护进程
建立一个合适的守护进程须要以精确的顺序调用一系列的系统调用,并当心注意其中的细节。