消息队列_RabbitMQ-0002.深刻MQ生产者/信道/交换机/队列/消费者?

形象说明:python


比喻: RabbitMQ提供的消息投递服务相似于现实生活中的快递公司,双11咱们可能会买不少东西,天然会陆续收到不少寄自淘宝店主由快递公司发来的快件,可是可能不少时候买回来的东西并不合心意,天然会陆续经过快递公司退回快件,因此回归到架构,这里的快件就至关于消息,咱们至关于应用程序,淘宝店主至关于服务器,而快递公司至关于路由器,应用程序能够发送和接收消息,服务器也能够发送和接收消息,因此当应用程序链接到RabbitMQ时,就必须作一个决定:我是发送仍是接收哪?缓存

现实: 生产者(Producer)建立消息,而后发布(发送)到消息代理服务器(RabbitMQ),消息包含两部份内容:有效载荷(想要传输的数据,支持任何内容)和标签(描述有效载荷,最终由RabbitMQ来决定谁将得到消息的拷贝),消费者(Consumer)启动时链接消息代理服务器上,并订阅指定队列,每当消息达到此队列时,RabbitMQ会将其发送给订阅的消费者,当消费者接收到消息时,它只是获得了有效载荷,由于消息在路由的过程当中,消息的标签并无随着有效载荷一块儿传递,RabbitMQ甚至不会告诉你生产者是谁?固然若是以为有必要,也能够将身份信息加入有效载荷一块儿传递~安全

wKioL1g9FvqyZ9s3AAE0kFq2RfM767.png

信道链接:服务器


说明: 使用消息代理服务器RabbitMQ的前提是创建AMQP信道,应用程序能够基于一条TCP链接快速建立销毁无数信道来减小传统TCP链接消耗,每一个信道有惟一ID(由AMQP库维护),AMQP命令都是经过信道发送架构


消息路由:app

wKioL1g9F1LCA_7jAALTlrIRsus695.png

# 消费消息负载均衡

1. 消费者经过AMQP的basic.consume命令订阅,这样作会将信道置为接收模式,订阅消息后,消息一到达队列时就自动接收,直到取消队列的订阅为止异步

2. 消费者经过AMQP的basic.get命令订阅,这样作会将信道置为接收模式,订阅消息后,得到单条消息后,而后自动取消订阅,千万不要妄想放在循环里代替basic.consume,不然没法发挥其高吞吐量特性ide

3. 若是消息到达了无人订阅的队列,消息会在队列中等待,一旦有消费者订阅该队列,队列的消息会发送给消费者函数

4. 若是队列拥有多个消费者时,队列的消息以轮询的方式发送给消费者,每条消息只会发送给一个订阅的消费者,且每一个消费者接收到的每一条消息都必须进行确认,消费者必须经过AMQP的basic.ack命令显式地向RabbitMQ发送一个确认,或者在订阅到队列的时候将auto_ack参数设置为true,此时一旦消费者接收消息,RabbitMQ会自动认为其确认了消息,一旦消息被确认,RabbitMQ才会安全的把消息从队列中删除,主要是防止确认以前RabbitMQ断开链接或取消订阅或程序崩溃,RabbitMQ会认为这条消息没有分发,而后从新分发给下一个订阅的消费者,RabbitMQ会认为没有确认的消费者并无准备好接收下一条消息,因此能够好好利用这一点,若是处理消息内容很是耗时,则你的应用程序能够延迟确认消息,直到消息处理完成再确认,这样可防止RabbitMQ持续不断的消息致使过载

5. 若是收到消息后想要明确拒绝而不是确认收到消息的话,可以使用AMQP的basic.reject,当把其basic.reject参数设置为true时,RabbitMQ会将消息从新发送给下一个订阅的消费者,若是设置为false,则RabbitMQ会把消息从队列中移除,而不会把它发送给新的消费者,固然也能够经过对消息确认的方式来简单地忽略该消息,如当你检测到一条格式错误的消息而任何一个消费者都没法处理的时候,此时就很是有用了.

# 队列建立

