消息队列之RabbitMQ

  消息队列产品有不少,好比 ActiveMQ、RabbitMQ ,ZeroMQ ,RocketMQ ,连 redis 这样的 NoSQL 数据库也支持 MQ 功能。总之这块知名的产品就有十几种,本文先讲 RabbitMQ ,在此以前先看下消息队列的相关概念。

什么是消息队列

  消息(Message)是指在应用间传送的数据。消息能够很是简单,好比只包含文本字符串,也能够更复杂,可能包含嵌入对象。html

  消息队列(Message Queue)是一种应用间的通讯方式,消息发送后能够当即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不论是谁发布的。这样发布者和使用者都不用知道对方的存在。redis

RabbitMQ特色

  RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特色包括:数据库

  1. 可靠性(Reliability)
    RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。服务器

  2. 灵活的路由(Flexible Routing)
    在消息进入队列以前,经过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,能够将多个 Exchange 绑定在一块儿,也经过插件机制实现本身的 Exchange 。网络

  3. 消息集群(Clustering)
    多个 RabbitMQ 服务器能够组成一个集群,造成一个逻辑 Broker 。分布式

  4. 高可用(Highly Available Queues)
    队列能够在集群中的机器上进行镜像,使得在部分节点出问题的状况下队列仍然可用。ide

  5. 多种协议(Multi-protocol)
    RabbitMQ 支持多种消息队列协议,好比 STOMP、MQTT 等等。fetch

  6. 多语言客户端(Many Clients)
    RabbitMQ 几乎支持全部经常使用语言,好比 Java、.NET、Ruby 等等。加密

  7. 管理界面(Management UI)
    RabbitMQ 提供了一个易用的用户界面,使得用户能够监控和管理消息 Broker 的许多方面。spa

  8. 跟踪机制(Tracing)
    若是消息异常,RabbitMQ 提供了消息跟踪机制,使用者能够找出发生了什么。

  9. 插件机制(Plugin System)
    RabbitMQ 提供了许多插件,来从多方面进行扩展,也能够编写本身的插件。

RabbitMQ的安装能够参考博客:

  https://www.cnblogs.com/ericli-ericli/p/5902270.html

RabbitMQ的概念模型

基本模型

全部 MQ 产品从模型抽象上来讲都是同样的过程:
  消费者(consumer)订阅某个队列。生产者(producer)建立消息,而后发布到队列(queue)中,最后将消息发送到监听的消费者。

基本概念

  • Message
    消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其余消息的优先权)、delivery-mode(指出该消息可能须要持久性存储)等。
  • Publisher
    消息的生产者,也是一个向交换器发布消息的客户端应用程序。
  • Exchange
    交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
  • Binding
    绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列链接起来的路由规则,因此能够将交换器理解成一个由绑定构成的路由表。
  • Queue
    消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者链接到这个队列将其取走。
  • Connection
    网络链接,好比一个TCP链接。
  • Channel
    信道,多路复用链接中的一条独立的双向数据流通道。信道是创建在真实的TCP链接内地虚拟链接,AMQP 命令都是经过信道发出去的,不论是发布消息、订阅队列仍是接收消息,这些动做都是经过信道完成。由于对于操做系统来讲创建和销毁 TCP 都是很是昂贵的开销,因此引入了信道的概念,以复用一条 TCP 链接。
  • Consumer
    消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
  • Virtual Host
    虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每一个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有本身的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在链接时指定,RabbitMQ 默认的 vhost 是 / 。
  • Broker
    表示消息队列服务器实体。

RabbitMQ实现消息传递实例

最简单的队列通讯
接收端
import pika,time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello1')

def callback(ch, method, properties, body):
    print('>>',ch,method,properties)
    time.sleep(30)
    print(" [x] Received %r" % body)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='hello1',
                      no_ack=True
                      )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

发送端

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello1')

channel.basic_publish(exchange='',
                      routing_key='hello1',
                      body='Hello World!')
connection.close()

PS:远程链接rabbitmq server的话,须要配置权限

可参考博客:https://www.cnblogs.com/alex3714/articles/5248247.html

 

  在这种工做模式下,若是启动了多个接收端,RabbitMQ会默认把p发的消息依次分发给各个消费者(c),至关于轮询。

  可是上面这样写的话,队列里面的消息若是没有被所有取出来,并且同时RabbitMQ又down机了,那这样的话队列里面的消息就都没有了,因此咱们须要作一个消息的持久化

import pika,time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello',durable=True)

def callback(ch, method, properties, body):
    print('>>',ch,method,properties)
    time.sleep(30)
    print(" [x] Received %r" % body)


channel.basic_consume(callback,
                      queue='hello',
                      )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
receive端
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))
channel = connection.channel()

# 声明queue
channel.queue_declare(queue='hello',durable=True)

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent
                      ))
print(" [x] Sent 'Hello World!'")
connection.close()
send端
消息公平分发

  若是Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,极可能出现,一个机器配置不高的消费者那里堆积了不少消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,能够在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。

channel.basic_qos(prefetch_count=1)
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(60)
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()
接收端
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                          delivery_mode=2,
                      ))
connection.close()
发送端
Publish\Subscribe(消息发布\订阅) 

  以前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被全部的Queue收到,相似广播的效果,这时候就要用到exchange了。

fanout: 全部bind到此exchange的queue均可以接收消息

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

message = "Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()
publisher
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',exchange_type='fanout')

result = channel.queue_declare(exclusive=True)  # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_name = result.method.queue

channel.queue_bind(exchange='logs',
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r" % body)


channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
subscriber

direct: 经过routingKey和exchange决定的那个惟一的queue能够接收消息

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
publisher
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s info warning error\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
subscriber

topic:全部符合routingKey(此时能够是一个表达式)的routingKey所bind的queue能够接收消息

 

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
publisher
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         exchange_type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
subscriber

PS:表达式符号说明:#表明收全部,*表明任何字符。(在终端运行,运行命令后面跟上符合条件的topic)

相关文章
相关标签/搜索