RabbitMQ,Apache的ActiveMQ,阿里RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可实现消息队列,RabbitMQ的应用场景以及基本原理介绍,RabbitMQ

 

 

消息队列及常见消息队列介绍

1、消息队列(MQ)概述javascript

消息队列(Message Queue),是分布式系统中重要的组件,其通用的使用场景能够简单地描述为:php

当不须要当即得到结果,可是并发量又须要进行控制的时候,差很少就是须要使用消息队列的时候。html

消息队列主要解决了应用耦合、异步处理、流量削锋等问题。前端

当前使用较多的消息队列有RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,而部分数据库如Redis、Mysql以及phxsql也可实现消息队列的功能。java

2、消息队列使用场景node

消息队列在实际应用中包括以下四个场景:python

  • 应用耦合:多应用间经过消息队列对同一消息进行处理,避免调用接口失败致使整个过程失败;c++

  • 异步处理:多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减小处理时间;git

  • 限流削峰:普遍应用于秒杀或抢购活动中,避免流量过大致使应用系统挂掉的状况;github

  • 消息驱动的系统:系统分为消息队列、消息生产者、消息消费者,生产者负责产生消息,消费者(可能有多个)负责对消息进行处理;

下面详细介绍上述四个场景以及消息队列如何在上述四个场景中使用:

2.1 异步处理

具体场景:用户为了使用某个应用,进行注册,系统须要发送注册邮件并验证短信。对这两个操做的处理方式有两种:串行及并行。

(1)串行方式:新注册信息生成后,先发送注册邮件,再发送验证短信;

在这种方式下,须要最终发送验证短信后再返回给客户端。

(2)并行处理:新注册信息写入后,由发短信和发邮件并行处理;

在这种方式下,发短信和发邮件 需处理完成后再返回给客户端。

假设以上三个子系统处理的时间均为50ms,且不考虑网络延迟,则总的处理时间:

串行:50+50+50=150ms

并行:50+50 = 100ms

若使用消息队列:

并在写入消息队列后当即返回成功给客户端,则总的响应时间依赖于写入消息队列的时间,而写入消息队列的时间自己是能够很快的,基本能够忽略不计,所以总的处理时间相比串行提升了2倍,相比并行提升了一倍;

2.2 应用耦合

具体场景:用户使用QQ相册上传一张图片,人脸识别系统会对该图片进行人脸识别,通常的作法是,服务器接收到图片后,图片上传系统当即调用人脸识别系统,调用完成后再返回成功,以下图所示:

该方法有以下缺点:

  • 人脸识别系统被调失败,致使图片上传失败;

  • 延迟高,须要人脸识别系统处理完成后,再返回给客户端,即便用户并不须要当即知道结果;

  • 图片上传系统与人脸识别系统之间互相调用,须要作耦合;

若使用消息队列:

客户端上传图片后,图片上传系统将图片信息如uin、批次写入消息队列,直接返回成功;而人脸识别系统则定时从消息队列中取数据,完成对新增图片的识别。

此时图片上传系统并不须要关心人脸识别系统是否对这些图片信息的处理、以及什么时候对这些图片信息进行处理。事实上,因为用户并不须要当即知道人脸识别结果,人脸识别系统能够选择不一样的调度策略,按照闲时、忙时、正常时间,对队列中的图片信息进行处理。

2.3 限流削峰

具体场景:购物网站开展秒杀活动,通常因为瞬时访问量过大,服务器接收过大,会致使流量暴增,相关系统没法处理请求甚至崩溃。而加入消息队列后,系统能够从消息队列中取数据,至关于消息队列作了一次缓冲。

该方法有以下优势:

  1. 请求先入消息队列,而不是由业务处理系统直接处理,作了一次缓冲,极大地减小了业务处理系统的压力;

  2. 队列长度能够作限制,事实上,秒杀时,后入队列的用户没法秒杀到商品,这些请求能够直接被抛弃,返回活动已结束或商品已售完信息;

2.4 消息驱动的系统

具体场景:用户新上传了一批照片, 人脸识别系统须要对这个用户的全部照片进行聚类,聚类完成后由对帐系统从新生成用户的人脸索引(加快查询)。这三个子系统间由消息队列链接起来,前一个阶段的处理结果放入队列中,后一个阶段从队列中获取消息继续处理。

该方法有以下优势:

  • 避免了直接调用下一个系统致使当前系统失败;

  • 每一个子系统对于消息的处理方式能够更为灵活,能够选择收到消息时就处理,能够选择定时处理,也能够划分时间段按不一样处理速度处理;

3、消息队列的两种模式

消息队列包括两种模式,点对点模式(point to point, queue)和发布/订阅模式(publish/subscribe,topic)。

3.1 点对点模式

点对点模式下包括三个角色:

  • 消息队列

  • 发送者 (生产者)

  • 接收者(消费者)

消息发送者生产消息发送到queue中,而后消息接收者从queue中取出而且消费消息。消息被消费之后,queue中再也不有存储,因此消息接收者不可能消费到已经被消费的消息。

点对点模式特色:

  • 每一个消息只有一个接收者(Consumer)(即一旦被消费,消息就再也不在消息队列中);

  • 发送者和接收者间没有依赖性,发送者发送消息以后,无论有没有接收者在运行,都不会影响到发送者下次发送消息;

  • 接收者在成功接收消息以后需向队列应答成功,以便消息队列删除当前接收的消息;

3.2 发布/订阅模式

发布/订阅模式下包括三个角色:

  • 角色主题(Topic)

  • 发布者(Publisher)

  • 订阅者(Subscriber)

发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。

发布/订阅模式特色:

  • 每一个消息能够有多个订阅者;

  • 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须建立一个订阅者以后,才能消费发布者的消息。

  • 为了消费消息,订阅者须要提早订阅该角色主题,并保持在线运行;

4、经常使用消息队列介绍

本部分主要介绍四种经常使用的消息队列(RabbitMQ/ActiveMQ/RocketMQ/Kafka)的主要特性、优势、缺点。

4.1 RabbitMQ

RabbitMQ 2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。

