【总结】RocketMq高级特性

1、消息存储

分布式队列由于有高可靠性,保证消息不会丢失的要求,因此数据要进行持久化存储。.java

1. 为何要存储到文件系统?如何保证性能?

持久化方式能够分红两大类web

  • 关系型数据库:ActiveMQ默认采用的KahaDB作消息存储,因为,普通关系型数据库(如Mysql)在单表数据量达到千万级别的状况下,其IO读写性能每每会出现瓶颈。
  • 文件系统:RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来作持久化,刷盘通常能够分为异步刷盘和同步刷盘两种模式

通常来说性能对比上:文件系统>关系型数据库DB算法

RocketMq的文件存储系统有两点优化以保证性能sql

  • 消息存储(顺序写):RocketMQ的消息用顺序写,保证了消息存储的速度。目前的高性能磁盘,顺序写速度能够达到600MB/s, 超过了通常网卡的传输速度,可是磁盘随机写的速度只有大概100KB/s
  • 消息发送(零拷贝):将本机磁盘文件的内容发送到客户端须要进行屡次复制,好比从磁盘复制数据到内核态内存;从内核态内存复制到用户态内存;从用户态内存复制到网络驱动,最后从网络驱动复制到网卡中。RocketMq采用Java中零拷贝的技术,让从内核态内存复制到用户态内存这一步省略,直接赋值到网络驱动中

零拷贝技术有个限制是不能超过2G,因此RocketMQ默认设置单个CommitLog日志数据文件为1G数据库

2. 加入持久化后RocketMq的架构是什么样的?

  1. 消息生成者发送消息
  2. MQ收到消息,将消息进行持久化,在存储中新增一条记录
  3. 返回ACK给生产者
  4. MQ push 消息给对应的消费者,而后等待消费者返回ACK
  5. 若是消息消费者在指定时间内成功返回ack,那么MQ认为消息消费成功,在存储中删除消息,即执行第6步;若是MQ在指定时间内没有收到ACK,则认为消息消费失败,会尝试从新push消息,重复执行四、五、6步骤
  6. MQ删除消息

3. 存储结构是什么样的?

RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成的安全

  • CommitLog:消息真正的物理存储文件是CommitLog,默认一个文件一个G,存储的是Topic,QueueId和Message,一个存储满了会自动建立一个新的。
  • ConsumeQueue:是消息的逻辑队列,相似数据库的索引文件,存储的是指向物理存储的地址,为了加快消息的读取速度。消费者消费某条消息时,先查询索引获取CommitLog的对应的物理地址。每一个Topic下的每一个Message Queue都有一个对应的ConsumeQueue文件,文件很小,一般会加载到内存中。若是该文件丢失或者损坏,能够经过CommitLog恢复
  • IndexFile:也是个索引文件,为了消息查询提供了一种经过key或时间区间来查询消息的方法,这种经过IndexFile来查找消息的方法不影响发送与消费消息的主流程

4. 刷盘机制有哪些?

  • 同步刷盘(数据必定保存成功,可是速度慢):在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,马上通知刷盘线程刷盘, 而后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写 成功的状态。
  • 异步刷盘(速度快,数据不必定保存成功):在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操做的返回快,吞吐量大;当内存里的消息量积累到必定程度时,统一触发写磁盘动做,快速写入。

5. 如何保证消息不丢失?

  • RocketMq提供消息持久化机制,消息的刷盘策略分为同步刷盘和异步刷盘。同步刷盘即刷盘成功后再返回一个成功信息,可以保证数据必定保存成功,可是会下降系统吞吐量,异步刷盘与同步刷盘相反,我通常会采用同步刷盘的策略来保证消息不会丢失。
  • RocketMq采用的文件系统存储而不是关系型数据库存储,由于在通常状况下文件系统的性能是比数据库性能高的
  • 而RocketMq为了提升文件系统的读写的高性能,作了两点优化。第一点是采用顺序写的方式,这样能够大大提升磁盘写的性能。第二点采用了零拷贝,原来的文件读取流程是:从磁盘复制数据到内核态内存;从内核态内存复制到用户态内存;从用户态内存复制到网络驱动,最后从网络驱动复制到网卡中发送,零拷贝则省去了从内核态内存复制到用户态内存的这一过程,提升了读取的性能,可是零拷贝对文件大小有要求,因此RocketMq的持久化文件commitlog默认为1G。
  • commitlog是存储了RocketMq的消息等核心信息,除此以外,还提供可一个ConsumeQueue做为持久化文件的索引,提升查询的效率,通常文件比较小,都是加载在内存中。除了ConsumeQueue以外,还会存储一个IndexFile文件,用来提供针对某一个key或者时间区间的查询。

2、高可用机制

RocketMq是天生支持分布式的,能够配置主从以及水平扩展网络

Master角色的Broker支持读和写,Slave角色的Broker仅支持读,也就是 Producer只能和Master角色的Broker链接写入消息;Consumer能够链接 Master角色的Broker,也能够链接Slave角色的Broker来读取消息。架构

1. 消息消费的高可用(主从)

  • 在Consumer的配置文件中,并不须要设置是从Master读仍是从Slave 读,当Master不可用或者繁忙的时候,Consumer会被自动切换到从Slave 读。有了自动切换Consumer这种机制,当一个Master角色的机器出现故障后,Consumer仍然能够从Slave读取消息,不影响Consumer程序。这就达到了消费端的高可用性。
  • RocketMQ目前还不支持把Slave自动转成Master,若是机器资源不足,须要把Slave转成Master,则要手动中止Slave角色的Broker,更改配置文件,用新的配置文件启动Broker。

