若是Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,极可能出现,一个机器配置不高的消费者那里堆积了不少消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,能够在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。python
消费者端添加服务器
channel.basic_qos(prefetch_count=1)
带消息持久化+公平分发的完整代码socket
生产者函数
import pika #至关于声明一个socket conn = pika.BlockingConnection(pika.ConnectionParameters('localhost')) #声明一个管道 channel = conn.channel() #声明queue channel.queue_declare(queue='hello',durable=True) channel.basic_publish(exchange='', routing_key='hello', body='Hello World!', properties=pika.BasicProperties( delivery_mode=2, ) ) # routing_key 消息的key body 消息的内容 print('Sent "hello world"') conn.close()
消费者fetch
import pika #至关于声明一个socket conn = pika.BlockingConnection(pika.ConnectionParameters('localhost')) #声明一个管道 channel = conn.channel() #声明queue 这里能够不用声明,可是若是消费者先运行,又不但愿出错,就要消费者先运行 channel.queue_declare(queue='hello',durable=True) def callback(ch,method,properties,body): print('[x] Received %r' % body ) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='hello' ) #消费消息 若是收到消息就调用CALLBACK函数处理 print('[*] Waiting for message.To exit press CTRL+C') channel.start_consuming() #开始收消息
消费者2blog
import pika,time #至关于声明一个socket conn = pika.BlockingConnection(pika.ConnectionParameters('localhost')) #声明一个管道 channel = conn.channel() #声明queue 这里能够不用声明,可是若是消费者先运行,又不但愿出错,就要消费者先运行 channel.queue_declare(queue='hello1',durable=True) def callback(ch,method,properties,body): time.sleep(30) print('[x] Received %r' % body ) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='hello1' ) #消费消息 若是收到消息就调用CALLBACK函数处理 print('[*] Waiting for message.To exit press CTRL+C') channel.start_consuming() #开始收消息
消费者2 CALLBACK中sleep30s 在这个过程当中 将不会接收到消息,那么消息会发送到消费者1队列
以前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被全部的Queue收到,相似广播的效果,这时候就要用到exchange了,it
exchange是一个简单的东西,一端接收消息,另外一端将消息推送到队列。exchange类型将决定这条消息是放到一个队列,仍是不少队列,仍是被删除。exchange 就像转发器。io
exchange的类型:class
fanout: 全部bind到此exchange的queue均可以就收消息。 纯广播 ,只要绑定exchange就能够接收到。
direct: 经过routingKey和exchange决定的那个惟一的queue能够接收消息。
topic 全部符合routingkey(此时能够是一个表达式)的routingkey所bind的queue能够接收消息。
headers 经过headers 来决定把消息发给哪些queue。
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #发布方不须要声明queue 只须要有个exchange就能够了 , exchange_type 类型是 fanout channel.exchange_declare(exchange='logs',exchange_type='fanout') #message = ' '.join(sys.argv[1:]) or "info: Hello World!" message = "info: Hello World!" channel.basic_publish(exchange='logs', # 发布广播的时候 exchange定义要一致 routing_key='', #必需要写成空 body=message) #发送的消息主体 print(" [x] Sent %r" % message) connection.close()
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') #exchange 转发器,exchange_type=fanout 绑定到这个转发器上的消费者都能接收到消息 result = channel.queue_declare(exclusive=True) # 随机queue 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除 queue_name = result.method.queue #获取queueu的名字 channel.queue_bind(exchange='logs', #绑定到这个转发器上,只能从这个转发器上接收 queue=queue_name) #指定queue的名字 转发器把消息发送到这个queue上,消费者从这个queue上接收消息 print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
发送接收过程:
消费者要先在线,生产者发送消息,消费者才能接收到。
生产者
消费者1在线
消费者2在线
消费者3中断从新链接
广播并不会存下来,不在线就接收不到了。
RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 断定应该将数据发送至指定队列。
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
服务器端
消费者端
绑定哪一个就能接收的哪一个的消息。
上面的过滤条件是写死的,更细致的过滤条件就是在上面的基础上,对过滤参数的匹配。相似与正则匹配。
publisher
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()
subscriber
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()