主要特性:

  1. 可靠性: 提供了多种技术可让你在性能和可靠性之间进行权衡。这些技术包括持久性机制、投递确认、发布者证明和高可用性机制;

  2. 灵活的路由: 消息在到达队列前是经过交换机进行路由的。RabbitMQ为典型的路由逻辑提供了多种内置交换机类型。若是你有更复杂的路由需求,能够将这些交换机组合起来使用,你甚至能够实现本身的交换机类型,而且当作RabbitMQ的插件来使用;

  3. 消息集群:在相同局域网中的多个RabbitMQ服务器能够聚合在一块儿,做为一个独立的逻辑代理来使用;

  4. 队列高可用:队列能够在集群中的机器上进行镜像,以确保在硬件问题下还保证消息安全;

  5. 多种协议的支持:支持多种消息队列协议;

  6. 服务器端用Erlang语言编写,支持只要是你能想到的全部编程语言;

  7. 管理界面: RabbitMQ有一个易用的用户界面,使得用户能够监控和管理消息Broker的许多方面;

  8. 跟踪机制:若是消息异常,RabbitMQ提供消息跟踪机制,使用者能够找出发生了什么;

  9. 插件机制:提供了许多插件,来从多方面进行扩展,也能够编写本身的插件;

使用RabbitMQ须要:

  • ErLang语言包

  • RabbitMQ安装包

RabbitMQ能够运行在Erlang语言所支持的平台之上:

Solaris

BSD

Linux

MacOSX

TRU64

Windows NT/2000/XP/Vista/Windows 7/Windows 8

Windows Server 2003/2008/2012

Windows 95, 98

VxWorks

优势:

  1. 因为erlang语言的特性,mq 性能较好,高并发;

  2. 健壮、稳定、易用、跨平台、支持多种语言、文档齐全;

  3. 有消息确认机制和持久化机制,可靠性高;

  4. 高度可定制的路由;

  5. 管理界面较丰富,在互联网公司也有较大规模的应用;

  6. 社区活跃度高;

缺点:

  1. 尽管结合erlang语言自己的并发优点,性能较好,可是不利于作二次开发和维护;

  2. 实现了代理架构,意味着消息在发送到客户端以前能够在中央节点上排队。此特性使得RabbitMQ易于使用和部署,可是使得其运行速度较慢,由于中央节点增长了延迟,消息封装后也比较大;

  3. 须要学习比较复杂的接口和协议,学习和维护成本较高;

4.2 ActiveMQ

ActiveMQ是由Apache出品,ActiveMQ 是一个彻底支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。它很是快速,支持多种语言的客户端和协议,并且能够很是容易的嵌入到企业的应用环境中,并有许多高级功能。

主要特性:

  1. 服从 JMS 规范:JMS 规范提供了良好的标准和保证,包括:同步或异步的消息分发,一次和仅一次的消息分发,消息接收和订阅等等。听从 JMS 规范的好处在于,不论使用什么 JMS 实现提供者,这些基础特性都是可用的;

  2. 链接性:ActiveMQ 提供了普遍的链接选项,支持的协议有:HTTP/S,IP 多播,SSL,STOMP,TCP,UDP,XMPP等等。对众多协议的支持让 ActiveMQ 拥有了很好的灵活性。

  3. 支持的协议种类多:OpenWire、STOMP、REST、XMPP、AMQP ;

  4. 持久化插件和安全插件:ActiveMQ 提供了多种持久化选择。并且,ActiveMQ 的安全性也能够彻底依据用户需求进行自定义鉴权和受权;

  5. 支持的客户端语言种类多:除了 Java 以外,还有:C/C++,.NET,Perl,PHP,Python,Ruby;

  6. 代理集群:多个 ActiveMQ 代理能够组成一个集群来提供服务;

  7. 异常简单的管理:ActiveMQ 是以开发者思惟被设计的。因此,它并不须要专门的管理员,由于它提供了简单又使用的管理特性。有不少中方法能够监控 ActiveMQ 不一样层面的数据,包括使用在 JConsole 或者 ActiveMQ 的Web Console 中使用 JMX,经过处理 JMX 的告警消息,经过使用命令行脚本,甚至能够经过监控各类类型的日志。

使用ActiveMQ须要:

  • Java JDK

  • ActiveMQ安装包

ActiveMQ能够运行在Java语言所支持的平台之上。

优势:

  1. 跨平台(JAVA编写与平台无关有,ActiveMQ几乎能够运行在任何的JVM上)

  2. 能够用JDBC:能够将数据持久化到数据库。虽然使用JDBC会下降ActiveMQ的性能,可是数据库一直都是开发人员最熟悉的存储介质。将消息存到数据库,看得见摸得着。并且公司有专门的DBA去对数据库进行调优,主从分离;

  3. 支持JMS :支持JMS的统一接口;

  4. 支持自动重连;

  5. 有安全机制:支持基于shiro,jaas等多种安全配置机制,能够对Queue/Topic进行认证和受权。

  6. 监控完善:拥有完善的监控,包括Web Console,JMX,Shell命令行,Jolokia的REST API;

  7. 界面友善:提供的Web Console能够知足大部分状况,还有不少第三方的组件可使用,如hawtio;

    缺点:

  8. 社区活跃度不及RabbitMQ高;

  9. 根据其余用户反馈,会出莫名其妙的问题,会丢失消息;

  10. 目前重心放到activemq6.0产品-apollo,对5.x的维护较少;

  11. 不适合用于上千个队列的应用场景;

4.3 RocketMQ

RocketMQ出自 阿里公司的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并作出了本身的一些改进,消息可靠性上比 Kafka 更好。RocketMQ在阿里集团被普遍应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景。

主要特性:

  1. 是一个队列模型的消息中间件,具备高性能、高可靠、高实时、分布式特色;

  2. Producer、Consumer、队列均可以分布式;

  3. Producer向一些队列轮流发送消息,队列集合称为Topic,Consumer若是作广播消费,则一个consumer实例消费这个Topic对应的全部队列,若是作集群消费,则多个Consumer实例平均消费这个topic对应的队列集合;

  4. 可以保证严格的消息顺序;

  5. 提供丰富的消息拉取模式;

  6. 高效的订阅者水平扩展能力;

  7. 实时的消息订阅机制;

  8. 亿级消息堆积能力;

  9. 较少的依赖;

使用RocketMQ须要:

  • Java JDK

  • 安装git、Maven

  • RocketMQ安装包

RocketMQ能够运行在Java语言所支持的平台之上。

