RocketMQ 经过本身的方式解决了消息顺序性的问题:
RocketMQ经过轮询全部队列的方式来肯定消息被发送到哪个队列(负载均衡策略)。根据不一样业务,能够将业务ID做为计算队列,让业务ID相同的消息前后发送到同一个队列中,在获取到路由信息之后,会根据MessageQueueSelector实现的算法来选择一个队列,同一个OrderId获取到的确定是同一个队列。java
在网络不稳定的状况下,RocketMQ会出现消息重复的问题:redis
发送时消息重复算法
- 当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,致使服务端对客户端应答失败。 若是此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同而且 Message ID 也相同的消息。
投递时消息重复缓存
- 消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递以前已被处理过的消息,消费者后续会收到两条内容相同而且 Message ID 也相同的消息。
负载均衡时消息重复服务器
- 当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。
形成消息重复的根本缘由是:网络不可达。只要经过网络交换数据,就没法避免这个问题。 因此解决这个问题的办法就是绕过这个问题。那么问题就变成了:若是消费端收到两条同样的消息,应该怎样处理?网络
第1条很好理解,只要保持幂等性,无论来多少条重复消息,最后处理的结果都同样。
第2条原理就是利用一张日志表来记录已经处理成功的消息的ID,若是新到的消息ID已经在日志表中,那么就再也不处理这条消息。
第1条解决方案,很明显应该在消费端实现,不属于消息系统要实现的功能。
第2条能够消息系统实现,也能够业务端实现。正常状况下出现重复消息的几率其实很小,若是由消息系统来实现的话,确定会对消息系统的吞吐量和高可用有影响,因此最好仍是由业务端本身处理消息重复的问题,这也是RocketMQ不解决消息重复的问题的缘由。
负载均衡
RocketMQ不保证消息不重复,若是你的业务须要保证严格的不重复消息,须要你本身在业务端去重。异步
那业务端如何去重呢?原理很简单,步骤以下:分布式
事务消息:消息队列 RocketMQ 提供相似 X/Open XA 的分布事务功能,经过消息队列 RocketMQ 事务消息能达到分布式事务的最终一致。 半消息:暂不能投递的消息,发送方已经将消息成功发送到了消息队列 RocketMQ 服务端,可是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半消息。 消息回查:因为网络闪断、生产者应用重启等缘由,致使某条事务消息的二次确认丢失,消息队列 RocketMQ 服务端经过扫描发现某条消息长期处于“半消息”时,须要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该过程即消息回查。函数
事务消息的适用场景示例: 经过购物车进行下单的流程中,用户入口在购物车系统,交易下单入口在交易系统,两个系统之间的数据须要保持最终一致,这时能够经过事务消息进行处理。交易系统下单以后,发送一条交易下单的消息到消息队列 RocketMQ,购物车系统订阅消息队列 RocketMQ 的交易下单消息,作相应的业务处理,更新购物车数据。
交互流程
消息队列 RocketMQ 事务消息交互流程以下所示: 事务消息
说明:事务消息发送对应步骤 一、二、三、4,事务消息回查对应步骤 五、六、7。
Message message = new Message();
// 在消息属性中添加第一次消息回查的最快时间,单位秒。例如,如下设置实际第一次回查时间为 120 秒 ~ 125 秒之间
message.putUserProperties(PropertyKeyConst.CheckImmunityTimeInSeconds,"120");
// 以上方式只肯定事务消息的第一次回查的最快时间,实际回查时间向后浮动0~5秒;如第一次回查后事务仍未提交,后续每隔5秒回查一次。
复制代码
同步发送是指消息发送方发出数据后,会在收到接收方发回响应以后才发下一个数据包的通信方式。
异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通信方式。 MQ 的异步发送,须要用户实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不须要等待服务器响应便可返回,进行第二条消息发送。发送方经过回调接口接收服务器响应,并对响应结果进行处理。
单向(Oneway)发送特色为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。 此方式发送消息的过程耗时很是短,通常在微秒级别。
RocketMQ采起长轮询+PULL模式保证消息的持久性
消息队列 RocketMQ 支持如下两种订阅方式:
集群订阅:同一个 Group ID 所标识的全部 Consumer 平均分摊消费消息。 例如某个 Topic 有 9 条消息,一个 Group ID 有 3 个 Consumer 实例,那么在集群消费模式下每一个实例平均分摊,只消费其中的 3 条消息。
// 集群订阅方式设置(不设置的状况下,默认为集群订阅方式)
properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
复制代码
广播订阅:同一个 Group ID 所标识的全部 Consumer 都会各自消费某条消息一次。 例如某个 Topic 有 9 条消息,一个 Group ID 有 3 个 Consumer 实例,那么在广播消费模式下每一个实例都会各自消费 9 条消息。
// 广播订阅方式设置
properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
复制代码
- 定时消息:Producer 将消息发送到消息队列 RocketMQ 服务端,但并不指望这条消息立马投递,而是推迟到在当前时间点以后的某一个时间投递到 Consumer 进行消费,该消息即定时消息。
- 延时消息:Producer 将消息发送到消息队列 RocketMQ 服务端,但并不指望这条消息立马投递,而是延迟必定时间后才投递到 Consumer 进行消费,该消息即延时消息。
定时消息与延时消息在代码配置上存在一些差别,可是最终达到的效果相同:消息在发送到消息队列 RocketMQ 服务端后并不会立马投递,而是根据消息中的属性延迟固定时间后才投递给消费者。
定时消息和延时消息适用于如下一些场景: