RocketMq架构原理和使用总结

RocketMQ是一个分布式开放消息中间件,底层基于队列模型来实现消息收发功能。RocketMQ集群中包含4个模块:Namesrv, Broker, Producer, Consumer。html

RocketMq架构原理和使用总结

主要功能

  • 削峰填谷(主要解决瞬时写压力大于应用服务能力致使消息丢失、系统奔溃等问题)
  • 系统解耦(解决不一样重要程度、不一样能力级别系统之间依赖致使一死全死)
  • 提高性能(当存在一对多调用时,能够发一条消息给消息系统,让消息系统通知相关系统)
  • 蓄流压测(线上有些链路很差压测,能够经过堆积必定量消息再放开来压测)

各个模块的做用

  • Namesrv: 存储当前集群全部Brokers信息、Topic跟Broker的对应关系。
  • Broker: 集群最核心模块,主要负责Topic消息存储、消费者的消费位点管理(消费进度)。
  • Producer: 消息生产者,每一个生产者都有一个ID(编号),多个生产者实例能够共用同一个ID。同一个ID下全部实例组成一个生产者集群。
  • Consumer: 消息消费者,每一个订阅者也有一个ID(编号),多个消费者实例能够共用同一个ID。同一个ID下全部实例组成一个消费者集群。

各个模块功能关系参考博客:https://www.cnblogs.com/wxd0108/p/6041829.html架构

功能架构部署图:app

RocketMq架构原理和使用总结

MQ集群工做流程

  1. 启动Namesrv,Namesrv起来后监听端口,等待Broker、Produer、Consumer连上来,至关于一个路由控制中心。异步

  2. Broker启动,跟全部的Namesrv保持长链接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储全部topic信息。注册成功后,namesrv集群中就有Topic跟Broker的映射关系。分布式

  3. 收发消息前,先建立topic,建立topic时须要指定该topic要存储在哪些Broker上。也能够在发送消息时自动建立Topic。ide

  4. Producer发送消息,启动时先跟Namesrv集群中的其中一台创建长链接,并从Namesrv中获取当前发送的Topic存在哪些Broker上,而后跟对应的Broker创建长链接,直接向Broker发消息。性能

  5. Consumer跟Producer相似。跟其中一台Namesrv创建长链接,获取当前订阅Topic存在哪些Broker上,而后直接跟Broker创建链接通道,开始消费消息。

Producer

示例代码:日志

这里用InitializingBean, DisposableBean来管理mq的生命周期,InitializingBean用来初始化mq配置信息,DisposableBean 在mq执行完成后用来销毁bean。code

@Component
public class CancelDisplayProducer implements InitializingBean, DisposableBean {

    private static final Logger logger= LoggerFactory.getLogger(CancelDisplayProducer.class);

    private DefaultMQProducer defaultMQProducer;
    @Value("${crk.topic}")
    private String topicName;

    @Value("${crk.nameServer}")
    private String nameServer;

    @Value(("${crk.groupName}"))
    private String groupName;

    public SendResult sendCancelDisplayMq(String tag, String msg, Object primaryKey, Object hashVal){
        logger.info("发送取消延时队列消息内容{}",msg);
        Message rocketMsg = null;
        com.alibaba.rocketmq.client.producer.SendResult sendResult = null;
        try {
            rocketMsg =  new Message(topicName, tag, primaryKey + "", msg.getBytes("UTF-8"));
            //设置该消息延迟1s发送
            rocketMsg.setDelayTimeLevel(1);
            sendResult = defaultMQProducer.send(rocketMsg, new MessageQueueSelector() {
            //发送顺序消息
                @Override
                public MessageQueue select(List<MessageQueue> list, Message message, Object obj) {
                    int hashCode = obj.hashCode();
                    if(hashCode < 0) {
                        hashCode = Math.abs(hashCode);
                    }
                    int index = hashCode % list.size();
                    return list.get(index);
                }
            }, hashVal);
            if(sendResult != null && sendResult.getSendStatus() == SendStatus.SEND_OK) {
                logger.info("发送取消延时队列消息成功,发送内容:{},keys:{}", msg, primaryKey);
            }
        } catch (Exception e) {
            logger.error("发送取消延时队列消息异常【{}】", e);
        }
        return sendResult;

    }

    @Override
    public void destroy() throws Exception {
        defaultMQProducer.shutdown();
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        logger.info("groupName=" + groupName);
        logger.info("nameServer=" + nameServer);
        //初始化
        defaultMQProducer = new DefaultMQProducer();
        defaultMQProducer.setNamesrvAddr(nameServer);
        defaultMQProducer.setProducerGroup(groupName);
        defaultMQProducer.setRetryTimesWhenSendFailed(5);
        defaultMQProducer.setInstanceName("openCarCancelDisplayInstance");
        //设置超时时间为5s
        defaultMQProducer.setSendMsgTimeout(5000);
        defaultMQProducer.start();
        logger.info("DefaultMQProudcer start success!");
    }
}

//***调用生产者发送消息***
cancelDisplayProducer.sendCancelDisplayMq("cancleDisplay",JSONObject.toJSONString(bodyJson),orderNo,orderNo);

Producer顺序发送

Rocketmq可以保证消息严格顺序,可是Rocketmq须要producer保证顺序消息按顺序发送到同一个queue中,好比购买流程(1)下单(2)支付(3)支付成功,htm