优势:

  1. 单机支持 1 万以上持久化队列

  2. RocketMQ 的全部消息都是持久化的,先写入系统 PAGECACHE,而后刷盘,能够保证内存与磁盘都有一份数据,

    访问时,直接从内存读取。

  3. 模型简单,接口易用(JMS 的接口不少场合并不太实用);

  4. 性能很是好,能够大量堆积消息在broker中;

  5. 支持多种消费,包括集群消费、广播消费等。

  6. 各个环节分布式扩展设计,主从HA;

  7. 开发度较活跃,版本更新很快。

缺点:

支持的客户端语言很少,目前是java及c++,其中c++不成熟;

RocketMQ社区关注度及成熟度也不及前二者;

没有web管理界面,提供了一个CLI(命令行界面)管理工具带来查询、管理和诊断各类问题;

没有在 mq 核心中去实现JMS等接口;

4.4 Kafka

Apache Kafka是一个分布式消息发布订阅系统。它最初由LinkedIn公司基于独特的设计实现为一个分布式的提交日志系统( a distributed commit log),,以后成为Apache项目的一部分。Kafka系统快速、可扩展而且可持久化。它的分区特性,可复制和可容错都是其不错的特性。

主要特性:

  1. 快速持久化,能够在O(1)的系统开销下进行消息持久化;

  2. 高吞吐,在一台普通的服务器上既能够达到10W/s的吞吐速率;

  3. .彻底的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;

  4. 支持同步和异步复制两种HA;

  5. 支持数据批量发送和拉取;

  6. zero-copy:减小IO操做步骤;

  7. 数据迁移、扩容对用户透明;

  8. 无需停机便可扩展机器;

  9. 其余特性:严格的消息顺序、丰富的消息拉取模型、高效订阅者水平扩展、实时的消息订阅、亿级的消息堆积能力、按期删除机制;

使用Kafka须要:

  • Java JDK

  • Kafka安装包

优势:

  1. 客户端语言丰富,支持java、.net、php、ruby、python、go等多种语言;

  2. 性能卓越,单机写入TPS约在百万条/秒,消息大小10个字节;

  3. 提供彻底分布式架构, 并有replica机制, 拥有较高的可用性和可靠性, 理论上支持消息无限堆积;

  4. 支持批量操做;

  5. 消费者采用Pull方式获取消息, 消息有序, 经过控制可以保证全部消息被消费且仅被消费一次;

  6. 有优秀的第三方Kafka Web管理界面Kafka-Manager;

  7. 在日志领域比较成熟,被多家公司和多个开源项目使用;

缺点:

  1. Kafka单机超过64个队列/分区,Load会发生明显的飙高现象,队列越多,load越高,发送消息响应时间变长

  2. 使用短轮询方式,实时性取决于轮询间隔时间;

  3. 消费失败不支持重试;

  4. 支持消息顺序,可是一台代理宕机后,就会产生消息乱序;

  5. 社区更新较慢;

4.5 RabbitMQ/ActiveMQ/RocketMQ/Kafka对比

这里列举了上述四种消息队列的差别对比:

结论:

Kafka在于分布式架构,RabbitMQ基于AMQP协议来实现,RocketMQ/思路来源于kafka,改为了主从结构,在事务性可靠性方面作了优化。普遍来讲,电商、金融等对事务性要求很高的,能够考虑RabbitMQ和RocketMQ,对性能要求高的可考虑Kafka。

5、参考资料: 5.1 消息队列:

  1. 大型网站架构之分布式消息队列 http://blog.csdn.net/shaobingj126/article/details/50585035

  2. 消息队列的使用场景 https://www.zhihu.com/question/34243607/answer/127666030

  3. 浅谈异步消息队列模型 http://www.cnblogs.com/sunkeydev/p/5248855.html

  4. 消息队列的两种模式 http://blog.csdn.net/heyutao007/article/details/50131089

5.2 RabbitMQ

  1. RabbitMQ主页 https://www.rabbitmq.com/

  2. RabbitMQ学习教程 https://www.rabbitmq.com/getstarted.html

  3. 专栏:RabbitMQ从入门到精通 http://blog.csdn.net/column/details/rabbitmq.html

  4. RabbitMQ能为你作些什么 http://rabbitmq.mr-ping.com/deion.html

  5. RabbitMQ指南(1)-特性及功能 https://blog.zenfery.cc/archives/79.html

5.3 ActiveMQ

  1. ActiveMQ主页 http://activemq.apache.org/

  2. Apache ActiveMQ介绍 http://jfires.iteye.com/blog/1187688

  3. ActiveMQ的简介与安装 http://blog.csdn.net/sl1992/article/details/72824562

  4. ActiveMQ 和消息简介 http://www.cnblogs.com/craftsman-gao/p/7002605.html

5.4 RocketMQ

  1. 主页 https://github.com/alibaba/RocketMQ

  2. RocketMQ 原理简介 http://alibaba.github.io/RocketMQ-docs/document/design/RocketMQ_design.pdf

  3. RocketMQ与kafka对比(18项差别) http://jm.taobao.org/2016/03/24/rmq-vs-kafka/

5.5 Kafka

1.Kafka主页: http://kafka.apache.org/

  1. Kafka特性 http://www.cnblogs.com/lsx1993/p/4847719.html

  2. Kafka客户端支持语言 https://cwiki.apache.org/confluence/display/KAFKA/Clients

5.6 RabbitMQ/ActiveMQ/RocketMQ/Kafka对比

  1. RocketMQ,队列选型 http://www.zmannotes.com/index.php/2016/01/17/rocketmq/

  2. RabbitMQ和Kafka http://www.dongcoder.com/detail-416804.html

  3. 即时通讯RabbitMQ二-性能测试 http://www.jianshu.com/p/d31ae9e3bfb6

  4. RabbitMq、ActiveMq、ZeroMq、kafka之间的比较,资料汇总 http://blog.csdn.net/linsongbin1/article/details/47781187

  5. 消息队列软件产品大比拼 http://www.cnblogs.com/amityat/archive/2011/08/31/2160293.html

总结:

消息队列利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通讯来进行分布式系统的集成。目前业界有不少的MQ产品,例如RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,也有直接使用数据库redis充当消息队列的案例。而这些消息队列产品,各有侧重,在实际选型时,须要结合自身需求及MQ产品特征,综合考虑。

 

 

 

 

