rabbitmq高级消息队列

rabbitmq使用

什么是消息队列

消息(Message)是指在应用间传送的数据。消息能够很是简单,好比只包含文本字符串,也能够很复杂,能够包含嵌入对象。node

消息队列是一种应用间的通讯方式,消息发送后能够当即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中不用考虑哪一个消费者来取数据,消息使用者只管从MQ 中取消息而不用管是谁发布的。这样发布者和使用者都不用知道对方的存在。python

为什么用消息队列

消息队列是一种应用间的异步协做机制,那么何时使用 MQ呢后端

一常见的订单系统为例,用户点击下单按钮以后的业务逻辑可能包括:扣减库存、生成相应单据、发红包、发短信通知。在业务发展出力这些逻辑可能放在一块儿同步执行,随着业务的发展订单量增加,须要提高系统服务的性能,这时能够将一些不须要当即生效的操做拆分出来异步执行,好比发红包、发短信、邮件通知等。这种场景下能够用 MQ,在下单的主流程(好比扣减库存、生成相应单据)完成以后发送一条消息到 MQ 让主流程快速完成,而由另外的单独线程拉取 MQ 的消息(或者由 MQ 推送消息),当发现 MQ中有发红包或其余的消息时,执行相应的业务逻辑。安全

RabbitMQ 的特色

RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。服务器

AMQP:Advanced Message Queue Protocol,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。网络

RabbitMQ 最初起源于金融系统,用与在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现很好。具体特色以下:并发

  1. 可靠性(Reliability)app

    RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。框架

  2. 灵活的路由(Flexible Routing)运维

    在消息进入队列以前,经过 Exchange 来路由消息。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,能够将多个 Exchange 绑定在一块儿,也能够经过插件实现本身的 Exchange。

  3. 消息集群(Clustering)

    多个 RabbitMQ 服务器能够组成一个集群,造成一个逻辑 Broker。

  4. 高可用(Highly Available Queues)

    队列能够在集群中的机器上进行镜像,使得在部分节点出问题的状况下队列仍可使用。

  5. 多种协议(Multi-Protocol)

    RabbitMQ 支持多种消息队列协议,如 STOMP、MQTT 等。

  6. 多语言客户端(Many Clients)

    RabbitMQ 几乎支持全部经常使用语言,好比 Python、Java、.NET等。

  7. 管理界面(Management UI)

    RabbitMQ 提供了一个易用的用户界面,使得用户能够监控和管理消息 Broker 的许多方面。

  8. 跟踪机制(Tracing)

    如消息异常,RabbitMQ 提供了消息追踪机制,使用者能够找出发生了什么。

  9. 插件机制(Plugin System)

    RabbitMQ 提供了许多插件,能够扩展插件,也能够本身编写插件。

RabbitMQ 中的概念模型

消息模型

全部 MQ 产品从模型抽象上来讲都是同样的过程:

消费者(consumer)订阅某个队列。生产者(producer)建立消息,而后发布到队列(queue)中,最后将消息发送到监听的消费之。

消息流

RabbitMQ 基本概念

上面只是最简单抽象的描述,具体到 RabbitMQ 则有更详细的概念须要解释。上面介绍过 RabbitMQ 是 AMQP 协议的一个开源实现,因此其内部实际上也是 AMQP 中的基本概念:

RabbitMQ 内部结构
  1. Message

    消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括 routing-key(路由键)、priority(相对于其余消息的优先权)、delivery-mode(指出该消息可能须要持久性存储)等。。

  2. Publisher

    消息的生产者,也是一个向交换器发布消息的客户端应用程序。

  3. Exchange

    交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

  4. Binding

    绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列链接起来的路由规则,因此能够将交换器理解成一个由绑定构成的路由表。

  5. Queue

    消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息能够投入一个或多个队列。消息一直在队列里面,等待消费者链接到这个队列将其取走。

  6. Connection

    网络链接,好比一个 TCP 链接。

  7. Channel

    信道,多路复用链接中的一条独立的双向数据流通道。信道是创建在真实的 TCP 链接内的虚拟链接,AMQP 命令都是经过信道发出去的,无论是发布消息、订阅队列仍是接收消息,这些动做都是经过信道完成。由于对于操做系统来讲创建和销毁 TCP 都是很是昂贵的开销,因此引入了信道的概念,以复用一条 TCP 链接。

  8. Consumer

    消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

  9. Virtual Host

    虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每一个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有本身的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在链接时指定,RabbitMQ 是默认的 vhost。

  10. Broker

    表示消息队列服务器实体。

