RocketMQ是一个分布式开放消息中间件,底层基于队列模型来实现消息收发功能。RocketMQ集群中包含4个模块:Namesrv, Broker, Producer, Consumer。html
各个模块功能关系参考博客:https://www.cnblogs.com/wxd0108/p/6041829.html架构
功能架构部署图:app
启动Namesrv,Namesrv起来后监听端口,等待Broker、Produer、Consumer连上来,至关于一个路由控制中心。异步
Broker启动,跟全部的Namesrv保持长链接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储全部topic信息。注册成功后,namesrv集群中就有Topic跟Broker的映射关系。分布式
收发消息前,先建立topic,建立topic时须要指定该topic要存储在哪些Broker上。也能够在发送消息时自动建立Topic。ide
Producer发送消息,启动时先跟Namesrv集群中的其中一台创建长链接,并从Namesrv中获取当前发送的Topic存在哪些Broker上,而后跟对应的Broker创建长链接,直接向Broker发消息。性能
示例代码:日志
这里用InitializingBean
, DisposableBean
来管理mq的生命周期,InitializingBean
用来初始化mq配置信息,DisposableBean
在mq执行完成后用来销毁bean。code
@Component public class CancelDisplayProducer implements InitializingBean, DisposableBean { private static final Logger logger= LoggerFactory.getLogger(CancelDisplayProducer.class); private DefaultMQProducer defaultMQProducer; @Value("${crk.topic}") private String topicName; @Value("${crk.nameServer}") private String nameServer; @Value(("${crk.groupName}")) private String groupName; public SendResult sendCancelDisplayMq(String tag, String msg, Object primaryKey, Object hashVal){ logger.info("发送取消延时队列消息内容{}",msg); Message rocketMsg = null; com.alibaba.rocketmq.client.producer.SendResult sendResult = null; try { rocketMsg = new Message(topicName, tag, primaryKey + "", msg.getBytes("UTF-8")); //设置该消息延迟1s发送 rocketMsg.setDelayTimeLevel(1); sendResult = defaultMQProducer.send(rocketMsg, new MessageQueueSelector() { //发送顺序消息 @Override public MessageQueue select(List<MessageQueue> list, Message message, Object obj) { int hashCode = obj.hashCode(); if(hashCode < 0) { hashCode = Math.abs(hashCode); } int index = hashCode % list.size(); return list.get(index); } }, hashVal); if(sendResult != null && sendResult.getSendStatus() == SendStatus.SEND_OK) { logger.info("发送取消延时队列消息成功,发送内容:{},keys:{}", msg, primaryKey); } } catch (Exception e) { logger.error("发送取消延时队列消息异常【{}】", e); } return sendResult; } @Override public void destroy() throws Exception { defaultMQProducer.shutdown(); } @Override public void afterPropertiesSet() throws Exception { logger.info("groupName=" + groupName); logger.info("nameServer=" + nameServer); //初始化 defaultMQProducer = new DefaultMQProducer(); defaultMQProducer.setNamesrvAddr(nameServer); defaultMQProducer.setProducerGroup(groupName); defaultMQProducer.setRetryTimesWhenSendFailed(5); defaultMQProducer.setInstanceName("openCarCancelDisplayInstance"); //设置超时时间为5s defaultMQProducer.setSendMsgTimeout(5000); defaultMQProducer.start(); logger.info("DefaultMQProudcer start success!"); } } //***调用生产者发送消息*** cancelDisplayProducer.sendCancelDisplayMq("cancleDisplay",JSONObject.toJSONString(bodyJson),orderNo,orderNo);
Rocketmq可以保证消息严格顺序,可是Rocketmq须要producer保证顺序消息按顺序发送到同一个queue中,好比购买流程(1)下单(2)支付(3)支付成功,htm
这三个消息须要根据特定规则将这个三个消息按顺序发送到一个queue
Producer端确保消息顺序惟一要作的事情就是将消息路由到特定的分区(这里的分区能够理解为不一样的队列),在RocketMQ中,经过MessageQueueSelector来实现分区的选择。
如何实现把顺序消息发送到同一个queue:
通常消息是经过轮询全部队列发送的,顺序消息能够根据业务好比说订单号orderId相同的消息发送到同一个队列, 或者同一用户userId发送到同一队列等等
messageQueueList [orderId%messageQueueList.size()] messageQueueList [userId%messageQueueList.size()]
示例代码:
@Component public class CancelDisplayConsumer implements InitializingBean, DisposableBean { private static final String CANCEL_DISPLAY_GROUP_NAME="cancle_display_consumer_group"; private static final String CANCLE_DISPLAY_INSTANCE_NAME="cancle_display_consumer_instance"; private static final Logger logger= LoggerFactory.getLogger(CancelDisplayConsumer.class); private DefaultMQPushConsumer consumer; @Autowired private CancelDisplayProducer cancelDisplayProducer; @Autowired private IComTransChannelConfigService comTransChannelConfigService; @Value("${crk.nameServer}") private String nameServer; @Value("${crk.topic}") private String topicName; @Autowired private IHongqiOrderMappingService hongqiOrderMappingService; @Override public void destroy() throws Exception { consumer.shutdown(); logger.info("订单取消延时队列消费消息关闭"); } @Override public void afterPropertiesSet() throws Exception { try { consumer = new DefaultMQPushConsumer(CANCEL_DISPLAY_GROUP_NAME); consumer.setNamesrvAddr(nameServer); consumer.setInstanceName(CANCLE_DISPLAY_INSTANCE_NAME); consumer.subscribe(topicName, "*"); consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) { for(MessageExt messageExt : list) { logger.info("消费取消延迟消息start:{}", list); String body = new String(messageExt.getBody()); JSONObject bodyJson = JSONObject.parseObject(body); String orderNo = bodyJson.getString("orderNo"); String channel=bodyJson.getString("channel"); MDC.put("traceId", messageExt.getMsgId()); //逻辑代码忽略......... return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); } catch (MQClientException e) { logger.error("消费延迟取消消息consume启动异常:{}",e); } } }
分别从Producer发送机制、Broker的持久化机制,以及消费者的offSet机制来最大程度保证消息不易丢失
默认状况下,能够经过同步的方式阻塞式的发送,check SendStatus,状态是OK,表示消息必定成功的投递到了Broker,状态超时或者失败,则会触发默认的2次重试。此方法的发送结果,可能Broker存储成功了,也可能没成功
采起事务消息的投递方式,并不能保证消息100%投递成功到了Broker,可是若是消息发送Ack失败的话,此消息会存储在CommitLog当中,可是对ConsumerQueue是不可见的。能够在日志中查看到这条异常的消息,严格意义上来说,也并无彻底丢失
2.Broker自身支持同步刷盘、异步刷盘的策略,能够保证接收到的消息必定存储在本地的内存中
消费者能够根据自身的策略批量Pull消息
Consumer自身维护一个持久化的offset(对应MessageQueue里面的min offset),标记已经成功消费或者已经成功发回到broker的消息下标
若是Consumer消费失败,那么它会把这个消息发回给Broker,发回成功后,再更新本身的offset
若是Consumer和broker一块儿挂了,消息也不会丢失,由于consumer 里面的offset是定时持久化的,重启以后,继续拉取offset以前的消息到本地
RocketMQ 中, 一 种类型的消息会放到 一 个 Topic 里,为了可以并行, 通常一个 Topic 会有多个 Message Queue (也能够 设置成一个), Offset是指某个 Topic下的一条消息在某个 Message Queue里的 位置,经过 Offset的值能够定位到这条消息,或者指示 Consumer从这条消息 开始向后继续处理。
Offset主要分为本地文件类型和 Broker代存的类型两种。
Rocketmq集群有两种消费模式
默认是 CLUSTERING 模式,也就是同一个 Consumer group 里的多个消费者每人消费一部分,各自收到的消息内容不同。 这种状况下,由 Broker 端存储和控制 Offset 的值,使用 RemoteBrokerOffsetStore 结构。
BROADCASTING模式下,每一个 Consumer 都收到这个 Topic 的所有消息,各个 Consumer 间相互没有干扰, RocketMQ 使用 LocalfileOffsetStore,把 Offset存到本地。
原文来自:http://suo.im/5wYoLH