1. 消费者和生产者都能使用AMQP的queue.declare命令来建立队列,可是若是消费者在同一条信道上订阅了另外一个队列的话,就没法再声明队列,必须首先取消订阅,将信道设置为"传输"模式,

2. 建立队列时,最好指定一个队列名称,消费者订阅队列时须要队列名称,并在建立绑定时也须要队列名称,若是不指定,RabbitMQ会随机分配一个名称做为queue.declare的返回值(经常使用于构建在AMQP上的RPC应用,此时零时匿名队列颇有用),建立队列时exclusive为true时,队列会变为私有,此时只有你的应用程序才能消费队列消息,当你想要限制一个队列只有一个消费者时颇有有,auto-delete为true时,当最后一个消费者取消订阅的时候,队列就会自动移除,当你须要零时队列只为一个消费者服务的话,可结合auto-delete和exclusive,当消费者断开链接时,队列就被移除了.

3. 若是尝试声明一个已经存在的队列时,RabbitMQ就什么都不作,并成功返回,若是你只是为了检测队列是否存在,可设置queue.declare的passive为true,若是存在会成功返回,不然会直接返回一个错误

4. 因为生产者和消费者均可以经过queue.declare建立队列,可是因为若是消息路由到了不存在的队列RabbitMQ会直接忽略它们,因此最好是生产者和消费者都建队列

#交换绑定

1. 若是你想要将消息投递到队列时,首先得把消息发送给交换机,而后根据肯定的规则,RabbitMQ会将决定消息该投递到哪一个队列,这些规则被称为路由键(Routing Key),队列经过路由键绑定到交换机,当你把消息发送到消息代理服务器时,消息将拥有一个路由键,即使为空,RabbitMQ也会将其和绑定使用的路由键进行匹配,若是匹配成功,消息会被投递到该队列,若是不匹配将进入"黑洞"

wKiom1g9F3HCPie-AACJHeYI8wM145.png

2. Direct直接交换机(channel->basic_publish(message, exchange, routingkey)),很是简单,若是路由键匹配的话,消息就被投递到对应的队列,当声明队列时,会自动绑定到默认交换机,并以队列名称做为路由键,因此发送消息时exchange为空则会发送到默认交换机,routingkey直接填写对应的队列名便可,若是默认交换机没法知足应用程序需求时,可经过exchange.declare建立其它交换机

wKioL1g9GDzjYG3IAAB6vj0PX48379.png

3. Fanout扇形交换机,很是简单,当你发送一条消息到fanout交换机时,它会把消息投递给全部附加在此交换机上的队列,这容许你对单条消息作不一样方式的反应,如一个WEB应用程序可能须要在用户上传新的图片时,用户相册必须清除缓存,同时用户应该获得些积分奖励,你能够将两个队列绑定到图片上传交换机上,一个用于清除缓存,另外一个用于增长用户积分,后期若是有其它需求只须要为新的消费者写段代码,而后声明新的队列并将其绑定到fanout交换机上,这样就能够实现生产者和消费者彻底解耦,容许你垂手可得的添加应用程序的功能.

wKioL1g9GF6CR2mTAAB5dxEM-KA426.png

4. Topic主题交换机,很是简单,当你发送一条消息到topic交换机时,它会把消息投递给以点号分割的路由键,匹配模式中*匹配特定位置的任意文本,"#"匹配全部的规则,是没有相似"*"以点号特定块儿匹配的概念的,它匹配包括点号在内的全部规则.


总结: 从上面几种模式能够看出其实RabbitMQ在开发中的角色能够很是灵活,既能够做为队列服务器使用,也能够做为RPC服务器使用,彻底取决于你如何组织这些功能.


虚机隔离:

说明: RabbitMQ还支持Vhost"虚拟主机",每一个Vhost本质上是一个迷你版拥有本身的队列/交换机/绑定以及权限机制的RabbitMQ服务器,这样就能够经过一个RabbitMQ服务众多应用程序,Vhost之间相互隔离,有效的避免了队列/交换机的命名冲突,不然你不得不运行多个RabbitMQ,默认Vhost为vhost: "/"可经过guest/guest访问,可是为了安全起见,应该及时更改


