随着对消息队列的应用日益推广,在分布式系统中的使用能够极大的下降对各个组件间的耦合度,从而提升组件的处理效率。由于消息队列的存在,可使咱们对任务进行异步处理,这样能够减小请求响应时间和解耦。同时因为使用了消息队列,只要保证消息格式不变,消息的发送方和接收方并不须要彼此联系,也不须要受对方的影响,即解耦和。
所谓解耦,就是说A 系统产生一条数据,发送到 MQ 里面去,哪一个系统须要数据本身去 MQ 里面消费。若是新系统须要数据,直接从 MQ 里消费便可;若是某个系统不须要这条数据了,就取消对 MQ 消息的消费便可。这样下来,A 系统压根儿不须要去考虑要给谁发送数据,不须要维护这个代码,也不须要考虑人家是否调用成功、失败超时等状况。
所谓异步,那么 A 系统连续发送 3 条消息到 MQ 队列中,假如耗时 5ms,A 系统从接受一个请求到返回响应给用户,总时长是 3 + 5 = 8ms,对于用户而言,其实感受上就是点个按钮,8ms 之后就直接返回了,爽!网站作得真好,真快!
此外使用消息队列还有削峰的优点。所谓削峰,即在某些时刻,用户会大量的对咱们的服务发起请求,咱们的数据库有时候须要对这些请求进行写入,可是呢,mysql的吞吐量顶破天就5000,剩下的就要慢慢等了,并且当并发量太高的时候,数据库的各类异常也会让人抓狂,可是呢,咱们使用消息队列就不同了,用户的各类请求统统塞入消息队列里面,以后由消息队列返回处理结果,而请求存储在队列里面,一个个按顺序消费,使请求写入不出现高峰低谷
基于这些有点,咱们开发团队最近在spring boot的开发过程当中,因为项目的须要咱们进行消息队列的接入改造。
在改造过程当中遇到了这样的问题,起初我将注解写在了class上,在运行的过程当中会出现异常,如下是异常的详细内容:html
2019-05-31 17:42:36.798 WARN 30544 --- [cTaskExecutor-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.java
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1506)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1417)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.amqp.AmqpException: No method found for class java.util.LinkedHashMap
at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getHandlerForPayload(DelegatingInvocableHandler.java:147)
at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getMethodNameFor(DelegatingInvocableHandler.java:250)
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.getMethodAsString(HandlerAdapter.java:70)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:190)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:120)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414)
... 8 common frames omittedmysql
至于说开发的源码,我是这么写的,我在class这里进行注解,这个时候我猜想,应该是注解的位置不对spring
@Component @RabbitListener(queues = "xx.yy.zz") public class Receiver { @RabbitHandler public void process(MSGSTO message) { System.out.println("消费消息"); System.out.println(message.toString()); } }
事实上,确实是位置不对,但更加专业的解答方式是,这个listener注解是方法级别上的,而不能用在class上面,咱们不妨来看下RabbitListener的源码,从根本上理解这个方法的使用。sql
package org.springframework.amqp.rabbit.annotation; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; import java.lang.annotation.Repeatable; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; import org.springframework.messaging.handler.annotation.MessageMapping; @Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE }) @Retention(RetentionPolicy.RUNTIME) @MessageMapping @Documented @Repeatable(RabbitListeners.class) public @interface RabbitListener { String id() default ""; String containerFactory() default ""; String[] queues() default {}; Queue[] queuesToDeclare() default {}; boolean exclusive() default false; String priority() default ""; String admin() default ""; QueueBinding[] bindings() default {}; String group() default ""; String returnExceptions() default ""; String errorHandler() default ""; String concurrency() default ""; String autoStartup() default ""; }
因为业务须要,咱们确实是须要对消息进行异步处理,而异步接收消息的最简单的方法是使用带注解的监听端点基础结构。简而言之,它容许将托管bean的方法公开为Rabbit listener的端点。br/>在这里,使用queues属性时,能够指定关联的容器能够监听多个队列。可使用@Header注释来建立POJO方法可接收消息的队列名称。
这里我经过queues来指定监听的队列数据库
@Component public class Receiver { @RabbitListener(queues = "xx.yy.zz") @RabbitHandler public void process(MSGSTO message) { System.out.println("消费消息"); System.out.println(message.toString());
至于说配置方式,我是经过application.yml的形式进行接入配置的,例如并发
rabbitmq: addresses: 127.0.0.1 port: 5672 username: guest password: guest publisher-confirms: true publisher-returns: true virtual-host: dev listener: simple: concurrency: 10 max-concurrency: 20
这些属性会被注入到RabbitProperties属性中,如app
@ConfigurationProperties(prefix = "spring.rabbitmq") public class RabbitProperties { … }
挺有趣的对吧:)异步
参考资料:分布式