Rocketmq原理&最佳实践

MQ背景&选型

消息队列做为高并发系统的核心组件之一,可以帮助业务系统解构提高开发效率和系统稳定性。主要具备如下优点:html

  • 削峰填谷(主要解决瞬时写压力大于应用服务能力致使消息丢失、系统奔溃等问题)
  • 系统解耦(解决不一样重要程度、不一样能力级别系统之间依赖致使一死全死)
  • 提高性能(当存在一对多调用时,能够发一条消息给消息系统,让消息系统通知相关系统)
  • 蓄流压测(线上有些链路很差压测,能够经过堆积必定量消息再放开来压测)

目前主流的MQ主要是Rocketmq、kafka、Rabbitmq,Rocketmq相比于Rabbitmq、kafka具备主要优点特性有:
• 支持事务型消息(消息发送和DB操做保持两方的最终一致性,rabbitmq和kafka不支持)
• 支持结合rocketmq的多个系统之间数据最终一致性(多方事务,二方事务是前提)
• 支持18个级别的延迟消息(rabbitmq和kafka不支持)
• 支持指定次数和时间间隔的失败消息重发(kafka不支持,rabbitmq须要手动确认)
• 支持consumer端tag过滤,减小没必要要的网络传输(rabbitmq和kafka不支持)
• 支持重复消费(rabbitmq不支持,kafka支持)linux

Rocketmq、kafka、Rabbitmq的详细对比,请参照下表格:sql

 

RocketMQ集群概述

RocketMQ集群部署结构

 
image.png

Name Server

Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。数据库

 Broker

Broker部署相对复杂,Broker分为Master与Slave,一个Master能够对应多个Slave,可是一个Slave只能对应一个Master,Master与Slave的对应关系经过指定相同的Broker Name,不一样的Broker Id来定义,BrokerId为0表示Master,非0表示Slave。Master也能够部署多个。segmentfault

每一个Broker与Name Server集群中的全部节点创建长链接,定时(每隔30s)注册Topic信息到全部Name Server。Name Server定时(每隔10s)扫描全部存活broker的链接,若是Name Server超过2分钟没有收到心跳,则Name Server断开与Broker的链接。服务器

Producer

Producer与Name Server集群中的其中一个节点(随机选择)创建长链接,按期从Name Server取Topic路由信息,并向提供Topic服务的Master创建长链接,且定时向Master发送心跳。Producer彻底无状态,可集群部署。网络

Producer每隔30s(由ClientConfig的pollNameServerInterval)从Name server获取全部topic队列的最新状况,这意味着若是Broker不可用,Producer最多30s可以感知,在此期间内发往Broker的全部消息都会失败。并发

Producer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向全部关联的broker发送心跳,Broker每隔10s中扫描全部存活的链接,若是Broker在2分钟内没有收到心跳数据,则关闭与Producer的链接。异步

Consumer

Consumer与Name Server集群中的其中一个节点(随机选择)创建长链接,按期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave创建长链接,且定时向Master、Slave发送心跳。Consumer既能够从Master订阅消息,也能够从Slave订阅消息,订阅规则由Broker配置决定。分布式

Consumer每隔30s从Name server获取topic的最新队列状况,这意味着Broker不可用时,Consumer最多最须要30s才能感知。

Consumer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向全部关联的broker发送心跳,Broker每隔10s扫描全部存活的链接,若某个链接2分钟内没有发送心跳数据,则关闭链接;并向该Consumer Group的全部Consumer发出通知,Group内的Consumer从新分配队列,而后继续消费。

当Consumer获得master宕机通知后,转向slave消费,slave不能保证master的消息100%都同步过来了,所以会有少许的消息丢失。可是一旦master恢复,未同步过去的消息会被最终消费掉。

