原文:https://blog.csdn.net/linpeng_1/article/details/80505828html
Spring AMQP提供了一个发送和接收消息的操做模板类AmqpTemplate。 AmqpTemplate它定义包含了发送和接收消息等的一些基本的操做功能。RabbitTemplate是AmqpTemplate的一个实现。java
RabbitTemplate支持消息的确认与返回,为了返回消息,RabbitTemplate 须要设置mandatory 属性为true,而且CachingConnectionFactory 的publisherReturns属性也须要设置为true。返回的消息会根据它注册的RabbitTemplate.ReturnCallback setReturnCallback 回调发送到给客户端,spring
一个RabbitTemplate仅能支持一个ReturnCallback 。数据库
为了确认Confirms消息, CachingConnectionFactory 的publisherConfirms 属性也须要设置为true,确认的消息会根据它注册的RabbitTemplate.ConfirmCallback setConfirmCallback回调发送到给客户端。一个RabbitTemplate也仅能支持一个ConfirmCallback.json
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
server.port=8083 #服务器配置 spring.application.name=rabbitmq-hello-sending #rabbitmq链接参数 spring.rabbitmq.addresses:ip1:port1,ip2:port2,ip3:port3 #集群或单机 均可配置 spring.rabbitmq.username=linpeng spring.rabbitmq.password=123456 # rabbitmq服务器的虚拟主机名,能够在后台管理系统上查看和新建 spring.rabbitmq.virtual-host=/test # 链接超时 spring.rabbitmq.connection-timeout=5s # 发送方 # 开启发送确认(未到达MQ服务器) spring.rabbitmq.publisher-confirms=true # 开启发送失败退回(未找到对应queue) spring.rabbitmq.publisher-returns=true # 消费方 开启手动ACK(坑:当序列化为JSON时,此配置会失效,见下文) spring.rabbitmq.listener.direct.acknowledge-mode=manual spring.rabbitmq.listener.simple.acknowledge-mode=manual # 消费方 spring.rabbitmq.listener.concurrency=2 //最小消息监听线程数 spring.rabbitmq.listener.max-concurrency=2 //最大消息监听线程数 #消费者每次从队列获取的消息数量 (默认一次250个) #经过查看后台管理器中queue的unacked数量 spring.rabbitmq.listener.simple.prefetch= 5 #消费者自动启动 spring.rabbitmq.listener.simple.auto-startup=true #消费失败,自动从新入队 spring.rabbitmq.listener.simple.default-requeue-rejected= true #启用发送重试 spring.rabbitmq.template.retry.enabled=true spring.rabbitmq.template.retry.initial-interval=1000 spring.rabbitmq.template.retry.max-attempts=3 spring.rabbitmq.template.retry.max-interval=10000 spring.rabbitmq.template.retry.multiplier=1.0
默认一个RabbitTemplate在RabbitMQ中至关于一个connection,每发送一次消息至关于channel,MQ接收消息后释放channel。每一个connection最多支持2048个channel,加入从一个connection同时超过2048个线程并发发送,channel超过2048,会报错org.springframework.amqp.AmqpResourceNotAvailableException: The channelMax limit is reached. Try later。数组
测试启动publisher-confirms后,400个线程经过一个RabbitTemplate并发发送10000消息,同时就可能产生1000左右的channel。由于channel等在confirm。10000消息所有发送在几秒内完成,10000消息所有confirm回调完成用时22秒。缓存
后台管理页面查看connection+channelspringboot
此connection中有10个线程并发发送消息,监控到10个channel生成,MQ完成接收后释放channel。若是是publisher-confirms模式,channel会保持到confirm回调完成再释放,影响并发性能。每一个connection最多支持2048个channel。bash
测试启动publisher-confirms后,500个线程并发发送,部分消息报AmqpResourceNotAvailableException。400个线程经过一个RabbitTemplate并发发送10000消息,最高同时就可能产生1000多的channel。由于channel在等待执行confirm回调。10000消息所有发送在几秒内完成,10000消息所有confirm回调完成用时22秒,此时全部channel所有释放。服务器
若在rabbitmq的管理页面手动建立队列和交换机,则能够再也不代码中声明
package com.example.demo; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { @Bean public Queue QueueA() { return new Queue("hello"); } @Bean public Queue QueueB() { return new Queue("helloObj"); } /** * Fanout 就是咱们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的全部队列都收到这个消息。 * @return */ @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("ABExchange"); } @Bean DirectExchange Exchange() { return new DirectExchange("DExchange"); } @Bean Binding bindingExchangeA(Queue QueueA, FanoutExchange fanoutExchange) { return BindingBuilder.bind(QueueA).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue QueueB, FanoutExchange fanoutExchange) { return BindingBuilder.bind(QueueB).to(fanoutExchange); } @Bean Binding bindingExchange() { return BindingBuilder.bind(QueueA()).to(Exchange()).with("TEST");//routingKey } }
ConfirmCallback :ACK=true仅仅标示消息已被Broker接收到,并不表示已成功投放至消息队列中, ACK=false标示消息因为Broker处理错误,消息并未处理成功。如未找到对应交换机返回ACK=false。
ReturnCallback:当消息发送出去找不到对应路由队列时,将会把消息退回 。若是有任何一个路由队列接收投递消息成功,则不会退回消息。MQ成功接收,可是未找到对应队列触发
经过以上异步确认机制,增长降级、补偿处理。好比发送时保存信息和消息ID,ConfirmCallback 经过ID找到对应信息重发,注意要保证幂等性。
package com.example.demo; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; import java.util.Date; //RabbitTemplate.ConfirmCallback @Service public class HelloSender implements RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; public void send() { String context = "你好如今是 " + new Date() +""; System.out.println("HelloSender发送内容 : " + context); //消息序列化设置 //rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); //自身实现ReturnCallback接口 设置异步回调对象为this this.rabbitTemplate.setReturnCallback(this); //如果当前类实现RabbitTemplate.ConfirmCallback接口,则能够设置为this //发送前给RabbitTemplate设置一个异步回调对象 RabbitTemplate.ConfirmCallback接口的匿名类 this.rabbitTemplate.setConfirmCallback((correlationData, confirm, cause) -> { //若发送时没有CorrelationData,则这里correlationData==null if (!confirm) { System.out.println("HelloSender消息发送失败" + cause + correlationData.getId() ); //correlationData.getReturnedMessage(); Message //correlationData.toString(); } else { System.out.println("HelloSender 消息发送成功 "); } }); //this.rabbitTemplate.setConfirmCallback(this); //rabbitTemplate.convertAndSend("hello", context); //这里指定路由键,注意不是队列名 //发送时 能够指定消息ID,方便在ConfirmCallback时候二次处理消息 rabbitTemplate.convertAndSend("DExchange","QueueRoutingKey", context, new CorrelationData("自定义消息ID")); } public void sendObj() { MessageObj obj = new MessageObj(); obj.setACK(false); obj.setId(123); obj.setName("zhangsan"); obj.setValue("data"); System.out.println("发送 : " + obj); this.rabbitTemplate.convertAndSend("helloObj", obj); } @Override public void returnedMessage(Message message, int i, String cause, String exchange, String queue) { //没有找到queue //Message中的成员,Body为消息内容 //(Body:'hello' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) } // @Override // public void confirm(CorrelationData correlationData, boolean confirm, String cause) { // System.out.println("sender success"); // } }
测试发送:
使用Spring默认的rabbitTemplate发送消息,CorrelationData能够重复。
交换机+路由键+消息Object+CorrelationData
rabbitTemplate.convertAndSend("TEST.EX","TEST","String:message",new CorrelationData("111"));
在rabbitmq控制台上getmessage查看 ,rabbitTemplate默认发送deliverymode=2消息,已经设置了消息持久化。
测试速度:
测试100个线程同时并发向同一队列发送简单消息(15左右长度的字符串)。从发送到100个消息所有完成ConfirmCallback,用时为600ms左右。此过程不计入消费速度。
400个线程经过一个RabbitTemplate并发发送10000消息,同时就可能产生1000左右的channel。由于channel等在confirm。10000消息所有发送在几秒内完成,10000消息所有confirm回调完成用时22秒。
测试ConfirmCallback回调:
public void confirm(CorrelationData correlationData, boolean confirm, String cause) ;
confirm==true仅仅标示消息已被Broker接收到,并不表示已成功投放至消息队列中, confirm==false标示消息因为Broker处理错误,消息并未处理成功。如未找到对应交换机返回confirm==false。
在此方法中针对confirm==false的消息实现降级/补偿处理:重发、本地缓存、计入数据库/Redis等、更新状态.....
测试环境:实例化一个ConfirmCallback接口对象,做为rabbitTemplate共用回调处理对象。
回调测试结果:
1 先发送到MQ的消息,先完成confirm回调。
2 ConfirmCallback默认是由同一个线程执行回调,打印线程名能够看到线程名为【AMQP Connection rabbitmqIp:port】
3 若发送时没有携带CorrelationData,回调时这里correlationData==null
4.设置消息确认会影响并发性能,每一个线程发送生成一个channel,channel会保持到confirm回调完成再释放。由于每一个connection最多支持2048个channel,当channel达到2048时,会报错org.springframework.amqp.AmqpResourceNotAvailableException: The channelMax limit is reached. Try later。
测试ReturnCallback 回调:
public void returnedMessage(Message message, int i, String cause, String exchange, String queue) ;
MQ成功接收消息,可是未找到对应路由键的队列后回调。实现降级/补偿处理。
测试环境:实例化一个ReturnCallback接口对象,做为rabbitTemplate共用回调处理对象。
回调测试结果:
默认是由同一个线程执行回调,打印线程名能够看到线程名为【AMQP Connection rabbitmqIp:port】
message=返回的Message对象中的成员,Body为发送时的消息内容 ,receivedDeliveryMode=PERSISTENT=2 为持久化消息。spring_returned_message_correlation=发送时的CorrelationData
(Body:'String:message' MessageProperties [headers={spring_returned_message_correlation=111}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
cause=NO_ROUTE
exchange、queue 为发送时的配置
设置QOS,避免触发流控机制
#消费者每次从队列获取的消息数量 (默认一次250个)
spring.rabbitmq.listener.simple.prefetch= 5
当QUEUE达到5条Unacked消息时,不会再推送消息给Consumer。查看后台管理器中queue的unacked数量
package com.example.demo; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.beans.factory.annotation.Configurable; import org.springframework.context.annotation.Bean; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.Date; import java.util.Map; @Component public class HelloReceiver { @RabbitListener(queues = "hello") //这里是队列名,不是路由键 public void process(String msg,Channel channel, Message message) throws IOException { System.out.println("HelloReceiver收到 : " + msg +"收到时间"+new Date()); try { //告诉MQ服务器收到这条消息 已经被我消费了 能够在队列删掉 这样之后就不会再发了 不然消息服务器觉得这条消息没处理掉 后续还会在发 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); System.out.println("receiver success"); } catch (IOException e) { e.printStackTrace(); //丢弃这条消息 //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false); System.out.println("receiver fail"); } } }
msg是消息内容,至关于Message对象中的body。
Message对象的成员:
能够看到有消息信息BODY,发送方生成的消息CorrelationData,还有执行的Method对象(@RabbitListener标注的方法),目标BEAN
备注:咱们用注解的方式来接受消息 就不要用 本身建立对象实现ChannelAwareMessageListener的方式来接受消息 这种方式还要去全局里面配置麻烦,直接用@RabbitListener(queues = "hello")最简单
消息确认 由于我在属性配置文件里面开启了ACK确认 因此若是代码没有执行ACK确认 你在RabbitMQ的后台会看到消息会一直留在队列里面未消费掉 只要程序一启动开始接受该队列消息的时候 又会收到
方法参数详解:http://www.javashuo.com/article/p-dwmtzvog-ey.html
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); deliveryTag:该消息的index,由发送方生成 multiple:是否批量.true:将一次性ack全部小于deliveryTag的消息。
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); deliveryTag:该消息的index multiple:是否批量.true:将一次性拒绝全部小于deliveryTag的消息。 requeue:被拒绝的是否从新入队列,true 放在队首,false 消息进入绑定的DLX。必定注意:若此消息一直Nack重入队会致使的死循环
channel.basicNack 与 channel.basicReject 的区别在于basicNack能够拒绝多条消息,而basicReject一次只能拒绝一条消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); deliveryTag:该消息的index requeue:被拒绝的是否从新入队列。false 消息进入绑定的DLX
ShutdownSignalException
1 队列名找不到
2 代码中有ack,可是没有配置手动ACK
消费超时,queue中unacked的消息会退回到queue中,且消费者ACK时会失败。
@Component public class MessageHandler { //获取消息的头属性和body属性 @RabbitListener(queues = "zhihao.miao.order") public void handleMessage(@Payload String body, @Headers Map<String,Object> headers){ System.out.println("====消费消息===handleMessage"); System.out.println(headers); System.out.println(body); } }
@Component @RabbitListener(queues = "consumer_queue") public class Receiver { @RabbitHandler public void processMessage1(String message) { System.out.println(message); } @RabbitHandler public void processMessage2(byte[] message) { System.out.println(new String(message)); } }
当中默认的序列化类为SimpleMessageConverter。
仅仅有调用了convertAndSend方法才会使用对应的MessageConvert进行消息的序列化与反序列化。
SimpleMessageConverter对于要发送的消息体body为字节数组时。不进行处理。
对于假设是String。则将String转成字节数组。
对于假设是Java对象,则使用jdk序列化Serializable将消息转成字节数组。转出来的结果较大,含class类名。类对应方法等信息。所以性能较差。
当使用RabbitMq做为中间件时,数据量比較大,此时就要考虑使用相似Jackson2JsonMessageConverter。hessian等序列化形式。以此提升性能。
https://www.jianshu.com/p/911d987b5f11
发送
@Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); return rabbitTemplate; }
User user = new User("linyuan"); rabbitTemplate.convertAndSend("topic.exchange","queue1",user);
接收
@Bean public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//json序列化时,若想手动ACK,则必须配置 return factory; }
@Component @RabbitListener(queues = "queue1") public class Receiver { @RabbitHandler public void processMessage1(@Payload User user) { System.out.println(user.getName()); } }
解决方案: https://blog.csdn.net/m912595719/article/details/83787486
这是springboot集成RabbitMQ的一个大坑。当消费者配置JSON反序列化时,配置文件中的手动ACK会失效,消费者会变成自动ACK模式。spring.rabbitmq.listener.direct.acknowledge-mode=manual,spring.rabbitmq.listener.simple.acknowledge-mode=manual 配置失效。
解决方法是消费者配置RabbitListenerContainerFactory这个Bean时(见上),设置factory.setAcknowledgeMode(AcknowledgeMode.MANUAL)。把消费者强制转换为手动ACK。
若是配置失效切换为自动ACK,可是代码中又使用channel.basicAck手动ACK。这样会形成双ACK的ERROR,接着信道会重启重连。以下:
o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
unknown delivery tag 1表示当前Channel中找不到delivery-tag=1的消息,实际上是这个消息已经自动ACK了,basicAck时就会出错。测试显示,消息并不会丢失而是在出现ERROR异常后走向Nack后从新入队,再屡次重复消费后最终ACK成功,严重下降消费者的执行效率。
Delivery Tags投递的标识
当一个消费者向RabbitMQ注册后,RabbitMQ会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它在一个channel中惟一表明了一次投递。delivery tag的惟一标识范围限于channel. delivery tag是单调递增的正整数,客户端获取投递的方法用用dellivery tag做为一个参数。
@Autowired private HelloSender helloSender; /** * 单生产者-单个消费者 */ @RequestMapping("/test") public void hello() throws Exception { helloSender.send(); }
发送消息
ACK场景测试
咱们把HelloReceiver的ACK确认代码注释掉 ,那消息就算程序收到了, 可是未确认ACK致使消息服务器觉得他是未成功消费的,若此时消费者断开则消息返回队列,后续还会再发。