RabbitMQ的应用场景以及基本原理介绍

1.背景

RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue)的开源实现。

2.应用场景

2.1异步处理

场景说明:用户注册后,须要发注册邮件和注册短信,传统的作法有两种1.串行的方式;2.并行的方式 
(1)串行方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务所有完成后才返回给客户端。 这有一个问题是,邮件,短信并非必须的,它只是一个通知,而这种作法让客户端等待没有必要等待的东西. 
这里写图片描述
(2)并行方式:将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式能提升处理的时间。 
这里写图片描述 
假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用时间100ms。虽然并性已经提升的处理时间,可是,前面说过,邮件和短信对我正常的使用网站没有任何影响,客户端没有必要等着其发送完成才显示注册成功,英爱是写入数据库后就返回. 
(3)消息队列 
引入消息队列后,把发送邮件,短信不是必须的业务逻辑异步处理 
这里写图片描述 
由此能够看出,引入消息队列后,用户的响应时间就等于写入数据库的时间+写入消息队列的时间(能够忽略不计),引入消息队列后处理后,响应时间是串行的3倍,是并行的2倍。

2.2 应用解耦

场景:双11是购物狂节,用户下单后,订单系统须要通知库存系统,传统的作法就是订单系统调用库存系统的接口. 
这里写图片描述 
这种作法有一个缺点:

  • 当库存系统出现故障时,订单就会失败。(这样马云将少赚好多好多钱^ ^)
  • 订单系统和库存系统高耦合. 
    引入消息队列 
    这里写图片描述

  • 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。

  • 库存系统:订阅下单的消息,获取下单消息,进行库操做。 
    就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会致使消息丢失(马云这下高兴了).

流量削峰

流量削峰通常在秒杀活动中应用普遍 
场景:秒杀活动,通常会由于流量过大,致使应用挂掉,为了解决这个问题,通常在应用前端加入消息队列。 
做用: 
1.能够控制活动人数,超过此必定阀值的订单直接丢弃(我为何秒杀一次都没有成功过呢^^) 
2.能够缓解短期的高流量压垮应用(应用程序按本身的最大处理能力获取订单) 
这里写图片描述 
1.用户的请求,服务器收到以后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面. 
2.秒杀业务根据消息队列中的请求信息,再作后续处理.

3.系统架构

这里写图片描述 
几个概念说明: 
Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输, 
Exchange:消息交换机,它指定消息按什么规则,路由到哪一个队列。 
Queue:消息的载体,每一个消息都会被投到一个或多个队列。 
Binding:绑定,它的做用就是把exchange和queue按照路由规则绑定起来. 
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。 
vhost:虚拟主机,一个broker里能够有多个vhost,用做不一样用户的权限分离。 
Producer:消息生产者,就是投递消息的程序. 
Consumer:消息消费者,就是接受消息的程序. 
Channel:消息通道,在客户端的每一个链接里,可创建多个channel.

4.任务分发机制

4.1Round-robin dispathching循环分发

RabbbitMQ的分发机制很是适合扩展,并且它是专门为并发程序设计的,若是如今load加剧,那么只须要建立更多的Consumer来进行任务处理。

4.2Message acknowledgment消息确认

为了保证数据不被丢失,RabbitMQ支持消息确认机制,为了保证数据能被正确处理而不只仅是被Consumer收到,那么咱们不能采用no-ack,而应该是在处理完数据以后发送ack. 
在处理完数据以后发送ack,就是告诉RabbitMQ数据已经被接收,处理完成,RabbitMQ能够安全的删除它了. 
若是Consumer退出了可是没有发送ack,那么RabbitMQ就会把这个Message发送到下一个Consumer,这样就保证在Consumer异常退出状况下数据也不会丢失. 
RabbitMQ它没有用到超时机制.RabbitMQ仅仅经过Consumer的链接中断来确认该Message并无正确处理,也就是说RabbitMQ给了Consumer足够长的时间作数据处理。 
若是忘记ack,那么当Consumer退出时,Mesage会从新分发,而后RabbitMQ会占用愈来愈多的内存.

5.Message durability消息持久化

要持久化队列queue的持久化须要在声明时指定durable=True; 
这里要注意,队列的名字必定要是Broker中不存在的,否则不能改变此队列的任何属性. 
队列和交换机有一个建立时候指定的标志durable,durable的惟一含义就是具备这个标志的队列和交换机会在重启以后从新创建,它不表示说在队列中的消息会在重启后恢复 
消息持久化包括3部分 
1. exchange持久化,在声明时指定durable => true

hannel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的
  • 1

2.queue持久化,在声明时指定durable => true

channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的
  • 1

3.消息持久化,在投递时指定delivery_mode => 2(1是非持久化).

channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); 
  • 1

若是exchange和queue都是持久化的,那么它们之间的binding也是持久化的,若是exchange和queue二者之间有一个持久化,一个非持久化,则不容许创建绑定. 
注意:一旦建立了队列和交换机,就不能修改其标志了,例如,建立了一个non-durable的队列,而后想把它改变成durable的,惟一的办法就是删除这个队列而后重现建立。

6.Fair dispath 公平分发

你可能也注意到了,分发机制不是那么优雅,默认状态下,RabbitMQ将第n个Message分发给第n个Consumer。n是取余后的,它无论Consumer是否还有unacked Message,只是按照这个默认的机制进行分发. 
那么若是有个Consumer工做比较重,那么就会致使有的Consumer基本没事可作,有的Consumer却毫无休息的机会,那么,Rabbit是如何处理这种问题呢? 
这里写图片描述 
经过basic.qos方法设置prefetch_count=1,这样RabbitMQ就会使得每一个Consumer在同一个时间点最多处理一个Message,换句话说,在接收到该Consumer的ack前,它不会将新的Message分发给它

channel.basic_qos(prefetch_count=1) 
  • 1

注意,这种方法可能会致使queue满。固然,这种状况下你可能须要添加更多的Consumer,或者建立更多的virtualHost来细化你的设计。

7.分发到多个Consumer

7.1Exchange