AMQP 中的消息路由

AMQP 中消息的路由过程增长了 Exchange 和 Binding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接受,而 Binding 决定交换器的消息应该发送到哪一个队列中。

AMQP 的消息路由过程

Exchange 类型

Exchange 分发消息时根据类型的不一样分发策略有区别,目前共四种类型:direct、fanout、topic、headers。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器彻底一致,但性能差不少,目前几乎用不到了。

  1. direct

direct 交换器

消息中的路由键(routing key)若是和 Binding 中的 bind key 一致,交换器就将消息发送到相应的队列中。路由键与队列名彻底匹配,若是一个队列绑定到交换机要求路由键为‘’dog‘’,则只转发 routing key 标记为‘’dog‘’的消息,不会转发‘’dog.puppy‘’等。这是彻底匹配、单薄的模式。

  1. fanout

fanout 交换器

每一个发到 fanout 类型交换器的消息都会分到全部绑定的队列上去。fanout 交换器不处理路由键,只是简单地将队列绑定到交换器上,每一个发送到交换器的消息都会被转发到与该交换器绑定的全部队列上。很像子网广播,每台子网内的主机都得到一份复制的消息。fanout 类型转发消息是最快的。

  1. topic

topic 交换器

topic 交换器经过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列须要绑定到一个模式上。它将路由键和绑定键的字符串分红单词,这些单词之间用点隔开。它一样会识别两个通配符:符号#和符号*#匹配0个或多个单词,*匹配一个单词。

RabbitMQ 运行和管理

  1. 启动

找到安装后的 RabbitMQ 所在目录下的 sbin 目录,能够看到该目录下有7个以 rabbitmq 开头的可执行文件,直接执行 rabbitmq-server 便可,下面

启动会看到一些启动过程和最后的 completed with 6 plugins,这说明启动的时候默认加载了6个插件。

  1. 后台启动

    若是想让RabbitMQ 以守护程序的方式在后台运行,能够在启动的时候加上-detached 参数:

    rabbitmq-server -detached
  2. 查询服务器状态

    sbin 目录下有个很重要的文件叫 rabbitmqctl,它提供了 RabbitMQ 管理须要的几乎一站式解决方案,绝大部分的运维命令它均可以提供。

    查询 RabbitMQ 服务器的状态信息能够用参数 status

    rabbitmqctl status

    该命令将输出服务器不少信息,好比 RabbitMQ 和 Erlang的版本、os 名称、内存等信息。

  3. 关闭 RabbitMQ 节点

    在 Erlang 中有两个概念:节点和应用程序。节点就是 Erlang 虚拟机的每一个实例,而多个 Erlang 应用程序能够运行在同一个节点之上。节点之间能够进行本地通讯(无论他们是否是运行在同一台服务器之上)。好比一个运行在节点 A 上的应用程序能够调用节点 B 上应用程序的方法,就好像调用本地函数同样。若是应用程序因为某些缘由崩溃,Erlang 节点会自动尝试重启应用程序。

    若是要关闭整个 RabbitMQ 节点可使用参数 stop

    rabbitmqctl stop

    它会和本地节点通讯并指示其干净的关闭,也能够指定关闭不一样的节点,包括远程节点,只须要传入产生参数-n

    rabbitmqctl -n rabbit@server.example.com stop

    -n node默认 node 名称是 rabbit@server,若是你的主机名是 server.example.com,那么 node 名称就是 rabbit@server.example.com。

  4. 关闭 RabbitMQ 应用程序

    若是只想关闭应用程序,同时保持 Erlang 节点运行则可使用stop_app

    rabbitmqctl stop_app
  5. 启动 RabbitMQ 节点

    rabbitmqctl start_app
  6. 重置RabbitMQ 节点

    rabbitmqctl reset
  7. 查看已声明的队列

    rabbitmqctl list_queues
  8. 查看交换器

    rabbitmqctl list_exchanges

    该命令还能够附加参数,好比列出交换器的名称、类型、是否持久化、是否自动删除:

    rabbitmqctl list_exchanges name type durable auto_delete
  9. 查看绑定

    rabbitmqctl list_bindings

