我的在学习rabbitmq时发现网上不多有系统性介绍springboot和rabbitmq如何集成的,其余人总结的都片断化,因此结合我的调研过程,整理此篇文章。java
本文章共分为如下部分:redis
目前流程的消息队列主要有:ActivityMQ/kafka/redis/rabbitmq等,各有各自的应用场景,关于各个框架的介绍,你们可自行百度,网上不少文章介绍~其中rabbit由于其ack特性以及还算不错的性能被大多数公司采用。spring
单机版安装很简单,大概步骤以下:segmentfault
# 安装erlang包 yum install erlang # 安装socat yum install socat # 安装rabbit rpm -ivh rabbitmq-server-3.6.6-1.el6.noarch.rpm # 启动服务 rabbitmq-server start # 增长管理控制功能 rabbitmq-plugins enable rabbitmq_management # 增长用户: sudo rabbitmqctl add_user root password rabbitmqctl set_user_tags root administrator rabbitmqctl set_permissions -p / root '.*' '.*' '.*'
集群安装,可参考如下博客:
rabbitmq集群安装springboot
废话少说直接上代码:
配置参数
application.yml:app
spring: rabbitmq: addresses: 192.168.1.1:5672 username: username password: password publisher-confirms: true virtual-host: /
java config读取参数框架
/** * RabbitMq配置文件读取类 * * @author chenhf * @create 2017-10-23 上午9:31 **/ @Configuration @ConfigurationProperties(prefix = "spring.rabbitmq") public class RabbitMqConfig { @Value("${spring.rabbitmq.addresses}") private String addresses; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.publisher-confirms}") private Boolean publisherConfirms; @Value("${spring.rabbitmq.virtual-host}") private String virtualHost; // 构建mq实例工厂 @Bean public ConnectionFactory connectionFactory(){ CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses(addresses); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setPublisherConfirms(publisherConfirms); connectionFactory.setVirtualHost(virtualHost); return connectionFactory; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){ return new RabbitAdmin(connectionFactory); } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public RabbitTemplate rabbitTemplate(){ RabbitTemplate template = new RabbitTemplate(connectionFactory()); return template; } }
主要配置了直连和话题模式,其中话题模式设置两个队列(queueTopicTest一、queueTopicTest2),此两个队列在和交换机绑定时分别设置不一样的routingkey(.TEST.以及lazy.#)来验证匹配模式。dom
/** * 用于配置交换机和队列对应关系 * 新增消息队列应该按照以下步骤 * 一、增长queue bean,参见queueXXXX方法 * 二、增长queue和exchange的binding * @author chenhf * @create 2017-10-23 上午10:33 **/ @Configuration @AutoConfigureAfter(RabbitMqConfig.class) public class RabbitMqExchangeConfig { /** logger */ private static final Logger logger = LoggerFactory.getLogger(RabbitMqExchangeConfig.class); /** * @Author:chenhf * @Description: 主题型交换机 * @Date:下午5:49 2017/10/23 * @param * @return */ @Bean TopicExchange contractTopicExchangeDurable(RabbitAdmin rabbitAdmin){ TopicExchange contractTopicExchange = new TopicExchange(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode()); rabbitAdmin.declareExchange(contractTopicExchange); logger.debug("完成主题型交换机bean实例化"); return contractTopicExchange; } /** * 直连型交换机 */ @Bean DirectExchange contractDirectExchange(RabbitAdmin rabbitAdmin) { DirectExchange contractDirectExchange = new DirectExchange(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode()); rabbitAdmin.declareExchange(contractDirectExchange); logger.debug("完成直连型交换机bean实例化"); return contractDirectExchange; } //在此能够定义队列 @Bean Queue queueTest(RabbitAdmin rabbitAdmin){ Queue queue = new Queue(RabbitMqEnum.QueueName.TESTQUEUE.getCode()); rabbitAdmin.declareQueue(queue); logger.debug("测试队列实例化完成"); return queue; } //topic 1 @Bean Queue queueTopicTest1(RabbitAdmin rabbitAdmin){ Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST1.getCode()); rabbitAdmin.declareQueue(queue); logger.debug("话题测试队列1实例化完成"); return queue; } //topic 2 @Bean Queue queueTopicTest2(RabbitAdmin rabbitAdmin){ Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST2.getCode()); rabbitAdmin.declareQueue(queue); logger.debug("话题测试队列2实例化完成"); return queue; } //在此处完成队列和交换机绑定 @Bean Binding bindingQueueTest(Queue queueTest,DirectExchange exchange,RabbitAdmin rabbitAdmin){ Binding binding = BindingBuilder.bind(queueTest).to(exchange).with(RabbitMqEnum.QueueEnum.TESTQUEUE.getCode()); rabbitAdmin.declareBinding(binding); logger.debug("测试队列与直连型交换机绑定完成"); return binding; } //topic binding1 @Bean Binding bindingQueueTopicTest1(Queue queueTopicTest1,TopicExchange exchange,RabbitAdmin rabbitAdmin){ Binding binding = BindingBuilder.bind(queueTopicTest1).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE1.getCode()); rabbitAdmin.declareBinding(binding); logger.debug("测试队列与话题交换机1绑定完成"); return binding; } //topic binding2 @Bean Binding bindingQueueTopicTest2(Queue queueTopicTest2,TopicExchange exchange,RabbitAdmin rabbitAdmin){ Binding binding = BindingBuilder.bind(queueTopicTest2).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE2.getCode()); rabbitAdmin.declareBinding(binding); logger.debug("测试队列与话题交换机2绑定完成"); return binding; } }
在这里用到枚举类:RabbitMqEnum ide
/** * 定义rabbitMq须要的常量 * * @author chenhf * @create 2017-10-23 下午4:07 **/ public class RabbitMqEnum { /** * @param * @Author:chenhf * @Description:定义数据交换方式 * @Date:下午4:08 2017/10/23 * @return */ public enum Exchange { CONTRACT_FANOUT("CONTRACT_FANOUT", "消息分发"), CONTRACT_TOPIC("CONTRACT_TOPIC", "消息订阅"), CONTRACT_DIRECT("CONTRACT_DIRECT", "点对点"); private String code; private String name; Exchange(String code, String name) { this.code = code; this.name = name; } public String getCode() { return code; } public String getName() { return name; } } /** * describe: 定义队列名称 * creat_user: chenhf * creat_date: 2017/10/31 **/ public enum QueueName { TESTQUEUE("TESTQUEUE", "测试队列"), TOPICTEST1("TOPICTEST1", "topic测试队列"), TOPICTEST2("TOPICTEST2", "topic测试队列"); private String code; private String name; QueueName(String code, String name) { this.code = code; this.name = name; } public String getCode() { return code; } public String getName() { return name; } } /** * describe: 定义routing_key * creat_user: chenhf * creat_date: 2017/10/31 **/ public enum QueueEnum { TESTQUEUE("TESTQUEUE1", "测试队列key"), TESTTOPICQUEUE1("*.TEST.*", "topic测试队列key"), TESTTOPICQUEUE2("lazy.#", "topic测试队列key"); private String code; private String name; QueueEnum(String code, String name) { this.code = code; this.name = name; } public String getCode() { return code; } public String getName() { return name; } } }
以上完成消息生产者的定义,下面封装调用接口
测试时直接调用此工具类,testUser类需本身实现工具
rabbitMqSender.sendRabbitmqDirect("TESTQUEUE1",testUser); rabbitMqSender.sendRabbitmqTopic("lazy.1.2",testUser); rabbitMqSender.sendRabbitmqTopic("lazy.TEST.2",testUser);
/** * rabbitmq发送消息工具类 * * @author chenhf * @create 2017-10-26 上午11:10 **/ @Component public class RabbitMqSender implements RabbitTemplate.ConfirmCallback{ /** logger */ private static final Logger logger = LoggerFactory.getLogger(RabbitMqSender.class); private RabbitTemplate rabbitTemplate; @Autowired public RabbitMqSender(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; this.rabbitTemplate.setConfirmCallback(this); } @Override public void confirm(CorrelationData correlationData, boolean b, String s) { logger.info("confirm: " + correlationData.getId()); } /** * 发送到 指定routekey的指定queue * @param routeKey * @param obj */ public void sendRabbitmqDirect(String routeKey,Object obj) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); logger.info("send: " + correlationData.getId()); this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode(), routeKey , obj, correlationData); } /** * 全部发送到Topic Exchange的消息被转发到全部关心RouteKey中指定Topic的Queue上 * @param routeKey * @param obj */ public void sendRabbitmqTopic(String routeKey,Object obj) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); logger.info("send: " + correlationData.getId()); this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode(), routeKey , obj, correlationData); } }
springboot注解方式监听队列,没法手动指定回调,因此采用了实现ChannelAwareMessageListener接口,重写onMessage来进行手动回调,详见如下代码,详细介绍能够在spring的官网上找amqp相关章节阅读
直连消费者
经过设置TestUser的name来测试回调,分别发两条消息,一条UserName为1,一条为2,查看控制台中队列中消息是否被消费
/** * 消费者配置 * * @author chenhf * @create 2017-10-30 下午3:14 **/ @Configuration @AutoConfigureAfter(RabbitMqConfig.class) public class ExampleAmqpConfiguration { @Bean("testQueueContainer") public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("TESTQUEUE"); container.setMessageListener(exampleListener()); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); return container; } @Bean("testQueueListener") public ChannelAwareMessageListener exampleListener() { return new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody()); //经过设置TestUser的name来测试回调,分别发两条消息,一条UserName为1,一条为2,查看控制台中队列中消息是否被消费 if ("2".equals(testUser.getUserName())){ System.out.println(testUser.toString()); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } if ("1".equals(testUser.getUserName())){ System.out.println(testUser.toString()); channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); } } }; } }
topic消费者1
/** * 消费者配置 * * @author chenhf * @create 2017-10-30 下午3:14 **/ @Configuration @AutoConfigureAfter(RabbitMqConfig.class) public class TopicAmqpConfiguration { @Bean("topicTest1Container") public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("TOPICTEST1"); container.setMessageListener(exampleListener1()); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); return container; } @Bean("topicTest1Listener") public ChannelAwareMessageListener exampleListener1(){ return new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody()); System.out.println("TOPICTEST1:"+testUser.toString()); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } }; } }
topic消费者2
/** * 消费者配置 * * @author chenhf * @create 2017-10-30 下午3:14 **/ @Configuration @AutoConfigureAfter(RabbitMqConfig.class) public class TopicAmqpConfiguration2 { @Bean("topicTest2Container") public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("TOPICTEST2"); container.setMessageListener(exampleListener()); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); return container; } @Bean("topicTest2Listener") public ChannelAwareMessageListener exampleListener() { return new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody()); System.out.println("TOPICTEST2:"+testUser.toString()); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } }; } }
使用过程当中可能出现的坑参考此篇文章
https://segmentfault.com/a/11...