目录java
RabbitMQ 是一个开源的消息代理和队列服务器,用来经过普通协议在彻底不一样的应用之间共享数据,RabbitMQ是使用 Erlang语言来编写的,而且RabbitMQ是基于AMQP协议的web
RabbitMQ的优势:spring
RabbitMQ官网springboot
RabbitMQ的总体架构:服务器
RabbitMQ的消息流转:网络
AMQP全称: Advanced Message Queuing Protocol架构
AMQP翻译: 高级消息队列协议spring-boot
AMQP定义: 是具备现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计性能
AMQP核心概念:ui
后台启动: ./rabbitmq start &
关闭: ./rabbitmqctl stop
节点状态: ./rabbitmqctl status
管控台: http://ip:15672
RabbitMQ生产消费快速入门:
环境: springboot+jdk1.7+rabbitmq3.6.5 (Maven依赖配置)
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.9.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency> </dependencies>
public class Procuder { public static void main(String[] args) throws Exception { //1.建立一个ConnectionFactory 并进行配置 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); //2.经过链接工厂建立链接 Connection connection = connectionFactory.newConnection(); //3.经过Connection 建立一个 Channel Channel channel = connection.createChannel(); /** * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) * exchange:指定交换机 不指定 则默认 (AMQP default交换机) 经过routingkey进行匹配 * props 消息属性 * body 消息体 */ //4.经过Channel发送数据 for(int i = 0; i < 5; i++){ System.out.println("生产消息:" + i); String msg = "Hello RabbitMQ" + i; channel.basicPublish("", "test", null, msg.getBytes()); } //5.记得关闭相关的链接 channel.close(); connection.close(); } }
public class Consumer { public static void main(String[] args) throws Exception{ //1.建立一个ConnectionFactory 并进行配置 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); //2.经过链接工厂建立链接 Connection connection = connectionFactory.newConnection(); //3.经过Connection 建立一个 Channel Channel channel = connection.createChannel(); //4. 声明建立一个队列 String queueName = "test"; /** * durable 是否持久化 * exclusive 独占的 至关于加了一把锁 */ channel.queueDeclare(queueName,true,false,false,null); //5.建立消费者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); //6.设置channel /** * ACK: 当一条消息从生产端发到消费端,消费端接收到消息后会立刻回送一个ACK信息给broker,告诉它这条消息收到了 * autoack: * true 自动签收 当消费者一收到消息就表示消费者收到了消息,消费者收到了消息就会当即从队列中删除。 * false 手动签收 当消费者收到消息在合适的时候来显示的进行确认,说我已经接收到了该消息了,RabbitMQ能够从队列中删除该消息了 * */ channel.basicConsume(queueName, true, queueingConsumer); //7.获取消息 while(true){ Delivery delivery = queueingConsumer.nextDelivery(); String msg = new String(delivery.getBody()); System.err.println("消费端:" + msg); //Envelope envelope = delivery.getEnvelope(); } } }
Exchange: 接收消息,并根据路由键转发消息所绑定的队列
交换机属性:
全部发送到Direct Exchange的消息被转发到RouteKey指定的Queue
注意:Direct模式可使用RabbitMQ自带的Exchange: default Exchange,因此不须要将Exchange进行任何绑定(binding)操做,消息传递时,RoutingKey必须彻底匹配才会被队列接收,不然该消息会被抛弃
public class ProducerDirectExchange { public static void main(String[] args) throws Exception { //1.建立ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2.建立Connection Connection connection = connectionFactory.newConnection(); //3.建立Channel Channel channel = connection.createChannel(); //4.声明 String exchangeName = "test_direct_exchange"; String routingKey = "test.direct"; //5.发送 String msg = "Hello World RabbitMQ4 Direct Exchange Message"; channel.basicPublish(exchangeName, routingKey, null, msg.getBytes()); } }
public class ConsumerDirectExchange { public static void main(String[] args) throws Exception{ ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); connectionFactory.setAutomaticRecoveryEnabled(true); connectionFactory.setNetworkRecoveryInterval(3000); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //声明 String exchangeName = "test_direct_exchange"; String exchangeType = "direct"; String queueName = "test_direct_queue"; String routingKey = "test.direct"; //表示声明了一个交换机 channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null); //表示声明了一个队列 channel.queueDeclare(queueName,false,false,false,null); //创建一个绑定关系 channel.queueBind(queueName, exchangeName, routingKey); //durable 是否持久化消息 QueueingConsumer consumer = new QueueingConsumer(channel); //参数:队列名称,是否自动ACK,Consumer channel.basicConsume(queueName, true, consumer); //循环获取消息 while(true){ //获取消息,若是没有消息,这一步将会一直阻塞 Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息:" + msg); } } }
全部发送到Topic Exchange的消息被转发到全部关心RouteKey中指定Topic的Queue上
Exchange将RouteKey和某Topic进行模糊匹配,此时队列须要绑定一个Topic
注意:可使用通配符进行匹配
符号 # 匹配一个或多个词
符号 * 匹配很少很多一个词
例如: "log.#" 可以匹配到 “log.info.oa”
"log.*" 只会匹配到 "log.err"
public class ProducerTopicExchange { public static void main(String[] args) throws Exception { //1.建立ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); //2.建立Connection Connection connection = connectionFactory.newConnection(); //3.建立Channel Channel channel = connection.createChannel(); //4.声明 String exchangeName = "test_topic_exchange"; String routingKey1 = "user.save"; String routingKey2 = "user.update"; String routingKey3 = "user.delete.abc"; //5.发送 String msg = "Hello World RabbitMQ4 Direct Exchange Message"; channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes()); channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes()); channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes()); } }
public class ConsumerTopicExchange { public static void main(String[] args) throws Exception{ ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); connectionFactory.setAutomaticRecoveryEnabled(true); connectionFactory.setNetworkRecoveryInterval(3000); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //声明 String exchangeName = "test_topic_exchange"; String exchangeType = "topic"; String queueName = "test_topic_queue"; String routingKey = "user.#"; //表示声明了一个交换机 channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null); //表示声明了一个队列 channel.queueDeclare(queueName,false,false,false,null); //创建一个绑定关系 channel.queueBind(queueName, exchangeName, routingKey); //durable 是否持久化消息 QueueingConsumer consumer = new QueueingConsumer(channel); //参数:队列名称,是否自动ACK,Consumer channel.basicConsume(queueName, true, consumer); //循环获取消息 while(true){ //获取消息,若是没有消息,这一步将会一直阻塞 Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息:" + msg); } } }
不处理路由键,只须要简单的将队列绑定到交换机上
发送到交换机的消息都会被转发到与该交换机绑定的全部队列上
因此Fanout交换机转发消息是最快的
public class ProducerFanoutExchange { public static void main(String[] args) throws Exception { //1.建立ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); //2.建立Connection Connection connection = connectionFactory.newConnection(); //3.建立Channel Channel channel = connection.createChannel(); //4.声明 String exchangeName = "test_fanout_exchange"; //5.发送 for(int i = 0; i < 10 ; i++){ String msg = "Hello World RabbitMQ4 Direct Exchange Message"; channel.basicPublish(exchangeName, "", null, msg.getBytes()); } channel.close(); connection.close(); } }
public class ConsumerFanoutExchange { public static void main(String[] args) throws Exception{ ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); connectionFactory.setAutomaticRecoveryEnabled(true); connectionFactory.setNetworkRecoveryInterval(3000); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //声明 String exchangeName = "test_fanout_exchange"; String exchangeType = "fanout"; String queueName = "test_topic_queue"; //无需指定路由key String routingKey = ""; //表示声明了一个交换机 channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null); //表示声明了一个队列 channel.queueDeclare(queueName,false,false,false,null); //创建一个绑定关系 channel.queueBind(queueName, exchangeName, routingKey); //durable 是否持久化消息 QueueingConsumer consumer = new QueueingConsumer(channel); //参数:队列名称,是否自动ACK,Consumer channel.basicConsume(queueName, true, consumer); //循环获取消息 while(true){ //获取消息,若是没有消息,这一步将会一直阻塞 Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息:" + msg); } } }
服务器与应用程序之间传递的数据,本质上就是一段数据,由Properties和Body组成
经常使用属性:delivery mode、headers (自定义属性)
其余属性:content_type、content_encoding、priority、expiration
消息的properties属性用法示例:
public class Procuder { public static void main(String[] args) throws Exception { //1.建立一个ConnectionFactory 并进行配置 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); //2.经过链接工厂建立链接 Connection connection = connectionFactory.newConnection(); //3.经过Connection 建立一个 Channel Channel channel = connection.createChannel(); Map<String,Object> headers = new HashMap<>(); headers.put("my1", "111"); headers.put("my2", "222"); //10秒不消费 消息过时移除消息队列 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .deliveryMode(2) .contentEncoding("utf-8") .expiration("10000") .headers(headers) .build(); //4.经过Channel发送数据 for(int i = 0; i < 5; i++){ System.out.println("生产消息:" + i); String msg = "Hello RabbitMQ" + i; channel.basicPublish("", "test", properties, msg.getBytes()); } //5.记得关闭相关的链接 channel.close(); connection.close(); } }
public class Consumer { public static void main(String[] args) throws Exception{ //1.建立一个ConnectionFactory 并进行配置 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); //2.经过链接工厂建立链接 Connection connection = connectionFactory.newConnection(); //3.经过Connection 建立一个 Channel Channel channel = connection.createChannel(); //4. 声明建立一个队列 String queueName = "test"; channel.queueDeclare(queueName,true,false,false,null); //5.建立消费者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); //6.设置channel channel.basicConsume(queueName, true, queueingConsumer); //7.获取消息 while(true){ Delivery delivery = queueingConsumer.nextDelivery(); String msg = new String(delivery.getBody()); System.err.println("消费端:" + msg); Map<String, Object> headers = delivery.getProperties().getHeaders(); System.err.println("headers value:" + headers.get("my1")); } } }