队列的做用

应用解耦

实际案例:在用户进行注册的时候须要进行邮件发送和验证码发送,在没有使用消息队列以前,这些操做是同步进行的,在用户量较少的状况下,是不会影响用户体验的,可是一旦用户量大了起来,仍是使用同步的话会极大地影响用户体验,在这种状况下就可使用消息队列。

当用户注册的时候先直接返回注册成功页面,以后将须要发送邮件和验证码的任务存在队列里面,以后由服务器来消费队列里面的任务已达到应用解耦的目的。

流量削峰

在淘宝双十一或者春运时的12306的用户访问量是特别大的,这种状况下若是全部的请求直接打到服务器上,服务器会直接挂掉。在这种状况下,可使用动态扩容服务器的数量来处理高并发,可是并非全部时间都须要不少服务器进行处理,为了成本的考虑能够采用消息队列进行处理并发,就是全部的请求并非直接请求服务器,而是先把用户的请求存储在消息队列里面,以后服务器在慢慢地进行处理请求。

消息队列数据不丢失

  1. 在产生队列时,设置durable=True,表明队列持久化
  2. 在生产端,设置properties=pika.BasicProperties(delivery_mode=2)
  3. 在消费端,设置auto_ack=False, ch.basic_ack(delivery_tag=method.delivery_tag)

消息产生与消费

简单消息发送

producer

# -*- coding: utf-8 -*-
# __author__: MUSIBII
# __email__ : shaozuanzuan@gmail.com
# __file__  : producer.py
# __time__  : 2019-04-03 16:56

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')
)

channel = connection.channel()

# 声明队列
channel.queue_declare(queue='test', durable=True)

channel.basic_publish(exchange='',
                      routing_key='test',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=1,  # make message persistent
                      ))

print('[x] sent "hello world!"')
connection.close()

consumer

# -*- coding: utf-8 -*-
# __author__: MUSIBII
# __email__ : shaozuanzuan@gmail.com
# __file__  : consumer.py
# __time__  : 2019-04-03 16:56

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')
)

channel = connection.channel()

channel.queue_declare(queue='test', durable=True)


def callback(ch, method, properties, body):
    '''

    :param ch: 信道
    :param method:
    :param properties:
    :param body: message
    :return:
    '''
    print(ch, method, properties, body)

    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_consume(
    queue='test', on_message_callback=callback, auto_ack=False
)

channel.start_consuming()

fanout 模式

producer

# -*- coding: utf-8 -*-
# __author__: MUSIBII
# __email__ : shaozuanzuan@gmail.com
# __file__  : producer.py
# __time__  : 2019-04-03 18:36

import sys

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

message = ''.join(sys.argv[1:]) or 'info: hello world!'

channel.basic_publish(exchange='logs', routing_key='', body=message)

print('[x] sent %r' % message)

connection.close()

consumer

# -*- coding: utf-8 -*-
# __author__: MUSIBII
# __email__ : shaozuanzuan@gmail.com
# __file__  : consumer.py
# __time__  : 2019-04-03 18:41

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')
)

channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

result = channel.queue_declare('', exclusive=True)  # exclusive 排他性,惟一性

queue_name = result.method.queue

print('queue_name:', queue_name)

channel.queue_bind(exchange='logs', queue=queue_name)

print('[*] waiting for logs. To exit press CTRL+c')


def callback(ch, method, properties, body):
    print('[x] %r' % body)


channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True
)

channel.start_consuming()

direct 模式

producer