先来温习如下交换机路由的几种类型: 
Direct Exchange:直接匹配,经过Exchange名称+RountingKey来发送与接收消息. 
Fanout Exchange:广播订阅,向全部的消费者发布消息,可是只有消费者将队列绑定到该路由器才能收到消息,忽略Routing Key. 
Topic Exchange:主题匹配订阅,这里的主题指的是RoutingKey,RoutingKey能够采用通配符,如:*或#,RoutingKey命名采用.来分隔多个词,只有消息这将队列绑定到该路由器且指定RoutingKey符合匹配规则时才能收到消息; 
Headers Exchange:消息头订阅,消息发布前,为消息定义一个或多个键值对的消息头,而后消费者接收消息同时须要定义相似的键值对请求头:(如:x-mactch=all或者x_match=any),只有请求头与消息头匹配,才能接收消息,忽略RoutingKey. 
默认的exchange:若是用空字符串去声明一个exchange,那么系统就会使用”amq.direct”这个exchange,咱们建立一个queue时,默认的都会有一个和新建queue同名的routingKey绑定到这个默认的exchange上去

channel.BasicPublish("", "TaskQueue", properties, bytes);
  • 1

由于在第一个参数选择了默认的exchange,而咱们申明的队列叫TaskQueue,因此默认的,它在新建一个也叫TaskQueue的routingKey,并绑定在默认的exchange上,致使了咱们能够在第二个参数routingKey中写TaskQueue,这样它就会找到定义的同名的queue,并把消息放进去。 
若是有两个接收程序都是用了同一个的queue和相同的routingKey去绑定direct exchange的话,分发的行为是负载均衡的,也就是说第一个是程序1收到,第二个是程序2收到,以此类推。 
若是有两个接收程序用了各自的queue,但使用相同的routingKey去绑定direct exchange的话,分发的行为是复制的,也就是说每一个程序都会收到这个消息的副本。行为至关于fanout类型的exchange。 
下面详细来讲:

7.2 Bindings 绑定

绑定其实就是关联了exchange和queue,或者这么说:queue对exchange的内容感兴趣,exchange要把它的Message deliver到queue。

7.3Direct exchange

Driect exchange的路由算法很是简单:经过bindingkey的彻底匹配,能够用下图来讲明. 
这里写图片描述 
Exchange和两个队列绑定在一块儿,Q1的bindingkey是orange,Q2的binding key是black和green. 
当Producer publish key是orange时,exchange会把它放到Q1上,若是是black或green就会到Q2上,其他的Message被丢弃.

7.4 Multiple bindings

多个queue绑定同一个key也是能够的,对于下图的例子,Q1和Q2都绑定了black,对于routing key是black的Message,会被deliver到Q1和Q2,其他的Message都会被丢弃. 
这里写图片描述

7.5 Topic exchange

对于Message的routing_key是有限制的,不能使任意的。格式是以点号“.”分割的字符表。好比:”stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”。你能够听任意的key在routing_key中,固然最长不能超过255 bytes。 
对于routing_key,有两个特殊字符

  • *(星号)表明任意一个单词
  • #(hash)0个或多个单词 
    这里写图片描述 
    Producer发送消息时须要设置routing_key,routing_key包含三个单词和连个点号o,第一个key描述了celerity(灵巧),第二个是color(色彩),第三个是物种: 
    在这里咱们建立了两个绑定: Q1 的binding key 是”.orange.“; Q2 是 “..rabbit” 和 “lazy.#”:

    • Q1感兴趣全部orange颜色的动物
    • Q2感兴趣全部rabbits和全部的lazy的. 
      例子:rounting_key 为 “quick.orange.rabbit”将会发送到Q1和Q2中 
      rounting_key 为”lazy.orange.rabbit.hujj.ddd”会被投递到Q2中,#匹配0个或多个单词。

8.消息序列化

RabbitMQ使用ProtoBuf序列化消息,它可做为RabbitMQ的Message的数据格式进行传输,因为是结构化的数据,这样就极大的方便了Consumer的数据高效处理,固然也可使用XML,与XML相比,ProtoBuf有如下优点: 
1.简单 
2.size小了3-10倍 
3.速度快了20-100倍 
4.易于编程 
6.减小了语义的歧义. 
,ProtoBuf具备速度和空间的优点,使得它如今应用很是普遍

 

 

 

 

 

RabbitMQ基础知识详解

什么是MQ?

       MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通讯方法。MQ是消费-生产者模型的一个典型的表明,一端往消息队列中不断写入消息,而另外一端则能够读取队列中的消息。

      RabbitMQ是MQ的一种。下面详细介绍一下RabbitMQ的基本概念。

一、队列、生产者、消费者

      队列是RabbitMQ的内部对象,用于存储消息。生产者(下图中的P)生产消息并投递到队列中,消费者(下图中的C)能够从队列中获取消息并消费。

      

      多个消费者能够订阅同一个队列,这时队列中的消息会被平均分摊给多个消费者进行处理,而不是每一个消费者都收到全部的消息并处理。

      

二、Exchange、Binding

      刚才咱们看到生产者将消息投递到队列中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的状况是,生产者将消息发送到Exchange(交换器,下图中的X),再经过Binding将Exchange与Queue关联起来。

      

三、Exchange Type、Bingding key、routing key

      在绑定(Binding)Exchange与Queue的同时,通常会指定一个binding key。在绑定多个Queue到同一个Exchange的时候,这些Binding容许使用相同的binding key。

      生产者在将消息发送给Exchange的时候,通常会指定一个routing key,来指定这个消息的路由规则,生产者就能够在发送消息给Exchange时,经过指定routing key来决定消息流向哪里。

      RabbitMQ经常使用的Exchange Type有三种:fanout、direct、topic。

      fanout:把全部发送到该Exchange的消息投递到全部与它绑定的队列中。

      direct:把消息投递到那些binding key与routing key彻底匹配的队列中。

      topic:将消息路由到binding key与routing key模式匹配的队列中。

      附上一张RabbitMQ的结构图:

      

    

最后来具体解析一下几个问题:

