RocketMQ使用总结 转载

分布式开放消息系统(RocketMQ)的原理与实践

 

分布式消息系统做为实现分布式系统可扩展、可伸缩性的关键组件,须要具备高吞吐量、高可用等特色。而谈到消息系统的设计,就回避不了两个问题:java

  1. 消息的顺序问题
  2. 消息的重复问题

RocketMQ做为阿里开源的一款高性能、高吞吐量的消息中间件,它是怎样来解决这两个问题的?RocketMQ 有哪些关键特性?其实现原理是怎样的?mysql

关键特性以及其实现原理git

1、顺序消息github

消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了 3 条消息,分别是订单建立、订单付款、订单完成。消费时,要按照这个顺序消费才有意义。但同时订单之间又是能够并行消费的。算法

假如生产者产生了2条消息:M一、M2,要保证这两条消息的顺序,应该怎样作?你脑中想到的多是这样:sql

你可能会采用这种方式保证消息顺序数据库


M1发送到S1后,M2发送到S2,若是要保证M1先于M2被消费,那么须要M1到达消费端后,通知S2,而后S2再将M2发送到消费端。缓存

这个模型存在的问题是,若是M1和M2分别发送到两台Server上,就不能保证M1先达到,也就不能保证M1被先消费,那么就须要在MQ Server集群维护消息的顺序。那么如何解决?一种简单的方式就是将M一、M2发送到同一个Server上:安全

保证消息顺序,你改进后的方法服务器


这样能够保证M1先于M2到达MQServer(客户端等待M1成功后再发送M2),根据先达到先被消费的原则,M1会先于M2被消费,这样就保证了消息的顺序。

这个模型,理论上能够保证消息的顺序,但在实际运用中你应该会遇到下面的问题:

网络延迟问题

只要将消息从一台服务器发往另外一台服务器,就会存在网络延迟问题。如上图所示,若是发送M1耗时大于发送M2的耗时,那么M2就先被消费,仍然不能保证消息的顺序。即便M1和M2同时到达消费端,因为不清楚消费端1和消费端2的负载状况,仍然有可能出现M2先于M1被消费。如何解决这个问题?将M1和M2发往同一个消费者便可,且发送M1后,须要消费端响应成功后才能发送M2。

但又会引入另一个问题,若是发送M1后,消费端1没有响应,那是继续发送M2呢,仍是从新发送M1?通常为了保证消息必定被消费,确定会选择重发M1到另一个消费端2,就以下图所示。

保证消息顺序的正确姿式

这样的模型就严格保证消息的顺序,细心的你仍然会发现问题,消费端1没有响应Server时有两种状况,一种是M1确实没有到达,另一种状况是消费端1已经响应,可是Server端没有收到。若是是第二种状况,重发M1,就会形成M1被重复消费。也就是咱们后面要说的第二个问题,消息重复问题。

回过头来看消息顺序问题,严格的顺序消息很是容易理解,并且处理问题也比较容易,要实现严格的顺序消息,简单且可行的办法就是:

保证生产者 - MQServer - 消费者是一对一对一的关系

可是这样设计,并行度就成为了消息系统的瓶颈(吞吐量不够),也会致使更多的异常处理,好比:只要消费端出现问题,就会致使整个处理流程阻塞,咱们不得不花费更多的精力来解决阻塞的问题。

但咱们的最终目标是要集群的高容错性和高吞吐量。这彷佛是一对不可调和的矛盾,那么阿里是如何解决的?

世界上解决一个计算机问题最简单的方法:“刚好”不须要解决它!

有些问题,看起来很重要,但实际上咱们能够经过合理的设计或者将问题分解来规避。若是硬要把时间花在解决它们身上,其实是浪费的,效率低下的。从这个角度来看消息的顺序问题,咱们能够得出两个结论:

一、不关注乱序的应用实际大量存在
二、队列无序并不意味着消息无序

最后咱们从源码角度分析RocketMQ怎么实现发送顺序消息。

通常消息是经过轮询全部队列来发送的(负载均衡策略),顺序消息能够根据业务,好比说订单号相同的消息发送到同一个队列。下面的示例中,OrderId相同的消息,会发送到同一个队列:

 

// RocketMQ默认提供了两种MessageQueueSelector实现:随机/Hash
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer id = (Integer) arg;
        int index = id % mqs.size();
        return mqs.get(index);
    }
}, orderId);

 

 

在获取到路由信息之后,会根据MessageQueueSelector实现的算法来选择一个队列,同一个OrderId获取到的队列是同一个队列。

 

private SendResult send()  {
    // 获取topic路由信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        MessageQueue mq = null;
        // 根据咱们的算法,选择一个发送队列
        // 这里的arg = orderId
        mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
        if (mq != null) {
            return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
        }
    }
}

 

2、消息重复

上面在解决消息顺序问题时,引入了一个新的问题,就是消息重复。那么RocketMQ是怎样解决消息重复的问题呢?仍是“刚好”不解决。

形成消息的重复的根本缘由是:网络不可达。只要经过网络交换数据,就没法避免这个问题。因此解决这个问题的办法就是不解决,转而绕过这个问题。那么问题就变成了:若是消费端收到两条同样的消息,应该怎样处理?

一、消费端处理消息的业务逻辑保持幂等性
二、保证每条消息都有惟一编号且保证消息处理成功与去重表的日志同时出现

第1条很好理解,只要保持幂等性,无论来多少条重复消息,最后处理的结果都同样。第2条原理就是利用一张日志表来记录已经处理成功的消息的ID,若是新到的消息ID已经在日志表中,那么就再也不处理这条消息。

咱们能够看到第1条的解决方式,很明显应该在消费端实现,不属于消息系统要实现的功能。第2条能够消息系统实现,也能够业务端实现。正常状况下出现重复消息的几率不必定大,且由消息系统实现的话,确定会对消息系统的吞吐量和高可用有影响,因此最好仍是由业务端本身处理消息重复的问题,这也是RocketMQ不解决消息重复的问题的缘由。

RocketMQ不保证消息不重复,若是你的业务须要保证严格的不重复消息,须要你本身在业务端去重。

3、事务消息

RocketMQ除了支持普通消息,顺序消息,另外还支持事务消息。首先讨论一下什么是事务消息以及支持事务消息的必要性。咱们以一个转账的场景为例来讲明这个问题:Bob向Smith转帐100块。

在单机环境下,执行事务的状况,大概是下面这个样子:

单机环境下转帐事务示意图

当用户增加到必定程度,Bob和Smith的帐户及余额信息已经不在同一台服务器上了,那么上面的流程就变成了这样:

集群环境下转帐事务示意图

这时候你会发现,一样是一个转帐的业务,在集群环境下,耗时竟然成倍的增加,这显然是不可以接受的。那咱们如何来规避这个问题?

