RocketMQ是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给Apache基金会,并于2016年11月成为 Apache 孵化项目。 前端
中间件是一类链接软件组件和应用的计算机软件,它包括一组服务。以便于运行在一台或多台机器上的多个软件经过网络进行交互。 中间件技术所提供的互操做性,推进了分布式体系架构的演进,该架构一般用于支持并简化那些复杂的分布式应用程序,它包括web服务器、事务监控器和消息队列软件。 中间件(middleware)是基础软件的一大类,属于可复用软件的范畴。顾名思义,中间件处于操做系统软件与用户的应用软件的中间。 中间件在操做系统、网络和数据库之上,应用软件的下层,总的做用是为处于本身上层的应用软件提供运行与开发的环境,帮助用户灵活、高效地开发和集成复杂的应用软件。
中间件是位于平台(硬件和操做系统)和应用之间的通用服务,这些服务具备标准的程序接口和协议。针对不一样的操做系统和硬件平台,中间件能够有符合接口和协议规范的多种实现:java
一.理论部分web
RocketMQ就是一款分布式消息中间件。那么,RocketMQ主要为了解决哪些问题呢?sql
(1)Publish/Subscribe
发布与订阅是消息中间件的最基本功能,也是相对于传统RPC通讯而言。数据库
(2)Message Priority
规范中描述的优先级是指在一个消息队列中,每条消息都有不一样的优先级,通常用整数来描述,优先级高的消息先投递,若是消息彻底在一个内存队列中,那么在投递前能够按照优先级排序,令优先级高的先投递。
因为RocketMQ全部消息都是持久化的,因此若是按照优先级来排序,开销会很是大,所以RocketMQ没有特地支持消息优先级,可是能够经过变通的方式实现相似功能,即单独配置一个优先级高的队列,和一个普通优先级的队列, 将不一样优先级发送到不一样队列便可。apache
(3)Message Order
消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了3条消息,分别是订单建立,订单付款,订单完成。消费时,要按照这个顺序消费才能有意义。可是同时订单之间是能够并行消费的。
RocketMQ能够严格的保证消息有序。后端
(4)Message Filter
①Broker端消息过滤
在Broker中,按照Consumer的要求作过滤,优势是减小了对于Consumer无用消息的网络传输。缺点是增长了Broker的负担,实现相对复杂。
②Consumer端消息过滤
这种过滤方式可由应用彻底自定义实现,可是缺点是不少无用的消息要传输到Consumer端。服务器
(5)Message Persistence
消息中间件一般采用的几种持久化方式:
①持久化到数据库,例如Mysql。
②持久化到KV存储,例如levelDB、伯克利DB等KV存储系统。
③文件记录形式持久化,例如Kafka,RocketMQ
④对内存数据作一个持久化镜像,例如beanstalkd,VisiNotify
⑤前三种持久化方式都具备将内存队列Buffer进行扩展的能力,第四种方式只是一个内存的镜像,做用是当Broker挂掉重启后仍然能将以前内存的数据恢复出来。网络
RocketMQ充分利用Linux文件系统内存cache来提升性能。架构
(6)Message Reliablity
影响消息可靠性的几种状况:
①Broker正常关闭;
②Broker异常Crash;
③OS Crash;
④机器掉电,可是能当即恢复供电状况。
⑤机器没法开机(多是cpu、主板、内存等关键设备损坏)
⑥磁盘设备损坏。
前四种状况都属于硬件资源可当即恢复状况,RocketMQ在这四种状况下能保证消息不丢,或者丢失少许数据(依赖刷盘方式是同步仍是异步)。
后两种状况属于单点故障,且没法恢复,一旦发生,在此单点上的消息所有丢失。RocketMQ在这两种状况下,经过异步复制,可保证99%的消息不丢,可是仍然会有极少许的消息可能丢失。经过同步双写技术能够彻底避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与Money相关的应用。
RocketMQ从3.0版本开始支持同步双写。
(7)Low Latency Messaging
在消息不堆积状况下,消息到达Broker后,能马上到达Consumer。RocketMQ使用长轮询Pull方式,可保证消息很是实时,消息实时性不低于Push。
(8)At least Once
是指每一个消息必须投递一次。RocketMQ Consumer先pull消息到本地,消费完成后,才向服务器返回ack,若是没有消费必定不会ack消息,因此RocketMQ能够很好的支持此特性。
(9)Exactly Only Once
①发送消息阶段,不容许发送重复的消息。
②消费消息阶段,不容许消费重复的消息。
只有以上两个条件都知足状况下,才能认为消息是“Exactly Only Once”,而要实现以上两点,在分布式系统环境下,不可避免要产生巨大的开销。因此RocketMQ为了追求高性能,并不保证此特性,要求在业务上进行去重,也就是说消费消息要作到幂等性。RocketMQ虽然不能严格保证不重复,可是正常状况下不多会出现重复发送、消费状况,只有网络异常,Consumer启停等异常状况下会出现消息重复。
(10)Broker的Buffer问题
Broker的Buffer一般指的是Broker中一个队列的内存Buffer大小,这类Buffer一般大小有限。
另外,RocketMQ没有内存Buffer概念,RocketMQ的队列都是持久化磁盘,数据按期清除。RocketMQ同其余MQ有很是显著的区别,RocketMQ的内存Buffer抽象成一个无限长度的队列,无论有多少数据进来都能装得下,这个无限是有前提的,Broker会按期删除过时的数据,例如Broker只保存3天的消息,那么这个Buffer虽然长度无限,可是3天前的数据会被从队尾删除。
(11)回溯消费
回溯消费是指Consumer已经消费成功的消息,因为业务上的需求须要从新消费,要支持此功能,Broker在向Consumer投递成功消息后,消息仍然须要保留。而且从新消费通常是按照时间维度,例如因为Consumer系统故障,恢复后须要从新消费1小时前的数据,那么Broker要提供一种机制,能够按照时间维度来回退消费进度。
RocketMQ支持按照时间回溯消费,时间维度精确到毫秒,能够向前回溯,也能够向后回溯。
(12)消息堆积
消息中间件的主要功能是异步解耦,还有个重要功能是挡住前端的数据洪峰,保证后端系统的稳定性,这就要求消息中间件具备必定的消息堆积能力,消息堆积分如下两种状况:
①消息堆积在内存Buffer,一旦超过内存Buffer,能够根据必定的丢弃策略来丢弃消息,如CORBA Notification规范中描述。适合能容忍丢弃消息的业务,这种状况消息的堆积能力主要在于内存Buffer大小,并且消息堆积后,性能降低不会太大,由于内存中数据多少对于对外提供的访问能力影响有限。
②消息堆积到持久化存储系统中,例如DB,KV存储,文件记录形式。 当消息不能在内存Cache命中时,要不可避免的访问磁盘,会产生大量读IO,读IO的吞吐量直接决定了消息堆积后的访问能力。
评估消息堆积能力主要有如下四点:
消息能堆积多少条,多少字节?即消息的堆积容量。
消息堆积后,发消息的吞吐量大小,是否会受堆积影响?
消息堆积后,正常消费的Consumer是否会受影响?
消息堆积后,访问堆积在磁盘的消息时,吞吐量有多大?
(13)分布式事务
已知的几个分布式事务规范,如XA,JTA等。其中XA规范被各大数据库厂商普遍支持,如Oracle,Mysql等。其中XA的TM实现佼佼者如Oracle Tuxedo,在金融、电信等领域被普遍应用。
分布式事务涉及到两阶段提交问题,在数据存储方面的方面必然须要KV存储的支持,由于第二阶段的提交回滚须要修改消息状态,必定涉及到根据Key去查找Message的动做。RocketMQ在第二阶段绕过了根据Key去查找Message的问题,采用第一阶段发送Prepared消息时,拿到了消息的Offset,第二阶段经过Offset去访问消息,并修改状态,Offset就是数据的地址。
RocketMQ这种实现事务的方式,没有经过KV存储作,而是经过Offset方式,存在一个显著缺陷,即经过Offset更改数据,会令系统的脏页过多,须要特别关注。
(14)定时消息
定时消息是指消息发到Broker后,不能马上被Consumer消费,要到特定的时间点或者等待特定的时间后才能被消费。
若是要支持任意的时间精度,在Broker层面,必需要作消息排序,若是再涉及到持久化,那么消息排序要不可避免的产生巨大性能开销。
RocketMQ支持定时消息,可是不支持任意时间精度,支持特定的level,例如定时5s,10s,1m等。
(15)消息重试
Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer消费消息失败一般能够认为有如下几种状况:
因为消息自己的缘由,例如反序列化失败,消息数据自己没法处理(例如话费充值,当前消息的手机号被注销,没法充值)等。这种错误一般须要跳过这条消息,再消费其余消息,而这条失败的消息即便马上重试消费,99%也不成功,因此最好提供一种定时重试机制,即过10s秒后再重试。
因为依赖的下游应用服务不可用,例如db链接不可用,外系统网络不可达等。遇到这种错误,即便跳过当前失败的消息,消费其余消息一样也会报错。这种状况建议应用sleep 30s,再消费下一条消息,这样能够减轻Broker重试消息的压力。
RocketMQ的设计模型:
简单说来,RocketMQ具备如下特色:
①是一个队列模型的消息中间件,具备高性能、高可靠、高实时、分布式特色。
②Producer、Consumer、队列均可以分布式。
③Producer向一些队列轮流发送消息,队列集合称为Topic,Consumer若是作广播消费,则一个consumer实例消费这个Topic对应的全部队列,若是作集群消费,则多个Consumer实例平均消费这个topic对应的队列集合。
④可以保证严格的消息顺序。
⑤提供丰富的消息拉取模式。
⑥高效的订阅者水平扩展能力。
⑦实时的消息订阅机制。
⑧亿级消息堆积能力。
⑨较少的依赖。
RocketMQ 物理部署结构:
RocketMQ的部署结构有如下特色:
①Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
②Broker部署相对复杂,Broker分为Master与Slave,一个Master能够对应多个Slave,可是一个Slave只能对应一个Master,Master与Slave的对应关系经过指定相同的BrokerName,不一样的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也能够部署多个。每一个Broker与Name Server集群中的全部节点创建长链接,定时注册Topic信息到全部Name Server。
③Producer与Name Server集群中的其中一个节点(随机选择)创建长链接,按期从Name Server取Topic路由信息,并向提供Topic服务的Master创建长链接,且定时向Master发送心跳。Producer彻底无状态,可集群部署。
④Consumer与Name Server集群中的其中一个节点(随机选择)创建长链接,按期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave创建长链接,且定时向Master、Slave发送心跳。Consumer既能够从Master订阅消息,也能够从Slave订阅消息,订阅规则由Broker配置决定。
RocketMQ 逻辑部署结构:
RocketMQ的逻辑部署结构有Producer和Consumer两个特色。
(1)Producer Group
用来表示一个发送消息应用,一个Producer Group下包含多个Producer实例,能够是多台机器,也能够是一台机器的多个进程,或者一个进程的多个Producer对象。一个Producer Group能够发送多个Topic消息,Producer Group做用以下:
①标识一类Producer;
②能够经过运维工具查询这个发送消息应用下有多个Producer实例;
③发送分布式事务消息时,若是Producer中途意外宕机,Broker会主动回调Producer Group内的任意一台机器来确认事务状态。
(2)Consumer Group
用来表示一个消费消息应用,一个Consumer Group下包含多个Consumer实例,能够是多台机器,也能够是多个进程,或者是一个进程的多个Consumer对象。一个Consumer Group下的多个Consumer以均摊方式消费消息,若是设置为广播方式,那么这个Consumer Group下的每一个实例都消费全量数据。
RocketMQ 数据存储结构:
RocketMQ采起了一种数据与索引分离的存储方法。有效下降文件资源、IO资源,内存资源的损耗。即使是阿里这种海量数据,高并发场景也可以有效下降端到端延迟,并具有较强的横向扩展能力。
二.实践部分
1.在服务器上安装RocketMQ
此处略。
2.程序中使用RocketMQ
建立一个maven项目,在pom文件中添加RocketMQ客户端jar包的依赖。
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.1.0-incubating</version> </dependency>
建立生产者:
//Producer.java
package itszt; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; /** * 生产者 */ public class Producer { public static void main(String[] args) { DefaultMQProducer producer = new DefaultMQProducer("Producer"); producer.setNamesrvAddr("127.0.0.1:9876"); try { producer.start(); Message msg = new Message("PushTopic", "push", "1", "Just for test.".getBytes()); SendResult result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); msg = new Message("PushTopic", "push", "2", "Just for test.".getBytes()); result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); msg = new Message("PushTopic", "push", "1", "Just for test.".getBytes()); result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); } catch (Exception e) { e.printStackTrace(); }finally{ producer.shutdown(); } } }
建立消费者:
//Consumer.java package itszt; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * 消费者 */ public class Consumer { public static void main(String[] args) { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer"); consumer.setNamesrvAddr("127.0.0.1:9876"); try { //订阅PushTopic下Tag为push的消息 consumer.subscribe("PushTopic", "push"); //程序第一次启动从消息队列头取数据 consumer.setConsumeFromWhere( ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext Context) { Message msg = list.get(0); // System.out.println(msg.toString()); String topic = msg.getTopic(); System.out.println("topic = " + topic); byte[] body = msg.getBody(); System.out.println("body: " + new String(body)); String keys = msg.getKeys(); System.out.println("keys = " + keys); String tags = msg.getTags(); System.out.println("tags = " + tags); System.out.println("-----------------------------------------------"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } ); consumer.start(); } catch (Exception e) { e.printStackTrace(); } } }