①,拉取rabbitmq 镜像(推荐使用,镜像加速:http://www.docker-cn.com/registry-mirror)java
#获取镜像 docker pull registry.docker-cn.com/library/rabbitmq #查看docker的镜像 [root@localhost ~]# docker images REPOSITORY TAG IMAGE ID CREATED SIZE registry.docker-cn.com/library/rabbitmq 3-management c51d1c73d028 9 days ago 149 MB #运行镜像(-d后台运行,-p表示暴露的端口,5672是程序链接端口,15672是管理端口, #--name 指定容器名 c51d1c73d028 是镜像id) docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq c51d1c73d028 #查看docker里运行的容器 [root@localhost ~]# docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 02731f5a5334 c51d1c73d028 "docker-entrypoint..." 3 days ago Up 4 seconds 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp myrabbitmq
②,在浏览器上访问rabbitmq后台web
③,建立一个direct类型的交换机spring
④,建立队列docker
⑤,将交换机与队列绑定json
⑥,绑定成功后以下浏览器
⑦,pom依赖springboot
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.2.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies>
⑧,application.properties 的配置app
server.port=8081 spring.rabbitmq.host=192.168.43.28 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
①,springboot为咱们注入了两个重要的beantcp
// RabbitTemplate 用于发送和接受消息 @Bean @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnMissingBean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { PropertyMapper map = PropertyMapper.get(); RabbitTemplate template = new RabbitTemplate(connectionFactory); //从ioc容器中获取消息转换器 MessageConverter messageConverter = this.messageConverter.getIfUnique(); if (messageConverter != null) { //获取到了就配置为默认的消息转换器 template.setMessageConverter(messageConverter); } template.setMandatory(determineMandatoryFlag()); RabbitProperties.Template properties = this.properties.getTemplate(); if (properties.getRetry().isEnabled()) { template.setRetryTemplate(createRetryTemplate(properties.getRetry())); } map.from(properties::getReceiveTimeout).whenNonNull().as(Duration::toMillis) .to(template::setReceiveTimeout); map.from(properties::getReplyTimeout).whenNonNull().as(Duration::toMillis) .to(template::setReplyTimeout); map.from(properties::getExchange).to(template::setExchange); map.from(properties::getRoutingKey).to(template::setRoutingKey); return template; } //AmqpAdmin 是用于建立交换器和队列以及绑定规则的工具 @Bean @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true) @ConditionalOnMissingBean public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); }
import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; //启用 RabbitMQ @EnableRabbit @Configuration public class RabbitConfig { //注入json类型的消息转换器,这样方便咱们观察消息内容 @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }
import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; //将该监听器注入ioc容器 @Component public class MyRabbitListener { //监听的队列名为queue.news @RabbitListener(queues = "queue.news") public void listener(Object object){ System.out.println("收到消息:"+object.toString()); } }
import com.mq.rabbitmq.bean.Person; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class RabbitController { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private AmqpAdmin amqpAdmin; @RequestMapping("person") public String person(){ //给交换器的名为exchange.direct 发送路由键为news 的消息 rabbitTemplate.convertAndSend("exchange.direct","news",new Person(1,"翛苏")); System.out.println("消息发送成功"); return "消息发送成功"; } /*利用amqpAdmin 建立交换机*/ @RequestMapping("createDirect") public String createDirect(){ //建立一个direct类型的交换机,名字为amqb.exange.su amqpAdmin.declareExchange(new DirectExchange("amqb.exange.su")); return "建立交换器成功"; } /*利用amqpAdmin 建立队列*/ @RequestMapping("createQueue") public String createQueue(){ //建立一个队列名为:amqb.queue.chun 的队列 amqpAdmin.declareQueue(new Queue("amqb.queue.chun")); return "建立队列成功"; } /*利用amqpAdmin 建立交换机与交换机的绑定规则*/ @RequestMapping("createBinding") public String createBinding(){ amqpAdmin.declareBinding(new Binding("amqb.queue.chun", Binding.DestinationType.QUEUE, "amqb.exange.su", "route.su",null)); return "建立绑定成功"; } //接收队列名为queue.news 的消息 @RequestMapping("receive") public String receive(){ Object o = rabbitTemplate.receiveAndConvert("queue.news"); System.out.println(o.toString()); return o.toString(); } }
①,启动应用,调度person方法发送消息spring-boot
②,控制台输出以下
消息发送成功 收到消息:(Body:'{"id":1,"name":"翛苏"}' MessageProperties [headers={__TypeId__=com.mq.rabbitmq.bean.Person}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=exchange.direct, receivedRoutingKey=news, deliveryTag=3, consumerTag=amq.ctag-3zRVuPvjjOAn-ulvdLna-Q, consumerQueue=queue.news])
这是消息监听器的输出