消息(Message)是指在应用间传送的数据。消息能够很是简单,好比只包含文本字符串,也能够更复杂,可能包含嵌入对象。html
消息队列(Message Queue)是一种应用间的通讯方式,消息发送后能够当即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不论是谁发布的。这样发布者和使用者都不用知道对方的存在。redis
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特色包括:数据库
可靠性(Reliability)
RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。服务器
灵活的路由(Flexible Routing)
在消息进入队列以前,经过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,能够将多个 Exchange 绑定在一块儿,也经过插件机制实现本身的 Exchange 。网络
消息集群(Clustering)
多个 RabbitMQ 服务器能够组成一个集群,造成一个逻辑 Broker 。分布式
高可用(Highly Available Queues)
队列能够在集群中的机器上进行镜像,使得在部分节点出问题的状况下队列仍然可用。ide
多种协议(Multi-protocol)
RabbitMQ 支持多种消息队列协议,好比 STOMP、MQTT 等等。fetch
多语言客户端(Many Clients)
RabbitMQ 几乎支持全部经常使用语言,好比 Java、.NET、Ruby 等等。加密
管理界面(Management UI)
RabbitMQ 提供了一个易用的用户界面,使得用户能够监控和管理消息 Broker 的许多方面。spa
跟踪机制(Tracing)
若是消息异常,RabbitMQ 提供了消息跟踪机制,使用者能够找出发生了什么。
插件机制(Plugin System)
RabbitMQ 提供了许多插件,来从多方面进行扩展,也能够编写本身的插件。
RabbitMQ的安装能够参考博客:
https://www.cnblogs.com/ericli-ericli/p/5902270.html
全部 MQ 产品从模型抽象上来讲都是同样的过程:
消费者(consumer)订阅某个队列。生产者(producer)建立消息,而后发布到队列(queue)中,最后将消息发送到监听的消费者。
import pika,time connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() channel.queue_declare(queue='hello1') def callback(ch, method, properties, body): print('>>',ch,method,properties) time.sleep(30) print(" [x] Received %r" % body) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='hello1', no_ack=True ) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
发送端
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() channel.queue_declare(queue='hello1') channel.basic_publish(exchange='', routing_key='hello1', body='Hello World!') connection.close()
PS:远程链接rabbitmq server的话,须要配置权限
可参考博客:https://www.cnblogs.com/alex3714/articles/5248247.html
在这种工做模式下,若是启动了多个接收端,RabbitMQ会默认把p发的消息依次分发给各个消费者(c),至关于轮询。
可是上面这样写的话,队列里面的消息若是没有被所有取出来,并且同时RabbitMQ又down机了,那这样的话队列里面的消息就都没有了,因此咱们须要作一个消息的持久化。
import pika,time connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() channel.queue_declare(queue='hello',durable=True) def callback(ch, method, properties, body): print('>>',ch,method,properties) time.sleep(30) print(" [x] Received %r" % body) channel.basic_consume(callback, queue='hello', ) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() # 声明queue channel.queue_declare(queue='hello',durable=True) channel.basic_publish(exchange='', routing_key='hello', body='Hello World!', properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) print(" [x] Sent 'Hello World!'") connection.close()
若是Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,极可能出现,一个机器配置不高的消费者那里堆积了不少消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,能够在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。
channel.basic_qos(prefetch_count=1)
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(60) print(" [x] Done") ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, )) connection.close()
以前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被全部的Queue收到,相似广播的效果,这时候就要用到exchange了。
fanout: 全部bind到此exchange的queue均可以接收消息
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') message = "Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs',exchange_type='fanout') result = channel.queue_declare(exclusive=True) # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除 queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
direct: 经过routingKey和exchange决定的那个惟一的queue能够接收消息
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s info warning error\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
topic:全部符合routingKey(此时能够是一个表达式)的routingKey所bind的queue能够接收消息
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', exchange_type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', exchange_type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
PS:表达式符号说明:#表明收全部,*表明任何字符。(在终端运行,运行命令后面跟上符合条件的topic)