这将是RocketMQ实战系列的最后一篇文章,该系列的文章列表以下:并发
在上一篇博客中,已经知道RocketMQ 3.0.8是支持事务回查机制,可是在RocketMQ 3.2.6中取消了这个功能,下面咱们继续以转帐功能分析咱们本身如何解决这个问题。性能
在正常状况下,固然没有问题,若是第五步(向MQ发送确认消息)出现失败,加上RocketMQ 3.2.6版本没有事务回查机制,就会致使这条转帐消息,在A银行完成了操做,可是迟迟对B银行系统不可见!ui
用户U1从A银行系统转帐给B银行系统的用户U2的处理过程以下:spa
第一步:A银行系统生成一条转帐消息,以事务消息的方式写入RocketMQ,此时B银行系统不可见这条消息线程
第二步:写入MQ成功后,回调A银行系统,对T1,T2表进行操做(很显然须要是一个事务)orm
咱们重点关注下T2表,这个表是用来干吗的呢?每条转帐消息都会在T2表中,该表有2个特殊的字段:status,updatetime。(用途会在后文详述)blog
第三步:完成第二步,接下来发送确认消息给MQ,若是这个确认消息发送成功,那么这条转帐消息,将对B银行系统可见。而后B银行系统,会在一个事务中完成对t3,t5的操做。
若是发送确认消息给MQ失败的处理思路:
首先,B银行系统,有一个定时任务(好比说每隔1MIN执行一次),扫描表t5,取得一段时间内的数据,发送给A银行系统。要知道t5中的数据,必然是A银行系统成功处理并发送确认消息成功的转帐数据。为何要发送给A银行系统呢,其实就是为了找到那些发送确认消息失败的转帐数据。那么怎么发给A银行系统呢,这个方式比较多,能够考虑在来一个Topic,也能够考虑Netty等。发送给A银行系统,其实就是为了更新t2表的status,updatetime。
这里有一个关键,如何“扫描表t5,取得一段时间内的数据”?这就是t4的做用,在t4中记录一个time字段,每次定时任务启动,先更新time(好比设定为当前系统时间,设置前的的时间为old),而后扫描出t5中大于这个old时间的转帐数据,如此循环往复。
其次,A银行系统,也有一个定时任务(能够根据业务消费能力定,能够大一些),扫描t2表(指定status及updatetime条件),将那些确认消息发送失败的转帐消息找出来,更新updatetime并发送给MQ。
这样,咱们并无改动RocketMQ 3.2.6的源码,而是在外围解决了事务回查!
其实到这里,你能够发现RocketMQ的一个特色,就是将生产者和MQ绑定,而不须要特别处理消费者,这是为何呢?由于消息只要发往RocketMQ成功,那么就意味着成功,为何这么说?
前面,咱们说过,消费者端消费消息只会产生2种错误,第一:timeout,第二:exception。要知道RocketMQ对于超时,会不断重试;对于消费异常,会根据消费端的返回码,会有重试机制保证。也就是,RocketMQ必定会让消息获得消费,若是消费有问题,只能是消费者的问题,而不会是RocketMQ的问题!
在前面的博客已经提到,在RocketMQ中Consumer分为2类:Push Consumer、Pull Consumer。之前的例子都是Push Consumer,接下来,为你们介绍下Pull Consumer。
从表面意思上来看,好像Push是MQ推送给消费者,而Pull是消费者从MQ中拉取;其实本质上都是拉取模式PULL,即消费者从MQ中轮询取得消息。
在Push模式下,Consumer把轮询过程封装了,并注册了MessageListener监听器,取到消息后,唤醒MessageListener监听器中的consumeMessage()进行消费,因此给咱们形成了感受上好像是“推消息”。
在Pull模式下,须要特别注意的是,本质上是从一个Topic下的全部Queue进行拉取,并且每一个Queue都必须记录拉取位置,不然会致使重复消费。还有拉取的时间间隔,拉取的大小等等。不过全部的这一切,MQPullConsumerScheduleService都替咱们考虑清楚了,提供updateConsumeOffset去更新消费的队列的位置(默认5S同步一次),提供setPullNextDelayTimeMillis设置下次拉取的时间间隔(应该设置的大一些,至少大于5S)。
仔细回想下,对于Push方式的回调 和 Pull方式的回调,还有什么关键区别么?
对于Push而言,不管是基于MessageListenerConcurrently的,仍是基于MessageListenerOrderly的,都有返回值的;而Pull的doPullTask的返回值倒是void?
这意味,咱们须要在pull方式中,注意本身处理每条消息消费的异常状况!
经过运行结果,能够印证上面的观点:为何每次消费都是4条开始,4条结束呢?由于一个Topic下有4个Queue,并且上面的代码实际上会针对每一个Queue开启一个线程去消费!
对于ActiveMQ而言,咱们能够经过JMS Selectors机制(就是相似于SQL的语法)来实现过滤,很easy。那么和RocketMQ Filter组件有什么区别呢?
虽然,2者都能实现过滤,可是RocketMQ Filter的性能要更高效些,由于RocketMQ是在broker上将过滤后的数据发往filter,而后消费者直接从filter上取得数据;而ActiveMQ是消费者直接在broker上进行过滤消费!(固然,对于RocketMQ而言,Tag机制已经足够应付平常绝大数的过滤功能,除非你的业务对性能有特别高的要求)
具体怎么作呢?这里我就不演示了,网上有不少例子,这里只说下大体的过程:
第一:broker-xxx.properties中指定filter个数
第二:上传一段JAVA代码,其实就是一个类