一个应用尽量用一个Topic,而消息子类型则能够用tags来标识。java
tags可由应用自行设置,只有生产者在发送消息设置了tags,消费方在订阅消息时才能够利用tags经过broker作消息过滤:算法
message.setTags("TagA");
每一个消息在业务层面的惟一标识码要设置到keys字段,方便往后定位消息丢失问题。spring
服务器会为每一个消息建立哈希索引,应用能够经过topic、key来查询这条消息内容,以及消息被谁消费。数据库
哈希索引,请保证key尽量惟一,避免潜在的哈希冲突。后端
// 订单Id String orderId = "20034568923546"; message.setKeys(orderId);
消息发送成功或者失败要打印消息日志,务必要打印SendResult和key字段。send消息方法只要不抛异常,就表明发送成功。发送成功会有多个状态,在sendResult里定义。如下对每一个状态进行说明:缓存
消息发送成功。要注意的是消息发送成功也不意味着它是可靠的。要确保不会丢失任何消息,还应启用同步Master服务器或同步刷盘,即SYNC_MASTER或SYNC_FLUSH。安全
消息发送成功可是服务器刷盘超时。此时消息已经进入服务器队列(内存),只有服务器宕机,消息才会丢失。消息存储配置参数中能够设置刷盘方式和同步刷盘时间长度,若是Broker服务器设置了刷盘方式为同步刷盘,即FlushDiskType=SYNC_FLUSH(默认为异步刷盘方式),当Broker服务器未在同步刷盘时间内(默认为5s)完成刷盘,则将返回该状态——刷盘超时。服务器
消息发送成功,可是服务器同步到Slave时超时。此时消息已经进入服务器队列,只有服务器宕机,消息才会丢失。若是Broker服务器的角色是同步Master,即SYNC_MASTER(默认是异步Master即ASYNC_MASTER),而且从Broker服务器未在同步刷盘时间(默认为5秒)内完成与主服务器的同步,则将返回该状态——数据同步到Slave服务器超时。网络
消息发送成功,可是此时Slave不可用。若是Broker服务器的角色是同步Master,即SYNC_MASTER(默认是异步Master服务器即ASYNC_MASTER),但没有配置slave Broker服务器,则将返回该状态——无Slave服务器可用。数据结构
Producer的send方法自己支持内部重试,重试逻辑以下:
必定程度上保证了消息能够发送成功。若是业务对消息可靠性要求比较高,建议应用增长相应的重试逻辑:好比调用send同步方法发送失败时,尝试将消息存储到DB,而后由后台线程定时重试,确保消息必定到达Broker。
那么DB重试方案为何没有集成到MQ客户端内部,而要求应用本身完成?
消息发送过程:
因此,一次消息发送的耗时时间是上述三个步骤的总和,而某些场景要求耗时很是短,可是对可靠性要求并不高,例如日志收集类应用,此类应用能够采用oneway形式调用,oneway形式只发送请求不等待应答,而发送请求在客户端实现层面仅仅是一个os系统调用的开销,即将数据写入客户端的socket缓冲区,此过程耗时一般在微秒级。
RocketMQ没法避免消息重复(Exactly-Once),因此若是业务对消费重复很是敏感,务必要在业务层面进行去重处理。能够借助关系数据库进行去重。首先须要肯定消息的惟一键,能够是msgId,也能够是消息内容中的惟一标识字段,例如订单Id等。在消费以前判断惟一键是否在关系数据库中存在。若是不存在则插入,并消费,不然跳过。(实际过程要考虑原子性问题,判断是否存在能够尝试插入,若是报主键冲突,则插入失败,直接跳过)
msgId必定是全局惟一标识符,可是实际使用中,可能会存在相同的消息有两个不一样msgId的状况(消费者主动重发、因客户端重投机制致使的重复等),这种状况就须要使业务字段进行重复消费。
绝大部分消息消费行为都属于 IO 密集型,便可能是操做数据库,或者调用 RPC,这类消费行为的消费速度在于后端数据库或者外系统的吞吐量,经过增长消费并行度,能够提升总的消费吞吐量,可是并行度增长到必定程度,反而会降低。因此,应用必需要设置合理的并行度。 以下有几种修改消费并行度的方法:
某些业务流程若是支持批量方式消费,则能够很大程度上提升消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样便可大幅度提升消费的吞吐量,经过设置 consumer的 consumeMessageBatchMaxSize 返个参数,默认是 1,即一次只消费一条消息,例如设置为 N,那么每次消费的消息数小于等于 N。
发生消息堆积时,若是消费速度一直追不上发送速度,若是业务对数据要求不高的话,能够选择丢弃不重要的消息。例如,当某个队列的消息数堆积到100000条以上,则尝试丢弃部分或所有消息,这样就能够快速追上发送消息的速度。示例代码以下:
public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { long offset = msgs.get(0).getQueueOffset(); String maxOffset = msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET); long diff = Long.parseLong(maxOffset) - offset; if (diff > 100000) { // TODO 消息堆积状况的特殊处理 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } // TODO 正常消费过程 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }
举例以下,某条消息的消费过程以下:
这条消息的消费过程当中有4次与 DB的 交互,若是按照每次 5ms 计算,那么总共耗时 20ms,假设业务计算耗时 5ms,那么总过耗时 25ms,因此若是能把 4 次 DB 交互优化为 2 次,那么总耗时就能够优化到 15ms,即整体性能提升了 40%。因此应用若是对时延敏感的话,能够把DB部署在SSD硬盘,相比于SCSI磁盘,前者的RT会小不少。
若是消息量较少,建议在消费入口方法打印消息,消费耗时等,方便后续排查问题。
public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { log.info("RECEIVE_MSG_BEGIN: " + msgs.toString()); // TODO 正常消费过程 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }
若是能打印每条消息消费耗时,那么在排查消费慢等线上问题时,会更方便。
第一件须要注意的事情是,不一样的消费者组能够独立的消费一些 topic,而且每一个消费者组都有本身的消费偏移量,请确保同一组内的每一个消费者订阅信息保持一致。
消费者将锁定每一个消息队列,以确保他们被逐个消费,虽然这将会致使性能降低,可是当你关心消息顺序的时候会颇有用。咱们不建议抛出异常,你能够返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT 做为替代。
顾名思义,消费者将并发消费这些消息,建议你使用它来得到良好性能,咱们不建议抛出异常,你能够返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 做为替代。
对于并发的消费监听器,你能够返回 RECONSUME_LATER 来通知消费者如今不能消费这条消息,而且但愿能够稍后从新消费它。而后,你能够继续消费其余消息。对于有序的消息监听器,由于你关心它的顺序,因此不能跳过消息,可是你能够返回SUSPEND_CURRENT_QUEUE_A_MOMENT 告诉消费者等待片刻。
不建议阻塞监听器,由于它会阻塞线程池,并最终可能会终止消费进程
消费者使用 ThreadPoolExecutor 在内部对消息进行消费,因此你能够经过设置 setConsumeThreadMin 或 setConsumeThreadMax 来改变它。
当创建一个新的消费者组时,须要决定是否须要消费已经存在于 Broker 中的历史消息CONSUME_FROM_LAST_OFFSET 将会忽略历史消息,并消费以后生成的任何消息。CONSUME_FROM_FIRST_OFFSET 将会消费每一个存在于 Broker 中的信息。你也可使用 CONSUME_FROM_TIMESTAMP 来消费在指定时间戳后产生的消息。
Broker 角色分为 ASYNC_MASTER(异步主机)、SYNC_MASTER(同步主机)以及SLAVE(从机)。若是对消息的可靠性要求比较严格,能够采用 SYNC_MASTER加SLAVE的部署方式。若是对消息可靠性要求不高,能够采用ASYNC_MASTER加SLAVE的部署方式。若是只是测试方便,则能够选择仅ASYNC_MASTER或仅SYNC_MASTER的部署方式。
SYNC_FLUSH(同步刷新)相比于ASYNC_FLUSH(异步处理)会损失不少性能,可是也更可靠,因此须要根据实际的业务场景作好权衡。
参数名 | 默认值 | 说明 |
---|---|---|
listenPort | 10911 | 接受客户端链接的监听端口 |
namesrvAddr | null | nameServer 地址 |
brokerIP1 | 网卡的 InetAddress | 当前 broker 监听的 IP |
brokerIP2 | 跟 brokerIP1 同样 | 存在主从 broker 时,若是在 broker 主节点上配置了 brokerIP2 属性,broker 从节点会链接主节点配置的 brokerIP2 进行同步 |
brokerName | null | broker 的名称 |
brokerClusterName | DefaultCluster | 本 broker 所属的 Cluser 名称 |
brokerId | 0 | broker id, 0 表示 master, 其余的正整数表示 slave |
storePathCommitLog | $HOME/store/commitlog/ | 存储 commit log 的路径 |
storePathConsumerQueue | $HOME/store/consumequeue/ | 存储 consume queue 的路径 |
mappedFileSizeCommitLog | 1024 * 1024 * 1024(1G) | commit log 的映射文件大小 |
deleteWhen | 04 | 在天天的什么时间删除已经超过文件保留时间的 commit log |
fileReservedTime | 72 | 以小时计算的文件保留时间 |
brokerRole | ASYNC_MASTER | SYNC_MASTER/ASYNC_MASTER/SLAVE |
flushDiskType | ASYNC_FLUSH | SYNC_FLUSH/ASYNC_FLUSH SYNC_FLUSH 模式下的 broker 保证在收到确认生产者以前将消息刷盘。ASYNC_FLUSH 模式下的 broker 则利用刷盘一组消息的模式,能够取得更好的性能。 |
RocketMQ 中,Name Servers 被设计用来作简单的路由管理。其职责包括:
相对于RocketMQ的Broker集群,生产者和消费者都是客户端。本小节主要描述生产者和消费者公共的行为配置。
RocketMQ能够令客户端找到Name Server, 而后经过Name Server再找到Broker。以下所示有多种配置方式,优先级由高到低,高优先级会覆盖低优先级。
producer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876"); consumer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
-Drocketmq.namesrv.addr=192.168.0.1:9876;192.168.0.2:9876
export NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876
客户端启动后,会定时访问一个静态HTTP服务器,地址以下:http://jmenv.tbsite.net:8080/rocketmq/nsaddr,这个URL的返回内容以下:
192.168.0.1:9876;192.168.0.2:9876
客户端默认每隔2分钟访问一次这个HTTP服务器,并更新本地的Name Server地址。URL已经在代码中硬编码,可经过修改/etc/hosts文件来改变要访问的服务器,例如在/etc/hosts增长以下配置:
10.232.22.67 jmenv.taobao.net
推荐使用HTTP静态服务器寻址方式,好处是客户端部署简单,且Name Server集群能够热升级。
DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPullConsumer都继承于ClientConfig类,ClientConfig为客户端的公共配置类。客户端的配置都是get、set形式,每一个参数均可以用spring来配置,也能够在代码中配置,例如namesrvAddr这个参数能够这样配置,producer.setNamesrvAddr(“192.168.0.1:9876”),其余参数同理。
参数名 | 默认值 | 说明 |
---|---|---|
namesrvAddr | Name Server地址列表,多个NameServer地址用分号隔开 | |
clientIP | 本机IP | 客户端本机IP地址,某些机器会发生没法识别客户端IP地址状况,须要应用在代码中强制指定 |
instanceName | DEFAULT | 客户端实例名称,客户端建立的多个Producer、Consumer实际是共用一个内部实例(这个实例包含网络链接、线程资源等) |
clientCallbackExecutorThreads | 4 | 通讯层异步回调线程数 |
pollNameServerInteval | 30000 | 轮询Name Server间隔时间,单位毫秒 |
heartbeatBrokerInterval | 30000 | 向Broker发送心跳间隔时间,单位毫秒 |
persistConsumerOffsetInterval | 5000 | 持久化Consumer消费进度间隔时间,单位毫秒 |
参数名 | 默认值 | 说明 |
---|---|---|
producerGroup | DEFAULT_PRODUCER | Producer组名,多个Producer若是属于一个应用,发送一样的消息,则应该将它们归为同一组 |
createTopicKey | TBW102 | 在发送消息时,自动建立服务器不存在的topic,须要指定Key,该Key可用于配置发送消息所在topic的默认路由。 |
defaultTopicQueueNums | 4 | 在发送消息,自动建立服务器不存在的topic时,默认建立的队列数 |
sendMsgTimeout | 10000 | 发送消息超时时间,单位毫秒 |
compressMsgBodyOverHowmuch | 4096 | 消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节 |
retryAnotherBrokerWhenNotStoreOK | FALSE | 若是发送消息返回sendResult,可是sendStatus!=SEND_OK,是否重试发送 |
retryTimesWhenSendFailed | 2 | 若是消息发送失败,最大重试次数,该参数只对同步发送模式起做用 |
maxMessageSize | 4MB | 客户端限制的消息大小,超过报错,同时服务端也会限制,因此须要跟服务端配合使用。 |
transactionCheckListener | 事务消息回查监听器,若是发送事务消息,必须设置 | |
checkThreadPoolMinSize | 1 | Broker回查Producer事务状态时,线程池最小线程数 |
checkThreadPoolMaxSize | 1 | Broker回查Producer事务状态时,线程池最大线程数 |
checkRequestHoldMax | 2000 | Broker回查Producer事务状态时,Producer本地缓冲请求队列大小 |
RPCHook | null | 该参数是在Producer建立时传入的,包含消息发送前的预处理和消息响应后的处理两个接口,用户能够在第一个接口中作一些安全控制或者其余操做。 |
参数名 | 默认值 | 说明 |
---|---|---|
consumerGroup | DEFAULT_CONSUMER | Consumer组名,多个Consumer若是属于一个应用,订阅一样的消息,且消费逻辑一致,则应该将它们归为同一组 |
messageModel | CLUSTERING | 消费模型支持集群消费和广播消费两种 |
consumeFromWhere | CONSUME_FROM_LAST_OFFSET | Consumer启动后,默认从上次消费的位置开始消费,这包含两种状况:一种是上次消费的位置未过时,则消费从上次停止的位置进行;一种是上次消费位置已通过期,则从当前队列第一条消息开始消费 |
consumeTimestamp | 半个小时前 | 只有当consumeFromWhere值为CONSUME_FROM_TIMESTAMP时才起做用。 |
allocateMessageQueueStrategy | AllocateMessageQueueAveragely | Rebalance算法实现策略 |
subscription | 订阅关系 | |
messageListener | 消息监听器 | |
offsetStore | 消费进度存储 | |
consumeThreadMin | 10 | 消费线程池最小线程数 |
consumeThreadMax | 20 | 消费线程池最大线程数 |
consumeConcurrentlyMaxSpan | 2000 | 单队列并行消费容许的最大跨度 |
pullThresholdForQueue | 1000 | 拉消息本地队列缓存消息最大数 |
pullInterval | 0 | 拉消息间隔,因为是长轮询,因此为0,可是若是应用为了流控,也能够设置大于0的值,单位毫秒 |
consumeMessageBatchMaxSize | 1 | 批量消费,一次消费多少条消息 |
pullBatchSize | 32 | 批量拉消息,一次最多拉多少条 |
参数名 | 默认值 | 说明 |
---|---|---|
consumerGroup | DEFAULT_CONSUMER | Consumer组名,多个Consumer若是属于一个应用,订阅一样的消息,且消费逻辑一致,则应该将它们归为同一组 |
brokerSuspendMaxTimeMillis | 20000 | 长轮询,Consumer拉消息请求在Broker挂起最长时间,单位毫秒 |
consumerTimeoutMillisWhenSuspend | 30000 | 长轮询,Consumer拉消息请求在Broker挂起超过指定时间,客户端认为超时,单位毫秒 |
consumerPullTimeoutMillis | 10000 | 非长轮询,拉消息超时时间,单位毫秒 |
messageModel | BROADCASTING | 消息支持两种模式:集群消费和广播消费 |
messageQueueListener | 监听队列变化 | |
offsetStore | 消费进度存储 | |
registerTopics | 注册的topic集合 | |
allocateMessageQueueStrategy | AllocateMessageQueueAveragely | Rebalance算法实现策略 |
字段名 | 默认值 | 说明 |
---|---|---|
Topic | null | 必填,消息所属topic的名称 |
Body | null | 必填,消息体 |
Tags | null | 选填,消息标签,方便服务器过滤使用。目前只支持每一个消息设置一个tag |
Keys | null | 选填,表明这条消息的业务关键词,服务器会根据keys建立哈希索引,设置后,能够在Console系统根据Topic、Keys来查询消息,因为是哈希索引,请尽量保证key惟一,例如订单号,商品Id等。 |
Flag | 0 | 选填,彻底由应用来设置,RocketMQ不作干预 |
DelayTimeLevel | 0 | 选填,消息延时级别,0表示不延时,大于0会延时特定的时间才会被消费 |
WaitStoreMsgOK | TRUE | 选填,表示消息是否在服务器落盘后才返回应答。 |
本小节主要介绍系统(JVM/OS)相关的配置。
推荐使用最新发布的JDK 1.8版本。经过设置相同的Xms和Xmx值来防止JVM调整堆大小以得到更好的性能。简单的JVM配置以下所示:
-server -Xms8g -Xmx8g -Xmn4g
若是您不关心RocketMQ Broker的启动时间,还有一种更好的选择,就是经过“预触摸”Java堆以确保在JVM初始化期间每一个页面都将被分配。那些不关心启动时间的人能够启用它:
-XX:+AlwaysPreTouch
禁用偏置锁定可能会减小JVM暂停,
-XX:-UseBiasedLocking
至于垃圾回收,建议使用带JDK 1.8的G1收集器。
-XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30
这些GC选项看起来有点激进,但事实证实它在咱们的生产环境中具备良好的性能。另外不要把-XX:MaxGCPauseMillis的值设置过小,不然JVM将使用一个小的年轻代来实现这个目标,这将致使很是频繁的minor GC,因此建议使用rolling GC日志文件:
-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m
若是写入GC文件会增长代理的延迟,能够考虑将GC日志文件重定向到内存文件系统:
-Xloggc:/dev/shm/mq_gc_%p.log123
os.sh脚本在bin文件夹中列出了许多内核参数,能够进行微小的更改而后用于生产用途。下面的参数须要注意,更多细节请参考/proc/sys/vm/*的文档