注意
java
SimpleMessageListenerContainer
能够进行动态设置,好比在运行中能够动态修改其消费者数量的大小,接收消息的模式等。不少基于RabbitMQ的制定化后端管理控制台在进行动态设置的时候,也是根据这一特性去实现的。spring
package com.wyg.rabbitmq.springamqp;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.support.ConsumerTagStrategy;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.rabbitmq.client.Channel;
/** * RabbitAdmin * * @author wyg0405@gmail.com * @date 2019-11-25 15:11 * @since JDK1.8 * @version V1.0 */
@Configuration
public class RabbitConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setAddresses("localhost:5672");
cachingConnectionFactory.setUsername("guest");
cachingConnectionFactory.setPassword("guest");
cachingConnectionFactory.setVirtualHost("/");
return cachingConnectionFactory;
}
/** * SimpleMessageListenerContainer注入 * * @param connectionFactory * @return * @throws @author * wyg0405@gmail.com * @date 2019/11/25 17:16 */
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
// 监听多个queue
container.addQueueNames("test01", "test02", "test03");
// 设置当前消费者数量
container.setConcurrentConsumers(1);
// 设置最大的消费者数量
container.setMaxConcurrentConsumers(5);
// 设置不要重回队列
container.setDefaultRequeueRejected(false);
// 设置自动签收
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
// 设置消费端tag策略
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + System.currentTimeMillis();
}
});
// 设置监听
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
// 消息处理
String msg = new String(message.getBody(), "UTF-8");
System.out.println("---消费者---队列名:" + message.getMessageProperties().getConsumerQueue() + ",消息:" + msg
+ ",deliveryTag:" + message.getMessageProperties().getDeliveryTag());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);;
}
});
return container;
}
}
复制代码
package com.wyg.rabbitmq.springamqp;
import java.io.*;
import java.lang.reflect.Field;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.wyg.rabbitmq.springamqp.convert.Order;
import com.wyg.rabbitmq.springamqp.convert.User;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitConfigTest {
@Autowired
RabbitAdmin rabbitAdmin;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private SimpleMessageListenerContainer simpleMessageListenerContainer;
@Test
public void testSimpleMessageListenerContainerSendMsg() {
// 分别向 队列 "test01", "test02", "test03" 发消息,"test01", "test02",
// "test03"与springdemo.direct已经绑定,routingKey都为orderRoutingKey
for (int i = 0; i < 3; i++) {
rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", ("第" + i + "条消息").getBytes());
}
}
}
复制代码