添加虚机: /xm-workspace/xm-apps/rabbitmq/sbin/rabbitmqctl add_vhost xmzoomeye

查看虚机: /xm-workspace/xm-apps/rabbitmq/sbin/rabbitmqctl list_vhosts

删除虚机: /xm-workspace/xm-apps/rabbitmq/sbin/rabbitmqctl delete_vhost xmzoomeye


说明: 一旦Vhost建立成功以后,就能够链接上去开始添加队列和交换机,若是想链接远程RabbitMQ节点可经过rabbitmqctl -n rabbit@hostname list_vhosts,须要注意的是rabbit@hostname中rabbit@是固定的,而hostname必须正确的是远程主机名


持久存储:


1. 默认重启RabbitMQ后,以前定义的交换机/队列都会消失,可是若是设置队列和交换机的durable属性为true,则在崩溃重启以后会重建队列和交换机,可是消息并不会重建,若是要实现持久化消息,则须要首先将"投递模式"设置为2将消息标记成持久化,而后发布到持久化的交换机并到达持久化的队列,这样才能够保证消息的持久化.

2. RabbitMQ确保持久化消息能从服务器重启中恢复实际上是将它们写入磁盘上的一个持久化日志文件,当发布一条持久化消息到持久化交换机时,RabbitMQ会在消息提交到日志文件后才发送响应,若是消息后来被路由到非持久化队列,它会自动从持久化日志中删除,而且没法从服务器重启中恢复,若是消息后来被路由到持久化队列且被消费者消费并确认,则RabbitMQ会在持久化日志中把这条消息标记为等待垃圾收集,可是并非全部的消息都须要启用持久化,否则会严重影响RabbitMQ每秒处理的消息总数

3. 从业务分析性能需求,若是要单台RabbitMQ服务器每秒处理10万条消息则[能够考虑更快的存储系统]或[经过在生产者单独信道上监听应答队列,发送消息时有效载荷带上此队列名,消费者就能够回答应答确认接收返回给生产者]或[分开创建持久化热备非集群负载均衡和非持久化集群],这样持久化消息通讯负载不会减慢非持久化消息的处理.

4. AMQP中,一旦把信道设置成事务模式后,经过信道发送须要确认的消息,若是第一个消息失败则后续命令会忽略,虽然能够借助它确认消息是否持久化到磁盘,可是事务不但会下降消息吞吐量,并且会使生产者应用程序产生同步,而你使用消息通讯就是想要避免同步,其实还有另外一种发送确认模式和事务相仿,只须要将信道设置为confirm模式,全部信道上发布的消息都会被指派一个惟一的ID,一旦消息被投递给匹配的队列后,信道会发送一个发送方确认模式给生产者应用程序(包含惟一ID),使得生产者知道消息已经安全到达目的队列,若是消息和队列是可持久化的,那么确认消息只会在队列将消息写入磁盘后才会发出,相比于事务来讲,最大的好处在于都是异步的,一旦发布了一条消息,生产者应用程序就能够在等待确认的同时继续发送下一条,当确认消息最终收到的时候,生产者应用的回调方法就会触发来处理该确认消息,若是RabbitMQ发生内部错误而致使消息丢失,会发送一条nack未确认消息,只是此次说明消息确实丢失了,此方式更加轻量级对于RabbitMQ消息代理服务器的性能影响几乎不记.


贯穿实例:

wKioL1g9GIqS9VB6AAFC8FC6gQ8839.png

说明: 如上讲述了RabbitMQ的全部组件以及架构,但要结合起来理解一条真实消息的生命周期的最好方法是实践出真知,下面会使用PY的pika模块来演示Hello Word消息传递过程.

发布: 链接RabbitMQ->获取信道->声明交换机->建立消息->发布消息->关闭信道->关闭链接

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import sys
import pika
# 说明: 导入其它模块


