rabbitMQ是一款基于AMQP协议的消息中间件,它可以在应用之间提供可靠的消息传输。在易用性,扩展性,高可用性上表现优秀。并且使用消息中间件利于应用之间的解耦,生产者(客户端)无需知道消费者(服务端)的存在。并且两端可使用不一样的语言编写,大大提供了灵活性。python
从以上能够看出Rabbitmq工做原理大体就是producer把一条消息发送给exchange。rabbitMQ根据routingKey负责将消息从exchange发送到对应绑定的queue中去,这是由rabbitMQ负责作的。而consumer只需从queue获取消息便可。基本效果图以下: 服务器
这里就会有一个问题,若是consumer在执行任务时须要花费一些时间,这个时候若是忽然挂了,消息尚未被完成,消息岂不是丢失了,为了避免让消息丢失,rabbitmq提供了消息确认机制,consumer在接收到,执行完消息后会发送一个ack给rabbitmq告诉它能够从queue中移除消息了。若是没收到ack。Rabbitmq会从新发送此条消息,若是有其余的consumer在线,将会接收并消费这条消息。消息确认机制是默认打开的。若是想关闭它只须要设置no_ack=true。在此处咱们不须要设置。默认以下就行。负载均衡
channel.queue_declare(queue= task_queue, durable=True)
消息的持久话则是经过delivery_mode属性,设置值为2便可。fetch
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
在rabbit MQ里消息永远不能被直接发送到queue。这里咱们经过提供一个空字符串来使用默认的exchange。这个exchange是特殊的,它能够根据routingKey把消息发送给指定的queue。因此咱们的设计看起来以下所示:ui
发送端:设计
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() #声明queue channel.queue_declare(queue='hello') #n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close()
接收端:3d
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() #You may ask why we declare the queue again ‒ we have already declared it in our previous code. # We could avoid that if we were sure that the queue already exists. For example if send.py program #was run before. But we're not yet sure which program to run first. In such cases it's a good # practice to repeat declaring the queue in both programs. channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(callback, queue='hello', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
一个生产者发送消息到队列中,有多个消费者共享一个队列,每一个消费者获取的消息是惟一的。code
若是Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,极可能出现,一个机器配置不高的消费者那里堆积了不少消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,能够在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。server
channel.basic_qos(prefetch_count=1)
生产者端: 中间件
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print(" [x] Sent %r" % message) connection.close()
消费者端:
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
在前面2个示例咱们都适用默认的exchange。这里咱们将本身定义一个exchange。并设置type为fanout。它能够将消息广播给绑定的每个queue。而再也不是某一个queue。
生产者端:
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()
消费者端:
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') #不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 断定应该将数据发送至指定队列。
生产者端:
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
消费者端:
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
运行结果:
发送消息
只收到warning的消息
只收到error的消息
error和waring的都能收到
这种模型是最灵活的,相比较于direct的彻底匹配和fanout的广播。Topic能够用相似正则的手法更好的匹配来知足咱们的应用。下面咱们首先了解一下topic类型的exchange。
topic类型的routing_key不能够是随意的单词,它必须是一系列的单词组合,中间以点号隔开,譬如“quick.orange.rabbit”这个样子。发送消息的routing_key必须匹配上绑定到队列的routing_key。消息才会被发送。
此外还有个重要的地方要说明,在以下代码处绑定的routing_key种能够有*和#2种字符。它们表明的意义以下:
由图可知,Q1匹配3个单词中间为orange的routing_key ,而Q2能够匹配3个单词最后一个单词为rabbit和第一个单词为lazy后面能够有多个单词的routing_key。
生产者端:
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()
消费者端:
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
当咱们须要在远程服务器上执行一个方法并等待它的结果的时候,咱们将这种模式称为RPC。
在rabbit MQ中为了能让client收到server端的response message。须要定义一个callback queue,不过如今有一个问题,就是每次请求都会建立一个callback queue .这样的效率是极其低下的。幸运的是咱们能够经过correlation_id为每个client建立一个单独的callback queue。经过指定correlation_id咱们能够知道callback queue中的消息属于哪一个client。要作到这样只需client每次发送请求时带上这惟一的correlation_id。而后当咱们从callback queue中收到消息时,咱们能基于 correlation_id 匹配上咱们的消息。匹配不上的消息将被丢弃,看上去就像下图这样:
总结一下流程以下:
生产者端:
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='rpc_queue') def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2) def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) response = fib(n) ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = \ props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue='rpc_queue') print(" [x] Awaiting RPC requests") channel.start_consuming()
消费者端:
import pika import uuid class FibonacciRpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = self.corr_id, ), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response) fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) print(" [.] Got %r" % response)