1、消息队列(MQ)概述javascript
消息队列(Message Queue),是分布式系统中重要的组件,其通用的使用场景能够简单地描述为:php
当不须要当即得到结果,可是并发量又须要进行控制的时候,差很少就是须要使用消息队列的时候。html
消息队列主要解决了应用耦合、异步处理、流量削锋等问题。前端
当前使用较多的消息队列有RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,而部分数据库如Redis、Mysql以及phxsql也可实现消息队列的功能。java
2、消息队列使用场景node
消息队列在实际应用中包括以下四个场景:python
应用耦合:多应用间经过消息队列对同一消息进行处理,避免调用接口失败致使整个过程失败;c++
异步处理:多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减小处理时间;git
限流削峰:普遍应用于秒杀或抢购活动中,避免流量过大致使应用系统挂掉的状况;github
消息驱动的系统:系统分为消息队列、消息生产者、消息消费者,生产者负责产生消息,消费者(可能有多个)负责对消息进行处理;
下面详细介绍上述四个场景以及消息队列如何在上述四个场景中使用:
2.1 异步处理
具体场景:用户为了使用某个应用,进行注册,系统须要发送注册邮件并验证短信。对这两个操做的处理方式有两种:串行及并行。
(1)串行方式:新注册信息生成后,先发送注册邮件,再发送验证短信;
在这种方式下,须要最终发送验证短信后再返回给客户端。
(2)并行处理:新注册信息写入后,由发短信和发邮件并行处理;
在这种方式下,发短信和发邮件 需处理完成后再返回给客户端。
假设以上三个子系统处理的时间均为50ms,且不考虑网络延迟,则总的处理时间:
串行:50+50+50=150ms
并行:50+50 = 100ms
若使用消息队列:
并在写入消息队列后当即返回成功给客户端,则总的响应时间依赖于写入消息队列的时间,而写入消息队列的时间自己是能够很快的,基本能够忽略不计,所以总的处理时间相比串行提升了2倍,相比并行提升了一倍;
2.2 应用耦合
具体场景:用户使用QQ相册上传一张图片,人脸识别系统会对该图片进行人脸识别,通常的作法是,服务器接收到图片后,图片上传系统当即调用人脸识别系统,调用完成后再返回成功,以下图所示:
该方法有以下缺点:
人脸识别系统被调失败,致使图片上传失败;
延迟高,须要人脸识别系统处理完成后,再返回给客户端,即便用户并不须要当即知道结果;
图片上传系统与人脸识别系统之间互相调用,须要作耦合;
若使用消息队列:
客户端上传图片后,图片上传系统将图片信息如uin、批次写入消息队列,直接返回成功;而人脸识别系统则定时从消息队列中取数据,完成对新增图片的识别。
此时图片上传系统并不须要关心人脸识别系统是否对这些图片信息的处理、以及什么时候对这些图片信息进行处理。事实上,因为用户并不须要当即知道人脸识别结果,人脸识别系统能够选择不一样的调度策略,按照闲时、忙时、正常时间,对队列中的图片信息进行处理。
2.3 限流削峰
具体场景:购物网站开展秒杀活动,通常因为瞬时访问量过大,服务器接收过大,会致使流量暴增,相关系统没法处理请求甚至崩溃。而加入消息队列后,系统能够从消息队列中取数据,至关于消息队列作了一次缓冲。
该方法有以下优势:
请求先入消息队列,而不是由业务处理系统直接处理,作了一次缓冲,极大地减小了业务处理系统的压力;
队列长度能够作限制,事实上,秒杀时,后入队列的用户没法秒杀到商品,这些请求能够直接被抛弃,返回活动已结束或商品已售完信息;
2.4 消息驱动的系统
具体场景:用户新上传了一批照片, 人脸识别系统须要对这个用户的全部照片进行聚类,聚类完成后由对帐系统从新生成用户的人脸索引(加快查询)。这三个子系统间由消息队列链接起来,前一个阶段的处理结果放入队列中,后一个阶段从队列中获取消息继续处理。
该方法有以下优势:
避免了直接调用下一个系统致使当前系统失败;
每一个子系统对于消息的处理方式能够更为灵活,能够选择收到消息时就处理,能够选择定时处理,也能够划分时间段按不一样处理速度处理;
3、消息队列的两种模式
消息队列包括两种模式,点对点模式(point to point, queue)和发布/订阅模式(publish/subscribe,topic)。
3.1 点对点模式
点对点模式下包括三个角色:
消息队列
发送者 (生产者)
接收者(消费者)
消息发送者生产消息发送到queue中,而后消息接收者从queue中取出而且消费消息。消息被消费之后,queue中再也不有存储,因此消息接收者不可能消费到已经被消费的消息。
点对点模式特色:
每一个消息只有一个接收者(Consumer)(即一旦被消费,消息就再也不在消息队列中);
发送者和接收者间没有依赖性,发送者发送消息以后,无论有没有接收者在运行,都不会影响到发送者下次发送消息;
接收者在成功接收消息以后需向队列应答成功,以便消息队列删除当前接收的消息;
3.2 发布/订阅模式
发布/订阅模式下包括三个角色:
角色主题(Topic)
发布者(Publisher)
订阅者(Subscriber)
发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。
发布/订阅模式特色:
每一个消息能够有多个订阅者;
发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须建立一个订阅者以后,才能消费发布者的消息。
为了消费消息,订阅者须要提早订阅该角色主题,并保持在线运行;
4、经常使用消息队列介绍
本部分主要介绍四种经常使用的消息队列(RabbitMQ/ActiveMQ/RocketMQ/Kafka)的主要特性、优势、缺点。
4.1 RabbitMQ
RabbitMQ 2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。
主要特性:
可靠性: 提供了多种技术可让你在性能和可靠性之间进行权衡。这些技术包括持久性机制、投递确认、发布者证明和高可用性机制;
灵活的路由: 消息在到达队列前是经过交换机进行路由的。RabbitMQ为典型的路由逻辑提供了多种内置交换机类型。若是你有更复杂的路由需求,能够将这些交换机组合起来使用,你甚至能够实现本身的交换机类型,而且当作RabbitMQ的插件来使用;
消息集群:在相同局域网中的多个RabbitMQ服务器能够聚合在一块儿,做为一个独立的逻辑代理来使用;
队列高可用:队列能够在集群中的机器上进行镜像,以确保在硬件问题下还保证消息安全;
多种协议的支持:支持多种消息队列协议;
服务器端用Erlang语言编写,支持只要是你能想到的全部编程语言;
管理界面: RabbitMQ有一个易用的用户界面,使得用户能够监控和管理消息Broker的许多方面;
跟踪机制:若是消息异常,RabbitMQ提供消息跟踪机制,使用者能够找出发生了什么;
插件机制:提供了许多插件,来从多方面进行扩展,也能够编写本身的插件;
使用RabbitMQ须要:
ErLang语言包
RabbitMQ安装包
RabbitMQ能够运行在Erlang语言所支持的平台之上:
Solaris
BSD
Linux
MacOSX
TRU64
Windows NT/2000/XP/Vista/Windows 7/Windows 8
Windows Server 2003/2008/2012
Windows 95, 98
VxWorks
优势:
因为erlang语言的特性,mq 性能较好,高并发;
健壮、稳定、易用、跨平台、支持多种语言、文档齐全;
有消息确认机制和持久化机制,可靠性高;
高度可定制的路由;
管理界面较丰富,在互联网公司也有较大规模的应用;
社区活跃度高;
缺点:
尽管结合erlang语言自己的并发优点,性能较好,可是不利于作二次开发和维护;
实现了代理架构,意味着消息在发送到客户端以前能够在中央节点上排队。此特性使得RabbitMQ易于使用和部署,可是使得其运行速度较慢,由于中央节点增长了延迟,消息封装后也比较大;
须要学习比较复杂的接口和协议,学习和维护成本较高;
4.2 ActiveMQ
ActiveMQ是由Apache出品,ActiveMQ 是一个彻底支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。它很是快速,支持多种语言的客户端和协议,并且能够很是容易的嵌入到企业的应用环境中,并有许多高级功能。
主要特性:
服从 JMS 规范:JMS 规范提供了良好的标准和保证,包括:同步或异步的消息分发,一次和仅一次的消息分发,消息接收和订阅等等。听从 JMS 规范的好处在于,不论使用什么 JMS 实现提供者,这些基础特性都是可用的;
链接性:ActiveMQ 提供了普遍的链接选项,支持的协议有:HTTP/S,IP 多播,SSL,STOMP,TCP,UDP,XMPP等等。对众多协议的支持让 ActiveMQ 拥有了很好的灵活性。
支持的协议种类多:OpenWire、STOMP、REST、XMPP、AMQP ;
持久化插件和安全插件:ActiveMQ 提供了多种持久化选择。并且,ActiveMQ 的安全性也能够彻底依据用户需求进行自定义鉴权和受权;
支持的客户端语言种类多:除了 Java 以外,还有:C/C++,.NET,Perl,PHP,Python,Ruby;
代理集群:多个 ActiveMQ 代理能够组成一个集群来提供服务;
异常简单的管理:ActiveMQ 是以开发者思惟被设计的。因此,它并不须要专门的管理员,由于它提供了简单又使用的管理特性。有不少中方法能够监控 ActiveMQ 不一样层面的数据,包括使用在 JConsole 或者 ActiveMQ 的Web Console 中使用 JMX,经过处理 JMX 的告警消息,经过使用命令行脚本,甚至能够经过监控各类类型的日志。
使用ActiveMQ须要:
Java JDK
ActiveMQ安装包
ActiveMQ能够运行在Java语言所支持的平台之上。
优势:
跨平台(JAVA编写与平台无关有,ActiveMQ几乎能够运行在任何的JVM上)
能够用JDBC:能够将数据持久化到数据库。虽然使用JDBC会下降ActiveMQ的性能,可是数据库一直都是开发人员最熟悉的存储介质。将消息存到数据库,看得见摸得着。并且公司有专门的DBA去对数据库进行调优,主从分离;
支持JMS :支持JMS的统一接口;
支持自动重连;
有安全机制:支持基于shiro,jaas等多种安全配置机制,能够对Queue/Topic进行认证和受权。
监控完善:拥有完善的监控,包括Web Console,JMX,Shell命令行,Jolokia的REST API;
界面友善:提供的Web Console能够知足大部分状况,还有不少第三方的组件可使用,如hawtio;
缺点:
社区活跃度不及RabbitMQ高;
根据其余用户反馈,会出莫名其妙的问题,会丢失消息;
目前重心放到activemq6.0产品-apollo,对5.x的维护较少;
不适合用于上千个队列的应用场景;
4.3 RocketMQ
RocketMQ出自 阿里公司的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并作出了本身的一些改进,消息可靠性上比 Kafka 更好。RocketMQ在阿里集团被普遍应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景。
主要特性:
是一个队列模型的消息中间件,具备高性能、高可靠、高实时、分布式特色;
Producer、Consumer、队列均可以分布式;
Producer向一些队列轮流发送消息,队列集合称为Topic,Consumer若是作广播消费,则一个consumer实例消费这个Topic对应的全部队列,若是作集群消费,则多个Consumer实例平均消费这个topic对应的队列集合;
可以保证严格的消息顺序;
提供丰富的消息拉取模式;
高效的订阅者水平扩展能力;
实时的消息订阅机制;
亿级消息堆积能力;
较少的依赖;
使用RocketMQ须要:
Java JDK
安装git、Maven
RocketMQ安装包
RocketMQ能够运行在Java语言所支持的平台之上。
优势:
单机支持 1 万以上持久化队列
RocketMQ 的全部消息都是持久化的,先写入系统 PAGECACHE,而后刷盘,能够保证内存与磁盘都有一份数据,
访问时,直接从内存读取。
模型简单,接口易用(JMS 的接口不少场合并不太实用);
性能很是好,能够大量堆积消息在broker中;
支持多种消费,包括集群消费、广播消费等。
各个环节分布式扩展设计,主从HA;
开发度较活跃,版本更新很快。
缺点:
支持的客户端语言很少,目前是java及c++,其中c++不成熟;
RocketMQ社区关注度及成熟度也不及前二者;
没有web管理界面,提供了一个CLI(命令行界面)管理工具带来查询、管理和诊断各类问题;
没有在 mq 核心中去实现JMS等接口;
4.4 Kafka
Apache Kafka是一个分布式消息发布订阅系统。它最初由LinkedIn公司基于独特的设计实现为一个分布式的提交日志系统( a distributed commit log),,以后成为Apache项目的一部分。Kafka系统快速、可扩展而且可持久化。它的分区特性,可复制和可容错都是其不错的特性。
主要特性:
快速持久化,能够在O(1)的系统开销下进行消息持久化;
高吞吐,在一台普通的服务器上既能够达到10W/s的吞吐速率;
.彻底的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;
支持同步和异步复制两种HA;
支持数据批量发送和拉取;
zero-copy:减小IO操做步骤;
数据迁移、扩容对用户透明;
无需停机便可扩展机器;
其余特性:严格的消息顺序、丰富的消息拉取模型、高效订阅者水平扩展、实时的消息订阅、亿级的消息堆积能力、按期删除机制;
使用Kafka须要:
Java JDK
Kafka安装包
优势:
客户端语言丰富,支持java、.net、php、ruby、python、go等多种语言;
性能卓越,单机写入TPS约在百万条/秒,消息大小10个字节;
提供彻底分布式架构, 并有replica机制, 拥有较高的可用性和可靠性, 理论上支持消息无限堆积;
支持批量操做;
消费者采用Pull方式获取消息, 消息有序, 经过控制可以保证全部消息被消费且仅被消费一次;
有优秀的第三方Kafka Web管理界面Kafka-Manager;
在日志领域比较成熟,被多家公司和多个开源项目使用;
缺点:
Kafka单机超过64个队列/分区,Load会发生明显的飙高现象,队列越多,load越高,发送消息响应时间变长
使用短轮询方式,实时性取决于轮询间隔时间;
消费失败不支持重试;
支持消息顺序,可是一台代理宕机后,就会产生消息乱序;
社区更新较慢;
4.5 RabbitMQ/ActiveMQ/RocketMQ/Kafka对比
这里列举了上述四种消息队列的差别对比:
结论:
Kafka在于分布式架构,RabbitMQ基于AMQP协议来实现,RocketMQ/思路来源于kafka,改为了主从结构,在事务性可靠性方面作了优化。普遍来讲,电商、金融等对事务性要求很高的,能够考虑RabbitMQ和RocketMQ,对性能要求高的可考虑Kafka。
5、参考资料: 5.1 消息队列:
大型网站架构之分布式消息队列 http://blog.csdn.net/shaobingj126/article/details/50585035
消息队列的使用场景 https://www.zhihu.com/question/34243607/answer/127666030
浅谈异步消息队列模型 http://www.cnblogs.com/sunkeydev/p/5248855.html
消息队列的两种模式 http://blog.csdn.net/heyutao007/article/details/50131089
5.2 RabbitMQ
RabbitMQ主页 https://www.rabbitmq.com/
RabbitMQ学习教程 https://www.rabbitmq.com/getstarted.html
专栏:RabbitMQ从入门到精通 http://blog.csdn.net/column/details/rabbitmq.html
RabbitMQ能为你作些什么 http://rabbitmq.mr-ping.com/deion.html
RabbitMQ指南(1)-特性及功能 https://blog.zenfery.cc/archives/79.html
5.3 ActiveMQ
ActiveMQ主页 http://activemq.apache.org/
Apache ActiveMQ介绍 http://jfires.iteye.com/blog/1187688
ActiveMQ的简介与安装 http://blog.csdn.net/sl1992/article/details/72824562
ActiveMQ 和消息简介 http://www.cnblogs.com/craftsman-gao/p/7002605.html
5.4 RocketMQ
主页 https://github.com/alibaba/RocketMQ
RocketMQ 原理简介 http://alibaba.github.io/RocketMQ-docs/document/design/RocketMQ_design.pdf
RocketMQ与kafka对比(18项差别) http://jm.taobao.org/2016/03/24/rmq-vs-kafka/
5.5 Kafka
1.Kafka主页: http://kafka.apache.org/
Kafka特性 http://www.cnblogs.com/lsx1993/p/4847719.html
Kafka客户端支持语言 https://cwiki.apache.org/confluence/display/KAFKA/Clients
5.6 RabbitMQ/ActiveMQ/RocketMQ/Kafka对比
RocketMQ,队列选型 http://www.zmannotes.com/index.php/2016/01/17/rocketmq/
RabbitMQ和Kafka http://www.dongcoder.com/detail-416804.html
即时通讯RabbitMQ二-性能测试 http://www.jianshu.com/p/d31ae9e3bfb6
RabbitMq、ActiveMq、ZeroMq、kafka之间的比较,资料汇总 http://blog.csdn.net/linsongbin1/article/details/47781187
消息队列软件产品大比拼 http://www.cnblogs.com/amityat/archive/2011/08/31/2160293.html
总结:
消息队列利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通讯来进行分布式系统的集成。目前业界有不少的MQ产品,例如RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,也有直接使用数据库redis充当消息队列的案例。而这些消息队列产品,各有侧重,在实际选型时,须要结合自身需求及MQ产品特征,综合考虑。
RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue)的开源实现。
场景说明:用户注册后,须要发注册邮件和注册短信,传统的作法有两种1.串行的方式;2.并行的方式
(1)串行方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务所有完成后才返回给客户端。 这有一个问题是,邮件,短信并非必须的,它只是一个通知,而这种作法让客户端等待没有必要等待的东西.
(2)并行方式:将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式能提升处理的时间。
假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用时间100ms。虽然并性已经提升的处理时间,可是,前面说过,邮件和短信对我正常的使用网站没有任何影响,客户端没有必要等着其发送完成才显示注册成功,英爱是写入数据库后就返回.
(3)消息队列
引入消息队列后,把发送邮件,短信不是必须的业务逻辑异步处理
由此能够看出,引入消息队列后,用户的响应时间就等于写入数据库的时间+写入消息队列的时间(能够忽略不计),引入消息队列后处理后,响应时间是串行的3倍,是并行的2倍。
场景:双11是购物狂节,用户下单后,订单系统须要通知库存系统,传统的作法就是订单系统调用库存系统的接口.
这种作法有一个缺点:
订单系统和库存系统高耦合.
引入消息队列
订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。
流量削峰通常在秒杀活动中应用普遍
场景:秒杀活动,通常会由于流量过大,致使应用挂掉,为了解决这个问题,通常在应用前端加入消息队列。
做用:
1.能够控制活动人数,超过此必定阀值的订单直接丢弃(我为何秒杀一次都没有成功过呢^^)
2.能够缓解短期的高流量压垮应用(应用程序按本身的最大处理能力获取订单)
1.用户的请求,服务器收到以后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面.
2.秒杀业务根据消息队列中的请求信息,再作后续处理.
几个概念说明:
Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,
Exchange:消息交换机,它指定消息按什么规则,路由到哪一个队列。
Queue:消息的载体,每一个消息都会被投到一个或多个队列。
Binding:绑定,它的做用就是把exchange和queue按照路由规则绑定起来.
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里能够有多个vhost,用做不一样用户的权限分离。
Producer:消息生产者,就是投递消息的程序.
Consumer:消息消费者,就是接受消息的程序.
Channel:消息通道,在客户端的每一个链接里,可创建多个channel.
RabbbitMQ的分发机制很是适合扩展,并且它是专门为并发程序设计的,若是如今load加剧,那么只须要建立更多的Consumer来进行任务处理。
为了保证数据不被丢失,RabbitMQ支持消息确认机制,为了保证数据能被正确处理而不只仅是被Consumer收到,那么咱们不能采用no-ack,而应该是在处理完数据以后发送ack.
在处理完数据以后发送ack,就是告诉RabbitMQ数据已经被接收,处理完成,RabbitMQ能够安全的删除它了.
若是Consumer退出了可是没有发送ack,那么RabbitMQ就会把这个Message发送到下一个Consumer,这样就保证在Consumer异常退出状况下数据也不会丢失.
RabbitMQ它没有用到超时机制.RabbitMQ仅仅经过Consumer的链接中断来确认该Message并无正确处理,也就是说RabbitMQ给了Consumer足够长的时间作数据处理。
若是忘记ack,那么当Consumer退出时,Mesage会从新分发,而后RabbitMQ会占用愈来愈多的内存.
要持久化队列queue的持久化须要在声明时指定durable=True;
这里要注意,队列的名字必定要是Broker中不存在的,否则不能改变此队列的任何属性.
队列和交换机有一个建立时候指定的标志durable,durable的惟一含义就是具备这个标志的队列和交换机会在重启以后从新创建,它不表示说在队列中的消息会在重启后恢复
消息持久化包括3部分
1. exchange持久化,在声明时指定durable => true
hannel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的
2.queue持久化,在声明时指定durable => true
channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的
3.消息持久化,在投递时指定delivery_mode => 2(1是非持久化).
channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
若是exchange和queue都是持久化的,那么它们之间的binding也是持久化的,若是exchange和queue二者之间有一个持久化,一个非持久化,则不容许创建绑定.
注意:一旦建立了队列和交换机,就不能修改其标志了,例如,建立了一个non-durable的队列,而后想把它改变成durable的,惟一的办法就是删除这个队列而后重现建立。
你可能也注意到了,分发机制不是那么优雅,默认状态下,RabbitMQ将第n个Message分发给第n个Consumer。n是取余后的,它无论Consumer是否还有unacked Message,只是按照这个默认的机制进行分发.
那么若是有个Consumer工做比较重,那么就会致使有的Consumer基本没事可作,有的Consumer却毫无休息的机会,那么,Rabbit是如何处理这种问题呢?
经过basic.qos方法设置prefetch_count=1,这样RabbitMQ就会使得每一个Consumer在同一个时间点最多处理一个Message,换句话说,在接收到该Consumer的ack前,它不会将新的Message分发给它
channel.basic_qos(prefetch_count=1)
注意,这种方法可能会致使queue满。固然,这种状况下你可能须要添加更多的Consumer,或者建立更多的virtualHost来细化你的设计。
先来温习如下交换机路由的几种类型:
Direct Exchange:直接匹配,经过Exchange名称+RountingKey来发送与接收消息.
Fanout Exchange:广播订阅,向全部的消费者发布消息,可是只有消费者将队列绑定到该路由器才能收到消息,忽略Routing Key.
Topic Exchange:主题匹配订阅,这里的主题指的是RoutingKey,RoutingKey能够采用通配符,如:*或#,RoutingKey命名采用.来分隔多个词,只有消息这将队列绑定到该路由器且指定RoutingKey符合匹配规则时才能收到消息;
Headers Exchange:消息头订阅,消息发布前,为消息定义一个或多个键值对的消息头,而后消费者接收消息同时须要定义相似的键值对请求头:(如:x-mactch=all或者x_match=any),只有请求头与消息头匹配,才能接收消息,忽略RoutingKey.
默认的exchange:若是用空字符串去声明一个exchange,那么系统就会使用”amq.direct”这个exchange,咱们建立一个queue时,默认的都会有一个和新建queue同名的routingKey绑定到这个默认的exchange上去
channel.BasicPublish("", "TaskQueue", properties, bytes);
由于在第一个参数选择了默认的exchange,而咱们申明的队列叫TaskQueue,因此默认的,它在新建一个也叫TaskQueue的routingKey,并绑定在默认的exchange上,致使了咱们能够在第二个参数routingKey中写TaskQueue,这样它就会找到定义的同名的queue,并把消息放进去。
若是有两个接收程序都是用了同一个的queue和相同的routingKey去绑定direct exchange的话,分发的行为是负载均衡的,也就是说第一个是程序1收到,第二个是程序2收到,以此类推。
若是有两个接收程序用了各自的queue,但使用相同的routingKey去绑定direct exchange的话,分发的行为是复制的,也就是说每一个程序都会收到这个消息的副本。行为至关于fanout类型的exchange。
下面详细来讲:
绑定其实就是关联了exchange和queue,或者这么说:queue对exchange的内容感兴趣,exchange要把它的Message deliver到queue。
Driect exchange的路由算法很是简单:经过bindingkey的彻底匹配,能够用下图来讲明.
Exchange和两个队列绑定在一块儿,Q1的bindingkey是orange,Q2的binding key是black和green.
当Producer publish key是orange时,exchange会把它放到Q1上,若是是black或green就会到Q2上,其他的Message被丢弃.
多个queue绑定同一个key也是能够的,对于下图的例子,Q1和Q2都绑定了black,对于routing key是black的Message,会被deliver到Q1和Q2,其他的Message都会被丢弃.
对于Message的routing_key是有限制的,不能使任意的。格式是以点号“.”分割的字符表。好比:”stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”。你能够听任意的key在routing_key中,固然最长不能超过255 bytes。
对于routing_key,有两个特殊字符
#(hash)0个或多个单词
Producer发送消息时须要设置routing_key,routing_key包含三个单词和连个点号o,第一个key描述了celerity(灵巧),第二个是color(色彩),第三个是物种:
在这里咱们建立了两个绑定: Q1 的binding key 是”.orange.“; Q2 是 “..rabbit” 和 “lazy.#”:
RabbitMQ使用ProtoBuf序列化消息,它可做为RabbitMQ的Message的数据格式进行传输,因为是结构化的数据,这样就极大的方便了Consumer的数据高效处理,固然也可使用XML,与XML相比,ProtoBuf有如下优点:
1.简单
2.size小了3-10倍
3.速度快了20-100倍
4.易于编程
6.减小了语义的歧义.
,ProtoBuf具备速度和空间的优点,使得它如今应用很是普遍
什么是MQ?
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通讯方法。MQ是消费-生产者模型的一个典型的表明,一端往消息队列中不断写入消息,而另外一端则能够读取队列中的消息。
RabbitMQ是MQ的一种。下面详细介绍一下RabbitMQ的基本概念。
一、队列、生产者、消费者
队列是RabbitMQ的内部对象,用于存储消息。生产者(下图中的P)生产消息并投递到队列中,消费者(下图中的C)能够从队列中获取消息并消费。
多个消费者能够订阅同一个队列,这时队列中的消息会被平均分摊给多个消费者进行处理,而不是每一个消费者都收到全部的消息并处理。
二、Exchange、Binding
刚才咱们看到生产者将消息投递到队列中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的状况是,生产者将消息发送到Exchange(交换器,下图中的X),再经过Binding将Exchange与Queue关联起来。
三、Exchange Type、Bingding key、routing key
在绑定(Binding)Exchange与Queue的同时,通常会指定一个binding key。在绑定多个Queue到同一个Exchange的时候,这些Binding容许使用相同的binding key。
生产者在将消息发送给Exchange的时候,通常会指定一个routing key,来指定这个消息的路由规则,生产者就能够在发送消息给Exchange时,经过指定routing key来决定消息流向哪里。
RabbitMQ经常使用的Exchange Type有三种:fanout、direct、topic。
fanout:把全部发送到该Exchange的消息投递到全部与它绑定的队列中。
direct:把消息投递到那些binding key与routing key彻底匹配的队列中。
topic:将消息路由到binding key与routing key模式匹配的队列中。
附上一张RabbitMQ的结构图:
最后来具体解析一下几个问题:
一、能够自动建立队列,也能够手动建立队列,若是自动建立队列,那么是谁负责建立队列呢?是生产者?仍是消费者?
若是队列不存在,固然消费者不会收到任何的消息。可是若是队列不存在,那么生产者发送的消息就会丢失。因此,为了数据不丢失,消费者和生产者均可以建立队列。那么若是建立一个已经存在的队列呢?那么不会有任何的影响。须要注意的是没有任何的影响,也就是说第二次建立若是参数和第一次不同,那么该操做虽然成功,可是队列属性并不会改变。
队列对于负载均衡的处理是完美的。对于多个消费者来讲,RabbitMQ使用轮询的方式均衡的发送给不一样的消费者。
二、RabbitMQ的消息确认机制
默认状况下,若是消息已经被某个消费者正确的接收到了,那么该消息就会被从队列中移除。固然也可让同一个消息发送到不少的消费者。
若是一个队列没有消费者,那么,若是这个队列有数据到达,那么这个数据会被缓存,不会被丢弃。当有消费者时,这个数据会被当即发送到这个消费者,这个数据被消费者正确收到时,这个数据就被从队列中删除。
那么什么是正确收到呢?经过ack。每一个消息都要被acknowledged(确认,ack)。咱们能够显示的在程序中去ack,也能够自动的ack。若是有数据没有被ack,那么:
RabbitMQ Server会把这个信息发送到下一个消费者。
若是这个app有bug,忘记了ack,那么RabbitMQServer不会再发送数据给它,由于Server认为这个消费者处理能力有限。
并且ack的机制能够起到限流的做用(Benefitto throttling):在消费者处理完成数据后发送ack,甚至在额外的延时后发送ack,将有效的均衡消费者的负载。
关于消息队列,从前年开始断断续续看了些资料,想写好久了,但一直没腾出空,近来分别碰到几个朋友聊这块的技术选型,是时候把这块的知识整理记录一下了。
市面上的消息队列产品有不少,好比老牌的 ActiveMQ、RabbitMQ ,目前我看最火的 Kafka ,还有 ZeroMQ ,去年末阿里巴巴捐赠给 Apache 的 RocketMQ ,连 redis 这样的 NoSQL 数据库也支持 MQ 功能。总之这块知名的产品就有十几种,就我本身的使用经验和兴趣只打算谈谈 RabbitMQ、Kafka 和 ActiveMQ ,本文先讲 RabbitMQ ,在此以前先看下消息队列的相关概念。
消息(Message)是指在应用间传送的数据。消息能够很是简单,好比只包含文本字符串,也能够更复杂,可能包含嵌入对象。
消息队列(Message Queue)是一种应用间的通讯方式,消息发送后能够当即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而无论是谁发布的。这样发布者和使用者都不用知道对方的存在。
从上面的描述中能够看出消息队列是一种应用间的异步协做机制,那何时须要使用 MQ 呢?
以常见的订单系统为例,用户点击【下单】按钮以后的业务逻辑可能包括:扣减库存、生成相应单据、发红包、发短信通知。在业务发展初期这些逻辑可能放在一块儿同步执行,随着业务的发展订单量增加,须要提高系统服务的性能,这时能够将一些不须要当即生效的操做拆分出来异步执行,好比发放红包、发短信通知等。这种场景下就能够用 MQ ,在下单的主流程(好比扣减库存、生成相应单据)完成以后发送一条消息到 MQ 让主流程快速完结,而由另外的单独线程拉取MQ的消息(或者由 MQ 推送消息),当发现 MQ 中有发红包或发短信之类的消息时,执行相应的业务逻辑。
以上是用于业务解耦的状况,其它常见场景包括最终一致性、广播、错峰流控等等。
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。
AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特色包括:
可靠性(Reliability)
RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
灵活的路由(Flexible Routing)
在消息进入队列以前,经过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,能够将多个 Exchange 绑定在一块儿,也经过插件机制实现本身的 Exchange 。
消息集群(Clustering)
多个 RabbitMQ 服务器能够组成一个集群,造成一个逻辑 Broker 。
高可用(Highly Available Queues)
队列能够在集群中的机器上进行镜像,使得在部分节点出问题的状况下队列仍然可用。
多种协议(Multi-protocol)
RabbitMQ 支持多种消息队列协议,好比 STOMP、MQTT 等等。
多语言客户端(Many Clients)
RabbitMQ 几乎支持全部经常使用语言,好比 Java、.NET、Ruby 等等。
管理界面(Management UI)
RabbitMQ 提供了一个易用的用户界面,使得用户能够监控和管理消息 Broker 的许多方面。
跟踪机制(Tracing)
若是消息异常,RabbitMQ 提供了消息跟踪机制,使用者能够找出发生了什么。
插件机制(Plugin System)
RabbitMQ 提供了许多插件,来从多方面进行扩展,也能够编写本身的插件。
全部 MQ 产品从模型抽象上来讲都是同样的过程:
消费者(consumer)订阅某个队列。生产者(producer)建立消息,而后发布到队列(queue)中,最后将消息发送到监听的消费者。
上面只是最简单抽象的描述,具体到 RabbitMQ 则有更详细的概念须要解释。上面介绍过 RabbitMQ 是 AMQP 协议的一个开源实现,因此其内部实际上也是 AMQP 中的基本概念:
AMQP 中消息的路由过程和 Java 开发者熟悉的 JMS 存在一些差异,AMQP 中增长了 Exchange 和 Binding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到那个队列。
Exchange分发消息时根据类型的不一样分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器彻底一致,但性能差不少,目前几乎用不到了,因此直接看另外三种类型:
direct
消息中的路由键(routing key)若是和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名彻底匹配,若是一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是彻底匹配、单播的模式。
fanout
每一个发到 fanout 类型交换器的消息都会分到全部绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每一个发送到交换器的消息都会被转发到与该交换器绑定的全部队列上。很像子网广播,每台子网内的主机都得到了一份复制的消息。fanout 类型转发消息是最快的。
通常来讲安装 RabbitMQ 以前要安装 Erlang ,能够去Erlang官网下载。接着去RabbitMQ官网下载安装包,以后解压缩便可。根据操做系统不一样官网提供了相应的安装说明:Windows、Debian / Ubuntu、RPM-based Linux、Mac
若是是Mac 用户,我的推荐使用 HomeBrew 来安装,安装前要先更新 brew:
brew update
接着安装 rabbitmq 服务器:
brew install rabbitmq
这样 RabbitMQ 就安装好了,安装过程当中会自动其所依赖的 Erlang 。
./sbin/rabbitmq-server
启动正常的话会看到一些启动过程信息和最后的 completed with 7 plugins,这也说明启动的时候默认加载了7个插件。
./sbin/rabbitmq-server -detached
./sbin/rabbitmqctl status
该命令将输出服务器的不少信息,好比 RabbitMQ 和 Erlang 的版本、OS 名称、内存等等
./sbin/rabbitmqctl stop
它会和本地节点通讯并指示其干净的关闭,也能够指定关闭不一样的节点,包括远程节点,只须要传入参数 -n :
./sbin/rabbitmqctl -n rabbit@server.example.com stop
-n node 默认 node 名称是 rabbit@server ,若是你的主机名是 server.example.com ,那么 node 名称就是 rabbit@server.example.com 。
./sbin/rabbitmqctl stop_app
这个命令在后面要讲的集群模式中将会颇有用。
./sbin/rabbitmqctl start_app
./sbin/rabbitmqctl reset
该命令将清除全部的队列。
./sbin/rabbitmqctl list_queues
./sbin/rabbitmqctl list_exchanges
该命令还能够附加参数,好比列出交换器的名称、类型、是否持久化、是否自动删除:
./sbin/rabbitmqctl list_exchanges name type durable auto_delete
./sbin/rabbitmqctl list_bindings
RabbitMQ 支持多种语言访问,以 Java 为例看下通常使用 RabbitMQ 的步骤。
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.1.0</version> </dependency>
package org.study.rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //建立链接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); //设置 RabbitMQ 地址 factory.setHost("localhost"); //创建到代理服务器到链接 Connection conn = factory.newConnection(); //得到信道 Channel channel = conn.createChannel(); //声明交换器 String exchangeName = "hello-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); String routingKey = "hola"; //发布消息 byte[] messageBodyBytes = "quit".getBytes(); channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes); channel.close(); conn.close(); } }
package org.study.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); //创建到代理服务器到链接 Connection conn = factory.newConnection(); //得到信道 final Channel channel = conn.createChannel(); //声明交换器 String exchangeName = "hello-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); //声明队列 String queueName = channel.queueDeclare().getQueue(); String routingKey = "hola"; //绑定队列,经过键 hola 将队列和交换器绑定起来 channel.queueBind(queueName, exchangeName, routingKey); while(true) { //消费消息 boolean autoAck = false; String consumerTag = ""; channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); System.out.println("消费的路由键:" + routingKey); System.out.println("消费的内容类型:" + contentType); long deliveryTag = envelope.getDeliveryTag(); //确认消息 channel.basicAck(deliveryTag, false); System.out.println("消费的消息体内容:"); String bodyStr = new String(body, "UTF-8"); System.out.println(bodyStr); } }); } } }
./sbin/rabbitmq-server
运行 Producer
接着运行 Producer ,发布一条消息,在 Consumer 的控制台能看到接收的消息:
RabbitMQ 最优秀的功能之一就是内建集群,这个功能设计的目的是容许消费者和生产者在节点崩溃的状况下继续运行,以及经过添加更多的节点来线性扩展消息通讯吞吐量。RabbitMQ 内部利用 Erlang 提供的分布式通讯框架 OTP 来知足上述需求,使客户端在失去一个 RabbitMQ 节点链接的状况下,仍是可以从新链接到集群中的任何其余节点继续生产、消费消息。
RabbitMQ 会始终记录如下四种类型的内部元数据:
在单一节点中,RabbitMQ 会将全部这些信息存储在内存中,同时将标记为可持久化的队列、交换器、绑定存储到硬盘上。存到硬盘上能够确保队列和交换器在节点重启后可以重建。而在集群模式下一样也提供两种选择:存到硬盘上(独立节点的默认设置),存在内存中。
若是在集群中建立队列,集群只会在单个节点而不是全部节点上建立完整的队列信息(元数据、状态、内容)。结果是只有队列的全部者节点知道有关队列的全部信息,所以当集群节点崩溃时,该节点的队列和绑定就消失了,而且任何匹配该队列的绑定的新消息也丢失了。还好RabbitMQ 2.6.0以后提供了镜像队列以免集群节点故障致使的队列内容不可用。
RabbitMQ 集群中能够共享 user、vhost、exchange等,全部的数据和状态都是必须在全部节点上复制的,例外就是上面所说的消息队列。RabbitMQ 节点能够动态的加入到集群中。
当在集群中声明队列、交换器、绑定的时候,这些操做会直到全部集群节点都成功提交元数据变动后才返回。集群中有内存节点和磁盘节点两种类型,内存节点虽然不写入磁盘,可是它的执行比磁盘节点要好。内存节点能够提供出色的性能,磁盘节点能保障配置信息在节点重启后仍然可用,那集群中如何平衡这二者呢?
RabbitMQ 只要求集群中至少有一个磁盘节点,全部其余节点能够是内存节点,当节点加入火离开集群时,它们必需要将该变动通知到至少一个磁盘节点。若是只有一个磁盘节点,恰好又是该节点崩溃了,那么集群能够继续路由消息,但不能建立队列、建立交换器、建立绑定、添加用户、更改权限、添加或删除集群节点。换句话说集群中的惟一磁盘节点崩溃的话,集群仍然能够运行,但知道该节点恢复,不然没法更改任何东西。
若是是在一台机器上同时启动多个 RabbitMQ 节点来组建集群的话,只用上面介绍的方式启动第2、第三个节点将会由于节点名称和端口冲突致使启动失败。因此在每次调用 rabbitmq-server 命令前,设置环境变量 RABBITMQ_NODENAME 和 RABBITMQ_NODE_PORT 来明确指定惟一的节点名称和端口。下面的例子端口号从5672开始,每一个新启动的节点都加1,节点也分别命名为test_rabbit_一、test_rabbit_二、test_rabbit_3。
启动第1个节点:
RABBITMQ_NODENAME=test_rabbit_1 RABBITMQ_NODE_PORT=5672 ./sbin/rabbitmq-server -detached
启动第2个节点:
RABBITMQ_NODENAME=test_rabbit_2 RABBITMQ_NODE_PORT=5673 ./sbin/rabbitmq-server -detached
启动第2个节点前建议将 RabbitMQ 默认激活的插件关掉,不然会存在使用了某个插件的端口号冲突,致使节点启动不成功。
如今第2个节点和第1个节点都是独立节点,它们并不知道其余节点的存在。集群中除第一个节点外后加入的节点须要获取集群中的元数据,因此要先中止 Erlang 节点上运行的 RabbitMQ 应用程序,并重置该节点元数据,再加入而且获取集群的元数据,最后从新启动 RabbitMQ 应用程序。
中止第2个节点的应用程序:
./sbin/rabbitmqctl -n test_rabbit_2 stop_app
重置第2个节点元数据:
./sbin/rabbitmqctl -n test_rabbit_2 reset
第2节点加入第1个节点组成的集群:
./sbin/rabbitmqctl -n test_rabbit_2 join_cluster test_rabbit_1@localhost
启动第2个节点的应用程序
./sbin/rabbitmqctl -n test_rabbit_2 start_app
第3个节点的配置过程和第2个节点相似:
RABBITMQ_NODENAME=test_rabbit_3 RABBITMQ_NODE_PORT=5674 ./sbin/rabbitmq-server -detached ./sbin/rabbitmqctl -n test_rabbit_3 stop_app ./sbin/rabbitmqctl -n test_rabbit_3 reset ./sbin/rabbitmqctl -n test_rabbit_3 join_cluster test_rabbit_1@localhost ./sbin/rabbitmqctl -n test_rabbit_3 start_app
中止某个指定的节点,好比中止第2个节点:
RABBITMQ_NODENAME=test_rabbit_2 ./sbin/rabbitmqctl stop
查看节点3的集群状态:
./sbin/rabbitmqctl -n test_rabbit_3 cluster_status