大事务 = 小事务 + 异步

将大事务拆分红多个小事务异步执行。这样基本上可以将跨机事务的执行效率优化到与单机一致。转帐的事务就能够分解成以下两个小事务:

小事务+异步消息


图中执行本地事务(Bob帐户扣款)和发送异步消息应该保持同时成功或者失败中,也就是扣款成功了,发送消息必定要成功,若是扣款失败了,就不能再发送消息。那问题是:咱们是先扣款仍是先发送消息呢?

首先咱们看下,先发送消息,大体的示意图以下:

事务消息:先发送消息

存在的问题是:若是消息发送成功,可是扣款失败,消费端就会消费此消息,进而向Smith帐户加钱。

先发消息不行,那咱们就先扣款呗,大体的示意图以下:

事务消息-先扣款

存在的问题跟上面相似:若是扣款成功,发送消息失败,就会出现Bob扣钱了,可是Smith帐户未加钱。

可能你们会有不少的方法来解决这个问题,好比:直接将发消息放到Bob扣款的事务中去,若是发送失败,抛出异常,事务回滚。这样的处理方式也符合“刚好”不须要解决的原则。RocketMQ支持事务消息,下面咱们来看看RocketMQ是怎样来实现的。

RocketMQ实现发送事务消息

RocketMQ第一阶段发送Prepared消息时,会拿到消息的地址,第二阶段执行本地事物,第三阶段经过第一阶段拿到的地址去访问消息,并修改状态。细心的你可能又发现问题了,若是确认消息发送失败了怎么办?RocketMQ会按期扫描消息集群中的事物消息,这时候发现了Prepared消息,它会向消息发送者确认,Bob的钱究竟是减了仍是没减呢?若是减了是回滚仍是继续发送确认消息呢?RocketMQ会根据发送端设置的策略来决定是回滚仍是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。

那咱们来看下RocketMQ源码,是否是这样来处理事务消息的。客户端发送事务消息的部分(完整代码请查看:rocketmq-example工程下的com.alibaba.rocketmq.example.transaction.TransactionProducer):

复制代码

复制代码

// 未决事务,MQ服务器回查客户端
// 也就是上文所说的,当RocketMQ发现`Prepared消息`时,会根据这个Listener实现的策略来决断事务
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
// 构造事务消息的生产者
TransactionMQProducer producer = new TransactionMQProducer("groupName");
// 设置事务决断处理类
producer.setTransactionCheckListener(transactionCheckListener);
// 本地事务的处理逻辑,至关于示例中检查Bob帐户并扣钱的逻辑
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
producer.start()
// 构造MSG,省略构造参数
Message msg = new Message(......);
// 发送消息
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
producer.shutdown();

 

 

接着查看sendMessageInTransaction方法的源码,总共分为3个阶段:发送Prepared消息、执行本地事务、发送确认消息。

 

 

public TransactionSendResult sendMessageInTransaction(.....)  {
    // 逻辑代码,非实际代码
    // 1.发送消息
    sendResult = this.send(msg);
    // sendResult.getSendStatus() == SEND_OK
    // 2.若是消息发送成功,处理与消息关联的本地事务单元
    LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
    // 3.结束事务
    this.endTransaction(sendResult, localTransactionState, localException);
}

 

endTransaction方法会将请求发往broker(mq server)去更新事物消息的最终状态:

  1. 根据sendResult找到Prepared消息
  2. 根据localTransaction更新消息的最终状态

若是endTransaction方法执行失败,致使数据没有发送到brokerbroker会有回查线程定时(默认1分钟)扫描每一个存储事务状态的表格文件,若是是已经提交或者回滚的消息直接跳过,若是是prepared状态则会向Producer发起CheckTransaction请求,Producer会调用DefaultMQProducerImpl.checkTransactionState()方法来处理broker的定时回调请求,而checkTransactionState会调用咱们的事务设置的决断方法,最后调用endTransactionOnewaybroker来更新消息的最终状态。

再回到转帐的例子,若是Bob的帐户的余额已经减小,且消息已经发送成功,Smith端开始消费这条消息,这个时候就会出现消费失败和消费超时两个问题?解决超时问题的思路就是一直重试,直到消费端消费消息成功,整个过程当中有可能会出现消息重复的问题,按照前面的思路解决便可。

消费事务消息

这样基本上能够解决超时问题,可是若是消费失败怎么办?阿里提供给咱们的解决方法是:人工解决。你们能够考虑一下,按照事务的流程,由于某种缘由Smith加款失败,须要回滚整个流程。若是消息系统要实现这个回滚流程的话,系统复杂度将大大提高,且很容易出现Bug,估计出现Bug的几率会比消费失败的几率大不少。咱们须要衡量是否值得花这么大的代价来解决这样一个出现几率很是小的问题,这也是你们在解决疑难问题时须要多多思考的地方。

20160321补充:在3.2.6版本中移除了事务消息的实现,因此此版本不支持事务消息,具体状况请参考rocketmq的issues:
https://github.com/alibaba/RocketMQ/issues/65
https://github.com/alibaba/RocketMQ/issues/138
https://github.com/alibaba/RocketMQ/issues/156

4、Producer如何发送消息

Producer轮询某topic下的全部队列的方式来实现发送方的负载均衡,以下图所示:

producer发送消息负载均衡


首先分析一下RocketMQ的客户端发送消息的源码:

 

// 构造Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 初始化Producer,整个应用生命周期内,只须要初始化1次
producer.start();
// 构造Message
Message msg = new Message("TopicTest1",// topic
                        "TagA",// tag:给消息打标签,用于区分一类消息,可为null
                        "OrderID188",// key:自定义Key,能够用于去重,可为null
                        ("Hello MetaQ").getBytes());// body:消息内容
// 发送消息并返回结果
SendResult sendResult = producer.send(msg);
// 清理资源,关闭网络链接,注销本身
producer.shutdown();

复制代码

复制代码

在整个应用生命周期内,生产者须要调用一次start方法来初始化,初始化主要完成的任务有:

  1. 若是没有指定namesrv地址,将会自动寻址
  2. 启动定时任务:更新namesrv地址、从namsrv更新topic路由信息、清理已经挂掉的broker、向全部broker发送心跳...
  3. 启动负载均衡的服务

初始化完成后,开始发送消息,发送消息的主要代码以下:

 

private SendResult sendDefaultImpl(Message msg,......) {
    // 检查Producer的状态是不是RUNNING
    this.makeSureStateOK();
    // 检查msg是否合法:是否为null、topic,body是否为空、body是否超长
    Validators.checkMessage(msg, this.defaultMQProducer);
    // 获取topic路由信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    // 从路由信息中选择一个消息队列
    MessageQueue mq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);
    // 将消息发送到该队列上去
    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
}

 

 