if __name__ == '__main__':
    # 建立凭证对象
    credentials = pika.PlainCredentials('guest', 'guest')
    # 建立参数对象
    conn_params = pika.ConnectionParameters(
        # RabbitMQ服务地址
        host='127.0.0.1',
        # RabbitMQ服务端口
        port=5672,
        # RabbitMQ登陆凭证
        credentials=credentials,
        # RabbitMQ虚拟主机
        virtual_host='/'
    )
    # 建立链接对象
    conn_broker = pika.BlockingConnection(conn_params)
    # 获取信道对象
    channel = conn_broker.channel()
    # 建立交换机
    channel.exchange_declare(
        # 交换机名称
        exchange="salt-exchange",
        # 交换机类型
        type="direct",
        # 若是同名交换机已存在依然返回成功,不然建立
        passive=False,
        # 声明为非持久化交换机
        durable=False,
        # 交换机闲置也不会自动删除
        auto_delete=False
    )
    msg = sys.argv[1]
    # 建立配置对象
    msg_props = pika.BasicProperties()
    # 设置内容类型
    msg_props.content_type = 'text/plain'
    # 尝试发布消息
    channel.basic_publish(
        # 发布消息内容
        body=msg,
        # 发布到交换机
        exchange='salt-exchange',
        # 发布信息属性
        properties=msg_props,
        # 发布信息时携带的路由键
        routing_key='salt'
    )

说明: 首先用使用默认账号密码guest,默认端口5672,默认虚拟主机/链接RabbitMQ Vhost,而后创建信道,利用信道和rabbitMQ进行通讯,而后声明交换机,须要指定交换机名称,交换机类型,是否passive模式,若是非passive模式则表示想要声明交换机而非获取交换机信息,还能够指定是否持久化以及是否删除,最后经过命令行建立一条携带salt路由键类型为text/plain的消息经过basic_publish发送到salt-exchange交换机,可是此时因为并无任何队列绑定在此交换机,因此消息必然会进入"黑洞"丢失掉.~

接收: 链接RabbitMQ->得到信道->声明交换机->声明队列->绑定队列到交换机->消费信息->关闭信道->关闭链接

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import pika
# 说明: 导入其它模块


if __name__ == '__main__':
    # 建立凭证对象
    credentials = pika.PlainCredentials('guest', 'guest')
    # 建立参数对象
    conn_params = pika.ConnectionParameters(
        # RabbitMQ服务地址
        host='127.0.0.1',
        # RabbitMQ服务端口
        port=5672,
        # RabbitMQ服务凭证
        credentials=credentials,
        # RabbitMQ虚拟主机
        virtual_host='/'
    )
    # 建立链接对象
    conn_broker = pika.BlockingConnection(conn_params)
    # 获取信道对象
    channel = conn_broker.channel()
    # 建立交换机
    channel.exchange_declare(
        # 交换机名称
        exchange="salt-exchange",
        # 交换机类型
        type="direct",
        # 若是同名交换机已存在依然返回成功
        passive=False,
        # 声明为持久化交换机
        durable=False,
        # 交换机闲置也不会自动删除
        auto_delete=False
    )
    # 建立队列
    channel.queue_declare(queue="salt")
    # 绑定队列
    channel.queue_bind(
        # 队列名称
        queue="salt",
        # 交换机名称
        exchange="salt-exchange",
        # 路由键名称
        routing_key="salt"
    )

    # 消息回调处理函数
    def msg_consumer(channel, method, header, body):
        # 发送消息确认
        channel.basic_ack(delivery_tag=method.delivery_tag)
        # 退出监听循环
        if body == 'exit':
            channel.basic_cancel(consumer_tag="salt-consumer")
            channel.stop_consuming()
        else:
            print 'found notice: recive queue message {0}'.format(body)
        return

    # 做为指定队列消费者
    channel.basic_consume(msg_consumer, queue="salt", consumer_tag="salt-consumer")
    # 循环调用回调函数接收处理消息
    channel.start_consuming()

