RabbitMq是一个消息队列,是以一种队列的结构来存放message,遵循这FIFO的规则。主要能够用来在不一样的进程和线程之间进行通讯。服务器
为何会产生消息队列?markdown
不一样进程(process)之间传递消息时,两个进程之间耦合程度太高,改动一个进程,引起必须修改另外一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),全部两进程之间传递的消息,都必须经过消息队列来传递,单独修改某一个进程,不会影响另外一个;app
不一样进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,而且,某一个进程接受的消息太多,一会儿没法处理完,而且也有前后顺序,必须对收到的消息进行排队,所以诞生了事实上的消息队列;maven
RabbitMq官方网站:https://www.rabbitmq.com/#supportide
RabbitMQ有多种消息队列模式,主要使用五种消息队列模式,结构如图所示:学习
其中有几个概念须要先介绍一下,Mq中至关与一个消息的生产消费过程,因此主要有消息的生产者,消息队列,消息的消费者三种组成。测试
message消息类型能够是markdown等。 网站
在简单的队列里,p表明producer生产者,红色表明消息队列,c为consumer消费者。在最简单的队列模式中,生产者发送消息到消息队列中,消费者只要队列中有消息就取出消费。spa
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency>
RabbitMq的一些配置链接,封装为一个Util类。.net
1 public class ConnectionUtil { 2 3 public static Connection getConnection() throws Exception { 4 //定义链接工厂 5 ConnectionFactory factory = new ConnectionFactory(); 6 //设置服务地址 7 factory.setHost("localhost"); 8 //端口 9 factory.setPort(5672); 10 //设置帐号信息,用户名、密码、vhost 11 factory.setVirtualHost("testhost"); 12 factory.setUsername("admin"); 13 factory.setPassword("admin"); 14 // 经过工程获取链接 15 Connection connection = factory.newConnection(); 16 return connection; 17 } 18 }
生产者发送消息到指定到队列中,首先获取链接到mq,而后建立通道。生产发送消息发送完以后就须要断开与队列的链接。
其中若是把队列的持久化设置为true,则队列中的消息为持久化消息,当消费者没有消费时会一直堆积在队列中。
1 public class Send { 2 3 //队列名 4 private final static String QUEUE_NAME = "q_test_01"; 5 6 public static void main(String[] argv) throws Exception { 7 // 获取到链接以及mq通道 8 Connection connection = ConnectionUtil.getConnection(); 9 // 从链接中建立通道 10 Channel channel = connection.createChannel(); 11 12 // 声明(建立)队列,第二个参数为是否持久化队列 13 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 14 15 // 消息内容 16 String message = "Hello World!"; 17 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 18 System.out.println(" [x] Sent '" + message + "'"); 19 //关闭通道和链接 20 channel.close(); 21 connection.close(); 22 } 23 }
将获取消息实现接口,使其在程序运行时被调起并执行。在接收消息的方法中,一样首先和rabbitMq进行创建链接,建立通道而且声明队列。程序在运行到channel.basicConsume时会被阻塞,只有当有消息时,才会执行上一步中的取消息后的相关操做。
1 public class ReceivingFromRabbitmq implements ApplicationListener<ApplicationReadyEvent> { 2 @Override 3 public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) { 4 try { 5 // 获取到链接以及mq通道 6 Connection connection = connectionUtil.getConnection(); 7 // 从链接中建立通道 8 Channel channel = connection.createChannel(); 9 // 声明队列 10 channel.queueDeclare(queueName, queueDurable, false, false, null); 11 12 DeliverCallback deliverCallback = (consumerTag, delivery) -> { 13 String message = new String(delivery.getBody(), "UTF-8"); 14 System.out.println(" [x] Received '" + message + "'"); 15 16 //执行获取消息后的相关操做 17 }; 18 19 channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { 20 }); 21 } catch (Exception e) { 22 e.printStackTrace(); 23 } 24 } 25 }
在work模式中包括一个生产者和两个消费者,然而生产者发送的消息只能被一个消费者所获取消费。
1 public class Recv { 2 3 private final static String QUEUE_NAME = "test_queue_work"; 4 5 public static void main(String[] argv) throws Exception { 6 7 // 获取到链接以及mq通道 8 Connection connection = ConnectionUtil.getConnection(); 9 Channel channel = connection.createChannel(); 10 11 // 声明队列 12 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 13 14 // 同一时刻服务器只会发一条消息给消费者 15 //channel.basicQos(1); 16 17 // 定义队列的消费者 18 QueueingConsumer consumer = new QueueingConsumer(channel); 19 // 监听队列,false表示手动返回完成状态,true表示自动 20 channel.basicConsume(QUEUE_NAME, true, consumer); 21 22 // 获取消息 23 while (true) { 24 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 25 String message = new String(delivery.getBody()); 26 System.out.println(" [y] Received '" + message + "'"); 27 //休眠 28 Thread.sleep(10); 29 // 返回确认状态,注释掉表示使用自动确认模式 30 //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 31 } 32 }
1 public class Recv2 { 2 3 private final static String QUEUE_NAME = "test_queue_work"; 4 5 public static void main(String[] argv) throws Exception { 6 7 // 获取到链接以及mq通道 8 Connection connection = ConnectionUtil.getConnection(); 9 Channel channel = connection.createChannel(); 10 11 // 声明队列 12 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 13 14 // 同一时刻服务器只会发一条消息给消费者 15 //channel.basicQos(1); 16 17 // 定义队列的消费者 18 QueueingConsumer consumer = new QueueingConsumer(channel); 19 // 监听队列,false表示手动返回完成状态,true表示自动 20 channel.basicConsume(QUEUE_NAME, true, consumer); 21 22 // 获取消息 23 while (true) { 24 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 25 String message = new String(delivery.getBody()); 26 System.out.println(" [x] Received '" + message + "'"); 27 // 休眠1秒 28 Thread.sleep(1000); 29 //下面这行注释掉表示使用自动确认模式 30 //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 31 } 32 } 33 }
1 public class Send { 2 3 private final static String QUEUE_NAME = "test_queue_work"; 4 5 public static void main(String[] argv) throws Exception { 6 // 获取到链接以及mq通道 7 Connection connection = ConnectionUtil.getConnection(); 8 Channel channel = connection.createChannel(); 9 10 // 声明队列 11 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 12 13 for (int i = 0; i < 100; i++) { 14 // 消息内容 15 String message = "" + i; 16 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 17 System.out.println(" [x] Sent '" + message + "'"); 18 19 Thread.sleep(i * 10); 20 } 21 22 channel.close(); 23 connection.close(); 24 } 25 }
生产者向队列中发送了一百条消息。
结果:
两个消费者接收不一样的消息,可是接收的数量是相同的。
虽然设置了每一个接收者的休眠时间不一样,可是他们接收的消息的数量倒是相同的,由于RabbitMq使用轮询方式进行信息分发,及默认将消息顺序的发给下一个消费者。
同时在这种模式中,消息分发给消费者后,只有当消费者完成消费并向RabbitMq返回确认消息,Mq才会对消息进行删除。若是在执行中消费者死亡,或没有发挥确认消息,则mq会将该消息即便分发给其余消费者。
当Rabbitmq退出或奔溃时,队列和消息会丢失,因此咱们须要将队列进行持久化声明:
boolean durable = true ; channel.queueDeclare(“hello”,durable,false,false,null);
打开上述代码的注释:
// 同一时刻服务器只会发一条消息给消费者 channel.basicQos(1); //开启这行 表示使用手动确认模式 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //同时改成手动模式 // 监听队列,false表示手动返回完成状态,true表示自动 channel.basicConsume(QUEUE_NAME, false, consumer);
测试结果,消费者1比消费者2获取的消息更多。
一次向多个消费者发送消息。
一、1个生产者,多个消费者
二、每个消费者都有本身的一个队列
三、生产者没有将消息直接发送到队列,而是发送到了交换机
四、每一个队列都要绑定到交换机
五、生产者发送的消息,通过交换机,到达队列,实现,一个消息被多个消费者获取的目的
注意:一个消费者队列能够有多个消费者实例,只有其中一个消费者实例会消费
向交换机X发送消息。可是若是交换机没绑定队列时,消息就会丢失。由于交换机没有存储消息的能力,消息只能存放在队列中。
1 public class Send { 2 3 private final static String EXCHANGE_NAME = "test_exchange_fanout"; 4 5 public static void main(String[] argv) throws Exception { 6 // 获取到链接以及mq通道 7 Connection connection = ConnectionUtil.getConnection(); 8 Channel channel = connection.createChannel(); 9 10 // 声明exchange 11 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); 12 13 // 消息内容 14 String message = "Hello World!"; 15 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); 16 System.out.println(" [x] Sent '" + message + "'"); 17 18 channel.close(); 19 connection.close(); 20 } 21 }
1 public class Recv { 2 3 private final static String QUEUE_NAME = "test_queue_work1"; 4 5 private final static String EXCHANGE_NAME = "test_exchange_fanout"; 6 7 public static void main(String[] argv) throws Exception { 8 9 // 获取到链接以及mq通道 10 Connection connection = ConnectionUtil.getConnection(); 11 Channel channel = connection.createChannel(); 12 13 // 声明队列 14 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 15 16 // 绑定队列到交换机 17 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); 18 19 // 同一时刻服务器只会发一条消息给消费者 20 channel.basicQos(1); 21 22 // 定义队列的消费者 23 QueueingConsumer consumer = new QueueingConsumer(channel); 24 // 监听队列,手动返回完成 25 channel.basicConsume(QUEUE_NAME, false, consumer); 26 27 // 获取消息 28 while (true) { 29 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 30 String message = new String(delivery.getBody()); 31 System.out.println(" [Recv] Received '" + message + "'"); 32 Thread.sleep(10); 33 34 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 35 } 36 } 37 }
1 public class Recv2 { 2 3 private final static String QUEUE_NAME = "test_queue_work2"; 4 5 private final static String EXCHANGE_NAME = "test_exchange_fanout"; 6 7 public static void main(String[] argv) throws Exception { 8 9 // 获取到链接以及mq通道 10 Connection connection = ConnectionUtil.getConnection(); 11 Channel channel = connection.createChannel(); 12 13 // 声明队列 14 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 15 16 // 绑定队列到交换机 17 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); 18 19 // 同一时刻服务器只会发一条消息给消费者 20 channel.basicQos(1); 21 22 // 定义队列的消费者 23 QueueingConsumer consumer = new QueueingConsumer(channel); 24 // 监听队列,手动返回完成 25 channel.basicConsume(QUEUE_NAME, false, consumer); 26 27 // 获取消息 28 while (true) { 29 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 30 String message = new String(delivery.getBody()); 31 System.out.println(" [Recv2] Received '" + message + "'"); 32 Thread.sleep(10); 33 34 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 35 } 36 } 37 }
测试结果:同一个消息被多个消费者获取。一个消费者队列能够有多个消费者实例,只有其中一个消费者实例会消费到消息。
——持续更新——
学习自大佬: