RocketMQ实战(四)

前言

这将是RocketMQ实战系列的最后一篇文章,该系列的文章列表以下:并发

《RocketMQ实战(一)》app

《RocketMQ实战(二)》
分布式

《RocketMQ实战(三):分布式事务》ide

RocketMQ 3.2.6的事务机制

在上一篇博客中,已经知道RocketMQ 3.0.8是支持事务回查机制,可是在RocketMQ 3.2.6中取消了这个功能,下面咱们继续以转帐功能分析咱们本身如何解决这个问题。性能

wKiom1kJ1cPRn6RcAABWBYYjcKA754.jpg


在正常状况下,固然没有问题,若是第五步(向MQ发送确认消息)出现失败,加上RocketMQ 3.2.6版本没有事务回查机制,就会致使这条转帐消息,在A银行完成了操做,可是迟迟对B银行系统不可见!ui

wKiom1kJ1gSAXzelAAB6Q70jL3M558.png

用户U1从A银行系统转帐给B银行系统的用户U2的处理过程以下:spa

第一步:A银行系统生成一条转帐消息,以事务消息的方式写入RocketMQ,此时B银行系统不可见这条消息线程

第二步:写入MQ成功后,回调A银行系统,对T1,T2表进行操做(很显然须要是一个事务)orm

咱们重点关注下T2表,这个表是用来干吗的呢?每条转帐消息都会在T2表中,该表有2个特殊的字段:status,updatetime。(用途会在后文详述)blog

第三步:完成第二步,接下来发送确认消息给MQ,若是这个确认消息发送成功,那么这条转帐消息,将对B银行系统可见。而后B银行系统,会在一个事务中完成对t3,t5的操做。

若是发送确认消息给MQ失败的处理思路:

首先,B银行系统,有一个定时任务(好比说每隔1MIN执行一次),扫描表t5,取得一段时间内的数据,发送给A银行系统。要知道t5中的数据,必然是A银行系统成功处理并发送确认消息成功的转帐数据。为何要发送给A银行系统呢,其实就是为了找到那些发送确认消息失败的转帐数据。那么怎么发给A银行系统呢,这个方式比较多,能够考虑在来一个Topic,也能够考虑Netty等。发送给A银行系统,其实就是为了更新t2表的status,updatetime。

这里有一个关键,如何“扫描表t5,取得一段时间内的数据”?这就是t4的做用,在t4中记录一个time字段,每次定时任务启动,先更新time(好比设定为当前系统时间,设置前的的时间为old),而后扫描出t5中大于这个old时间的转帐数据,如此循环往复。

其次,A银行系统,也有一个定时任务(能够根据业务消费能力定,能够大一些),扫描t2表(指定status及updatetime条件),将那些确认消息发送失败的转帐消息找出来,更新updatetime并发送给MQ。

这样,咱们并无改动RocketMQ 3.2.6的源码,而是在外围解决了事务回查!


其实到这里,你能够发现RocketMQ的一个特色,就是将生产者和MQ绑定,而不须要特别处理消费者,这是为何呢?由于消息只要发往RocketMQ成功,那么就意味着成功,为何这么说?

前面,咱们说过,消费者端消费消息只会产生2种错误,第一:timeout,第二:exception。要知道RocketMQ对于超时,会不断重试;对于消费异常,会根据消费端的返回码,会有重试机制保证。也就是,RocketMQ必定会让消息获得消费,若是消费有问题,只能是消费者的问题,而不会是RocketMQ的问题!



Pull Or Push

在前面的博客已经提到,在RocketMQ中Consumer分为2类:Push Consumer、Pull Consumer。之前的例子都是Push Consumer,接下来,为你们介绍下Pull Consumer。

wKioL1kJ1l-Q8Be4AAAhA9Onb10883.png


wKiom1kJ1nuz0T0dAADIcid4cW4589.png


从表面意思上来看,好像Push是MQ推送给消费者,而Pull是消费者从MQ中拉取;其实本质上都是拉取模式PULL,即消费者从MQ中轮询取得消息。

在Push模式下,Consumer把轮询过程封装了,并注册了MessageListener监听器,取到消息后,唤醒MessageListener监听器中的consumeMessage()进行消费,因此给咱们形成了感受上好像是“推消息”。

在Pull模式下,须要特别注意的是,本质上是从一个Topic下的全部Queue进行拉取,并且每一个Queue都必须记录拉取位置,不然会致使重复消费。还有拉取的时间间隔,拉取的大小等等。不过全部的这一切,MQPullConsumerScheduleService都替咱们考虑清楚了,提供updateConsumeOffset去更新消费的队列的位置(默认5S同步一次),提供setPullNextDelayTimeMillis设置下次拉取的时间间隔(应该设置的大一些,至少大于5S)。

仔细回想下,对于Push方式的回调   和  Pull方式的回调,还有什么关键区别么?

对于Push而言,不管是基于MessageListenerConcurrently的,仍是基于MessageListenerOrderly的,都有返回值的;而Pull的doPullTask的返回值倒是void?

这意味,咱们须要在pull方式中,注意本身处理每条消息消费的异常状况!


wKioL1kJ1qjCh3XgAABfya_2KQ0041.png


经过运行结果,能够印证上面的观点:为何每次消费都是4条开始,4条结束呢?由于一个Topic下有4个Queue,并且上面的代码实际上会针对每一个Queue开启一个线程去消费!


RocketMQ Filter组件介绍

对于ActiveMQ而言,咱们能够经过JMS Selectors机制(就是相似于SQL的语法)来实现过滤,很easy。那么和RocketMQ Filter组件有什么区别呢?

虽然,2者都能实现过滤,可是RocketMQ Filter的性能要更高效些,由于RocketMQ是在broker上将过滤后的数据发往filter,而后消费者直接从filter上取得数据;而ActiveMQ是消费者直接在broker上进行过滤消费!(固然,对于RocketMQ而言,Tag机制已经足够应付平常绝大数的过滤功能,除非你的业务对性能有特别高的要求)


wKioL1kJ1uaBn0l2AABatuiY9e8836.png


具体怎么作呢?这里我就不演示了,网上有不少例子,这里只说下大体的过程:

第一:broker-xxx.properties中指定filter个数 

第二:上传一段JAVA代码,其实就是一个类


到这里,整个RocketMQ实战系列就结束呢,你学到了么,体会到RocketMQ的强大了么?

See u next blog!

相关文章
相关标签/搜索