这三个消息须要根据特定规则将这个三个消息按顺序发送到一个queue
Producer端确保消息顺序惟一要作的事情就是将消息路由到特定的分区(这里的分区能够理解为不一样的队列),在RocketMQ中,经过MessageQueueSelector来实现分区的选择。

如何实现把顺序消息发送到同一个queue:

RocketMq架构原理和使用总结

通常消息是经过轮询全部队列发送的,顺序消息能够根据业务好比说订单号orderId相同的消息发送到同一个队列, 或者同一用户userId发送到同一队列等等

messageQueueList [orderId%messageQueueList.size()]

messageQueueList [userId%messageQueueList.size()]

Consumer

示例代码:

@Component
public class CancelDisplayConsumer implements InitializingBean, DisposableBean {

    private static  final String CANCEL_DISPLAY_GROUP_NAME="cancle_display_consumer_group";

    private static  final  String CANCLE_DISPLAY_INSTANCE_NAME="cancle_display_consumer_instance";

    private static  final Logger logger= LoggerFactory.getLogger(CancelDisplayConsumer.class);

    private DefaultMQPushConsumer consumer;

    @Autowired
    private CancelDisplayProducer cancelDisplayProducer;
    @Autowired
    private IComTransChannelConfigService comTransChannelConfigService;
    @Value("${crk.nameServer}")
    private String nameServer;
    @Value("${crk.topic}")
    private String topicName;
    @Autowired
    private IHongqiOrderMappingService hongqiOrderMappingService;
    @Override
    public void destroy() throws Exception {
        consumer.shutdown();
        logger.info("订单取消延时队列消费消息关闭");
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        try {
            consumer = new DefaultMQPushConsumer(CANCEL_DISPLAY_GROUP_NAME);
            consumer.setNamesrvAddr(nameServer);
            consumer.setInstanceName(CANCLE_DISPLAY_INSTANCE_NAME);
            consumer.subscribe(topicName, "*");
            consumer.registerMessageListener(new MessageListenerOrderly() {
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {

                        for(MessageExt messageExt : list) {
                            logger.info("消费取消延迟消息start:{}", list);
                            String body = new String(messageExt.getBody());
                            JSONObject bodyJson = JSONObject.parseObject(body);
                            String orderNo = bodyJson.getString("orderNo");
                            String channel=bodyJson.getString("channel");
                            MDC.put("traceId", messageExt.getMsgId());
                            //逻辑代码忽略.........
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
            consumer.start();
        } catch (MQClientException e) {
            logger.error("消费延迟取消消息consume启动异常:{}",e);
        }
    }
}

如何保证消息不丢失

分别从Producer发送机制、Broker的持久化机制,以及消费者的offSet机制来最大程度保证消息不易丢失

1、producer重试发送消息

  1. 默认状况下,能够经过同步的方式阻塞式的发送,check SendStatus,状态是OK,表示消息必定成功的投递到了Broker,状态超时或者失败,则会触发默认的2次重试。此方法的发送结果,可能Broker存储成功了,也可能没成功

  2. 采起事务消息的投递方式,并不能保证消息100%投递成功到了Broker,可是若是消息发送Ack失败的话,此消息会存储在CommitLog当中,可是对ConsumerQueue是不可见的。能够在日志中查看到这条异常的消息,严格意义上来说,也并无彻底丢失

  3. RocketMQ支持 日志的索引,若是一条消息发送以后超时,也能够经过查询日志的API,来check是否在Broker存储成功

2、broker的持久化机制

  1. 消息支持持久化到Commitlog里面,即便宕机后重启,未消费的消息也是能够加载出来的

2.Broker自身支持同步刷盘、异步刷盘的策略,能够保证接收到的消息必定存储在本地的内存中

  1. Broker集群支持 1主N从的策略,支持同步复制和异步复制的方式,同步复制能够保证即便Master 磁盘崩溃,消息仍然不会丢失

3、消费端的重试机制

消费者能够根据自身的策略批量Pull消息

  1. Consumer自身维护一个持久化的offset(对应MessageQueue里面的min offset),标记已经成功消费或者已经成功发回到broker的消息下标

  2. 若是Consumer消费失败,那么它会把这个消息发回给Broker,发回成功后,再更新本身的offset

  3. 若是Consumer消费失败,发回给broker时,broker挂掉了,那么Consumer会定时重试这个操做

若是Consumer和broker一块儿挂了,消息也不会丢失,由于consumer 里面的offset是定时持久化的,重启以后,继续拉取offset以前的消息到本地

关于offset:

RocketMQ 中, 一 种类型的消息会放到 一 个 Topic 里,为了可以并行, 通常一个 Topic 会有多个 Message Queue (也能够 设置成一个), Offset是指某个 Topic下的一条消息在某个 Message Queue里的 位置,经过 Offset的值能够定位到这条消息,或者指示 Consumer从这条消息 开始向后继续处理。

Offset主要分为本地文件类型和 Broker代存的类型两种。

RocketMq架构原理和使用总结

Rocketmq集群有两种消费模式

默认是 CLUSTERING 模式,也就是同一个 Consumer group 里的多个消费者每人消费一部分,各自收到的消息内容不同。 这种状况下,由 Broker 端存储和控制 Offset 的值,使用 RemoteBrokerOffsetStore 结构。

BROADCASTING模式下,每一个 Consumer 都收到这个 Topic 的所有消息,各个 Consumer 间相互没有干扰, RocketMQ 使用 LocalfileOffsetStore,把 Offset存到本地。

原文来自:http://suo.im/5wYoLH

相关文章
相关标签/搜索