消费者对列是消费者链接以后(或者以前有链接过)才建立的。咱们将原生的消费者标识由 {IP}@{消费者group}扩展为 {IP}@{消费者group}{topic}{tag},(例如xxx.xxx.xxx.xxx@mqtest_producer-group_2m2sTest_tag-zyk)。任何一个元素不一样,都认为是不一样的消费端,每一个消费端会拥有一份本身消费对列(默认是broker对列数量*broker数量)。新挂载的消费者对列中拥有commitlog中的全部数据。

若是有须要,能够查看Rocketmq更多源码解析

Rocketmq如何支持分布式事务消息

场景

A(存在DB操做)、B(存在DB操做)两方须要保证分布式事务一致性,经过引入中间层MQ,A和MQ保持事务一致性(异常状况下经过MQ反查A接口实现check),B和MQ保证事务一致(经过重试),从而达到最终事务一致性。

原理:大事务 = 小事务 + 异步

MQ与DB一致性原理(两方事务)

流程图

 
image.png

上图是RocketMQ提供的保证MQ消息、DB事务一致性的方案。

MQ消息、DB操做一致性方案:

1)发送消息到MQ服务器,此时消息状态为SEND_OK。此消息为consumer不可见。

2)执行DB操做;DB执行成功Commit DB操做,DB执行失败Rollback DB操做。

3)若是DB执行成功,回复MQ服务器,将状态为COMMIT_MESSAGE;若是DB执行失败,回复MQ服务器,将状态改成ROLLBACK_MESSAGE。注意此过程有可能失败。

4)MQ内部提供一个名为“事务状态服务”的服务,此服务会检查事务消息的状态,若是发现消息未COMMIT,则经过Producer启动时注册的TransactionCheckListener来回调业务系统,业务系统在checkLocalTransactionState方法中检查DB事务状态,若是成功,则回复COMMIT_MESSAGE,不然回复ROLLBACK_MESSAGE。

说明:

上面以DB为例,其实此处能够是任何业务或者数据源。

以上SEND_OK、COMMIT_MESSAGE、ROLLBACK_MESSAGE均是client jar提供的状态,在MQ服务器内部是一个数字。

TransactionCheckListener 是在消息的commit或者rollback消息丢失的状况下才会回调(上图中灰色部分)。这种消息丢失只存在于断网或者rocketmq集群挂了的状况下。当rocketmq集群挂了,若是采用异步刷盘,存在1s内数据丢失风险,异步刷盘场景下保障事务没有意义。因此若是要核心业务用Rocketmq解决分布式事务问题,建议选择同步刷盘模式。

多系统之间数据一致性(多方事务)

 
image.png

当须要保证多方(超过2方)的分布式一致性,上面的两方事务一致性(经过Rocketmq的事务性消息解决)已经没法支持。这个时候须要引入TCC模式思想(Try-Confirm-Cancel,不清楚的自行百度)。

以上图交易系统为例:

1)交易系统建立订单(往DB插入一条记录),同时发送订单建立消息。经过RocketMq事务性消息保证一致性

2)接着执行完成订单所需的同步核心RPC服务(非核心的系统经过监听MQ消息自行处理,处理结果不会影响交易状态)。执行成功更改订单状态,同时发送MQ消息。

3)交易系统接受本身发送的订单建立消息,经过定时调度系统建立延时回滚任务(或者使用RocketMq的重试功能,设置第二次发送时间为定时任务的延迟建立时间。在非消息堵塞的状况下,消息第一次到达延迟为1ms左右,这时可能RPC还未执行完,订单状态还未设置为完成,第二次消费时间能够指定)。延迟任务先经过查询订单状态判断订单是否完成,完成则不建立回滚任务,不然建立。 PS:多个RPC能够建立一个回滚任务,经过一个消费组接受一次消息就能够;也能够经过建立多个消费组,一个消息消费屡次,每次消费建立一个RPC的回滚任务。 回滚任务失败,经过MQ的重发来重试。

以上是交易系统和其余系统之间保持最终一致性的解决方案。

案例分析

单机环境下的事务示意图