一、能够自动建立队列,也能够手动建立队列,若是自动建立队列,那么是谁负责建立队列呢?是生产者?仍是消费者? 

      若是队列不存在,固然消费者不会收到任何的消息。可是若是队列不存在,那么生产者发送的消息就会丢失。因此,为了数据不丢失,消费者和生产者均可以建立队列。那么若是建立一个已经存在的队列呢?那么不会有任何的影响。须要注意的是没有任何的影响,也就是说第二次建立若是参数和第一次不同,那么该操做虽然成功,可是队列属性并不会改变。

      队列对于负载均衡的处理是完美的。对于多个消费者来讲,RabbitMQ使用轮询的方式均衡的发送给不一样的消费者。

二、RabbitMQ的消息确认机制

      默认状况下,若是消息已经被某个消费者正确的接收到了,那么该消息就会被从队列中移除。固然也可让同一个消息发送到不少的消费者。

      若是一个队列没有消费者,那么,若是这个队列有数据到达,那么这个数据会被缓存,不会被丢弃。当有消费者时,这个数据会被当即发送到这个消费者,这个数据被消费者正确收到时,这个数据就被从队列中删除。

     那么什么是正确收到呢?经过ack。每一个消息都要被acknowledged(确认,ack)。咱们能够显示的在程序中去ack,也能够自动的ack。若是有数据没有被ack,那么:

     RabbitMQ Server会把这个信息发送到下一个消费者。

     若是这个app有bug,忘记了ack,那么RabbitMQServer不会再发送数据给它,由于Server认为这个消费者处理能力有限。

    并且ack的机制能够起到限流的做用(Benefitto throttling):在消费者处理完成数据后发送ack,甚至在额外的延时后发送ack,将有效的均衡消费者的负载。

 

 

 

 

 

RabbitMQ布曙

2017.05.06 16:03* 字数 4884 阅读 34889评论 13

关于消息队列,从前年开始断断续续看了些资料,想写好久了,但一直没腾出空,近来分别碰到几个朋友聊这块的技术选型,是时候把这块的知识整理记录一下了。

市面上的消息队列产品有不少,好比老牌的 ActiveMQ、RabbitMQ ,目前我看最火的 Kafka ,还有 ZeroMQ ,去年末阿里巴巴捐赠给 Apache 的 RocketMQ ,连 redis 这样的 NoSQL 数据库也支持 MQ 功能。总之这块知名的产品就有十几种,就我本身的使用经验和兴趣只打算谈谈 RabbitMQ、Kafka 和 ActiveMQ ,本文先讲 RabbitMQ ,在此以前先看下消息队列的相关概念。

什么叫消息队列

消息(Message)是指在应用间传送的数据。消息能够很是简单,好比只包含文本字符串,也能够更复杂,可能包含嵌入对象。

消息队列(Message Queue)是一种应用间的通讯方式,消息发送后能够当即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而无论是谁发布的。这样发布者和使用者都不用知道对方的存在。

为什么用消息队列

从上面的描述中能够看出消息队列是一种应用间的异步协做机制,那何时须要使用 MQ 呢?

以常见的订单系统为例,用户点击【下单】按钮以后的业务逻辑可能包括:扣减库存、生成相应单据、发红包、发短信通知。在业务发展初期这些逻辑可能放在一块儿同步执行,随着业务的发展订单量增加,须要提高系统服务的性能,这时能够将一些不须要当即生效的操做拆分出来异步执行,好比发放红包、发短信通知等。这种场景下就能够用 MQ ,在下单的主流程(好比扣减库存、生成相应单据)完成以后发送一条消息到 MQ 让主流程快速完结,而由另外的单独线程拉取MQ的消息(或者由 MQ 推送消息),当发现 MQ 中有发红包或发短信之类的消息时,执行相应的业务逻辑。

以上是用于业务解耦的状况,其它常见场景包括最终一致性、广播、错峰流控等等。

RabbitMQ 特色

RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。

AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。

RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特色包括:

  1. 可靠性(Reliability)
    RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。

  2. 灵活的路由(Flexible Routing)
    在消息进入队列以前,经过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,能够将多个 Exchange 绑定在一块儿,也经过插件机制实现本身的 Exchange 。

  3. 消息集群(Clustering)
    多个 RabbitMQ 服务器能够组成一个集群,造成一个逻辑 Broker 。

  4. 高可用(Highly Available Queues)
    队列能够在集群中的机器上进行镜像,使得在部分节点出问题的状况下队列仍然可用。

  5. 多种协议(Multi-protocol)
    RabbitMQ 支持多种消息队列协议,好比 STOMP、MQTT 等等。

  6. 多语言客户端(Many Clients)
    RabbitMQ 几乎支持全部经常使用语言,好比 Java、.NET、Ruby 等等。

  7. 管理界面(Management UI)
    RabbitMQ 提供了一个易用的用户界面,使得用户能够监控和管理消息 Broker 的许多方面。

  8. 跟踪机制(Tracing)
    若是消息异常,RabbitMQ 提供了消息跟踪机制,使用者能够找出发生了什么。

  9. 插件机制(Plugin System)
    RabbitMQ 提供了许多插件,来从多方面进行扩展,也能够编写本身的插件。

RabbitMQ 中的概念模型

消息模型

全部 MQ 产品从模型抽象上来讲都是同样的过程:
消费者(consumer)订阅某个队列。生产者(producer)建立消息,而后发布到队列(queue)中,最后将消息发送到监听的消费者。

 
消息流
RabbitMQ 基本概念

上面只是最简单抽象的描述,具体到 RabbitMQ 则有更详细的概念须要解释。上面介绍过 RabbitMQ 是 AMQP 协议的一个开源实现,因此其内部实际上也是 AMQP 中的基本概念:

 
RabbitMQ 内部结构
  1. Message
    消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其余消息的优先权)、delivery-mode(指出该消息可能须要持久性存储)等。
  2. Publisher
    消息的生产者,也是一个向交换器发布消息的客户端应用程序。
  3. Exchange
    交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
  4. Binding
    绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列链接起来的路由规则,因此能够将交换器理解成一个由绑定构成的路由表。
  5. Queue
    消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者链接到这个队列将其取走。
  6. Connection
    网络链接,好比一个TCP链接。
  7. Channel
    信道,多路复用链接中的一条独立的双向数据流通道。信道是创建在真实的TCP链接内地虚拟链接,AMQP 命令都是经过信道发出去的,无论是发布消息、订阅队列仍是接收消息,这些动做都是经过信道完成。由于对于操做系统来讲创建和销毁 TCP 都是很是昂贵的开销,因此引入了信道的概念,以复用一条 TCP 链接。
  8. Consumer
    消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
  9. Virtual Host
    虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每一个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有本身的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在链接时指定,RabbitMQ 默认的 vhost 是 / 。
  10. Broker
    表示消息队列服务器实体。