说明: 首先用使用默认账号密码guest,默认端口5672,默认虚拟主机/链接RabbitMQ Vhost,而后创建信道,利用信道和rabbitMQ进行通讯,而后再次声明交换机,防止因为生产者没有声明交换机致使后面绑定队列失败,而后就是建立队列,建立队列时须要指定队列名称,而后就是绑定交换机,绑定的时候须要指定队列名称,交换机名称,绑定路由键,最后就是订阅指定队列,订阅时须要传递一个回调函数来处理消息,一个队列名称来指明要订阅的队列,一个标识进程的消费者标记,一旦开始读取消息则会开始一个阻塞的循环等待从信道进来的数据,若是要中止,则须要先使用basic_cancel结束消费(关闭信道和链接),注意须要提供进程标识,而后再stop_consuming中止消费者

确认: 链接RabbitMQ->获取信道->设置确认模式->声明交换机->建立消息->发布消息->关闭信道->关闭链接

wKioL1g9GSbjoUozAACEXLxs2hg797.png

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import sys
import pika
# 说明: 导入其它模块


if __name__ == '__main__':
    # 建立凭证对象
    credentials = pika.PlainCredentials('guest', 'guest')
    # 建立参数对象
    conn_params = pika.ConnectionParameters(
        # RabbitMQ服务地址
        host='127.0.0.1',
        # RabbitMQ服务端口
        port=5672,
        # RabbitMQ登陆凭证
        credentials=credentials,
        # RabbitMQ虚拟主机
        virtual_host='/'
    )
    # 建立链接对象
    conn_broker = pika.BlockingConnection(conn_params)
    # 获取信道对象
    channel = conn_broker.channel()
    msg_ids = []
    msg_ids.append(len(msg_ids)+1)
    # 确认模式回调函数
    def confirm_handler(frame):
        # 第一次信道被设置为确认模式时会触发一次确认回调
        if type(frame.method) == pika.spec.Confirm.SelectOk:
            print 'found notice: channel in confirm mode'
        # 若是发送的消息达到队列后没有回应则说明消息丢失,须要重发
        elif type(frame.method) == pika.spec.Basic.Nack:
            # 若是丢的消息确实是msg_ids里面的,则说明刚刚发的消息确实是丢失了~
            if frame.method.delivery_tag in msg_ids:
                print 'found errors: message may be lost'
        # 若是发送的消息到达队列后发回响应
        elif type(frame.method) == pika.spec.Basic.Ack:
            # 若是确认消息id确实是msg_ids里面的,则从msg_ids里面删除
            if frame.method.delivery_tag in msg_ids:
                print 'found notice: message confirm received'
                # 删除已经确认的消息
                msg_ids.remove(frame.method.delivery_tag)
    # 设置信道为确认模式
    channel.confirm_delivery(callback=confirm_handler)
    # 建立交换机
    channel.exchange_declare(
        # 交换机名称
        exchange="salt-exchange",
        # 交换机类型
        type="direct",
        # 若是同名交换机已存在依然返回成功,不然建立
        passive=False,
        # 声明为非持久化交换机
        durable=False,
        # 交换机闲置也不会自动删除
        auto_delete=False
    )
    msg = sys.argv[1]
    # 建立配置对象
    msg_props = pika.BasicProperties()
    # 设置内容类型
    msg_props.content_type = 'text/plain'
    # 尝试发布消息
    channel.basic_publish(
        # 发布消息内容
        body=msg,
        # 发布到交换机
        exchange='salt-exchange',
        # 发布信息属性
        properties=msg_props,
        # 发布信息时携带的路由键
        routing_key='salt'
    )
    channel.close()

说明: RabbitMQ任何一个信道上发布的第一条消息都将得到ID1,而且信道上接下来的每一条消息的ID都会步进1,对于信道来讲,消息ID是惟一的,因此一旦信道关闭,你将没法追踪发布在该信道上任何未完成的发送方确认消息状态,因此RabbitMQ并不会在发布消息时返回消息对应的ID,而须要咱们本身为每一个信道单独维护一个消息计数器,在几乎不影响RabbitMQ性能的前提下在生产者端用回调来处理消息确认.

相关文章
相关标签/搜索