RabbitMQ-从基础到实战(1)— Hello RabbitMQhtml
RabbitMQ-从基础到实战(3)— 消息的交换(上)java
RabbitMQ-从基础到实战(4)— 消息的交换(中)apache
RabbitMQ-从基础到实战(5)— 消息的交换(下)app
RabbitMQ-从基础到实战(6)— 与Spring集成maven
RabbitMQ中,消息丢失能够简单的分为两种:客户端丢失和服务端丢失。针对这两种消息丢失,RabbitMQ都给出了相应的解决方案。post
如图,生产者P向队列中生产消息,C1和C2消费队列中的消息,默认状况下,RabbitMQ会平均的分发消费给C1C2(Round-robin dispatching),假设一个任务的执行时间很是长,在执行过程当中,客户端挂了(链接断开),那么,该客户端正在处理且未完成的消息,以及分配给它还没来得及执行的消息,都将丢失。由于默认状况下,RabbitMQ分发完消息后,就会从内存中把消息删除掉。ui
为了解决上述问题,RabbitMQ引入了消息确认机制,当消息处理完成后,给Server端发送一个确认消息,来告诉服务端能够删除该消息了,若是链接断开的时候,Server端没有收到消费者发出的确认信息,则会把消息转发给其余保持在线的消费者。spa
首先,咱们验证上述问题(客户端丢失消息)是否真的存在,对Consumer进行以下改造。3d
先生产两条消息code
启动消费者,在消费者接收到消息,还没处理完成的时候,强制关掉
这时,观察控制台,发现两条消息都没有了,1条是在执行中丢失的,还有1条,已经分配给这个Consumer,还没来得及处理,也丢失了
这证实了上述问题是真的存在的,若是发生在生产环境,将产生难以预料的后果
为了方便观察,咱们用CMD来运行Consumer,要经过maven打成可执行的JAR包,须要在pom.xml中增长以下配置
<build> <finalName>Consumer</finalName> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <appendAssemblyId>false</appendAssemblyId> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.liyang.ticktock.rabbitmq.App</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>assembly</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>
上述配置描述了最终打包名字、入口类路径、带上依赖包、使用1.8版本的JDK进行打包,配置完后,就能够经过maven的install方法,在target目录生成可执行的jar包,若是包大小很小,应检查配置,是否是没有带上依赖包
再次改造Consummer类
install成可执行jar包,经过cmd开启两个consumer
经过Sender发送一条消息,而后用Ctrl+C结束先收到消息的Consumer,发现另一个Consumer接收到了未处理完的消息
问题获得了解决,如今消费者在执行过程当中死掉也不会丢失消息了
看一下发送确认的方法
1 /** 2 * Acknowledge one or several received 3 * messages. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk} 4 * or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method 5 * containing the received message being acknowledged. 6 * @see com.rabbitmq.client.AMQP.Basic.Ack 7 * @param deliveryTag the tag from the received 这个是RabbitMQ用来区分消息的,文档在这 8 * @param multiple true to acknowledge all messages up to and 为true的话,确认全部消息,为false只确认当前消息 9 * including the supplied delivery tag; false to acknowledge just 10 * the supplied delivery tag. 11 * @throws java.io.IOException if an error is encountered 12 */ 13 void basicAck(long deliveryTag, boolean multiple) throws IOException;
在官方文档中,这样描述deliveryTag
简单来讲,就是RabbitMQ内部用来区分消息的一个标签,从envelope中获取就好了
RabbitMQ只有在收到消费者确认后,才会从内存中删除消息,若是消费者忘了确认(更多状况是由于代码问题没有执行到确认的代码),将会致使内存泄漏
验证一下
注释掉Consumer中的确认代码
运行Sender和Consumer,不停的生产消费消息,发现消费者在正常的消费消息
查看控制台,发现已经被吃掉了43KB的内存,因此,在试用过程当中,必定要保证消息确认在任何状况下均可以发出,不然即便消费者处理完成,RabbitMQ也不会把消息在内存中清除,在该消费者断开链接以后,还会把消息转发给其余消费者从新处理,将引起难以预计的问题
如今,消费者宕机已经没法影响到咱们的消息了,但若是RabbitMQ重启了,消息依然会丢失。所幸的是,RabbitMQ提供了持久化的机制,将内存中的消息持久化到硬盘上,即便重启RabbitMQ,消息也不会丢失。可是,仍然有一个很是短暂的时间窗口(RabbitMQ收到消息还没来得及存到硬盘上)会致使消息丢失,若是须要严格的控制,能够参考官方文档
要使用RabbitMQ的消息持久化,在声明队列时设置一个参数便可
注意,RabbitMQ不容许对一个已经存在的队列用不一样的参数从新声明,对于试图这么作的程序,会报错,因此,改动以前代码以前,要在控制台中把原来的队列删除
从新声明队列后,发现Durable为true
重启RabbitMQ
队列的消息没有丢失
这一章介绍了RabbitMQ消息的确认和持久化,后面将会继续深刻介绍RabbitMQ的其余特性