以下为A给B转帐的例子。

步骤 动做
1 锁定A的帐户
2 锁定B的帐户
3 检查A帐户是否有1元
4 A的帐户扣减1元
5 给B的帐户加1元
6 解锁B的帐户
7 解锁A的帐户

以上过程在代码层面甚至能够简化到在一个事物中执行两条sql语句。

分布式环境下事务

和单机事务不一样,A、B帐户可能不在同一个DB中,此时没法像在单机状况下使用事物来实现。此时能够经过一下方式实现,将转帐操做分红两个操做。

a) A帐户

步骤 动做
1 锁定A的帐户
2 检查A帐户是否有1元
3 A的帐户扣减1元
4 解锁A的帐户

b) MQ消息
A帐户数据发生变化时,发送MQ消息,MQ服务器将消息推送给转帐系统,转帐系统来给B帐号加钱。

c) B帐户

步骤 动做
1 锁定B的帐户
2 给B的帐户加1元
3 解锁B的帐户

 顺序消息

顺序消息缺陷

发送顺序消息没法利用集群Fail Over特性消费顺序消息的并行度依赖于队列数量队列热点问题,个别队列因为哈希不均致使消息过多,消费速度跟不上,产生消息堆积问题遇到消息失败的消息,没法跳过,当前队列消费暂停。

原理

produce在发送消息的时候,把消息发到同一个队列(queue)中,消费者注册消息监听器为MessageListenerOrderly,这样就能够保证消费端只有一个线程去消费消息。

注意:把消息发到同一个队列(queue),不是同一个topic,默认状况下一个topic包括4个queue

扩展

能够经过实现发送消息的对列选择器方法,实现部分顺序消息。

举例:好比一个数据库经过MQ来同步,只须要保证每一个表的数据是同步的就能够。解析binlog,将表名做为对列选择器的参数,这样就能够保证每一个表的数据到同一个对列里面,从而保证表数据的顺序消费

最佳实践

Producer

Topic

一个应用尽量用一个Topic,消息子类型用tags来标识,tags能够由应用自由设置。只有发送消息设置了tags,消费方在订阅消息时,才能够利用tags 在broker作消息过滤。

key

每一个消息在业务层面的惟一标识码,要设置到 keys 字段,方便未来定位消息丢失问题。服务器会为每一个消息建立索引(哈希索引),应用能够经过 topic,key来查询这条消息内容,以及消息被谁消费。因为是哈希索引,请务必保证key 尽量惟一,这样能够避免潜在的哈希冲突。

//订单Id

String orderId= "20034568923546";

message.setKeys(orderId);

日志

消息发送成功或者失败,要打印消息日志,务必要打印 send result 和key 字段。

send

send消息方法,只要不抛异常,就表明发送成功。可是发送成功会有多个状态,在sendResult里定义。

SEND_OK:消息发送成功

FLUSH_DISK_TIMEOUT:消息发送成功,可是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失

FLUSH_SLAVE_TIMEOUT:消息发送成功,可是服务器同步到Slave时超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失

SLAVE_NOT_AVAILABLE:消息发送成功,可是此时slave不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失

Consumer

幂等

RocketMQ使用的消息原语是At Least Once,因此consumer可能屡次收到同一个消息,此时务必作好幂等。

日志

消费时记录日志,以便后续定位问题。

批量消费

尽可能使用批量方式消费方式,能够很大程度上提升消费吞吐量。

参考资料

文档

RocketMQ_design.pdf
RocketMQ_experience.pdf

博客

分布式开放消息系统(RocketMQ)的原理与实践

http://www.jianshu.com/p/453c6e7ff81c

RocketMQ事务消费和顺序消费详解

http://www.cnblogs.com/520playboy/p/6750023.html

ZeroCopy

http://www.linuxjournal.com/article/6345

IO方式的性能数据

http://stblog.baidu-tech.com/?p=851

原文

https://www.jianshu.com/p/2838890f3284