RabbitMQ-从基础到实战(2)— 防止消息丢失
html
RabbitMQ-从基础到实战(3)— 消息的交换(上)java
RabbitMQ-从基础到实战(4)— 消息的交换(中)windows
RabbitMQ-从基础到实战(5)— 消息的交换(下)api
RabbitMQ-从基础到实战(6)— 与Spring集成数组
本篇博文介绍了在windows平台下安装RabbitMQ Server端,并用JAVA代码实现收发消息maven
Windows平台安装完成后如图ide
RabbitMQ提供一个控制台,用于管理和监控RabbitMQ,默认是不启动的,须要运行如下命令进行启动post
rabbitmq-plugins enable rabbitmq_managementthis
目前能够先不用理会此界面,后面使用到时会详细介绍,也能够到这里查看官方文档。spa
Spring对RabbitMQ已经进行了封装,正常使用中,会使用Spring集成,第一个项目中,咱们先不考虑那么多
在IDE中新建一个Maven项目,并在pom.xml中贴入以下依赖,RabbitMQ的最新版本依赖能够在这里找到
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.1.0</version> </dependency>
等待Maven下载完成后,就能够在Maven Dependencies中看到RabbitMQ的JAR
在这里,咱们发现,RabbitMQ的日志依赖了slf4j-api这个包,slf4j-api并非一个日志实现,这样子是打不出日志的,因此,咱们给pom加上一个日志实现,这里用了logback
<dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.1</version> </dependency>
以后maven依赖以下,能够放心写代码了
新建一个MessageSender类,代码以下
1 import java.io.IOException; 2 import java.util.concurrent.TimeoutException; 3 4 import org.slf4j.Logger; 5 import org.slf4j.LoggerFactory; 6 7 import com.rabbitmq.client.Channel; 8 import com.rabbitmq.client.Connection; 9 import com.rabbitmq.client.ConnectionFactory; 10 11 public class MessageSender { 12 13 private Logger logger = LoggerFactory.getLogger(MessageSender.class); 14 15 //声明一个队列名字 16 private final static String QUEUE_NAME = "hello"; 17 18 public boolean sendMessage(String message){ 19 //new一个RabbitMQ的链接工厂 20 ConnectionFactory factory = new ConnectionFactory(); 21 //设置须要链接的RabbitMQ地址,这里指向本机 22 factory.setHost("127.0.0.1"); 23 Connection connection = null; 24 Channel channel = null; 25 try { 26 //尝试获取一个链接 27 connection = factory.newConnection(); 28 //尝试建立一个channel 29 channel = connection.createChannel(); 30 //这里的参数在后面详解 31 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 32 //注意这里调用了getBytes(),发送的实际上是byte数组,接收方收到消息后,须要从新组装成String 33 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 34 logger.info("Sent '" + message + "'"); 35 //关闭channel和链接 36 channel.close(); 37 connection.close(); 38 } catch (IOException | TimeoutException e) { 39 //失败后记录日志,返回false,表明发送失败 40 logger.error("send message faild!",e); 41 return false; 42 } 43 return true; 44 } 45 }
而后在App类的main方法中调用sendMessage
1 public class App { 2 public static void main( String[] args ){ 3 MessageSender sender = new MessageSender(); 4 sender.sendMessage("hello RabbitMQ!"); 5 } 6 }
打印日志以下
打开RabbitMQ的控制台,能够看到消息已经进到了RabbitMQ中
点进去,用控制台自带的getMessage功能,能够看到消息已经成功由RabbitMQ管理了
至此,MessageSender已经写好了,在该类的31和33行,咱们分别调用了队列声明和消息发送
channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
queueDeclare,有不少参数,咱们能够看一下他的源码,注释上有详细的解释,我简单翻译了一下
1 /** 2 * Declare a queue 声明一个队列 3 * @see com.rabbitmq.client.AMQP.Queue.Declare 4 * @see com.rabbitmq.client.AMQP.Queue.DeclareOk 5 * @param queue the name of the queue队列的名字 6 * @param durable true if we are declaring a durable queue (the queue will survive a server restart)是否持久化,为true则在rabbitMQ重启后生存 7 * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)是不是排他性队列(别人看不到),只对当前链接有效,当前链接断开后,队列删除(设置了持久化也删除) 8 * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)自动删除,在最后一个链接断开后删除队列 9 * @param arguments other properties (construction arguments) for the queue 其余参数 10 * @return a declaration-confirm method to indicate the queue was successfully declared 11 * @throws java.io.IOException if an error is encountered 12 */ 13 Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, 14 Map<String, Object> arguments) throws IOException;
前面4个都很是好理解,最后一个“其余参数”,究竟是什么其余参数,这个东西真的很难找,用到再解释吧,官方文档以下
basicPublish的翻译以下
1 /** 2 * Publish a message.发送一条消息 3 * 4 * Publishing to a non-existent exchange will result in a channel-level 5 * protocol exception, which closes the channel. 6 * 7 * Invocations of <code>Channel#basicPublish</code> will eventually block if a 8 * <a href="http://www.rabbitmq.com/alarms.html">resource-driven alarm</a> is in effect. 9 * 10 * @see com.rabbitmq.client.AMQP.Basic.Publish 11 * @see <a href="http://www.rabbitmq.com/alarms.html">Resource-driven alarms</a> 12 * @param exchange the exchange to publish the message to 交换模式,会在后面讲,官方文档在这里 13 * @param routingKey the routing key 控制消息发送到哪一个队列 14 * @param props other properties for the message - routing headers etc 其余参数 15 * @param body the message body 消息,byte数组 16 * @throws java.io.IOException if an error is encountered 17 */ 18 void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
这里又有个其余参数,它的类型是这样的,设置消息的一些详细属性
为了和Sender区分开,新建一个Maven项目MessageConsumer
1 package com.liyang.ticktock.rabbitmq; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 import org.slf4j.Logger; 7 import org.slf4j.LoggerFactory; 8 9 import com.rabbitmq.client.AMQP; 10 import com.rabbitmq.client.Channel; 11 import com.rabbitmq.client.Connection; 12 import com.rabbitmq.client.ConnectionFactory; 13 import com.rabbitmq.client.Consumer; 14 import com.rabbitmq.client.DefaultConsumer; 15 import com.rabbitmq.client.Envelope; 16 17 public class MessageConsumer { 18 19 private Logger logger = LoggerFactory.getLogger(MessageConsumer.class); 20 21 public boolean consume(String queueName){ 22 //链接RabbitMQ 23 ConnectionFactory factory = new ConnectionFactory(); 24 factory.setHost("127.0.0.1"); 25 Connection connection = null; 26 Channel channel = null; 27 try { 28 connection = factory.newConnection(); 29 channel = connection.createChannel(); 30 //这里声明queue是为了取消息的时候,queue确定会存在 31 //注意,queueDeclare是幂等的,也就是说,消费者和生产者,不论谁先声明,都只会有一个queue 32 channel.queueDeclare(queueName, false, false, false, null); 33 34 //这里重写了DefaultConsumer的handleDelivery方法,由于发送的时候对消息进行了getByte(),在这里要从新组装成String 35 Consumer consumer = new DefaultConsumer(channel){ 36 @Override 37 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) 38 throws IOException { 39 String message = new String(body, "UTF-8"); 40 logger.info("Received '" + message + "'"); 41 } 42 }; 43 //上面是声明消费者,这里用声明的消费者消费掉队列中的消息 44 channel.basicConsume(queueName, true, consumer); 45 46 //这里不能关闭链接,调用了消费方法后,消费者会一直链接着rabbitMQ等待消费 47 48 } catch (IOException | TimeoutException e) { 49 //失败后记录日志,返回false,表明消费失败 50 logger.error("send message faild!",e); 51 return false; 52 } 53 54 55 return true; 56 } 57 }
而后在App的main方法中调用Cunsumer进行消费
1 public class App 2 { 3 //这个队列名字要和生产者中的名字同样,不然找不到队列 4 private final static String QUEUE_NAME = "hello"; 5 6 public static void main( String[] args ) 7 { 8 MessageConsumer consumer = new MessageConsumer(); 9 consumer.consume(QUEUE_NAME); 10 } 11 }
结果以下,消费者一直在等待消息,每次有消息进来,就会马上消费掉
改造一下Consumer
在App中new多个消费者
改造Sender,使它不停的往RabbitMQ中发送消息
启动Sender
启动Consumer,发现消息很平均的发给四个客户端,一人一个,谁也不插队
若是咱们把速度加快呢?把Sender的休息时间去掉,发现消费开始变得没有规律了,其实呢,它仍是有规律的,这个是RabbitMQ的特性,称做“Round-robin dispatching”,消息会平均的发送给每个消费者,能够看第一第二行,消息分别是56981和56985,相应的8二、8二、84都被分给了其余线程,只是在当前线程的时间片内,能够处理这么多任务,因此就一次打印出来了
这一章介绍了从安装到用JAVA语言编写生产者与消费者,在这里只是简单的消费消息并打印日志,若是一个消息须要处理的时间很长,而处理的过程当中,这个消费者挂掉了,那消息会不会丢失呢?答案是确定的,并且已经分配给这个消费者,但还没来得及处理的消息也会一并丢失掉,这个问题,RabbitMQ早就考虑到了,而且提供了解决方案,下一篇博文将进行详细介绍