//某个服务的具体状况 ps -ef | grep XXX //杀死进程 kill -9 进程ID,第一个 //查看内存 free或者top //查看磁盘使用状况 df -l //寻找文件 find -name xxx //查看端口使用状况 netstat -an | grep 15672
sudo agt-get update
yum install erlang
sudo apt-get install rabbitmq-server
不启用你的http://ip+15672访问不到java
rabbitmq-plugins enable rabbitmq_management
启动、中止、重启、状态rabbitMq命令: 启动:sudo rabbitmq-server start 关闭: sudo rabbitmq-server stop 重启: sudo rabbitmq-server restart 查看状态:sudo rabbitmqctl status
root@yan:/# service rabbitmq-server status ● rabbitmq-server.service - RabbitMQ Messaging Server Loaded: loaded (/lib/systemd/system/rabbitmq-server.service; enabled; vendor preset: enabled) Active: active (running) since Thu 2018-11-08 20:15:25 CST; 7min ago Process: 16987 ExecStop=/usr/sbin/rabbitmqctl stop (code=exited, status=0/SUCCESS) Process: 17074 ExecStartPost=/usr/lib/rabbitmq/bin/rabbitmq-server-wait (code=exited, status=0/SUCCESS) Main PID: 17073 (rabbitmq-server) Tasks: 70 Memory: 39.6M CPU: 1.538s CGroup: /system.slice/rabbitmq-server.service ├─17073 /bin/sh /usr/sbin/rabbitmq-server ├─17084 /bin/sh -e /usr/lib/rabbitmq/bin/rabbitmq-server ├─17171 /usr/lib/erlang/erts-7.3/bin/epmd -daemon ├─17211 /usr/lib/erlang/erts-7.3/bin/beam -W w -A 64 -P 1048576 -K true -B i -- -root /usr/lib/erlang -progname erl -- -home /var/lib/rabbitmq -- -pa /usr/lib/ra ├─17312 inet_gethost 4 └─17313 inet_gethost 4 Nov 08 20:15:23 yan systemd[1]: Stopped RabbitMQ Messaging Server. Nov 08 20:15:23 yan systemd[1]: Starting RabbitMQ Messaging Server... Nov 08 20:15:23 yan rabbitmq[17074]: Waiting for rabbit@yan ... Nov 08 20:15:23 yan rabbitmq[17074]: pid is 17084 ... Nov 08 20:15:25 yan systemd[1]: Started RabbitMQ Messaging Server. lines 1-22/22 (END)
添加admin用户,密码设置为admin。web
sudo rabbitmqctl add_user admin admin
赋予权限spring
sudo rabbitmqctl set_user_tags admin administrator
赋予virtual host中全部资源的配置、写、读权限以便管理其中的资源sql
sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-TGzxSFXV-1581158404903)(http://phi7gkzsm.bkt.clouddn.com/rabbitmq.png)]xcode
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
配置rabbitmq的安装地址、端口以及帐户信息安全
必定不要把端口号设置成15672,由于那个已经被占用了,因此你新设置一个5672springboot
同时添加到阿里云服务器安全组配置服务器
spring.application.name=Spring-boot-rabbitmq spring.rabbitmq.host=101.200.55.12 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=你的密码
如图有一个admin链接上了app
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rLYFcBwf-1581158404904)(http://phi7gkzsm.bkt.clouddn.com/rabbitmq%E8%BF%9E%E6%8E%A5.png)]dom
新建以下几个文件
//队列配置 import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { @Bean public Queue Queue() { return new Queue("hello"); } }
// 发送者 // rabbitTemplate是springboot 提供的默认实现 import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; @Component public class HelloSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = "这是发送的信息 "+"---------------------" + new Date(); System.out.println("============================="); System.out.println("Sender : " + context); System.out.println("============================="); this.rabbitTemplate.convertAndSend("hello", context); } }
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "hello") public class HelloReceiver { @RabbitHandler public void process(String hello) { System.out.println("------------------1--------------------"); System.out.println("Receiver : " + hello); System.out.println("-----------------1---------------------"); } }
// 新建一个controller测试 import cn.nxcoder.blog.rabbit.HelloSender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.ResponseBody; @Controller public class RabbitController { @Autowired private HelloSender helloSender; /* 一个生产者和一个消费者 */ @PostMapping ("/rabbitHello") @ResponseBody public void hello( ) { helloSender.send(); } }
//用postman本地测试 http://localhost:8088/rabbitHello
//测试结果 ============================= Sender : 这是发送的信息 ---------------------Fri Nov 09 15:09:01 CST 2018 ============================= 15:09:01.188 [http-nio-8088-exec-1] INFO c.n.blog.handler.LogAspectHandler - ======执行方法后,执行该方法====== 15:09:01.188 [http-nio-8088-exec-1] INFO c.n.blog.handler.LogAspectHandler - Result:null ------------------1-------------------- Receiver : 这是发送的信息 ---------------------Fri Nov 09 15:09:01 CST 2018 -----------------1---------------------
在我门的生产者里面新家一个sendMsg方法,该方法须要传参
看下面的,hello不能变,或者说是必须和你的消费者引用的名称要同样,这里我就没有改
this.rabbitTemplate.convertAndSend("hello", sendMsg);
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; @Component public class HelloSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = "这是发送的信息 "+"---------------------" + new Date(); System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("hello", context); } public void sendMsg(String msg) { String sendMsg = msg + new Date(); System.out.println("Sender2 : " + sendMsg); this.rabbitTemplate.convertAndSend("hello", sendMsg); } }
新加一个消费者,可是引用的仍是hello
@Component @RabbitListener(queues = "hello") public class HelloReceiverTwo { @RabbitHandler public void process(String hello) { System.out.println("Receiver2 : " + hello); } }
再看原来的消费者1,他们的@RabbitListener(queues = “hello”)
@Component @RabbitListener(queues = "hello") public class HelloReceiver { @RabbitHandler public void process(String hello) { System.out.println("Receiver : " + hello); } }
在controller里面新加一个一对多测试
@Controller public class RabbitController { @Autowired private HelloSender helloSender; /* 一个生产者和一个消费者 */ @PostMapping ("/rabbitHello") @ResponseBody public void hello( ) { helloSender.send(); } /** * 单生产者-多消费者 */ @PostMapping("/oneToMany") @ResponseBody public void oneToMany() { for(int i=0;i<10;i++){ helloSender.sendMsg("这是第二个生产者发送的消息:==="+i+"====个"); } } }
postman测试
http://localhost:8088/oneToMany
测试结果:
Sender2 : 这是第二个生产者发送的消息:===0====个Fri Nov 09 15:33:33 CST 2018 Sender2 : 这是第二个生产者发送的消息:===1====个Fri Nov 09 15:33:33 CST 2018 Sender2 : 这是第二个生产者发送的消息:===2====个Fri Nov 09 15:33:33 CST 2018 Sender2 : 这是第二个生产者发送的消息:===3====个Fri Nov 09 15:33:33 CST 2018 Sender2 : 这是第二个生产者发送的消息:===4====个Fri Nov 09 15:33:33 CST 2018 Sender2 : 这是第二个生产者发送的消息:===5====个Fri Nov 09 15:33:33 CST 2018 Sender2 : 这是第二个生产者发送的消息:===6====个Fri Nov 09 15:33:33 CST 2018 Sender2 : 这是第二个生产者发送的消息:===7====个Fri Nov 09 15:33:33 CST 2018 Sender2 : 这是第二个生产者发送的消息:===8====个Fri Nov 09 15:33:33 CST 2018 Sender2 : 这是第二个生产者发送的消息:===9====个Fri Nov 09 15:33:33 CST 2018 15:33:33.224 [http-nio-8088-exec-1] INFO c.n.blog.handler.LogAspectHandler - ======执行方法后,执行该方法====== 15:33:33.224 [http-nio-8088-exec-1] INFO c.n.blog.handler.LogAspectHandler - Result:null Receiver2 : 这是第二个生产者发送的消息:===1====个Fri Nov 09 15:33:33 CST 2018 Receiver : 这是第二个生产者发送的消息:===0====个Fri Nov 09 15:33:33 CST 2018 Receiver2 : 这是第二个生产者发送的消息:===3====个Fri Nov 09 15:33:33 CST 2018 Receiver : 这是第二个生产者发送的消息:===2====个Fri Nov 09 15:33:33 CST 2018 Receiver2 : 这是第二个生产者发送的消息:===5====个Fri Nov 09 15:33:33 CST 2018 Receiver : 这是第二个生产者发送的消息:===4====个Fri Nov 09 15:33:33 CST 2018 Receiver2 : 这是第二个生产者发送的消息:===7====个Fri Nov 09 15:33:33 CST 2018 Receiver : 这是第二个生产者发送的消息:===6====个Fri Nov 09 15:33:33 CST 2018 Receiver2 : 这是第二个生产者发送的消息:===9====个Fri Nov 09 15:33:33 CST 2018 Receiver : 这是第二个生产者发送的消息:===8====个Fri Nov 09 15:33:33 CST 2018
以上一对多的就完成了
在刚才咱们实现了一个生产者和2个消费者
其实多对多更简单,只须要基于一对多把生产者copy一份就能够了,里面的东西不要变,换个名字
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; @Component public class HelloSenderTwo { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = "这是发送的信息 "+" ---------------------" + new Date(); System.out.println("生产者2_Sender : " + context); this.rabbitTemplate.convertAndSend("hello", context); } public void sendMsg(String msg) { String sendMsg = msg + new Date(); System.out.println("生产者2_Sender2 : " + sendMsg); this.rabbitTemplate.convertAndSend("hello", sendMsg); } }
controller新加一个方法
import cn.nxcoder.blog.rabbit.HelloSender; import cn.nxcoder.blog.rabbit.HelloSenderTwo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.ResponseBody; @Controller public class RabbitController { @Autowired private HelloSender helloSender; @Autowired private HelloSenderTwo helloSenderTwo; /* 一个生产者和一个消费者 */ @PostMapping ("/rabbitHello") @ResponseBody public void hello( ) { helloSender.send(); } /** * 单生产者-多消费者 */ @PostMapping("/oneToMany") @ResponseBody public void oneToMany() { for(int i=0;i<10;i++){ helloSender.sendMsg("这是第二个生产者发送的消息:==="+i+"====个"); } } /** * 多生产者-多消费者 */ @PostMapping("/manyToMany") @ResponseBody public void manyToMany() { for(int i=0;i<10;i++){ helloSender.sendMsg("hellomsg:"+i+" "); helloSenderTwo.sendMsg("hellomsg:"+i+" "); } } }
// postman测试 http://localhost:8088/manyToMany
//测试结果 Sender2 : hellomsg:0 Fri Nov 09 15:49:42 CST 2018 生产者2_Sender2 : hellomsg:0 Fri Nov 09 15:49:42 CST 2018 Sender2 : hellomsg:1 Fri Nov 09 15:49:42 CST 2018 生产者2_Sender2 : hellomsg:1 Fri Nov 09 15:49:42 CST 2018 Sender2 : hellomsg:2 Fri Nov 09 15:49:42 CST 2018 生产者2_Sender2 : hellomsg:2 Fri Nov 09 15:49:42 CST 2018 Sender2 : hellomsg:3 Fri Nov 09 15:49:42 CST 2018 生产者2_Sender2 : hellomsg:3 Fri Nov 09 15:49:42 CST 2018 Sender2 : hellomsg:4 Fri Nov 09 15:49:42 CST 2018 生产者2_Sender2 : hellomsg:4 Fri Nov 09 15:49:42 CST 2018 Sender2 : hellomsg:5 Fri Nov 09 15:49:42 CST 2018 生产者2_Sender2 : hellomsg:5 Fri Nov 09 15:49:42 CST 2018 Sender2 : hellomsg:6 Fri Nov 09 15:49:42 CST 2018 生产者2_Sender2 : hellomsg:6 Fri Nov 09 15:49:42 CST 2018 Sender2 : hellomsg:7 Fri Nov 09 15:49:42 CST 2018 生产者2_Sender2 : hellomsg:7 Fri Nov 09 15:49:42 CST 2018 Sender2 : hellomsg:8 Fri Nov 09 15:49:42 CST 2018 生产者2_Sender2 : hellomsg:8 Fri Nov 09 15:49:42 CST 2018 Sender2 : hellomsg:9 Fri Nov 09 15:49:42 CST 2018 生产者2_Sender2 : hellomsg:9 Fri Nov 09 15:49:42 CST 2018 15:49:42.455 [http-nio-8088-exec-2] INFO c.n.blog.handler.LogAspectHandler - ======执行方法后,执行该方法====== 15:49:42.455 [http-nio-8088-exec-2] INFO c.n.blog.handler.LogAspectHandler - Result:null Receiver : hellomsg:0 Fri Nov 09 15:49:42 CST 2018 Receiver2 : hellomsg:0 Fri Nov 09 15:49:42 CST 2018 Receiver2 : hellomsg:1 Fri Nov 09 15:49:42 CST 2018 Receiver : hellomsg:1 Fri Nov 09 15:49:42 CST 2018 Receiver2 : hellomsg:2 Fri Nov 09 15:49:42 CST 2018 Receiver : hellomsg:2 Fri Nov 09 15:49:42 CST 2018 Receiver2 : hellomsg:3 Fri Nov 09 15:49:42 CST 2018 Receiver : hellomsg:3 Fri Nov 09 15:49:42 CST 2018 Receiver2 : hellomsg:4 Fri Nov 09 15:49:42 CST 2018 Receiver : hellomsg:4 Fri Nov 09 15:49:42 CST 2018 Receiver2 : hellomsg:5 Fri Nov 09 15:49:42 CST 2018 Receiver : hellomsg:5 Fri Nov 09 15:49:42 CST 2018 Receiver2 : hellomsg:6 Fri Nov 09 15:49:42 CST 2018 Receiver : hellomsg:6 Fri Nov 09 15:49:42 CST 2018 Receiver2 : hellomsg:7 Fri Nov 09 15:49:42 CST 2018 Receiver : hellomsg:7 Fri Nov 09 15:49:42 CST 2018 Receiver2 : hellomsg:8 Fri Nov 09 15:49:42 CST 2018 Receiver : hellomsg:8 Fri Nov 09 15:49:42 CST 2018 Receiver : hellomsg:9 Fri Nov 09 15:49:42 CST 2018 Receiver2 : hellomsg:9 Fri Nov 09 15:49:42 CST 2018
大部分的状况是数据是用对象封装的,因此咱们来测试一下实体类
springboot完美的支持对象的发送和接收,不须要格外的配置。
实体类(必须实现序列化接口):
public class RabbitTest implements Serializable { private String name; private String pass; public String getName() { return name; } public void setName(String name) { this.name = name; } public String getPass() { return pass; } public void setPass(String pass) { this.pass = pass; } }
须要更改一下他的名字
this.rabbitTemplate.convertAndSend("entityQueue", rabbitTest);
原来的生产者变成这样;
import cn.nxcoder.blog.entity.RabbitTest; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; @Component public class HelloSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = "这是发送的信息 "+"---------------------" + new Date(); System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("hello", context); } public void sendMsg(String msg) { String sendMsg = msg + new Date(); System.out.println("Sender2 : " + sendMsg); this.rabbitTemplate.convertAndSend("hello", sendMsg); } public void sendEntity() { RabbitTest rabbitTest =new RabbitTest(); rabbitTest.setName("琬琬"); rabbitTest.setPass("123456987"); this.rabbitTemplate.convertAndSend("entityQueue", rabbitTest); } }
必须新建一个消费者,由于一个消费者只能有一个名字,刚才咱们新家的消费者名字都是hello
如今咱们给他定义一个新的名字entityQueue
import cn.nxcoder.blog.entity.RabbitTest; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "entityQueue") public class EntityReceiver { @RabbitHandler public void process(RabbitTest rabbitTest) { System.out.println("rabbitTest receive : " + rabbitTest.getName()+"/"+rabbitTest.getPass()); } }
由于你一旦新定义一个名字,就必须往config文件中添加这个名字
如今咱们的配置类多了一个entityQueue
import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { @Bean public Queue Queue() { return new Queue("hello"); } @Bean public Queue entityQueue() { return new Queue("entityQueue"); } }
/** * 实体类传输测试 */ @PostMapping("/entityTest") @ResponseBody public void userTest() { helloSender.sendEntity(); }
//测试结果 rabbitTest receive : 琬琬/123456987
topic 是RabbitMQ中最灵活的一种方式,能够根据binding_key自由的绑定不一样的队列
首先对topic规则配置,这里使用两个队列来测试
(也就是在Application类中建立和绑定的topic.message1和topic.message2两个队列)
其中topic.message的bindting_key为
“topic.message1”,topic.message2的binding_key为“topic.#”;
//===============如下是验证topic Exchange的队列========== @Bean public Queue queueMessage() { return new Queue("topic.message1"); } @Bean public Queue queueMessages() { return new Queue("topic.message2"); } @Bean TopicExchange exchange() { return new TopicExchange("exchange"); } /** * 将队列topic.messages与exchange绑定,binding_key为topic.#,模糊匹配 * @param queueMessages * @param exchange * @return */ @Bean Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"); }
@Component @RabbitListener(queues = "topic.message1") public class topicMessageReceiver { @RabbitHandler public void process(String msg) { System.out.println("topic.messageReceiver1 : " +msg); } }
@Component @RabbitListener(queues = "topic.message2") public class topicMessageReceiverTwo { @RabbitHandler public void process(String msg) { System.out.println("topic.messageReceiver2 : " +msg); } }
/** * topic exchange类型rabbitmq测试 */ @PostMapping("/topicTest") @ResponseBody public void topicTest() { helloSender.sendTopic(); }
测试: http://localhost:8088/topicTest //结果 sender1 : I am topic.mesaage msg====== sender2 : I am topic.mesaages msg######## 16:50:10.455 [http-nio-8088-exec-2] INFO c.n.blog.handler.LogAspectHandler - ======执行方法后,执行该方法====== 16:50:10.455 [http-nio-8088-exec-2] INFO c.n.blog.handler.LogAspectHandler - Result:null topic.messageReceiver2 : I am topic.mesaage msg====== topic.messageReceiver2 : I am topic.mesaages msg########
Fanout 就是咱们熟悉的广播模式或者订阅模式,给Fanout转发器发送消息,绑定了这个转发器的全部队列都收到这个消息。
这里使用三个队列来测试(也就是在config类中建立和绑定的fanout.A、fanout.B、fanout.C)这三个队列都和config中建立的fanoutExchange转发器绑定。
//===============如下是验证Fanout Exchange的队列========== @Bean public Queue AMessage() { return new Queue("fanout.A"); } @Bean public Queue BMessage() { return new Queue("fanout.B"); } @Bean public Queue CMessage() { return new Queue("fanout.C"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } @Bean Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) { return BindingBuilder.bind(AMessage).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(BMessage).to(fanoutExchange); } @Bean Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(CMessage).to(fanoutExchange); } //===============以上是验证Fanout Exchange的队列==========
public void sendFanout() { String msgString="fanoutSender :hello i am hzb"; System.out.println(msgString); this.rabbitTemplate.convertAndSend("fanoutExchange","abcd.ee", msgString); }
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "fanout.C") public class FanoutReceiverC { @RabbitHandler public void process(String msg) { System.out.println("FanoutReceiverC : " + msg); } }
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "fanout.B") public class FanoutReceiverB { @RabbitHandler public void process(String msg) { System.out.println("FanoutReceiverB : " + msg); } }
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "fanout.A") public class FanoutReceiverA { @RabbitHandler public void process(String msg) { System.out.println("FanoutReceiverA : " + msg); } }
/** * fanout exchange类型rabbitmq测试 */ @PostMapping("/fanoutTest") @ResponseBody public void fanoutTest() { helloSender.sendFanout(); }
http://localhost:8088/fanoutTest
//结果 fanoutSender :hello i am hzb 17:35:14.911 [http-nio-8088-exec-1] INFO c.n.blog.handler.LogAspectHandler - ======执行方法后,执行该方法====== 17:35:14.911 [http-nio-8088-exec-1] INFO c.n.blog.handler.LogAspectHandler - Result:null FanoutReceiverB : fanoutSender :hello i am hzb FanoutReceiverA : fanoutSender :hello i am hzb FanoutReceiverC : fanoutSender :hello i am hzb
由以上结果可知:就算fanoutSender发送消息的时候,指定了routing_key为"abcd.ee",可是全部接收者都接受到了消息
增长回调处理,这里再也不使用application.properties默认配置的方式,会在程序中显示的使用文件中的配置信息。该示例中没有新建队列和exchange,用的是第5节中的topic.messages队列和exchange转发器。消费者也是第5节中的topicMessagesReceiver
添加; spring.rabbitmq.publisher-confirms=true spring.rabbitmq.virtual-host=/
//如今变为: spring.application.name=Spring-boot-rabbitmq spring.rabbitmq.host=101.200.55.12 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=你的密码 spring.rabbitmq.publisher-confirms=true spring.rabbitmq.virtual-host=/
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Scope; public class RabbitConfig2 { @Value("${spring.rabbitmq.host}") private String addresses; @Value("${spring.rabbitmq.port}") private String port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.virtual-host}") private String virtualHost; @Value("${spring.rabbitmq.publisher-confirms}") private boolean publisherConfirms; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses(addresses+":"+port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); /** 若是要进行消息回调,则这里必需要设置为true */ connectionFactory.setPublisherConfirms(publisherConfirms); return connectionFactory; } @Bean /** 由于要设置回调类,因此应是prototype类型,若是是singleton类型,则回调类为最后一次设置 */ @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public RabbitTemplate rabbitTemplatenew() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); return template; } }
import java.util.Date; import java.util.UUID; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class CallBackSender implements RabbitTemplate.ConfirmCallback{ @Autowired private RabbitTemplate rabbitTemplatenew; public void send() { rabbitTemplatenew.setConfirmCallback(this); String msg="callbackSender : i am callback sender"; System.out.println(msg ); CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); System.out.println("callbackSender UUID: " + correlationData.getId()); this.rabbitTemplatenew.convertAndSend("exchange", "topic.messages", msg, correlationData); } public void confirm(CorrelationData correlationData, boolean ack, String cause) { // TODO Auto-generated method stub System.out.println("callbakck confirm: " + correlationData.getId()); } }
@Component @RabbitListener(queues = "topic.message1") public class topicMessageReceiver { @RabbitHandler public void process(String msg) { System.out.println("topic.messageReceiver1 : " +msg); } }
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "topic.message2") public class topicMessageReceiverTwo { @RabbitHandler public void process(String msg) { System.out.println("topic.messageReceiver2 : " +msg); } }
@PostMapping("/callback") @ResponseBody public void callbak() { callBackSender.send(); }
http://localhost:8088/callback
//测试结果 callbackSender : i am callback sender callbackSender UUID: 48be7d7e-69f8-4d9c-b264-191402dec3de callbakck confirm: 48be7d7e-69f8-4d9c-b264-191402dec3de topic.messageReceiver2 : callbackSender : i am callback sender
从上面能够看出callbackSender发出的UUID,收到了回应,又传回来了。
到此,rabbitMQ先分析到这里,接下来咱们会用它作些高级的功能
有问题的能够联系 neuq_hcg@163.com