分布式队列由于有高可靠性,保证消息不会丢失的要求,因此数据要进行持久化存储。.java
持久化方式能够分红两大类web
通常来说性能对比上:文件系统>关系型数据库DB算法
RocketMq的文件存储系统有两点优化以保证性能:sql
顺序写
,保证了消息存储的速度。目前的高性能磁盘,顺序写速度能够达到600MB/s, 超过了通常网卡的传输速度,可是磁盘随机写的速度只有大概100KB/s零拷贝技术有个限制是不能超过2G,因此RocketMQ默认设置单个CommitLog日志数据文件为1G数据库
RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成的安全
CommitLog
:消息真正的物理存储文件是CommitLog,默认一个文件一个G,存储的是Topic,QueueId和Message,一个存储满了会自动建立一个新的。ConsumeQueue
:是消息的逻辑队列,相似数据库的索引文件,存储的是指向物理存储的地址,为了加快消息的读取速度。消费者消费某条消息时,先查询索引获取CommitLog的对应的物理地址。每一个Topic下的每一个Message Queue都有一个对应的ConsumeQueue文件,文件很小,一般会加载到内存中。若是该文件丢失或者损坏,能够经过CommitLog恢复IndexFile
:也是个索引文件,为了消息查询提供了一种经过key或时间区间来查询消息的方法,这种经过IndexFile来查找消息的方法不影响发送与消费消息的主流程RocketMq是天生支持分布式的,能够配置主从以及水平扩展网络
Master角色的Broker支持读和写,Slave角色的Broker仅支持读,也就是 Producer只能和Master角色的Broker链接写入消息;Consumer能够链接 Master角色的Broker,也能够链接Slave角色的Broker来读取消息。架构
若是一个Broker组有Master和Slave,消息须要从Master复制到Slave 上,有同步和异步两种复制方式。负载均衡
Producer端,每一个实例在发消息的时候,默认会轮询全部的message queue发送,以达到让消息平均落在不一样的queue上。而因为queue能够散落在不一样的broker,因此消息就发送到不一样的broker下,以下图:异步
若是consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将没法分到queue,也就没法消费到消息,也就没法起到分摊负载的做用了。因此须要控制让queue的总数量大于等于consumer的数量。
消费者的集群模式–启动多个消费者就能够保证消费者的负载均衡(均摊队列)
默认使用的是均摊队列:会按照queue的数量和实例的数量平均分配queue给每一个实例,这样每一个消费者能够均摊消费的队列,以下图所示6个队列和三个生产者。
对于广播模式并非负载均衡的,要求一条消息须要投递到一个消费组下面全部的消费者实例,因此也就没有消息被分摊消费的说法。
对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的状况。
所以,在使用顺序消息时,务必保证应用可以及时监控并处理消费失败的状况,避免阻塞现象的发生。
对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您能够经过设置返回状态达到消息重试的结果。
无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息再也不重试,继续消费新的消息。
消息队列 RocketMQ 默认容许每条消息最多重试 16 次
,将会在接下来的 4 小时 46 分钟以内进行 16 次重试,若是依然失败就会进入死信队列。
一条消息不管重试多少次,这些重试消息的 Message ID 不会改变。
也能够经过配置,让其再也不重试,可是不建议这样
public class MessageListenerImpl implements MessageListener { @Override public Action consume(Message message, ConsumeContext context) { try { doConsumeMessage(message); } catch (Throwable e) { //捕获消费逻辑中的全部异常,并返回 Action.CommitMessage; return Action.CommitMessage; } //消息处理正常,直接返回 Action.CommitMessage; return Action.CommitMessage; } }
死信消息具备如下特性:
死信队列具备如下特性:
查看死信队列
一条消息进入死信队列,意味着某些因素致使消费者没法正常消费该消息,所以,一般须要您对其进行特殊处理。排查可疑因素并解决问题后,能够在消息队列 RocketMQ 控制台从新发送该消息,让消费者从新消费一次。
消息队列 RocketMQ 消费者在接收到消息之后,有必要根据业务上的惟一 Key 对消息作幂等处理的必要性。
在互联网应用中,尤为在网络不稳定的状况下,消息队列 RocketMQ 的消息有可能会出现重复,这个重复简单能够归纳为如下状况:
发送时消息重复
当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,致使服务端对客户端应答失败。 若是此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同而且 Message ID 也相同的消息。
消费时消息重复
消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递以前已被处理过的消息,消费者后续会收到两条内容相同而且 Message ID 也相同的消息。
负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)
当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。
由于 Message ID 有可能出现冲突(重复)的状况,因此真正安全的幂等处理,不建议以 Message ID 做为处理依据。 最好的方式是以业务惟一标识做为幂等处理的关键依据,而业务的惟一标识能够经过消息 Key 进行设置:
Message message = new Message(); message.setKey("ORDERID_100"); SendResult sendResult = producer.send(message);
订阅方收到消息时能够根据消息的 Key 进行幂等处理:
consumer.subscribe("ons_test", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { String key = message.getKey() // 根据业务惟一标识的 key 作幂等处理 } });