RocketMQ实战(二)

在上一篇《RocketMQ实战(一)》中已经为你们初步介绍了下RocketMQ以及搭建了双Master环境,接下来继续为你们介绍!网络

Quick Start

写一个简单的生产者、消费者,带你们快速体验RocketMQ~app


Maven配置:负载均衡

wKioL1j0xAiTk3gjAAAS3SZK_Ow748.png


生产者:ide

wKiom1j0xJnDiVJfAAB1w6cONE4056.png

消费者:
ui


wKioL1j0xLWSH6jSAACcSdQYm7Y283.png

不管生产者、消费者都必须给出GroupName,并且具备惟一性!spa

生产到哪一个Topic的哪一个Tag下,消费者也是从Topic的哪一个Tag进行消费,可见这个Tag有点相似于JMS Selector机制,即实现消息的过滤。线程

生产者、消费者须要设置NameServer地址。3d

这里,采用的是Consumer Push的方式,即设置Listener机制回调,至关于开启了一个线程。之后为你们介绍Consumer Pull的方式。日志


咱们看一下运行结果:orm

wKioL1j0xPaCxhzcAABchYPgkjQ427.png

仔细看看生产者结果输出,你会发现,有的消息发往broker-a,有的在broker-b上,自动实现了消息的负载均衡!

wKiom1j0xRiSK1UjAAAomhW9rLk688.png


这里消费消息是没有什么顺序的,之后咱们在来谈消息的顺序性。

咱们再来看一看管控台:

wKioL1j0xUqyT_TYAABlcVWCvNc900.png

wKiom1j0xWawZ7e-AAB4z7SttCE144.png

在多Master模式中,若是某个Master进程挂了,显然这台broker将不可用,上面的消息也将没法消费,要知道开源版本的RocketMQ是没有提供切换程序,来自动恢复故障的,所以在实际开发中,咱们通常提供一个监听程序,用于监控Master的状态。

在ActiveMQ中,生产消息的时候会提供是否持久化的选择,可是对于RocketMQ而言,消息是必定会被持久化的!

上面的消费者采用的是Push Consumer的方式,那么监听的Listener中的消息List究竟是多少条呢?虽然提供了API,如consumer.setConsumeMessageBatchMaxSize(10),实际上即便设置了批量的条数,可是注意了,是最大是10,并不意味着每次batch的都是10,只有在消息有挤压的状况下才有可能。并且Push Consumer的最佳实践方式就是一条条的消费,若是须要batch,可使用Pull Consumer。

务必保证先启动消费者进行Topic订阅,而后在启动生产者进行生产(不然极有可能致使消息的重复消费,重复消费,重复消费!重要的事情说三遍!关于消息的重复问题后续给你们介绍~)。并且在实际开发中,有时候不会批量的处理消息,而是原子性的,单线程的去一条一条的处理消息,这样就是实时的在处理消息。(批量的处理海量的消息,能够考虑Kafka)


初步了解消息失败重试机制

消息失败,无非涉及到2端:从生产者端发往MQ的失败;消费者端从MQ消费消息的失败;

生产者端的失败重试

wKiom1j0xbmwIoXyAABfKnUa1Es539.png


生产者端的消息失败:好比网络抖动致使生产者发送消息到MQ失败。

上图代码示例的处理手段是:若是该条消息在1S内没有发送成功,那么重试3次。


消费者端的失败重试

消费者端的失败,分为2种状况,一个是timeout,一个是exception

timeout,好比因为网络缘由致使消息压根就没有从MQ到消费者上,在RocketMQ内部会不断的尝试发送这条消息,直至发送成功为止!(好比集群中一个broker失败,就尝试另外一个broker)

exception,消息正常的到了消费者,结果消费者发生异常,处理失败了。这里涉及到一些问题,须要咱们思考下,好比,消费者消费消息的状态有哪些定义?若是失败,MQ将采起什么策略进行重试?假设一次性批量PUSH了10条,其中某条数据消费异常,那么消息重试是10条呢,仍是1条呢?并且在重试的过程当中,须要保证不重复消费吗?

wKiom1j0xhizGpa1AAAh4Zdnhz8872.png


消息消费的状态,有2种,一个是成功(CONSUME_SUCCESS),一个是失败&稍后重试(RECONSUME_LATER)


wKioL1j0xlKgxHKiAAAsLq17Zts611.png


在启动broker的过程当中,能够观察下日志,你会发现RECONSUME_LATER的策略。

若是消费失败,那么1S后再次消费,若是失败,那么5S后,再次消费,......直至2H后若是消费还失败,那么该条消息就会终止发送给消费者了!

RocketMQ为咱们提供了这么屡次数的失败重试,可是在实际中也许咱们并不须要这么多重试,好比重试3次,尚未成功,咱们但愿把这条消息存储起来并采用另外一种方式处理,并且但愿RocketMQ不要在重试呢,由于重试解决不了问题了!这该如何作呢?


咱们先来看一下一条消息MessageExt对象的输出:

MessageExt [queueId=0, storeSize=137, queueOffset=0, sysFlag=0, bornTimestamp=1492213846916, bornHost=/192.168.99.219:50478, storeTimestamp=1492213846981, storeHost=/192.168.99.121:10911, msgId=C0A8637900002A9F0000000000000000, commitLogOffset=0, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest2, flag=0, properties={TAGS=TagA, WAIT=true, MAX_OFFSET=3, MIN_OFFSET=0}, body=16]]

注意到reconsumeTimes属性,这个属性就表明消息重试的次数!来看一段代码:

wKioL1j0xozDthM0AACJ7SsgW48295.png


注意了,对于消费消息而言,存在2种指定的状态(成功 OR 失败重试),若是一条消息在消费端处理没有返回这2个状态,那么至关于这条消息没有达到消费者,势必会再次发送给消费者!也便是消息的处理必须有返回值,不然就进行重发。


自然的消息负载均衡及高效的水平扩展机制

wKiom1j0xrbRP7mCAABGU6_EqJ0970.png


对于RocketMQ而言,经过ConsumeGroup的机制,实现了自然的消息负载均衡!通俗点来讲,RocketMQ中的消息经过ConsumeGroup实现了将消息分发到C1/C2/C3/......的机制,这意味着咱们将很是方便的经过加机器来实现水平扩展!

咱们考虑一下这种状况:好比C2发生了重启,一条消息发往C3进行消费,可是这条消息的处理须要0.1S,而此时C2恰好完成重启,那么C2是否可能会收到这条消息呢?答案是确定的,也就是consume broker的重启,或者水平扩容,或者不遵照先订阅后生产消息,均可能致使消息的重复消费!关于去重的话题会在后续中予以介绍!

至于消息分发到C1/C2/C3,其实也是能够设置策略的。


wKiom1j0xtvQMhDcAAA1kCxs4nM128.png


集群消费 AND 广播消费

RocketMQ的消费方式有2种,在默认状况下,就是集群消费,也就是上面说起的消息的负载均衡消费。另外一种消费模式,是广播消费。广播消费,相似于ActiveMQ中的发布订阅模式,消息会发给Consume Group中的每个消费者进行消费。

wKioL1j0xyey-URcAAARlvOeTPQ481.png


wKiom1j0x0LjPJi7AAAOo-VBsQ8858.png


OK,到这里,本期的RocketMQ就结束了,我们下期见~

相关文章
相关标签/搜索