# -*- coding: utf-8 -*-
# __author__: MUSIBII
# __email__ : shaozuanzuan@gmail.com
# __file__  : producer.py
# __time__  : 2019-04-03 18:49

import sys
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')
)

channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

log_levels = sys.argv[1] if len(sys.argv) > 1 else 'info'

message = ' '.join(sys.argv[2:]) or 'Hello World!'

channel.basic_publish(
    exchange='direct_logs',
    routing_key=log_levels,
    body=message
)

print('[x] Sent %r:%r' % (log_levels, message))

connection.close()

consumer

# -*- coding: utf-8 -*-
# __author__: MUSIBII
# __email__ : shaozuanzuan@gmail.com
# __file__  : consumer.py
# __time__  : 2019-04-03 18:53

import sys

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')
)

channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

result = channel.queue_declare('', exclusive=True)

queue_name = result.method.queue

log_levels = sys.argv[1:]

if not log_levels:
    sys.stderr.write('Usage: %s [info] [warning] [error]\n' % sys.argv[0])

for severity in log_levels:
    channel.queue_bind(
        exchange='direct_logs',
        queue=queue_name,
        routing_key=severity
    )

print('[*] Waiting for logs. To exit press CTRL+c')


def callback(ch, method, properties, body):
    print('[x] %r:%r' % (method.routing_key, body))


channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True
)

channel.start_consuming()

topic 模式

producer

# -*- coding: utf-8 -*-
# __author__: MUSIBII
# __email__ : shaozuanzuan@gmail.com
# __file__  : producer.py
# __time__  : 2019-04-03 19:07

import sys
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')
)

channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'

message = ' '.join(sys.argv[2:]) or 'Hello World!'

channel.basic_publish(
    exchange='topic_logs',
    routing_key=routing_key,
    body=message
)

print('[x] Sent %r:%r' % (routing_key, message))

connection.close()

consumer

# -*- coding: utf-8 -*-
# __author__: MUSIBII
# __email__ : shaozuanzuan@gmail.com
# __file__  : consumer.py
# __time__  : 2019-04-03 19:11

import sys

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')
)

channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]

if not binding_keys:
    sys.stderr.write('Usage: %s [binding_key]...\n' % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(
        exchange='topic_logs',
        queue=queue_name,
        routing_key=binding_key
    )

print('[*] Waiting for logs. To exit press CTRL+c')


def callback(ch, method, properties, body):
    print('[x %r:%r' % (method.routing_key, body))


channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True
)

channel.start_consuming()

进行消息传递的时候先启动 consumer,这样当生产者发送消息的时候能再消费者后端看到消息记录。接着运行 producer,发布一条消息,在 consumer 的控制台能看到接收的消息。

RabbitMQ 集群

RabbitMQ 最优秀的功能之一就是内建集群,这个功能设计的目的是容许消费者和生产者在节点崩溃的状况下继续运行,以及经过添加更多的节点来线性扩展消息通讯吞吐量。RabbitMQ 内部利用 Erlang 提供的分布式通讯框架 OTP 来知足上述需求,使客户端在失去一个 RabbitMQ 节点链接的状况下,依旧可以从新链接到集群中的任何其余节点继续生产、消费。

RabbitMQ 集群中的一些概念

RabbitMQ 会始终记录如下四种类型的内部元数据:

  1. 队列元数据

    包括队列名称和它们的属性,好比是否可持久化、是否自动删除

  2. 交换器元数据

    交换器名称、类型、属性

  3. 绑定元数据

    内部是一张表格记录如何将消息路由到队列

  4. vhost 元数据

    为 vhost 内部的队列、交换器、绑定提供命名空间和安全属性。

在单一节点中,RabbitMQ 会将全部这些信息存储在内存中,同时将标记为可持久化的队列、交换器、绑定存储在硬盘上。存到硬盘上能够确保队列和交换器在节点重启后可以重建。而在集群模式下一样也提供两种选择:存到硬盘上(独立节点的默认设置),存到内存中。

