实现内容
RabbitMQ + springboot 实现消息的发送和监听 springboot版本2.1.8spring
直接上代码
配置类springboot
@Configuration public class RabbitConfig { // mq地址 @Bean(value = "connectionFactory") @Primary public ConnectionFactory connectionFactory( @Value("${spring.rabbitmq.host}") String host, @Value("${spring.rabbitmq.port}") int port, @Value("${spring.rabbitmq.username}") String username, @Value("${spring.rabbitmq.password}") String password, @Value("${spring.rabbitmq.virtual-host}") String virtualHost) { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); return connectionFactory; } @Bean public RetryTemplate retryTemplate() { RetryTemplate retryTemplate = new RetryTemplate(); ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); backOffPolicy.setInitialInterval(500); backOffPolicy.setMultiplier(10.0); backOffPolicy.setMaxInterval(10000); retryTemplate.setBackOffPolicy(backOffPolicy); return retryTemplate; } // mq发送 @Bean public AmqpTemplate myMQTemplate(@Qualifier("connectionFactory") ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setRetryTemplate(retryTemplate()); template.setMessageConverter(new Jackson2JsonMessageConverter()); return template; } }
MQ生产者服务器
@Component public class MQSender { @Autowired private AmqpTemplate myMQTemplate; public void send(String exchangeName, String routingKey, Object object) { myMQTemplate.convertAndSend(exchangeName, routingKey, object); } }
须要注意, 这里没有生成exchange, 须要手动建立, 若是须要程序自动建立, 则须要将exchange声明为bean便可. 执行测试代码测试
@Autowired private MQSender mqSender; @Test public void send() { mqSender.send("e.send", "r.send", "send a message"); }
若是你观察的及时, 估计还能看见exchange收到消息的曲线波动~ 由于没有消费者, 因此exchange在接收到信息后直接将消息丢弃了, 如今咱们建立对应的队列, 并绑定. 再次执行就能够看到队列中有消息了. code
下面是消费者 首先咱们要在配置类中增长监听配置, 一个自动ack, 一个手动ackblog
// 自动ack @Bean(value = "listenerFactoryWithAutoAck") public SimpleRabbitListenerContainerFactory listenerFactoryWithAutoAck(@Qualifier("connectionFactory") ConnectionFactory newRentConnectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(newRentConnectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setConcurrentConsumers(3); factory.setMaxConcurrentConsumers(10); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; } // 手动ack @Bean(value = "listenerFactoryWithManualAck") public SimpleRabbitListenerContainerFactory listenerFactoryWithManualAck(@Qualifier("connectionFactory") ConnectionFactory newRentConnectionFactory) { SimpleRabbitListenerContainerFactory factory = listenerFactory(newRentConnectionFactory); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; }
若是你使用的springboot版本是2.2.*的话, 能够更方便的在监听的注解上设置ackMode, 而且会覆盖在配置监听工厂的配置方式, 这里使用的版本是2.1.8因此只能在配置工厂时设置.rabbitmq
@Component public class MQListener { @RabbitListener(containerFactory = "listenerFactoryWithManualAck", queues = "q.send") public void consume(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException { System.out.println("message: " + message.toString()); String msg = new String(message.getBody(), StandardCharsets.UTF_8); System.out.println("message body: " + msg); } }
执行后看到消息并无被消费, 这是由于咱们使用手动应答监听, 可是没有发送应答, 服务器将消息从新入队列. 在监听中加入代码, false表示只是响应这条信息, true表示全部信息.
channel.basicAck(tag, false);
队列
启动程序能够看到信息被消费了. 在程序抛异常时, 可能须要手动处理异常, 拒绝消息. true表示消息从新入队列, 还能够被消费; false表示直接丢弃消息 channel.basicReject(tag, true);
ip
以上就是RabbitMQ在springboot中的简单实用get