参考 Blinker Documentationpython
Blinker 是一个基于Python的强大的信号库,它既支持简单的对象到对象通讯,也支持针对多个对象进行组播。Flask的信号机制就是基于它创建的。安全
Blinker的内核虽然小巧,可是功能却很是强大,它支持如下特性:函数
支持注册全局命名信号优化
支持匿名信号线程
支持自定义命名信号code
支持与接收者之间的持久链接与短暂链接对象
经过弱引用实现与接收者之间的自动断开链接内存
支持发送任意大小的数据get
支持收集信号接收者的返回值it
线程安全
信号经过signal()
方法进行建立:
>>> from blinker import signal >>> initialized = signal("initialized") >>> initialized is signal("initialized") True
每次调用signal('name')
都会返回同一个信号对象。所以这里signal()
方法使用了单例模式。
使用Signal.connect()
方法注册一个函数,每当触发信号的时候,就会调用该函数。该函数以触发信号的对象做为参数,这个函数其实就是信号订阅者。
>>> def subscriber(sender): ... print("Got a signal sent by %r" % sender) ... >>> ready = signal('ready') >>> ready.connect(subscriber) <function subscriber at 0x...>
使用Signal.send()
方法通知信号订阅者。
下面定义类Processor
,在它的go()
方法中触发前面声明的ready
信号,send()
方法以self
为参数,也就是说Processor
的实例是信号的发送者。
>>> class Processor: ... def __init__(self, name): ... self.name = name ... ... def go(self): ... ready = signal('ready') ... ready.send(self) ... print("Processing.") ... complete = signal('complete') ... complete.send(self) ... ... def __repr__(self): ... return '<Processor %s>' % self.name ... >>> processor_a = Processor('a') >>> processor_a.go() Got a signal sent by <Processor a> Processing.
注意到go()
方法中的complete
信号没?并无订阅者订阅该信号,可是依然能够触发该信号。若是没有任何订阅者的信号,结果是什么信号也不会发送,并且Blinker
内部对这种状况进行了优化,以尽量的减小内存开销。
默认状况下,任意发布者触发信号,都会通知订阅者。能够给Signal.connect()
传递一个可选的参数,以便限制订阅者只能订阅特定发送者。
>>> def b_subscriber(sender): ... print("Caught signal from processor_b.") ... assert sender.name == 'b' ... >>> processor_b = Processor('b') >>> ready.connect(b_subscriber, sender=processor_b) <function b_subscriber at 0x...>
如今订阅者只订阅了processor_b
发布的ready
信号:
>>> processor_a.go() Got a signal sent by <Processor a> Processing. >>> processor_b.go() Got a signal sent by <Processor b> Caught signal from processor_b. Processing.
能够给send()
方法传递额外的关键字参数,这些参数会传递给订阅者。
>>> send_data = signal('send-data') >>> @send_data.connect ... def receive_data(sender, **kw): ... print("Caught signal from %r, data %r" % (sender, kw)) ... return 'received!' ... >>> result = send_data.send('anonymous', abc=123) Caught signal from 'anonymous', data {'abc': 123}
send()
方法的返回值收集每一个订阅者的返回值,拼接成一个元组组成的列表。每一个元组的组成为(receiver function, return value)。
前面咱们建立的信号都是命名信号,每次调用Signal
构造器都会建立一个惟一的信号,,也就是说每次建立的信号是不同的。下面对前面的Processor
类进行改造,将signal
做为它的类属性。
>>> from blinker import Signal >>> class AltProcessor: ... on_ready = Signal() ... on_complete = Signal() ... ... def __init__(self, name): ... self.name = name ... ... def go(self): ... self.on_ready.send(self) ... print("Alternate processing.") ... self.on_complete.send(self) ... ... def __repr__(self): ... return '<AltProcessor %s>' % self.name ...
上面建立的就是匿名信号。on_ready与on_complete是两个不一样的信号。
除了使用connect()
方法订阅信号以外,使用@connect
修饰器能够达到一样的效果。
>>> apc = AltProcessor('c') >>> @apc.on_complete.connect ... def completed(sender): ... print "AltProcessor %s completed!" % sender.name ... >>> apc.go() Alternate processing. AltProcessor c completed!
尽管这样用起来很方便,可是这种形式不支持订阅指定的发送者。这时,能够使用connect_via()
:
>>> dice_roll = signal('dice_roll') >>> @dice_roll.connect_via(1) ... @dice_roll.connect_via(3) ... @dice_roll.connect_via(5) ... def odd_subscriber(sender): ... print("Observed dice roll %r." % sender) ... >>> result = dice_roll.send(3) Observed dice roll 3.
信号一般会进行优化,以便快速的发送。无论有没有订阅者,均可以发送信号。若是发送信号时须要传送的参数要计算很长时间,能够在发送以前使用receivers
属性先检查一下是否有订阅者。
>>> bool(signal('ready').receivers) True >>> bool(signal('complete').receivers) False >>> bool(AltProcessor.on_complete.receivers) True
还能够检查订阅者是否订阅了某个具体的信号发布者。
>>> signal('ready').has_receivers_for(processor_a) True