代码中须要关注的两个方法tryToFindTopicPublishInfoselectOneMessageQueue。前面说过在producer初始化时,会启动定时任务获取路由信息并更新到本地缓存,因此tryToFindTopicPublishInfo会首先从缓存中获取topic路由信息,若是没有获取到,则会本身去namesrv获取路由信息。selectOneMessageQueue方法经过轮询的方式,返回一个队列,以达到负载均衡的目的。

若是Producer发送消息失败,会自动重试,重试的策略:

  1. 重试次数 < retryTimesWhenSendFailed(可配置)
  2. 总的耗时(包含重试n次的耗时) < sendMsgTimeout(发送消息时传入的参数)
  3. 同时知足上面两个条件后,Producer会选择另一个队列发送消息

5、消息存储

RocketMQ的消息存储是由consume queuecommit log配合完成的。

一、Consume Queue

consume queue是消息的逻辑队列,至关于字典的目录,用来指定消息在物理文件commit log上的位置。

咱们能够在配置中指定consume queue与commitlog存储的目录
每一个topic下的每一个queue都有一个对应的consumequeue文件,好比:

${rocketmq.home}/store/consumequeue/${topicName}/${queueId}/${fileName}

Consume Queue文件组织,如图所示:

Consume Queue文件组织示意图

  1. 根据topicqueueId来组织文件,图中TopicA有两个队列0,1,那么TopicA和QueueId=0组成一个ConsumeQueue,TopicA和QueueId=1组成另外一个ConsumeQueue。
  2. 按照消费端的GroupName来分组重试队列,若是消费端消费失败,消息将被发往重试队列中,好比图中的%RETRY%ConsumerGroupA
  3. 按照消费端的GroupName来分组死信队列,若是消费端消费失败,并重试指定次数后,仍然失败,则发往死信队列,好比图中的%DLQ%ConsumerGroupA

死信队列(Dead Letter Queue)通常用于存放因为某种缘由没法传递的消息,好比处理失败或者已通过期的消息。

Consume Queue中存储单元是一个20字节定长的二进制数据,顺序写顺序读,以下图所示:

consumequeue文件存储单元格式

  1. CommitLog Offset是指这条消息在Commit Log文件中的实际偏移量
  2. Size存储中消息的大小
  3. Message Tag HashCode存储消息的Tag的哈希值:主要用于订阅时消息过滤(订阅时若是指定了Tag,会根据HashCode来快速查找到订阅的消息)

二、Commit Log

CommitLog:消息存放的物理文件,每台broker上的commitlog被本机全部的queue共享,不作任何区分。
文件的默认位置以下,仍然可经过配置文件修改:

${user.home} \store\${commitlog}\${fileName}

CommitLog的消息存储单元长度不固定,文件顺序写,随机读。消息的存储结构以下表所示,按照编号顺序以及编号对应的内容依次存储。

Commit Log存储单元结构图

三、消息存储实现

消息存储实现,比较复杂,也值得你们深刻了解,后面会单独成文来分析,这小节只以代码说明一下具体的流程。

 

// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
synchronized (this) {
    long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
    // Here settings are stored timestamp, in order to ensure an orderly global
    msg.setStoreTimestamp(beginLockTimestamp);
    // MapedFile:操做物理文件在内存中的映射以及将内存数据持久化到物理文件中
    MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile();
    // 将Message追加到文件commitlog
    result = mapedFile.appendMessage(msg, this.appendMessageCallback);
    switch (result.getStatus()) {
    case PUT_OK:break;
    case END_OF_FILE:
         // Create a new file, re-write the message
         mapedFile = this.mapedFileQueue.getLastMapedFile();
         result = mapedFile.appendMessage(msg, this.appendMessageCallback);
     break;
     DispatchRequest dispatchRequest = new DispatchRequest(
                topic,// 1
                queueId,// 2
                result.getWroteOffset(),// 3
                result.getWroteBytes(),// 4
                tagsCode,// 5
                msg.getStoreTimestamp(),// 6
                result.getLogicsOffset(),// 7
                msg.getKeys(),// 8
                /**
                 * Transaction
                 */
                msg.getSysFlag(),// 9
                msg.getPreparedTransactionOffset());// 10
    // 1.分发消息位置到ConsumeQueue
    // 2.分发到IndexService创建索引
    this.defaultMessageStore.putDispatchRequest(dispatchRequest);
}

 

 

四、消息的索引文件

若是一个消息包含key值的话,会使用IndexFile存储消息索引,文件的内容结构如图:

消息索引


索引文件主要用于根据key来查询消息的,流程主要是:

  1. 根据查询的 key 的 hashcode%slotNum 获得具体的槽的位置(slotNum 是一个索引文件里面包含的最大槽的数目,例如图中所示 slotNum=5000000)
  2. 根据 slotValue(slot 位置对应的值)查找到索引项列表的最后一项(倒序排列,slotValue 老是指向最新的一个索引项)
  3. 遍历索引项列表返回查询时间范围内的结果集(默认一次最大返回的 32 条记录)

6、消息订阅

RocketMQ消息订阅有两种模式,一种是Push模式,即MQServer主动向消费端推送;另一种是Pull模式,即消费端在须要时,主动到MQServer拉取。但在具体实现时,Push和Pull模式都是采用消费端主动拉取的方式。

首先看下消费端的负载均衡:

消费端负载均衡


消费端会经过RebalanceService线程,10秒钟作一次基于topic下的全部队列负载:

  1. 遍历Consumer下的全部topic,而后根据topic订阅全部的消息
  2. 获取同一topic和Consumer Group下的全部Consumer
  3. 而后根据具体的分配策略来分配消费队列,分配的策略包含:平均分配、消费端配置等

如同上图所示:若是有 5 个队列,2 个 consumer,那么第一个 Consumer 消费 3 个队列,第二 consumer 消费 2 个队列。这里采用的就是平均分配策略,它相似于咱们的分页,TOPIC下面的全部queue就是记录,Consumer的个数就至关于总的页数,那么每页有多少条记录,就相似于某个Consumer会消费哪些队列。

经过这样的策略来达到大致上的平均消费,这样的设计也能够很方面的水平扩展Consumer来提升消费能力。

消费端的Push模式是经过长轮询的模式来实现的,就如同下图:

Push模式示意图


Consumer端每隔一段时间主动向broker发送拉消息请求,broker在收到Pull请求后,若是有消息就当即返回数据,Consumer端收到返回的消息后,再回调消费者设置的Listener方法。若是broker在收到Pull请求时,消息队列里没有数据,broker端会阻塞请求直到有数据传递或超时才返回。

固然,Consumer端是经过一个线程将阻塞队列LinkedBlockingQueue<PullRequest>中的PullRequest发送到broker拉取消息,以防止Consumer一致被阻塞。而Broker端,在接收到Consumer的PullRequest时,若是发现没有消息,就会把PullRequest扔到ConcurrentHashMap中缓存起来。broker在启动时,会启动一个线程不停的从ConcurrentHashMap取出PullRequest检查,直到有数据返回。

7、RocketMQ的其余特性

前面的6个特性都是基本上都是点到为止,想要深刻了解,还须要你们多多查看源码,多多在实际中运用。固然除了已经提到的特性外,RocketMQ还支持:

  1. 定时消息
  2. 消息的刷盘策略
  3. 主动同步策略:同步双写、异步复制
  4. 海量消息堆积能力
  5. 高效通讯
  6. .......

其中涉及到的不少设计思路和解决方法都值得咱们深刻研究:

  1. 消息的存储设计:既要知足海量消息的堆积能力,又要知足极快的查询效率,还要保证写入的效率。
  2. 高效的通讯组件设计:高吞吐量,毫秒级的消息投递能力都离不开高效的通讯。
  3. .......

RocketMQ最佳实践

1、Producer最佳实践

一、一个应用尽量用一个 Topic,消息子类型用 tags 来标识,tags 能够由应用自由设置。只有发送消息设置了tags,消费方在订阅消息时,才能够利用 tags 在 broker 作消息过滤。
二、每一个消息在业务层面的惟一标识码,要设置到 keys 字段,方便未来定位消息丢失问题。因为是哈希索引,请务必保证 key 尽量惟一,这样能够避免潜在的哈希冲突。
三、消息发送成功或者失败,要打印消息日志,务必要打印 sendresult 和 key 字段。
四、对于消息不可丢失应用,务必要有消息重发机制。例如:消息发送失败,存储到数据库,能有定时程序尝试重发或者人工触发重发。
五、某些应用若是不关注消息是否发送成功,请直接使用sendOneWay方法发送消息。

2、Consumer最佳实践

一、消费过程要作到幂等(即消费端去重)
二、尽可能使用批量方式消费方式,能够很大程度上提升消费吞吐量。
三、优化每条消息消费过程

3、其余配置

线上应该关闭autoCreateTopicEnable,即在配置文件中将其设置为false

RocketMQ在发送消息时,会首先获取路由信息。若是是新的消息,因为MQServer上面尚未建立对应的Topic,这个时候,若是上面的配置打开的话,会返回默认TOPIC的(RocketMQ会在每台broker上面建立名为TBW102的TOPIC)路由信息,而后Producer会选择一台Broker发送消息,选中的broker在存储消息时,发现消息的topic尚未建立,就会自动建立topic。后果就是:之后全部该TOPIC的消息,都将发送到这台broker上,达不到负载均衡的目的。

因此基于目前RocketMQ的设计,建议关闭自动建立TOPIC的功能,而后根据消息量的大小,手动建立TOPIC。

RocketMQ设计相关

RocketMQ的设计假定:

每台PC机器均可能宕机不可服务
任意集群都有可能处理能力不足
最坏的状况必定会发生
内网环境须要低延迟来提供最佳用户体验

RocketMQ的关键设计:

分布式集群化
强数据安全
海量数据堆积
毫秒级投递延迟(推拉模式)

这是RocketMQ在设计时的假定前提以及须要到达的效果。我想这些假定适用于全部的系统设计。随着咱们系统的服务的增多,每位开发者都要注意本身的程序是否存在单点故障,若是挂了应该怎么恢复、能不能很好的水平扩展、对外的接口是否足够高效、本身管理的数据是否足够安全...... 多多规范本身的设计,才能开发出高效健壮的程序。

附录:RocketMQ涉及到的几个专业术语和总体架构介绍

1、RocketMQ中的专业术语

Topic
topic表示消息的第一级类型,好比一个电商系统的消息能够分为:交易消息、物流消息...... 一条消息必须有一个Topic

Tag
Tag表示消息的第二级类型,好比交易消息又能够分为:交易建立消息,交易完成消息..... 一条消息能够没有Tag。RocketMQ提供2级消息分类,方便你们灵活控制。

Queue
一个topic下,咱们能够设置多个queue(消息队列)。当咱们发送消息时,须要要指定该消息的topic。RocketMQ会轮询该topic下的全部队列,将消息发送出去。

Producer 与 Producer Group
Producer表示消息队列的生产者。消息队列的本质就是实现了publish-subscribe模式,生产者生产消息,消费者消费消息。因此这里的Producer就是用来生产和发送消息的,通常指业务系统。

Producer Group是一类Producer的集合名称,这类Producer一般发送一类消息,且发送逻辑一致。

Consumer 与 Consumer Group
消息消费者,通常由后台系统异步消费消息。

Push Consumer
Consumer 的一种,应用一般向 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象马上回调 Listener 接口方法。
Pull Consumer
Consumer 的一种,应用一般主动调用 Consumer 的拉消息方法从 Broker 拉消息,主动权由应用控制。

Consumer Group是一类Consumer的集合名称,这类Consumer一般消费一类消息,且消费逻辑一致。

Broker
消息的中转者,负责存储和转发消息。能够理解为消息队列服务器,提供了消息的接收、存储、拉取和转发服务。broker是RocketMQ的核心,它不不能挂的,因此须要保证broker的高可用。

广播消费
一条消息被多个Consumer消费,即便这些Consumer属于同一个Consumer Group,消息也会被Consumer Group中的每一个Consumer都消费一次。在广播消费中的Consumer Group概念能够认为在消息划分方面无心义。

集群消费
一个Consumer Group中的Consumer实例平均分摊消费消息。例如某个Topic有 9 条消息,其中一个Consumer Group有 3 个实例(多是 3 个进程,或者 3 台机器),那么每一个实例只消费其中的 3 条消息。

NameServer
NameServer即名称服务,两个功能:

  1. 接收broker的请求,注册broker的路由信息
  2. 接口client的请求,根据某个topic获取其到broker的路由信息
    NameServer没有状态,能够横向扩展。每一个broker在启动的时候会到NameServer注册;Producer在发送消息前会根据topicNameServer获取路由(到broker)信息;Consumer也会定时获取topic路由信息。

2、RocketMQ Overview

rocketmq overview


Producer向一些队列轮流发送消息,队列集合称为TopicConsumer若是作广播消费,则一个consumer实例消费这个Topic对应的全部队列;若是作集群消费,则多个Consumer实例平均消费这个Topic对应的队列集合。

再看下RocketMQ物理部署结构图:

RocketMQ网络部署图


RocketMQ网络部署特色:

    1. Name Server 是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
    2. Broker部署相对复杂,Broker分为MasterSlave,一个Master能够对应多个Slave,可是一个Slave只能对应一个MasterMasterSlave的对应关系经过指定相同的BrokerName,不一样的BrokerId来定义,BrokerId=0表示Master,非0表示SlaveMaster也能够部署多个。每一个BrokerName Server集群中的全部节点创建长链接,定时注册Topic信息到全部Name Server
    3. ProducerName Server集群中的其中一个节点(随机选择)创建长链接,按期从Name Server取Topic路由信息,并向提供Topic 服务的Master创建长链接,且定时向Master发送心跳。Producer 彻底无状态,可集群部署。
    4. ConsumerName Server集群中的其中一个节点(随机选择)创建长链接,按期从Name Server取Topic 路由信息,并向提供Topic服务的Master、Slave创建长链接,且定时向Master、Slave发送心跳。Consumer既能够从Master订阅消息,也能够从Slave订阅消息,订阅规则由Broker配置决定。

 

 

 

 

RocketMQ事务消费和顺序消费详解

1、RocketMq有3中消息类型

1.普通消费

2. 顺序消费

3.事务消费

  • 顺序消费场景

在网购的时候,咱们须要下单,那么下单须要假若有三个顺序,第1、建立订单 ,第二:订单付款,第三:订单完成。也就是这个三个环节要有顺序,这个订单才有意义。RocketMQ能够保证顺序消费。

  • rocketMq实现顺序消费的原理

 produce在发送消息的时候,把消息发到同一个队列(queue)中,消费者注册消息监听器为MessageListenerOrderly,这样就能够保证消费端只有一个线程去消费消息

注意:是把把消息发到同一个队列(queue),不是同一个topic,默认状况下一个topic包括4个queue

单个节点(Producer端1个、Consumer端1个)

一、Producer.java 

复制代码

复制代码

package order;  
  
import java.util.List;  
  
import com.alibaba.rocketmq.client.exception.MQBrokerException;  
import com.alibaba.rocketmq.client.exception.MQClientException;  
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;  
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;  
import com.alibaba.rocketmq.client.producer.SendResult;  
import com.alibaba.rocketmq.common.message.Message;  
import com.alibaba.rocketmq.common.message.MessageQueue;  
import com.alibaba.rocketmq.remoting.exception.RemotingException;  
  
/** 
 * Producer,发送顺序消息 
 */  
public class Producer {  
    public static void main(String[] args) {  
        try {  
            DefaultMQProducer producer = new DefaultMQProducer("order_Producer");  
            producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
  
            producer.start();  
  
            // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD",  
            // "TagE" };  
  
            for (int i = 1; i <= 5; i++) {  
  
                Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes());  
  
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {  
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {  
                        Integer id = (Integer) arg;  
                        int index = id % mqs.size();  
                        return mqs.get(index);  
                    }  
                }, 0);  
  
                System.out.println(sendResult);  
            }  
  
            producer.shutdown();  
        } catch (MQClientException e) {  
            e.printStackTrace();  
        } catch (RemotingException e) {  
            e.printStackTrace();  
        } catch (MQBrokerException e) {  
            e.printStackTrace();  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
    }  
}