2. 消息发送高可用(配置多个主节点)

  • 在建立Topic的时候,把Topic的多个Message Queue建立在多个Broker组上(相同Broker名称,不一样 brokerId的机器组成一个Broker组),这样当一个Broker组的Master不可用后,其余组的Master仍然可用,Producer仍然能够发送消息。

3. 主从复制

若是一个Broker组有Master和Slave,消息须要从Master复制到Slave 上,有同步和异步两种复制方式。负载均衡

  • 同步复制:同步复制方式是等Master和Slave均写成功后才反馈给客户端写成功状态。若是Master出故障, Slave上有所有的备份数据,容易恢复同步复制会增大数据写入延迟,下降系统吞吐量。
  • 异步复制:异步复制方式是只要Master写成功 便可反馈给客户端写成功状态。在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,可是若是Master出了故障,有些数据由于没有被写 入Slave,有可能会丢失
  • 一般状况下,应该把Master和Save配置成同步刷盘方式,主从之间配置成异步的复制方式,这样即便有一台机器出故障,仍然能保证数据不丢,是个不错的选择。

3、负载均衡

1. Producer负载均衡

Producer端,每一个实例在发消息的时候,默认会轮询全部的message queue发送,以达到让消息平均落在不一样的queue上。而因为queue能够散落在不一样的broker,因此消息就发送到不一样的broker下,以下图:异步

2. Consumer负载均衡

若是consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将没法分到queue,也就没法消费到消息,也就没法起到分摊负载的做用了。因此须要控制让queue的总数量大于等于consumer的数量。

  • 消费者的集群模式–启动多个消费者就能够保证消费者的负载均衡(均摊队列)

  • 默认使用的是均摊队列:会按照queue的数量和实例的数量平均分配queue给每一个实例,这样每一个消费者能够均摊消费的队列,以下图所示6个队列和三个生产者。

  • 另一种平均的算法环状轮流分queue的形式,每一个消费者,均摊不一样主节点的一个消息队列,以下图所示:

对于广播模式并非负载均衡的,要求一条消息须要投递到一个消费组下面全部的消费者实例,因此也就没有消息被分摊消费的说法。

4、消息重试机制

1. 顺序消息的重试

  • 对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的状况。

  • 所以,在使用顺序消息时,务必保证应用可以及时监控并处理消费失败的状况,避免阻塞现象的发生。

2. 无序消息的重试

  • 对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您能够经过设置返回状态达到消息重试的结果。

  • 无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息再也不重试,继续消费新的消息。

  • 消息队列 RocketMQ 默认容许每条消息最多重试 16 次将会在接下来的 4 小时 46 分钟以内进行 16 次重试,若是依然失败就会进入死信队列。

  • 一条消息不管重试多少次,这些重试消息的 Message ID 不会改变。

  • 也能够经过配置,让其再也不重试,可是不建议这样

public class MessageListenerImpl implements MessageListener {
    @Override
    public Action consume(Message message, ConsumeContext context) {
        try {
            doConsumeMessage(message);
        } catch (Throwable e) {
            //捕获消费逻辑中的全部异常,并返回 Action.CommitMessage;
            return Action.CommitMessage;
        }
        //消息处理正常,直接返回 Action.CommitMessage;
        return Action.CommitMessage;
    }
}

5、死信队列

死信消息具备如下特性:

  • 不会再被消费者正常消费。
  • 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。所以,请在死信消息产生后的 3 天内及时处理。

死信队列具备如下特性:

  • 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
  • 若是一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其建立相应的死信队列。
  • 一个死信队列包含了对应 Group ID 产生的全部死信消息,不论该消息属于哪一个 Topic。

查看死信队列

  1. 在控制台查询出现死信队列的主题信息

  1. 在消息界面根据主题查询死信消息

  1. 选择从新发送消息

一条消息进入死信队列,意味着某些因素致使消费者没法正常消费该消息,所以,一般须要您对其进行特殊处理。排查可疑因素并解决问题后,能够在消息队列 RocketMQ 控制台从新发送该消息,让消费者从新消费一次。

6、消费幂等

消息队列 RocketMQ 消费者在接收到消息之后,有必要根据业务上的惟一 Key 对消息作幂等处理的必要性。

1. 何时产生重复消息?

在互联网应用中,尤为在网络不稳定的状况下,消息队列 RocketMQ 的消息有可能会出现重复,这个重复简单能够归纳为如下状况:

  • 发送时消息重复

    当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,致使服务端对客户端应答失败。 若是此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同而且 Message ID 也相同的消息。

  • 消费时消息重复

    消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递以前已被处理过的消息,消费者后续会收到两条内容相同而且 Message ID 也相同的消息。

  • 负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)

    当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。

2. 处理方式

由于 Message ID 有可能出现冲突(重复)的状况,因此真正安全的幂等处理,不建议以 Message ID 做为处理依据。 最好的方式是以业务惟一标识做为幂等处理的关键依据,而业务的惟一标识能够经过消息 Key 进行设置:

Message message = new Message();
message.setKey("ORDERID_100");
SendResult sendResult = producer.send(message);

订阅方收到消息时能够根据消息的 Key 进行幂等处理:

consumer.subscribe("ons_test", "*", new MessageListener() {
    public Action consume(Message message, ConsumeContext context) {
        String key = message.getKey()
        // 根据业务惟一标识的 key 作幂等处理
    }
});