若是在集群中建立队列,集群只会在单个节点而不是全部节点上建立完整的队列信息(元数据、状态、内容)。结果是只有队列的全部者节点知道有关队列的全部信息,所以当集群节点崩溃时,该节点的的队列和绑定就消失了,而且任何匹配该队列的绑定的新消息也会丢失。

RabbitMQ 集群中能够共享 user、vhost、exchange等,全部的数据和状态都是必须在全部节点上复制的,例外就是上面所说的消息队列。RabbitMQ 节点能够动态的加入到集群中。

当在集群中声明队列、交换器、绑定的时候,这些操做会直到全部集群节点都成功提交元数据变动后才返回。集群中有内存节点和磁盘节点两种类型,内存节点虽然不写入磁盘,可是它的执行比磁盘节点要好。内存节点能够提供出色的性能,磁盘节点能保障配置信息在节点重启后仍然可用,那集群中如何平衡这二者呢?

RabbitMQ 只要求集群中至少有一个磁盘节点,全部其余节点能够是内存节点,当节点加入火离开集群时,它们必需要将该变动通知到至少一个磁盘节点。若是只有一个磁盘节点,恰好又是该节点崩溃了,那么集群能够继续路由消息,但不能建立队列、建立交换器、建立绑定、添加用户、更改权限、添加或删除集群节点。换句话说集群中的惟一磁盘节点崩溃的话,集群仍然能够运行,但知道该节点恢复,不然没法更改任何东西。

RabbitMQ 集群配置和启动

若是是在一台机器上同时启动多个 RabbitMQ 节点来组建集群的话,只用上面介绍的方式启动第2、第三个节点将会由于节点名称和端口冲突致使启动失败。因此在每次调用 rabbitmq-server 命令前,设置环境变量 RABBITMQ_NODENAME 和 RABBITMQ_NODE_PORT 来明确指定惟一的节点名称和端口。下面的例子端口号从5672开始,每一个新启动的节点都加1,节点也分别命名为test_rabbit_一、test_rabbit_二、test_rabbit_3。

  1. 启动第一个节点:

    RABBITMQ_NODENAME=test_rabbit_1 RABBITMQ_NODE_PORT=5672 ./sbin/rabbitmq-server -detached
  2. 启动第二个节点:

    RABBITMQ_NODENAME=test_rabbit_2 RABBITMQ_NODE_PORT=5673 ./sbin/rabbitmq-server -detached

启动第2个节点前建议将 RabbitMQ 默认激活的插件关掉,不然会存在使用了某个插件的端口号冲突,致使节点启动不成功。

如今第2个节点和第1个节点都是独立节点,它们并不知道其余节点的存在。集群中除第一个节点外后加入的节点须要获取集群中的元数据,因此要先中止 Erlang 节点上运行的 RabbitMQ 应用程序,并重置该节点元数据,再加入而且获取集群的元数据,最后从新启动 RabbitMQ 应用程序。

  1. 中止第二个节点的应用程序:

    rabbitmqctl -n test_rabbit_2 stop_app
  2. 重置第二个节点元数据:

    rabbitmqctl -n test_rabbit_2 reset
  3. 第二个节点加入到第一个节点组成的集群:

    rabbitmqctl -n test_rabbit_2 join_cluster test_rabbit_1@localhost
  4. 启动第二个节点的应用程序

    rabbitmqctl -n test_rabbit_2 start_app
  5. 第三个节点的配置相似

    RABBITMQ_NODENAME=test_rabbit_3 RABBITMQ_NODE_PORT=5674 rabbitmq-server -detached
    rabbitmqctl -n test_rabbit_3 stop_app
    rabbitmqctl -n test_rabbit_3 reset
    rabbitmqctl -n test_rabbit_3 join_cluster test_rabbit_1@localhost
    rabbitmqctl -n test_rabbit_3 start_app

RabbitMQ 集群运维

中止某个指定的节点,好比中止第二个节点

RABBITMQ_NODENAME=test_rabbit_2 rabbitmqctl stop

查看节点三的集群状态

rabbitmqctl -n test_rabbit_3 cluster_status
相关文章
相关标签/搜索