复制代码

复制代码

二、Consumer.java

复制代码

复制代码

package order;   
import java.util.List;  
import java.util.concurrent.TimeUnit;  
import java.util.concurrent.atomic.AtomicLong;   
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;  
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;  
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;  
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;  
import com.alibaba.rocketmq.client.exception.MQClientException;  
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;  
import com.alibaba.rocketmq.common.message.MessageExt;  
  
/** 
 * 顺序消息消费,带事务方式(应用可控制Offset何时提交) 
 */  
public class Consumer1 {  
  
    public static void main(String[] args) throws MQClientException {  
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");  
        consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
  
        /** 
         * 设置Consumer第一次启动是从队列头部开始消费仍是队列尾部开始消费<br> 
         * 若是非第一次启动,那么按照上次消费的位置继续消费 
         */  
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
  
        consumer.subscribe("TopicOrderTest", "*");  
  
        consumer.registerMessageListener(new MessageListenerOrderly() {  
            AtomicLong consumeTimes = new AtomicLong(0);  
  
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {  
                // 设置自动提交  
                context.setAutoCommit(true);  
                for (MessageExt msg : msgs) {  
                    System.out.println(msg + ",内容:" + new String(msg.getBody()));  
                }  
  
                try {  
                    TimeUnit.SECONDS.sleep(5L);  
                } catch (InterruptedException e) {  
  
                    e.printStackTrace();  
                }  
                ;  
  
                return ConsumeOrderlyStatus.SUCCESS;  
            }  
        });  
  
        consumer.start();  
  
        System.out.println("Consumer1 Started.");  
    }  
  
}

复制代码

复制代码

结果以下图所示:

这个五条数据被顺序消费了

  • 多个节点(Producer端1个、Consumer端2个)

Producer.java

复制代码

复制代码

package order;  
  
import java.util.List;  
  
import com.alibaba.rocketmq.client.exception.MQBrokerException;  
import com.alibaba.rocketmq.client.exception.MQClientException;  
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;  
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;  
import com.alibaba.rocketmq.client.producer.SendResult;  
import com.alibaba.rocketmq.common.message.Message;  
import com.alibaba.rocketmq.common.message.MessageQueue;  
import com.alibaba.rocketmq.remoting.exception.RemotingException;  
  
/** 
 * Producer,发送顺序消息 
 */  
public class Producer {  
    public static void main(String[] args) {  
        try {  
            DefaultMQProducer producer = new DefaultMQProducer("order_Producer");  
            producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
  
            producer.start();  
  
            // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD",  
            // "TagE" };  
  
            for (int i = 1; i <= 5; i++) {  
  
                Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes());  
  
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {  
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {  
                        Integer id = (Integer) arg;  
                        int index = id % mqs.size();  
                        return mqs.get(index);  
                    }  
                }, 0);  
  
                System.out.println(sendResult);  
            }  
            for (int i = 1; i <= 5; i++) {  
  
                Message msg = new Message("TopicOrderTest", "order_2", "KEY" + i, ("order_2 " + i).getBytes());  
  
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {  
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {  
                        Integer id = (Integer) arg;  
                        int index = id % mqs.size();  
                        return mqs.get(index);  
                    }  
                }, 1);  
  
                System.out.println(sendResult);  
            }  
            for (int i = 1; i <= 5; i++) {  
  
                Message msg = new Message("TopicOrderTest", "order_3", "KEY" + i, ("order_3 " + i).getBytes());  
  
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {  
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {  
                        Integer id = (Integer) arg;  
                        int index = id % mqs.size();  
                        return mqs.get(index);  
                    }  
                }, 2);  
  
                System.out.println(sendResult);  
            }  
  
            producer.shutdown();  
        } catch (MQClientException e) {  
            e.printStackTrace();  
        } catch (RemotingException e) {  
            e.printStackTrace();  
        } catch (MQBrokerException e) {  
            e.printStackTrace();  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
    }  
}

