在上一篇文章中主要简单的介绍了一下rabbitmq 的基本概念,包括exchange的主要类型以及每种类型分别表示什么含义。本篇文章主要结合本身的理解,解读一下rabbitmq 是如何保证消息不丢失的?html
如图所示: producer 发送消息到rabbitmq broker,而后有2个消费者consumer1,consumer2进行信息消费,针对这个简单的场景,咱们内心不免会有一个疑问:做为producer,我怎么知道个人消息已经成功的发送到了broker 呢? 再一个,我怎么知道我发送的消息已经成功的被consumer消费了呢?还有,若是消息发送到broker后,broker机器挂了怎么办,消息会丢失吗?下面就这些疑问,结合本身的理解一一进行解答.java
生产者消息确认机制主要就是解决消息成功发送到rabbitmq broker 的问题,rabbitmq 提供了2种手段用来解决这个问题:mysql
rabbitmq 客户端channel API针对事务机制这块提供了3个方法:channel.txSelect,channel.txCommit,channel.txRollback .sql
/**
* Enables TX mode on this channel.
* @see com.rabbitmq.client.AMQP.Tx.Select
* @see com.rabbitmq.client.AMQP.Tx.SelectOk
* @return a transaction-selection method to indicate the transaction was successfully initiated
* @throws java.io.IOException if an error is encountered
*/
Tx.SelectOk txSelect() throws IOException;
/**
* Commits a TX transaction on this channel.
* @see com.rabbitmq.client.AMQP.Tx.Commit
* @see com.rabbitmq.client.AMQP.Tx.CommitOk
* @return a transaction-commit method to indicate the transaction was successfully committed
* @throws java.io.IOException if an error is encountered
*/
Tx.CommitOk txCommit() throws IOException;
/**
* Rolls back a TX transaction on this channel.
* @see com.rabbitmq.client.AMQP.Tx.Rollback
* @see com.rabbitmq.client.AMQP.Tx.RollbackOk
* @return a transaction-rollback method to indicate the transaction was successfully rolled back
* @throws java.io.IOException if an error is encountered
*/
Tx.RollbackOk txRollback() throws IOException;
复制代码
txSelect方法主要是用于将信道(channel)设置成事务模式,txCommit 主要用于提交事务,txRollback 主要用于将事务进行回滚。在开启事务以后,咱们即可以将消息发送给rabbitmq了,若是在执行tx.commit执行成功时,表示消息已经成功的发送到rabbitmq服务器了,反之则会抛异常。数据库
须要说明的是,这里的消息已经成功的发送到rabbitmq服务器,指的是消息已经成功发送到rabbitmq 服务器的exchange 了,若是exchange 没有匹配消息绑定的队列,消息仍是会丢失。
说明:rabbitmq 的事务与关系数据库如mysql的事务机制是不同的,关系数据库事务关注的是ACID,rabbitmq关心的是消息是否成功发送。
复制代码
生产者将信道设置成confirm(确认)模式,一旦设置成confirm 模式,当消息投递到broker以后,rabbitmq 的broker 会给消息发送端发一条BASIC.ACK 的确认消息,发送端经过监听这个确认消息,能够知道信息是否已经成功的发送出去. rabbitmq 客户端Channel API 里也提供了相应的API channel.confirmSelect 用来开启客户端确认模式:服务器
/**
* Enables publisher acknowledgements on this channel.
* @see com.rabbitmq.client.AMQP.Confirm.Select
* @throws java.io.IOException if an error is encountered
*/
Confirm.SelectOk confirmSelect() throws IOException;
复制代码
2种模式的比较:事务机制发送消息的过程是同步的,发送消息以后在rabbitmq 回应以前会阻塞,直到收到回应以后才能发送下一条消息,这样会下降系统的吞吐量。发送者确认机制是异步的,生产者在发送消息等待信道返回确认消息的时候继续发送下一条信息。因此相比而言,使用消息确认机制发送消息吞吐量会更高一些。网络
消费端确认机制主要是为了确保消息投递到消费者以后可以被成功的消费。 在rabbitmq 的Channel API 中也提供了相应的参数给业务侧进行控制,以下:异步
/**
* Start a non-nolocal, non-exclusive consumer, with
* a server-generated consumerTag.
* @param queue the name of the queue
* @param autoAck true if the server should consider messages
* acknowledged once delivered; false if the server should expect
* explicit acknowledgements
* @param callback an interface to the consumer object
* @return the consumerTag generated by the server
* @throws java.io.IOException if an error is encountered
* @see com.rabbitmq.client.AMQP.Basic.Consume
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
*/
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
复制代码
在baiscConusme里有一个参数:autoAck ,该参数为false 的时候表示消费端须要进行手动确认(好比调用channel.basicAck进行主动确认),若是消费者在消费完一条消息以后向broker 发送确认消息,而后因为网络缘由或者其余缘由致使broker 没有确认这条消息时,broker 不会删除这条消息,当链接从新创建以后,消费者仍是会收到这条消息。ide
rabbitmq 的持久化机制主要是确保生产者发送的消息能成功的落盘,确保broker重启以后未被消费的信息不会被丢失。学习
rabbitmq 的持久化机制,主要从如下几个方面来保障:
须要说明的是,消息是存储在queue里的,因此只有在queue设置为持久化的时候,message的持久化才有意义,不然若是queue是非持久化的,即使message是持久的,在broker重启以后信息仍是会丢失。
rabbitmq 的Channel API 也提供了相应的参数来设置:
/**
* Actively declare a non-autodelete exchange with no extra arguments
* @see com.rabbitmq.client.AMQP.Exchange.Declare
* @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
* @param exchange the name of the exchange
* @param type the exchange type
* @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
* @throws java.io.IOException if an error is encountered
* @return a declaration-confirm method to indicate the exchange was successfully declared
*/
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;
复制代码
exchangeDeclare 接口中的durable 参数用来设置exchange是否持久化,为true表示是持久化的,反之为false
/**
* Declare a queue
* @see com.rabbitmq.client.AMQP.Queue.Declare
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
* @param queue the name of the queue
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* @param arguments other properties (construction arguments) for the queue
* @return a declaration-confirm method to indicate the queue was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
复制代码
queueDeclare 接口的durable 参数通exchange相似
/**
* Publish a message.
*
* Publishing to a non-existent exchange will result in a channel-level
* protocol exception, which closes the channel.
*
* Invocations of <code>Channel#basicPublish</code> will eventually block if a
* <a href="http://www.rabbitmq.com/alarms.html">resource-driven alarm</a> is in effect.
*
* @see com.rabbitmq.client.AMQP.Basic.Publish
* @see <a href="http://www.rabbitmq.com/alarms.html">Resource-driven alarms</a>.
* @param exchange the exchange to publish the message to
* @param routingKey the routing key
* @param props other properties for the message - routing headers etc
* @param body the message body
* @throws java.io.IOException if an error is encountered
*/
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
复制代码
消息的持久化经过消息的属性BasicProperties中的deliveryMode参数来设置,deliveryMode为2表示是持久化信息。
经过消息发送端确认机制,消费端确认机制以及持久化,rabbitmq 保证了消息的可靠性。可是又有一个疑问出现了,若是仅仅部署一台broker, 即使是消息持久化了,若是broker 出故障了,无法恢复了,那消息不仍是会丢失吗? 为了不单点故障,提高rabbitmq的可用性,rabbitmq 支持集群部署,以及提供了镜像队列等机制来确保信息的可靠性的。 关于rabbitmq的集群,以及镜像队列等相关方面的知识,在下期的学习以后再进行分享。