技术由蚂蚁课堂教学 www.itmayiedu.com 高端IT人才培训授予java
能够报名学习蚂蚁课堂高端学习,咱们等着你的到来。web
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通讯方法。应用程序经过读写出入队列的消息(针对应用程序的数据)来通讯,而无需专用链接来连接它们。MQ消息队列 是企业级互联网架构核心产品,具有低延迟,高并发,高可用,高可靠,支撑万亿级数据洪峰的分布式消息中间件。spring
RabbitMQ拥有两种模式,点对点和发布订阅,它和ActiveMQ不一样的地方就在于RabbitMQ在消费时能够返回一个消费的标记。我的理解在实际生产中使用RabbitMQ的便捷性略高于ActiveMQ方法,由于RabbitMQ在发送时有发送确认和消费者消费返回。能够很好的知道消费者是否已经正确消费。json
使用RabbitMQ最好在Linux系统上安装RabbitMQ服务,具体安装过程不作详细介绍,百度一大堆。架构
安装完成事后登陆管理页面地址http://192.168.10.10:15672/,使用帐号和密码登陆事后就能够看到管理页面。并发
在界面上能够点击Queue队列按钮,建立咱们两个本身的队列,这里建立hello.queue1和hello.queue2两个队列。app
下面咱们就能够建立SpringBoot工程来使用RabbitMQ消息队列了。dom
下面是个人项目结构图 分布式
首先在父工程项目中的pom文件引入jar包:ide
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.4.0.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.40</version> </dependency> </dependencies>
在父工程下面建立两个Maven Module子工程分别如图命名。
在生产中中建立一个RabbitMQ配置文件RabbitConfig.java
@Configuration public class RabbitConfig { // 声明队列 @Bean public Queue queue1() { return new Queue("hello.queue1", true); // true表示持久化该队列 } @Bean public Queue queue2() { return new Queue("hello.queue2", true); } // 声明交互器 @Bean TopicExchange topicExchange() { return new TopicExchange("topicExchange"); } // 绑定 @Bean public Binding binding1() { return BindingBuilder.bind(queue1()).to(topicExchange()).with("key.1"); } @Bean public Binding binding2() { return BindingBuilder.bind(queue2()).to(topicExchange()).with("key.#"); } }
而后在生产者中建立生产者代码:
@Component @EnableScheduling public class Sender implements RabbitTemplate.ConfirmCallback, ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { System.out.println("消息发送成功:" + correlationData); } else { System.out.println("消息发送失败:" + cause); } } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println(message.getMessageProperties().getCorrelationIdString() + " 发送失败"); } // 发送消息,不须要实现任何接口,供外部调用。 public void send(String msg) { CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); System.out.println("开始发送消息 : " + msg.toLowerCase()); String response = rabbitTemplate.convertSendAndReceive("topicExchange", "key.1", msg, correlationId).toString(); System.out.println("结束发送消息 : " + msg.toLowerCase()); System.out.println("消费者响应 : " + response + "---消息处理完成"); } // 每隔五秒就发送消息进行测试 @Scheduled(fixedDelay = 5000) public void sendsmg() { JSONObject obj = new JSONObject(); obj.put("time", System.currentTimeMillis()); obj.put("name", "LLL丶禾羊"); obj.put("address", "www.itmayiedu.com"); obj.put("orderId", "20135111111123"); send(obj.toJSONString()); } }
建立配置文件application.propreties
spring.rabbitmq.host=192.168.10.10 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin spring.rabbitmq.publisher-confirms=true spring.rabbitmq.publisher-returns=true spring.rabbitmq.template.mandatory=true
到这里生产者就能够直接启动了,建立一个App.java进行启动,默认配置的定时器5秒钟触发一次消息 能够看到控制台输出。
-----------------------------------------www.itmayiedu.com----高端IT培训------------------------------------------------------------
下面消费者更加的简单,因为咱们在生产者中绑定了两个队列,咱们在接收者中就须要两个监听器。
咱们在消费者代码中建立消费者代码Receive.java
@Component public class Receiver { @RabbitListener(queues = "hello.queue1") public String processMessage1(String msg) { System.out.println(Thread.currentThread().getName() + " 接收到来自hello.queue1队列的消息:" + msg); return msg.toUpperCase(); } @RabbitListener(queues = "hello.queue2") public void processMessage2(String msg) { System.out.println(Thread.currentThread().getName() + " 接收到来自hello.queue2队列的消息:" + msg); } }
配置文件application.propreties
server.port=8081 spring.rabbitmq.host=192.168.10.10 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin spring.rabbitmq.listener.concurrency=2 spring.rabbitmq.listener.max-concurrency=2
而后建立启动类直接启动就好了
最后分别启动生产者和消费者,能够直接在控制台看到输出
生产者:
消费者:
在被消费事后咱们能够很清楚的发如今生产者这边会打印出消费者返回的信息,代表了消费成功。
到这里RabbitMQ的示例就完了
总结:
须要知道MQ的模式
点对点 一对一 例子 私聊
发布订阅 一对多 例子 一我的和多我的聊天
步骤
1.须要配置 指定队列queue 是否持久化配置
2.生产者发送消息到队列中
3.消费者去队列中去消费消息,若是消费了会给生产者返回一个成功失败标识,此时队列中不存在消息