复制代码

复制代码

Consumer1.java

 

复制代码

复制代码

/** 
 * 顺序消息消费,带事务方式(应用可控制Offset何时提交) 
 */  
public class Consumer1 {  
  
    public static void main(String[] args) throws MQClientException {  
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");  
        consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
  
        /** 
         * 设置Consumer第一次启动是从队列头部开始消费仍是队列尾部开始消费<br> 
         * 若是非第一次启动,那么按照上次消费的位置继续消费 
         */  
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
  
        consumer.subscribe("TopicOrderTest", "*");  
          
        /** 
         * 实现了MessageListenerOrderly表示一个队列只会被一个线程取到  
         *,第二个线程没法访问这个队列 
         */  
        consumer.registerMessageListener(new MessageListenerOrderly() {  
            AtomicLong consumeTimes = new AtomicLong(0);  
  
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {  
                // 设置自动提交  
                context.setAutoCommit(true);  
                for (MessageExt msg : msgs) {  
                    System.out.println(msg + ",内容:" + new String(msg.getBody()));  
                }  
  
                try {  
                    TimeUnit.SECONDS.sleep(5L);  
                } catch (InterruptedException e) {  
  
                    e.printStackTrace();  
                }  
                ;  
  
                return ConsumeOrderlyStatus.SUCCESS;  
            }  
        });  
  
        consumer.start();  
  
        System.out.println("Consumer1 Started.");  
    }  
  
}

复制代码

复制代码

Consumer2.java

复制代码

复制代码

/** 
 * 顺序消息消费,带事务方式(应用可控制Offset何时提交) 
 */  
public class Consumer2 {  
  
    public static void main(String[] args) throws MQClientException {  
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");  
        consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
  
        /** 
         * 设置Consumer第一次启动是从队列头部开始消费仍是队列尾部开始消费<br> 
         * 若是非第一次启动,那么按照上次消费的位置继续消费 
         */  
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
  
        consumer.subscribe("TopicOrderTest", "*");  
          
        /** 
         * 实现了MessageListenerOrderly表示一个队列只会被一个线程取到  
         *,第二个线程没法访问这个队列 
         */  
        consumer.registerMessageListener(new MessageListenerOrderly() {  
            AtomicLong consumeTimes = new AtomicLong(0);  
  
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {  
                // 设置自动提交  
                context.setAutoCommit(true);  
                for (MessageExt msg : msgs) {  
                    System.out.println(msg + ",内容:" + new String(msg.getBody()));  
                }  
  
                try {  
                    TimeUnit.SECONDS.sleep(5L);  
                } catch (InterruptedException e) {  
  
                    e.printStackTrace();  
                }  
                ;  
  
                return ConsumeOrderlyStatus.SUCCESS;  
            }  
        });  
  
        consumer.start();  
  
        System.out.println("Consumer2 Started.");  
    }  
  
}

复制代码

复制代码

先启动Consumer1和Consumer2,而后启动Producer,Producer会发送15条消息
Consumer1消费状况如图,都按照顺序执行了




Consumer2消费状况如图,都按照顺序执行了

2、事务消费

这里说的主要是分布式事物。下面的例子的数据库分别安装在不一样的节点上。

事物消费须要先说说什么是事务。好比说:咱们跨行转帐,从工商银行转到建设银行,也就是我从工商银行扣除1000元以后,个人建设银行也必须加1000元。这样才能保证数据的一致性。假如工商银行转1000元以后,建设银行的服务器忽然宕机,那么我扣除了1000,可是并无在建设银行给我加1000,就出现了数据的不一致。所以加1000和减1000才行,减1000和减1000必须一块儿成功,一块儿失败。

再好比,咱们进行网购的时候,咱们下单以后,订单提交成功,仓库商品的数量必须减一。可是订单多是一个数据库,仓库数量可能又是在另个数据库里面。有可能订单提交成功以后,仓库数量服务器忽然宕机。这样也出现了数据不一致的问题。

使用消息队列来解决分布式事物:

如今咱们去外面饭店吃饭,不少时候都不会直接给了钱以后直接在付款的窗口递饭菜,而是付款以后他会给你一张小票,你拿着这个小票去出饭的窗口取饭。这里和咱们的系统相似,提升了吞吐量。即便你到第二个窗口,师傅告诉你已经没饭了,你能够拿着这个凭证去退款,即便中途因为出了意外你没法到达窗口进行取饭,可是只要凭证还在,能够将钱退给你。这样就保证了数据的一致性。

如何保证凭证(消息)有2种方法:

一、在工商银行扣款的时候,余额表扣除1000,同时记录日志,并且这2个表是在同一个数据库实例中,可使用本地事物解决。而后咱们通知建设银行须要加1000给该用户,建设银行收到以后给我返回已经加了1000给用户的确认信息以后,我再标记日志表里面的日志为已经完成。

二、经过消息中间件

原文地址:http://www.jianshu.com/p/453c6e7ff81c

 

RocketMQ第一阶段发送Prepared消息时,会拿到消息的地址,第二阶段执行本地事物,第三阶段经过第一阶段拿到的地址去访问消息,并修改消息的状态。

细心的你可能又发现问题了,若是确认消息发送失败了怎么办?RocketMQ会按期扫描消息集群中的事物消息,若是发现了Prepared消息,它会向消息发送端(生产者)确认,Bob的钱究竟是减了仍是没减呢?若是减了是回滚仍是继续发送确认消息呢?RocketMQ会根据发送端设置的策略来决定是回滚仍是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。

例子:

Consumer.java

 

 

复制代码

复制代码

package transaction;  
  
import java.util.List;  
  
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;  
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;  
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;  
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;  
import com.alibaba.rocketmq.client.exception.MQClientException;  
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;  
import com.alibaba.rocketmq.common.message.MessageExt;  
  
/** 
 * Consumer,订阅消息 
 */  
public class Consumer {  
  
    public static void main(String[] args) throws InterruptedException, MQClientException {  
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_Consumer");  
        consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
        consumer.setConsumeMessageBatchMaxSize(10);  
        /** 
         * 设置Consumer第一次启动是从队列头部开始消费仍是队列尾部开始消费<br> 
         * 若是非第一次启动,那么按照上次消费的位置继续消费 
         */  
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
  
        consumer.subscribe("TopicTransactionTest", "*");  
  
        consumer.registerMessageListener(new MessageListenerConcurrently() {  
  
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  
  
                try {  
  
                    for (MessageExt msg : msgs) {  
                        System.out.println(msg + ",内容:" + new String(msg.getBody()));  
                    }  
  
                } catch (Exception e) {  
                    e.printStackTrace();  
  
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试  
  
                }  
  
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功  
            }  
        });  
  
