近期在作告警集成平台,其中须要告警消息发送,类型须要涵盖目前市场主流的消息接受端,例如微信/企业微信/钉钉/邮件/短信/电话等等,这势必要利用到MQ,在众多的消息中间件中,通过调研此场景并不象大数据处理场景须要kafka,同时须要较高性能和确认机制,数据的可靠性和活跃的社区,支持消息的持久化于中间件的高可用部署,最终选型了RabbitMQ来做为应用的中间件。html
MQ全称为Message Queue, 即消息队列。MQ是一种应用程序对应用程序的通讯方法。应用程序经过读写出入队列的消息(针对应用程序的数据)来通讯,而无需专用链接来连接它们。消息传递指的是程序之间经过在消息中发送数据进行通讯,而不是经过直接调用彼此来通讯,直接调用一般是用于诸如远程过程调用的技术。排队指的是应用程序经过队列来通讯。队列的使用除去了接收和发送应用程序同时执行的要求。RabbitMQ则是一个在AMQP基础上完整的,可复用的企业消息系统。node
也叫Broker Server,它不是运送食物的卡车,而是一种传输服务。原话是RabbitMQ isn't a food truck, it's a delivery service. 它的角色就是维护一条从Producer到Consumer的路线,保证数据可以按照指定的方式进行传输。虽然这个保证也不是100%的保证,可是对于普通的应用来讲这已经足够了。固然对于商业系统来讲,能够再作一层数据一致性的guard,就能够完全保证系统的一致性了。python
也叫Producer,数据的发送方。Create messages and publish (send) them to a Broker Server (RabbitMQ)。一个Message有两个部分:payload(有效载荷)和label(标签)。payload顾名思义就是传输的数据。label是exchange的名字或者说是一个tag,它描述了payload,并且RabbitMQ也是经过这个label来决定把这个Message发给哪一个Consumer。AMQP仅仅描述了label,而RabbitMQ决定了如何使用这个label的规则。git
也叫Consumer,数据的接收方。Consumers attach to a Broker Server (RabbitMQ) and subscribe to a queue。把queue比做是一个有名字的邮箱。当有Message到达某个邮箱后,RabbitMQ把它发送给它的某个订阅者即Consumer。固然可能会把同一个Message发送给不少的Consumer。在这个Message中,只有payload,label已经被删掉了。对于Consumer来讲,它是不知道谁发送的这个信息的,就是协议自己不支持。固然了,若是Producer发送的payload包含了Producer的信息就另当别论了。github
就是一个TCP的链接。Producer和Consumer都是经过TCP链接到RabbitMQ Server的。之后咱们能够看到,程序的起始处就是创建这个TCP链接。算法
虚拟链接。它创建在上述的TCP链接中。数据流动都是在Channel中进行的。也就是说,通常状况是程序起始创建TCP链接,第二步就是创建这个Channel。shell
那么,为何使用Channel,而不是直接使用TCP链接?vim
对于OS来讲,创建和关闭TCP链接是有代价的,频繁的创建关闭TCP链接对于系统的性能有很大的影响,并且TCP的链接数也有限制,这也限制了系统处理高并发的能力。可是,在TCP链接中创建Channel是没有上述代价的。对于Producer或者Consumer来讲,能够并发的使用多个Channel进行Publish或者Receive。有实验代表,1s的数据能够Publish10K的数据包。固然对于不一样的硬件环境,不一样的数据包大小这个数据确定不同,可是我只想说明,对于普通的Consumer或者Producer来讲,这已经足够了。若是不够用,你考虑的应该是如何细化SPLIT你的设计。centos
由Exchange、Queue、RoutingKey三个才能决定一个从Exchange到Queue的惟一的线路。安全
Connection Factory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。Connection是RabbitMQ的socket连接,它封装了socket协议相关部分逻辑。Connection Factory则是Connection的制造工厂。
Channel是咱们与RabbitMQ打交道的最重要的一个接口,咱们大部分的业务操做是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。
Queue(队列)是RabbitMQ的内部对象,用于存储消息,以下图表示。
RabbitMQ中的消息都只能存储在Queue中,生产者(下图中的P)生产消息并最终投递到Queue中,消费者(下图中的C)能够从Queue中获取消息并消费。
在实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完成就宕机(或出现其余意外)的状况,这种状况下就可能会致使消息丢失。为了不这种状况发生,咱们能够要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除。
若是RabbitMQ没有收到回执并检测到消费者的RabbitMQ链接断开,则RabbitMQ会将该消息发送给其余消费者(若是存在多个消费者)进行处理。这里不存在timeout,一个消费者处理消息时间再长也不会致使该消息被发送给其余消费者,除非它的RabbitMQ链接断开。
这里会产生另一个问题,若是咱们的开发人员在处理完业务逻辑后,忘记发送回执给RabbitMQ,这将会致使严重的bug——Queue中堆积的消息会愈来愈多。消费者重启后会重复消费这些消息并重复执行业务逻辑。
另外publish message 是没有ACK的。
若是咱们但愿即便在RabbitMQ服务重启的状况下,也不会丢失消息,咱们能够将Queue与Message都设置为可持久化的(durable),这样能够保证绝大部分状况下咱们的RabbitMQ消息不会丢失。但依然解决不了小几率丢失事件的发生(好比RabbitMQ服务器已经接收到生产者的消息,但还没来得及持久化该消息时RabbitMQ服务器就断电了),若是咱们须要对这种小几率事件也要管理起来,那么咱们要用到事务。因为这里仅为RabbitMQ的简单介绍,因此这里将不讲解RabbitMQ相关的事务。
前面咱们讲到若是有多个消费者同时订阅同一个Queue中的消息,Queue中的消息会被平摊给多个消费者。这时若是每一个消息的处理时间不一样,就有可能会致使某些消费者一直在忙,而另一些消费者很快就处理完手头工做并一直空闲的状况。咱们能够经过设置Prefetch count来限制Queue每次发送给每一个消费者的消息数,好比咱们设置prefetchCount=1,则Queue每次给每一个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息。
在上一节咱们看到生产者将消息投递到Queue中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的状况是,生产者将消息发送到Exchange(交换器,下图中的X),由Exchange将消息路由到一个或多个Queue中(或者丢弃)。
Exchange是按照什么逻辑将消息路由到Queue的?这个将在Binding一节中介绍。
RabbitMQ中的Exchange有四种类型,不一样的类型有着不一样的路由策略,这将在Exchange Types一节介绍。
生产者在将消息发送给Exchange的时候,通常会指定一个Routing Key,来指定这个消息的路由规则,而这个Routing Key须要与Exchange Type及Binding key联合使用才能最终生效。
在Exchange Type与Binding key固定的状况下(在正常使用时通常这些内容都是固定配置好的),咱们的生产者就能够在发送消息给Exchange时,经过指定Routing Key来决定消息流向哪里。
RabbitMQ为Routing Key设定的长度限制为255 bytes。
RabbitMQ中经过Binding将Exchange与Queue关联起来,这样RabbitMQ就知道如何正确地将消息路由到指定的Queue了。
在绑定(Binding)Exchange与Queue的同时,通常会指定一个Binding key。消费者将消息发送给Exchange时,通常会指定一个Routing Key。当 Binding key与Routing Key相匹配时,消息将会被路由到对应的Queue中。这个将在Exchange Types章节会列举实际的例子加以说明。
在绑定多个Queue到同一个Exchange的时候,这些Binding容许使用相同的Binding key。
Binding key并非在全部状况下都生效,它依赖于Exchange Type,好比fanout类型的Exchange就会无视Binding key,而是将消息路由到全部绑定到该Exchange的Queue。
RabbitMQ经常使用的Exchange Type有fanout、direct、topic、headers这四种(AMQP规范里还提到两种Exchange Type,分别为system与自定义,这里不予以描述),下面分别进行介绍。
fanout类型的Exchange路由规则很是简单,它会把全部发送到该Exchange的消息路由到全部与它绑定的Queue中。
上图中,生产者(P)发送到Exchange(X)的全部消息都会路由到图中的两个Queue,并最终被两个消费者(C1与C2)消费。
direct类型的Exchange路由规则也很简单,它会把消息路由到那些Binding key与Routing key彻底匹配的Queue中。
以上图的配置为例,咱们以routingKey="error"发送消息到Exchange,则消息会路由到Queue1(amqp.gen-S9b…,这是由RabbitMQ自动生成的Queue名称)和Queue2(amqp.gen-Agl…);若是咱们以Routing Key="info"或routingKey="warning"来发送消息,则消息只会路由到Queue2。若是咱们以其余Routing Key发送消息,则消息不会路由到这两个Queue中。
前面讲到direct类型的Exchange路由规则是彻底匹配Binding Key与Routing Key,但这种严格的匹配方式在不少状况下不能知足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage类似,也是将消息路由到Binding Key与Routing Key相匹配的Queue中,但这里的匹配规则有些不一样,它约定:
Routing Key为一个句点号“.”分隔的字符串(咱们将被句点号". "分隔开的每一段独立的字符串称为一个单词),如"stock.usd.nyse"、"nyse.vmw"、"quick.orange.rabbit"。Binding Key与Routing Key同样也是句点号“. ”分隔的字符串。
Binding Key中能够存在两种特殊字符""与"#",用于作模糊匹配,其中""用于匹配一个单词,"#"用于匹配多个单词(能够是零个)。
headers类型的Exchange不依赖于Routing Key与Binding Key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否彻底匹配Queue与Exchange绑定时指定的键值对。若是彻底匹配则消息会路由到该Queue,不然不会路由到该Queue。
该类型的Exchange没有用到过(不过也应该颇有用武之地),因此不作介绍。
MQ自己是基于异步的消息处理,前面的示例中全部的生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。
但实际的应用场景中,咱们极可能须要一些同步处理,须要同步等待服务端将个人消息处理完成后再进行下一步处理。这至关于RPC(Remote Procedure Call,远程过程调用)。在RabbitMQ中也支持RPC。
客户端发送请求(消息)时,在消息的属性(Message Properties,在AMQP协议中定义了14种properties,这些属性会随着消息一块儿发送)中设置两个值replyTo(一个Queue名称,用于告诉服务器处理完成后将通知个人消息发送到这个Queue中)和correlationId(这次请求的标识号,服务器处理完成后须要将此属性返还,客户端将根据这个id了解哪条请求被成功执行了或执行失败)。服务器端收到消息处理完后,将生成一条应答消息到replyTo指定的Queue,同时带上correlationId属性。客户端以前已订阅replyTo指定的Queue,从中收到服务器的应答消息后,根据其中的correlationId属性分析哪条请求被执行了,根据执行结果进行后续业务处理。
默认状况下,若是Message 已经被某个Consumer正确的接收到了,那么该Message就会被从Queue中移除。固然也可让同一个Message发送到不少的Consumer。
若是一个Queue没被任何的Consumer Subscribe(订阅),当有数据到达时,这个数据会被cache,不会被丢弃。当有Consumer时,这个数据会被当即发送到这个Consumer。这个数据被Consumer正确收到时,这个数据就被从Queue中删除。
那么什么是正确收到呢?经过ACK。每一个Message都要被acknowledged(确认,ACK)。咱们能够显示的在程序中去ACK,也能够自动的ACK。若是有数据没有被ACK,那么RabbitMQ Server会把这个信息发送到下一个Consumer。
若是这个APP有bug,忘记了ACK,那么RabbitMQ Server不会再发送数据给它,由于Server认为这个Consumer处理能力有限。并且ACK的机制能够起到限流的做用(Benefitto throttling):在Consumer处理完成数据后发送ACK,甚至在额外的延时后发送ACK,将有效的balance Consumer的load。
固然对于实际的例子,好比咱们可能会对某些数据进行merge,好比merge 4s内的数据,而后sleep 4s后再获取数据。特别是在监听系统的state,咱们不但愿全部的state实时的传递上去,而是但愿有必定的延时。这样能够减小某些IO,并且终端用户也不会感受到。
有两种方式,第一种的Reject可让RabbitMQ Server将该Message 发送到下一个Consumer。第二种是从Queue中当即删除该Message。
Consumer和Procuder均可以经过 queue.declare 建立queue。对于某个Channel来讲,Consumer不能declare一个queue,却订阅其余的queue。固然也能够建立私有的queue。这样只有APP自己才可使用这个queue。queue也能够自动删除,被标为auto-delete的queue在最后一个Consumer unsubscribe后就会被自动删除。那么若是是建立一个已经存在的queue呢?那么不会有任何的影响。须要注意的是没有任何的影响,也就是说第二次建立若是参数和第一次不同,那么该操做虽然成功,可是queue的属性并不会被修改。
那么谁应该负责建立这个queue呢?是Consumer,仍是Producer?
若是queue不存在,固然Consumer不会获得任何的Message。那么Producer Publish的Message会被丢弃。因此,仍是为了数据不丢失,Consumer和Producer都try to create the queue!反正无论怎么样,这个接口都不会出问题。
queue对load balance的处理是完美的。对于多个Consumer来讲,RabbitMQ 使用循环的方式(round-robin)的方式均衡的发送给不一样的Consumer。
从架构图能够看出,Procuder Publish的Message进入了Exchange。接着经过"routing keys”, RabbitMQ会找到应该把这个Message放到哪一个queue里。queue也是经过这个routing keys来作的绑定。有三种类型的Exchanges:direct, fanout,topic。 每一个实现了不一样的路由算法(routing algorithm)。
每一个virtual host本质上都是一个RabbitMQ Server,拥有它本身的queue,exchagne,和bings rule等等。这保证了你能够在多个不一样的Application中使用RabbitMQ。
此文档为centos7 安装部署
# 配置yum源
cat > /etc/yum.repos.d/erlang.repo << EOF
[rabbitmq_erlang]
name=rabbitmq_erlang
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/\$basearch
repo_gpgcheck=1
gpgcheck=0
enabled=1
gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300
[rabbitmq_erlang-source]
name=rabbitmq_erlang-source
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/SRPMS
repo_gpgcheck=1
gpgcheck=0
enabled=1
gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300
EOF
复制代码
cat > /etc/yum.repos.d/rabbitmq.repo <<EOF
[bintray-rabbitmq-server]
name=bintray-rabbitmq-rpm
baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.7.x/el/7/
gpgcheck=0
repo_gpgcheck=0
enabled=1
EOF
rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
复制代码
yum -y install rabbitmq-server
chkconfig rabbitmq-server on
# 更改rabbitmq数据和日志存储目录
# 建立数据和日志目录
mkdir -pv /data/rabbitmq/mnesia
mkdir -pv /data/rabbitmq/log
chown rabbitmq.rabbitmq /data/rabbitmq/* -R
# 建立配置文件
cat >/etc/rabbitmq/rabbitmq-env.conf <<EOF
RABBITMQ_MNESIA_BASE=/data/rabbitmq/mnesia
RABBITMQ_LOG_BASE=/data/rabbitmq/log
EOF
systemctl status rabbitmq-server
# 检查本地cli工具是否定证成功
sudo rabbitmq-diagnostics ping
# 打印应用启用的组件,tcp反省,内存使用,告警等等。
sudo rabbitmq-diagnostics status
# 打印节点有效的配置
sudo rabbitmq-diagnostics environment
# 本地节点监控检查
sudo rabbitmq-diagnostics node_health_check
# 添加用户
rabbitmqctl add_user xuel xuelpwd
rabbitmqctl list_users
Listing users ...
user tags
xuel [xuel]
guest [administrator]
# 角色定义
none 最小权限角色
management 管理员角色
policymaker 决策者
monitoring 监控
administrator 超级管理员
[root@VM_0_12_centos ~]# rabbitmqctl set_user_tags xuel administrator
Setting tags for user "xuel" to [administrator] ...
[root@VM_0_12_centos ~]# rabbitmqctl list_users
Listing users ...
user tags
xuel [administrator]
guest [administrator]
#查看全部的队列:
rabbitmqctl list_queues
# 新增虚拟主机:
rabbitmqctl add_vhost vhost_name
# 将新虚拟主机受权给新用户:
rabbitmqctl set_permissions -p vhost_name username '.*' '.*' '.*'
复制代码
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.17/ebin/rabbit.app
复制代码
rabbitmq-plugins enable rabbitmq_management
# rabbitmq 为了安全guest用户只能localhost访问,开启guest/guest登录
cat > /etc/rabbitmq/rabbitmq.config <<EOF
[{rabbit, [{loopback_users, []}]}].
EOF
systemctl restart rabbitmq-server
# 页面访问http://ip:15672
复制代码
因为技术栈为python,此处简单举例python中rabbitmq的使用
因为AMQP是双向RPC协议,客户端能够向服务器发送请求,服务器能够向客户端发送请求,所以Pika在其每一个异步链接适配器中实现或扩展IO循环。这些IO循环是阻塞循环和侦听事件的方法。每一个异步适配器都遵循相同的标准来调用IO循环。建立链接适配器时会建立IO循环。要为任何给定的适配器启动IO循环,请调用connection.ioloop.start()方法。
pip install pika
复制代码
channel.basic_consume
telling it to call the handle_delivery for each message RabbitMQ delivers to us.import pika
# Create a global channel variable to hold our channel object in
channel = None
# Step #2
def on_connected(connection):
"""Called when we are fully connected to RabbitMQ"""
# Open a channel
connection.channel(on_open_callback=on_channel_open)
# Step #3
def on_channel_open(new_channel):
"""Called when our channel has opened"""
global channel
channel = new_channel
channel.queue_declare(queue="test", durable=True, exclusive=False, auto_delete=False, callback=on_queue_declared)
# Step #4
def on_queue_declared(frame):
"""Called when RabbitMQ has told us our Queue has been declared, frame is the response from RabbitMQ"""
channel.basic_consume('test', handle_delivery)
# Step #5
def handle_delivery(channel, method, header, body):
"""Called when we receive a message from RabbitMQ"""
print(body)
# Step #1: Connect to RabbitMQ using the default parameters
parameters = pika.ConnectionParameters()
connection = pika.SelectConnection(parameters, on_open_callback=on_connected)
try:
# Loop so we can communicate with RabbitMQ
connection.ioloop.start()
except KeyboardInterrupt:
# Gracefully close the connection
connection.close()
# Loop until we're fully closed, will stop on its own
connection.ioloop.start()
复制代码
最总消费者和生产者整体均跑在k8s集群总,对于消息发送服务生产者发送消息携带routing_key,使用confirm确认,exchange使用direct模式,对应bind_key发送到对应queue中,在对英queue的connection中启动多个channel,每一个对应本身多个consumer来提升并发。