AMQP 中的消息路由

AMQP 中消息的路由过程和 Java 开发者熟悉的 JMS 存在一些差异,AMQP 中增长了 Exchange 和 Binding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到那个队列。

 
AMQP 的消息路由过程
Exchange 类型

Exchange分发消息时根据类型的不一样分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器彻底一致,但性能差不少,目前几乎用不到了,因此直接看另外三种类型:

  1. direct


     
    direct 交换器

    消息中的路由键(routing key)若是和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名彻底匹配,若是一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是彻底匹配、单播的模式。

  2. fanout


     
    fanout 交换器

    每一个发到 fanout 类型交换器的消息都会分到全部绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每一个发送到交换器的消息都会被转发到与该交换器绑定的全部队列上。很像子网广播,每台子网内的主机都得到了一份复制的消息。fanout 类型转发消息是最快的。

  3. topic
     
    topic 交换器

    topic 交换器经过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列须要绑定到一个模式上。它将路由键和绑定键的字符串切分红单词,这些单词之间用点隔开。它一样也会识别两个通配符:符号“#”和符号“”。#匹配0个或多个单词,匹配很少很多一个单词。

RabbitMQ 安装

通常来讲安装 RabbitMQ 以前要安装 Erlang ,能够去Erlang官网下载。接着去RabbitMQ官网下载安装包,以后解压缩便可。根据操做系统不一样官网提供了相应的安装说明:WindowsDebian / UbuntuRPM-based LinuxMac

若是是Mac 用户,我的推荐使用 HomeBrew 来安装,安装前要先更新 brew:

brew update

接着安装 rabbitmq 服务器:

brew install rabbitmq

这样 RabbitMQ 就安装好了,安装过程当中会自动其所依赖的 Erlang 。

RabbitMQ 运行和管理

  1. 启动
    启动很简单,找到安装后的 RabbitMQ 所在目录下的 sbin 目录,能够看到该目录下有6个以 rabbitmq 开头的可执行文件,直接执行 rabbitmq-server 便可,下面将 RabbitMQ 的安装位置以 . 代替,启动命令就是:
./sbin/rabbitmq-server

启动正常的话会看到一些启动过程信息和最后的 completed with 7 plugins,这也说明启动的时候默认加载了7个插件。


 
正常启动
  1. 后台启动
    若是想让 RabbitMQ 以守护程序的方式在后台运行,能够在启动的时候加上 -detached 参数:
./sbin/rabbitmq-server -detached
  1. 查询服务器状态
    sbin 目录下有个特别重要的文件叫 rabbitmqctl ,它提供了 RabbitMQ 管理须要的几乎一站式解决方案,绝大部分的运维命令它均可以提供。
    查询 RabbitMQ 服务器的状态信息能够用参数 status :
./sbin/rabbitmqctl status

该命令将输出服务器的不少信息,好比 RabbitMQ 和 Erlang 的版本、OS 名称、内存等等

  1. 关闭 RabbitMQ 节点
    咱们知道 RabbitMQ 是用 Erlang 语言写的,在Erlang 中有两个概念:节点和应用程序。节点就是 Erlang 虚拟机的每一个实例,而多个 Erlang 应用程序能够运行在同一个节点之上。节点之间能够进行本地通讯(无论他们是否是运行在同一台服务器之上)。好比一个运行在节点A上的应用程序能够调用节点B上应用程序的方法,就好像调用本地函数同样。若是应用程序因为某些缘由奔溃,Erlang 节点会自动尝试重启应用程序。
    若是要关闭整个 RabbitMQ 节点能够用参数 stop :
./sbin/rabbitmqctl stop

它会和本地节点通讯并指示其干净的关闭,也能够指定关闭不一样的节点,包括远程节点,只须要传入参数 -n :

./sbin/rabbitmqctl -n rabbit@server.example.com stop 

-n node 默认 node 名称是 rabbit@server ,若是你的主机名是 server.example.com ,那么 node 名称就是 rabbit@server.example.com 。

  1. 关闭 RabbitMQ 应用程序
    若是只想关闭应用程序,同时保持 Erlang 节点运行则能够用 stop_app:
./sbin/rabbitmqctl stop_app

这个命令在后面要讲的集群模式中将会颇有用。

  1. 启动 RabbitMQ 应用程序
./sbin/rabbitmqctl start_app
  1. 重置 RabbitMQ 节点
./sbin/rabbitmqctl reset

该命令将清除全部的队列。

  1. 查看已声明的队列
./sbin/rabbitmqctl list_queues
  1. 查看交换器
./sbin/rabbitmqctl list_exchanges

该命令还能够附加参数,好比列出交换器的名称、类型、是否持久化、是否自动删除:

./sbin/rabbitmqctl list_exchanges name type durable auto_delete 
  1. 查看绑定
./sbin/rabbitmqctl list_bindings

Java 客户端访问

RabbitMQ 支持多种语言访问,以 Java 为例看下通常使用 RabbitMQ 的步骤。

  1. maven工程的pom文件中添加依赖
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.1.0</version> </dependency> 
  1. 消息生产者
package org.study.rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //建立链接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); //设置 RabbitMQ 地址 factory.setHost("localhost"); //创建到代理服务器到链接 Connection conn = factory.newConnection(); //得到信道 Channel channel = conn.createChannel(); //声明交换器 String exchangeName = "hello-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); String routingKey = "hola"; //发布消息 byte[] messageBodyBytes = "quit".getBytes(); channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes); channel.close(); conn.close(); } } 
  1. 消息消费者
package org.study.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); //创建到代理服务器到链接 Connection conn = factory.newConnection(); //得到信道 final Channel channel = conn.createChannel(); //声明交换器 String exchangeName = "hello-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); //声明队列 String queueName = channel.queueDeclare().getQueue(); String routingKey = "hola"; //绑定队列,经过键 hola 将队列和交换器绑定起来 channel.queueBind(queueName, exchangeName, routingKey); while(true) { //消费消息 boolean autoAck = false; String consumerTag = ""; channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); System.out.println("消费的路由键:" + routingKey); System.out.println("消费的内容类型:" + contentType); long deliveryTag = envelope.getDeliveryTag(); //确认消息 channel.basicAck(deliveryTag, false); System.out.println("消费的消息体内容:"); String bodyStr = new String(body, "UTF-8"); System.out.println(bodyStr); } }); } } } 
  1. 启动 RabbitMQ 服务器