        consumer.start();  
  
        System.out.println("transaction_Consumer Started.");  
    }  
}

复制代码

复制代码

Producer.java

 

  

复制代码

复制代码

package transaction;  
  
import com.alibaba.rocketmq.client.exception.MQClientException;  
import com.alibaba.rocketmq.client.producer.SendResult;  
import com.alibaba.rocketmq.client.producer.TransactionCheckListener;  
import com.alibaba.rocketmq.client.producer.TransactionMQProducer;  
import com.alibaba.rocketmq.common.message.Message;  
  
/** 
 * 发送事务消息例子 
 *  
 */  
public class Producer {  
    public static void main(String[] args) throws MQClientException, InterruptedException {  
  
        TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();  
        TransactionMQProducer producer = new TransactionMQProducer("transaction_Producer");  
        producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
        // 事务回查最小并发数  
        producer.setCheckThreadPoolMinSize(2);  
        // 事务回查最大并发数  
        producer.setCheckThreadPoolMaxSize(2);  
        // 队列数  
        producer.setCheckRequestHoldMax(2000);  
        producer.setTransactionCheckListener(transactionCheckListener);  
        producer.start();  
  
        // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", "TagE"  
        // };  
        TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();  
        for (int i = 1; i <= 2; i++) {  
            try {  
                Message msg = new Message("TopicTransactionTest", "transaction" + i, "KEY" + i,  
                        ("Hello RocketMQ " + i).getBytes());  
                SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);  
                System.out.println(sendResult);  
  
                Thread.sleep(10);  
            } catch (MQClientException e) {  
                e.printStackTrace();  
            }  
        }  
  
        for (int i = 0; i < 100000; i++) {  
            Thread.sleep(1000);  
        }  
  
        producer.shutdown();  
  
    }  
}

TransactionExecuterImpl .java --执行本地事务

 

package transaction;  
  
import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;  
import com.alibaba.rocketmq.client.producer.LocalTransactionState;  
import com.alibaba.rocketmq.common.message.Message;  
  
/** 
 * 执行本地事务 
 */  
public class TransactionExecuterImpl implements LocalTransactionExecuter {  
    // private AtomicInteger transactionIndex = new AtomicInteger(1);  
  
    public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {  
  
        System.out.println("执行本地事务msg = " + new String(msg.getBody()));  
  
        System.out.println("执行本地事务arg = " + arg);  
  
        String tags = msg.getTags();  
  
        if (tags.equals("transaction2")) {  
            System.out.println("======个人操做============,失败了  -进行ROLLBACK");  
            return LocalTransactionState.ROLLBACK_MESSAGE;  
        }  
        return LocalTransactionState.COMMIT_MESSAGE;  
        // return LocalTransactionState.UNKNOW;  
    }  
}

TransactionCheckListenerImpl--未决事务,服务器回查客户端(目前已经被阉割啦)

 

 

package transaction;  
  
import com.alibaba.rocketmq.client.producer.LocalTransactionState;  
import com.alibaba.rocketmq.client.producer.TransactionCheckListener;  
import com.alibaba.rocketmq.common.message.MessageExt;  
  
/** 
 * 未决事务,服务器回查客户端 
 */  
public class TransactionCheckListenerImpl implements TransactionCheckListener {  
    // private AtomicInteger transactionIndex = new AtomicInteger(0);  
  
    //在这里,咱们能够根据由MQ回传的key去数据库查询,这条数据究竟是成功了仍是失败了。  
    public LocalTransactionState checkLocalTransactionState(MessageExt msg) {  
        System.out.println("未决事务,服务器回查客户端msg =" + new String(msg.getBody().toString()));  
        // return LocalTransactionState.ROLLBACK_MESSAGE;  
  
        return LocalTransactionState.COMMIT_MESSAGE;  
  
        // return LocalTransactionState.UNKNOW;  
    }  
}

 

producer端:发送数据到MQ,而且处理本地事物。这里模拟了一个成功一个失败。Consumer只会接收到本地事物成功的数据。第二个数据失败了,不会被消费。


 

Consumer只会接收到一个,第二个数据不会被接收到

 

 

 

 

分布式消息队列RocketMQ&Kafka -- 消息的“顺序消费”

在说到消息中间件的时候,咱们一般都会谈到一个特性:消息的顺序消费问题。这个问题看起来很简单:Producer发送消息1, 2, 3。。。 Consumer按1, 2, 3。。。顺序消费。

但实际状况倒是:不管RocketMQ,仍是Kafka,缺省都不保证消息的严格有序消费!

这个特性看起来很简单,但为何缺省他们都不保证呢?

 

“严格的顺序消费”有多么困难

下面就从3个方面来分析一下,对于一个消息中间件来讲,”严格的顺序消费”有多么困难,或者说不可能。

发送端

发送端不能异步发送,异步发送在发送失败的状况下,就没办法保证消息顺序。

好比你连续发了1,2,3。 过了一会,返回结果1失败,2, 3成功。你把1再从新发送1遍,这个时候顺序就乱掉了。

存储端

对于存储端,要保证消息顺序,会有如下几个问题: 
(1)消息不能分区。也就是1个topic,只能有1个队列。在Kafka中,它叫作partition;在RocketMQ中,它叫作queue。 若是你有多个队列,那同1个topic的消息,会分散到多个分区里面,天然不能保证顺序。

(2)即便只有1个队列的状况下,会有第2个问题。该机器挂了以后,可否切换到其余机器?也就是高可用问题。

好比你当前的机器挂了,上面还有消息没有消费完。此时切换到其余机器,可用性保证了。但消息顺序就乱掉了。

要想保证,一方面要同步复制,不能异步复制;另1方面得保证,切机器以前,挂掉的机器上面,全部消息必须消费完了,不能有残留。很明显,这个很难!!!

接收端

对于接收端,不能并行消费,也即不能开多线程或者多个客户端消费同1个队列。

总结

从上面的分析能够看出,要保证消息的严格有序,有多么困难!

发送端和接收端的问题,还好解决一点,限制异步发送,限制并行消费。但对于存储端,机器挂了以后,切换的问题,就很难解决了。

你切换了,可能消息就会乱;你不切换,那就暂时不可用。这2者之间,就须要权衡了。

业务须要全局有序吗?

经过上面分析能够看出,要保证一个topic内部,消息严格的有序,是很困难的,或者说条件是很苛刻的。

那怎么办呢?咱们必定要使出全部力气、用尽全部办法,来保证消息的严格有序吗?

