源码:https://github.com/ltoddy/rabbitmq-tutorialpython
在以前的教程中,咱们构建了一个简单的日志系统 咱们可以将日志消息广播给许多接收者。git
在本教程中,咱们将添加一个功能 - 咱们将只能订阅一部分消息。例如,咱们只能将重要的错误消息引导到日志文件(以节省磁盘空间),同时仍然可以在控制台上打印全部日志消息。github
在前面的例子中,咱们已经建立了绑定。您可能会回想一下代码:算法
channel.queue_bind(exchange=EXCHANGE_NAME, queue=queue_name)
绑定是交换和队列之间的关系。这能够简单地理解为: the queue is interested in messages from this exchange.this
绑定可使用额外的routing_key参数。为了不与basic_publish参数混淆,咱们将其称为绑定键。这就是咱们如何使用一个键建立一个绑定:spa
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key='black')
绑定键的含义取决于交换类型。咱们以前使用的 fanout 交换简单地忽略了它的价值。rest
咱们以前教程的日志记录系统将全部消息广播给全部消费者。咱们但愿将其扩展为容许根据其进行严格的过滤消息。
例如,咱们可能但愿将严重错误的日志消息写入磁盘,而不会写入警告或信息日志消息。日志
咱们正在使用fanout交换,这不会给咱们太多的灵活性 - 它只能无心识地播放。code
咱们将使用direct交换。direct交换背后的路由算法很简单 - 消息进入队列,其绑定密钥与消息的路由密钥彻底匹配。教程
为了说明这一点,请考虑如下设置:
在这个设置中,咱们能够看到有两个队列绑定的直接交换机X. 第一个队列用绑定键orange绑定,第二个队列有两个绑定,一个绑定键为black,另外一个为green。
在这种设置中,使用路由键orange发布到交换机的消息 将被路由到队列Q1。带有black或gree路由键的消息将进入Q2。全部其余消息将被丢弃。
使用相同的绑定密钥绑定多个队列是彻底合法的。在咱们的例子中,咱们可使用绑定键black添加X和Q1之间的绑定。
在这种状况下,direct交换就像fanout同样,并将消息广播到全部匹配的队列。带有路由键black的消息将传送到Q1和Q2。
咱们将使用这个模型用于咱们的日志系统。取而代之的fanout,咱们将消息发送到direct交换。咱们将提供严格的日志做为路由键(routing key)。
这样接收脚本将可以选择想要接收的消息。咱们先关注发出日志的实现。
像往常同样,咱们须要首先建立一个交换:
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
咱们准备发送一条消息:
channel.basic_publish(exchange='direct_logs', routing_key='', body=message)
为了简化事情,咱们将假设“severity”能够是'info','warning','error'之一。
接收邮件的方式与上一个教程中的同样,只有一个例外 - 咱们将为每一个咱们感兴趣的严重程度建立一个新绑定。
result = channel.queue_declare(exclusive=True) queue_name = result.method.queue for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
emit_log_direct.py的代码:
#!/usr/bin/env python import sys import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') severity = sys.args[1:] if len(sys.argv) > 2 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
receive_logs_direct.py的代码:
#!/usr/bin/env python import sys import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(cb, method, properities, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
若是只想保存'warning'和'error'(而不是'info')将消息记录到文件中,只需打开一个控制台并输入:
python receive_logs_direct.py warning error > logs_from_rabbit.log
若是您但愿在屏幕上看到全部日志消息,请打开一个新终端并执行如下操做:
python receive_logs_direct.py info warning error
例如,要输出error日志消息,只需输入:
python emit_log_direct.py error "Run. Run. Or it will explode."