./sbin/rabbitmq-server
  1. 运行 Consumer
    先运行 Consumer ,这样当生产者发送消息的时候能在消费者后端看到消息记录。
  2. 运行 Producer
    接着运行 Producer ,发布一条消息,在 Consumer 的控制台能看到接收的消息:


     
    Consumer 控制台

RabbitMQ 集群

RabbitMQ 最优秀的功能之一就是内建集群,这个功能设计的目的是容许消费者和生产者在节点崩溃的状况下继续运行,以及经过添加更多的节点来线性扩展消息通讯吞吐量。RabbitMQ 内部利用 Erlang 提供的分布式通讯框架 OTP 来知足上述需求,使客户端在失去一个 RabbitMQ 节点链接的状况下,仍是可以从新链接到集群中的任何其余节点继续生产、消费消息。

RabbitMQ 集群中的一些概念

RabbitMQ 会始终记录如下四种类型的内部元数据:

  1. 队列元数据
    包括队列名称和它们的属性,好比是否可持久化,是否自动删除
  2. 交换器元数据
    交换器名称、类型、属性
  3. 绑定元数据
    内部是一张表格记录如何将消息路由到队列
  4. vhost 元数据
    为 vhost 内部的队列、交换器、绑定提供命名空间和安全属性

在单一节点中,RabbitMQ 会将全部这些信息存储在内存中,同时将标记为可持久化的队列、交换器、绑定存储到硬盘上。存到硬盘上能够确保队列和交换器在节点重启后可以重建。而在集群模式下一样也提供两种选择:存到硬盘上(独立节点的默认设置),存在内存中。

若是在集群中建立队列,集群只会在单个节点而不是全部节点上建立完整的队列信息(元数据、状态、内容)。结果是只有队列的全部者节点知道有关队列的全部信息,所以当集群节点崩溃时,该节点的队列和绑定就消失了,而且任何匹配该队列的绑定的新消息也丢失了。还好RabbitMQ 2.6.0以后提供了镜像队列以免集群节点故障致使的队列内容不可用。

RabbitMQ 集群中能够共享 user、vhost、exchange等,全部的数据和状态都是必须在全部节点上复制的,例外就是上面所说的消息队列。RabbitMQ 节点能够动态的加入到集群中。

当在集群中声明队列、交换器、绑定的时候,这些操做会直到全部集群节点都成功提交元数据变动后才返回。集群中有内存节点和磁盘节点两种类型,内存节点虽然不写入磁盘,可是它的执行比磁盘节点要好。内存节点能够提供出色的性能,磁盘节点能保障配置信息在节点重启后仍然可用,那集群中如何平衡这二者呢?

RabbitMQ 只要求集群中至少有一个磁盘节点,全部其余节点能够是内存节点,当节点加入火离开集群时,它们必需要将该变动通知到至少一个磁盘节点。若是只有一个磁盘节点,恰好又是该节点崩溃了,那么集群能够继续路由消息,但不能建立队列、建立交换器、建立绑定、添加用户、更改权限、添加或删除集群节点。换句话说集群中的惟一磁盘节点崩溃的话,集群仍然能够运行,但知道该节点恢复,不然没法更改任何东西。

RabbitMQ 集群配置和启动

若是是在一台机器上同时启动多个 RabbitMQ 节点来组建集群的话,只用上面介绍的方式启动第2、第三个节点将会由于节点名称和端口冲突致使启动失败。因此在每次调用 rabbitmq-server 命令前,设置环境变量 RABBITMQ_NODENAME 和 RABBITMQ_NODE_PORT 来明确指定惟一的节点名称和端口。下面的例子端口号从5672开始,每一个新启动的节点都加1,节点也分别命名为test_rabbit_一、test_rabbit_二、test_rabbit_3。

启动第1个节点:

RABBITMQ_NODENAME=test_rabbit_1 RABBITMQ_NODE_PORT=5672 ./sbin/rabbitmq-server -detached

启动第2个节点:

RABBITMQ_NODENAME=test_rabbit_2 RABBITMQ_NODE_PORT=5673 ./sbin/rabbitmq-server -detached

启动第2个节点前建议将 RabbitMQ 默认激活的插件关掉,不然会存在使用了某个插件的端口号冲突,致使节点启动不成功。

如今第2个节点和第1个节点都是独立节点,它们并不知道其余节点的存在。集群中除第一个节点外后加入的节点须要获取集群中的元数据,因此要先中止 Erlang 节点上运行的 RabbitMQ 应用程序,并重置该节点元数据,再加入而且获取集群的元数据,最后从新启动 RabbitMQ 应用程序。

中止第2个节点的应用程序:

./sbin/rabbitmqctl -n test_rabbit_2 stop_app

重置第2个节点元数据:

./sbin/rabbitmqctl -n test_rabbit_2 reset

第2节点加入第1个节点组成的集群:

./sbin/rabbitmqctl -n test_rabbit_2 join_cluster test_rabbit_1@localhost 

启动第2个节点的应用程序

./sbin/rabbitmqctl -n test_rabbit_2 start_app

第3个节点的配置过程和第2个节点相似:

RABBITMQ_NODENAME=test_rabbit_3 RABBITMQ_NODE_PORT=5674 ./sbin/rabbitmq-server -detached ./sbin/rabbitmqctl -n test_rabbit_3 stop_app ./sbin/rabbitmqctl -n test_rabbit_3 reset ./sbin/rabbitmqctl -n test_rabbit_3 join_cluster test_rabbit_1@localhost ./sbin/rabbitmqctl -n test_rabbit_3 start_app 
RabbitMQ 集群运维

中止某个指定的节点,好比中止第2个节点:

RABBITMQ_NODENAME=test_rabbit_2 ./sbin/rabbitmqctl stop

查看节点3的集群状态:

./sbin/rabbitmqctl -n test_rabbit_3 cluster_status
相关文章
相关标签/搜索