这里就须要从另一个角度去考虑这个问题:业务角度。正如在下面这篇博客中所说的: 
http://www.jianshu.com/p/453c6e7ff81c

实际状况中: 
(1)不关注顺序的业务大量存在; 
(2) 队列无序不表明消息无序。

第(2)条的意思是说:咱们不保证队列的全局有序,但能够保证消息的局部有序。

举个例子:保证来自同1个order id的消息,是有序的!

下面就看一下在Kafka和RocketMQ中,分别是如何对待这个问题的:

Kafka中:发送1条消息的时候,能够指定(topic, partition, key) 3个参数。partiton和key是可选的。

若是你指定了partition,那就是全部消息发往同1个partition,就是有序的。而且在消费端,Kafka保证,1个partition只能被1个consumer消费。

或者你指定key(好比order id),具备同1个key的全部消息,会发往同1个partition。也是有序的。

RocketMQ: RocketMQ在Kafka的基础上,把这个限制更放宽了一步。只指定(topic, key),不指定具体发往哪一个队列。也就是说,它更加不但愿业务方,非要去要一个全局的严格有序。

 

 

消费模式

1、集群消费

以前的博客中,启动的都是单个Consumer,若是启动多个呢?

 

RocketMQ-集群消费 
RocketMQ-集群消费

 

其实,对于RocketMQ而言,经过ConsumeGroup的机制,实现了自然的消息负载均衡!通俗点来讲,RocketMQ中的消息经过ConsumeGroup实现了将消息分发到C1/C2/C3/……的机制,这意味着咱们将很是方便的经过加机器来实现水平扩展!

咱们考虑一下这种状况:好比C2发生了重启,一条消息发往C3进行消费,可是这条消息的处理须要0.1S,而此时C2恰好完成重启,那么C2是否可能会收到这条消息呢?答案是确定的,也就是consume broker的重启,或者水平扩容,或者不遵照先订阅后生产消息,均可能致使消息的重复消费!关于去重的话题会在后续中予以介绍!

至于消息分发到C1/C2/C3,其实也是能够设置策略的: 

RocketMQ-消息负载策略 
RocketMQ-消息负载策略

 

使用哪一种策略,只须要实例化对应的对象便可,如:

AllocateMessageQueueStrategy aqs = new AllocateMessageQueueAveragelyByCircle();
consumer.setAllocateMessageQueueStrategy(aqs);

上面内容,实际上是一种消费模式——集群消费。 
RocketMQ的消费模式有2种,查看一下源码:

public enum MessageModel {
    /**
     * broadcast
     */
    BROADCASTING,
    /**
     * clustering
     */
    CLUSTERING;
}

 

在默认状况下,就是集群消费(CLUSTERING),也就是上面说起的消息的负载均衡消费。另外一种消费模式,是广播消费(BROADCASTING)。


2、广播消费

广播消费,相似于ActiveMQ中的发布订阅模式,消息会发给Consume Group中的每个消费者进行消费。 

RocketMQ-广播消费模式设置 
RocketMQ-广播消费模式设置

 

/**
 * Consumer,订阅消息
 */
public class Consumer2 {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");
        consumer.setNamesrvAddr("192.168.2.222:9876;192.168.2.223:9876");
        consumer.setConsumeMessageBatchMaxSize(10);

        // 设置为广播消费模式
        consumer.setMessageModel(MessageModel.BROADCASTING);

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicTest", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                try {
                    for (MessageExt msg : msgs) {
                        System.out.println(" Receive New Messages: " + msg);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;   // 重试
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;       // 成功
            }
        });

        consumer.start();
        System.out.println("Consumer Started.");
    }
}

 


内容补充

《RocketMQ(三)——HelloWorld》那篇博客的最后提到了单批次消息消费数量 ,本文既然提到了集群消费,那就针对这两个内容再进行一次补充吧。 
若是咱们有2台节点,Producerw往MQ上写入20条数据 其中Consumer1中拉取了12条 。Consumer2中拉取了8 条,这种状况下,假如Consumer1宕机,那么咱们消费数据的时候,只能消费到Consumer2中的8条,Consumer1中的12条已经持久化了。须要Consumer1恢复以后这12条数据才能继续被消费。其实这种先启动producer往MQ上写数据,而后再启动Consumer的状况原本就是违规操做,正确的状况应该是先启动Consumer后再启动producer。

 

 

集群消费和广播消费

基本概念

MQ 是基于发布订阅模型的消息系统。在 MQ 消息系统中消息的订阅方订阅关注的 Topic,以获取并消费消息。因为订阅方应用通常是分布式系统,以集群方式部署有多台机器。所以 MQ 约定如下概念。

集群:MQ 约定使用相同 Consumer ID 的订阅者属于同一个集群,同一个集群下的订阅者消费逻辑必须彻底一致(包括 Tag 的使用),这些订阅者在逻辑上能够认为是一个消费节点。

集群消费:当使用集群消费模式时,MQ 认为任意一条消息只须要被集群内的任意一个消费者处理便可。

广播消费:当使用广播消费模式时,MQ 会将每条消息推送给集群内全部注册过的客户端,保证消息至少被每台机器消费一次。

场景对比

集群消费模式:

cl

适用场景&注意事项

  • 消费端集群化部署,每条消息只须要被处理一次。
  • 因为消费进度在服务端维护,可靠性更高。
  • 集群消费模式下,每一条消息都只会被分发到一台机器上处理,若是须要被集群下的每一台机器都处理,请使用广播模式。
  • 集群消费模式下,不保证消息的每一次失败重投等逻辑都能路由到同一台机器上,所以处理消息时不该该作任何肯定性假设。

广播消费模式:

bd

适用场景&注意事项

  • 每条消息都须要被相同逻辑的多台机器处理。
  • 消费进度在客户端维护,出现重复的几率稍大于集群模式。
  • 广播模式下,MQ 保证每条消息至少被每台客户端消费一次,可是并不会对消费失败的消息进行失败重投,所以业务方须要关注消费失败的状况。
  • 广播模式下,第一次启动时默认从最新消息消费,客户端的消费进度是被持久化在客户端本地的隐藏文件中,所以不建议删除该隐藏文件,不然会丢失部分消息。
  • 广播模式下,每条消息都会被大量的客户端重复处理,所以推荐尽量使用集群模式。
  • 目前仅 Java 客户端支持广播模式。
  • 广播模式下服务端不维护消费进度,因此服务端不提供堆积查询和报警功能。

使用集群模式模拟广播:

cd-bd

适用场景&注意事项

  • 每条消息都须要被多台机器处理,每台机器的逻辑能够相同也能够不同。
  • 消费进度在服务端维护,可靠性高于广播模式。
相关文章
相关标签/搜索