摘要: 介绍confirm的工做机制。使用spring-amqp介绍事务以及发布确认的使用方式。由于事务以及发布确认是针对channel来说,因此在一个链接中两个channel,一个channel可使用事务,另外一个channel可使用发布确认,并介绍了何时该使用事务,何时该使用发布确认java
Confirms是增长的一个确认机制的类,继承自标准的AMQP。这个类只包含了两个方法:confirm.select和confirm.select-ok。另外,basic.ack方法被发送到客户端。spring
confirm.select是在一个channel中启动发布确认。注意:一个具备事务的channel不能放入到确认模式,一样确认模式下的channel不能用事务。typescript
当confirm.select被发送/接收。发布者/broker开始计数(首先是发布而后confirm.select被记为1)。一旦channel为确认模式,发布者应该指望接收到basic.ack方法,delivery-tag属性显示确认消息的数量。数据库
当broker确认了一个消息,会通知发布者消息被成功处理;express
basic的规则是这样的:服务器
一个未被路由的具备manadatory或者immediate的消息被正确确认后触发basic.return;网络
另外,一个瞬时态的消息被确认目前已经入队;less
持久化的消息在持久化到磁盘或者每一个队列的消息被消费以后被确认。异步
关于confirm会有一些问题:ide
首先,broker不能保证消息会被confirm,只知道将会进行confirm。
第二,当未被确认的消息堆积时消息处理缓慢,对于确认模式下的发布,broker会作几个操做,日志记录未被确认的消息
第三,若是发布者与broker之间的链接删除了未能获得确认,它不必定知道消息丢失,因此可能会发布重复的消息。
最后,若是在broker中发生坏事会致使消息丢失,将会basic.nack那些消息
总之,Confirms给客户端一种轻量级的方式,可以跟踪哪些消息被broker处理,哪些可能由于broker宕掉或者网络失败的状况而从新发布。
确认而且保证消息被送达,提供了两种方式:发布确认和事务。(二者不可同时使用)在channel为事务时,不可引入确认模式;一样channel为确认模式下,不可以使用事务。
Spring AMQP作的不只仅是回滚事务,并且能够手动拒绝消息,如当监听容器发生异常时是否从新入队。
持久化的消息是应该在broker重启前都有效。若是在消息有机会写入到磁盘以前broker宕掉,消息仍然会丢失。在某些状况下,这是不够的,发布者须要知道消息是否处理正确。简单的解决方案是使用事务,即提交每条消息。
案例:
RabbitTemplate的使用案例(同步),由调用者提供外部事务,在模板中配置了channe-transacted=true。一般是首选,由于它是非侵入性的(低耦合)
<rabbit:template id="rabbitTemplate" connection-factory="cachingConnectionFactory" exchange="sslexchange" channel-transacted="true"/>
@Transactional
public void doSomething() { ApplicationContext context = new GenericXmlApplicationContext("spring-amqp-test.xml"); RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class); String incoming = (String) rabbitTemplate.receiveAndConvert(); // do some more database processing... String outgoing = processInDatabaseAndExtractReply(incoming); //数据库操做中若是失败了,outgoing这条消息不会被发送,incoming消息也会返回到broker服务器中,由于这是一条事务链。 //可作XA事务,在消息传送与数据库访问中共享事务。 rabbitTemplate.convertAndSend(outgoing); } private String processInDatabaseAndExtractReply(String incoming){ return incoming; }
异步使用案例(外部事务)
<bean id="rabbitTxManage" class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager"> <property name="connectionFactory" ref="cachingConnectionFactory"></property> </bean> <rabbit:listener-container connection-factory="cachingConnectionFactory" transaction-manager="rabbitTxManage" channel-transacted="true"> <rabbit:listener ref="foo" method="onMessage" queue-names="rabbit-ssl-test"/> </rabbit:listener-container>
在容器中配置事务时,若是提供了transactionManager,channelTransaction必须为true;若是为false,外部的事务仍然能够提供给监听容器,形成的影响是在回滚的业务操做中也会提交消息传输的操做。
使用事务有两个问题:
Ø 一是会阻塞,发布者必须等待broker处理每一个消息。若是发布者知道在broker死掉以前哪些消息没有被处理就足够了。
Ø 第二个问题是事务是重量级的,每次提交都须要fsync(),须要耗费大量的时间。
confirm模式下,broker将会确认消息并处理。这种模式下是异步的,生产者能够流水式的发布而不用等待broker,broker能够批量的往磁盘写入。
发布确认必须配置在CachingConnectionFactory上
<bean id="cachingConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> <property name="host" value="192.168.111.128"></property> <property name="port" value="5672"></property> <property name="username" value="admin"/> <property name="password" value="admin"/> <property name="publisherConfirms" value="true"/> <property name="publisherReturns" value="true"/> </bean>
若使用confirm-callback或return-callback,必需要配置publisherConfirms或publisherReturns为true
每一个rabbitTemplate只能有一个confirm-callback和return-callback
//确认消息是否到达broker服务器,也就是只确认是否正确到达exchange中便可,只要正确的到达exchange中,broker便可确认该消息返回给客户端ack。 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { System.out.println("消息确认成功"); } else { //处理丢失的消息(nack) System.out.println("消息确认失败"); } } });
使用return-callback时必须设置mandatory为true,或者在配置中设置mandatory-expression的值为true,可针对每次请求的消息去肯定’mandatory’的boolean值,只能在提供’return -callback’时使用,与mandatory互斥。
rabbitTemplate.setMandatory(true); //确认消息是否到达broker服务器,也就是只确认是否正确到达exchange中便可,只要正确的到达exchange中,broker便可确认该消息返回给客户端ack。 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { //从新发布 RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(errorTemplate,"errorExchange", "errorRoutingKey"); Throwable cause = new Exception(new Exception("route_fail_and_republish")); recoverer.recover(message,cause); System.out.println("Returned Message:"+replyText); } }); errorTemplate配置: <rabbit:queue id="errorQueue" name="errorQueue" auto-delete="false" durable="true"> <rabbit:queue-arguments> <entry key="x-ha-policy" value="all"/> <entry key="ha-params" value="1"/> <entry key="ha-sync-mode" value="automatic"/> </rabbit:queue-arguments> </rabbit:queue> <rabbit:direct-exchange id="errorExchange" name="errorExchange" auto-delete="false" durable="true"> <rabbit:bindings> <rabbit:binding queue="errorQueue" key="errorRoutingKey"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate"> <property name="backOffPolicy"> <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy"> <property name="initialInterval" value="200" /> <property name="maxInterval" value="30000" /> </bean> </property> <property name="retryPolicy"> <bean class="org.springframework.retry.policy.SimpleRetryPolicy"> <property name="maxAttempts" value="5"/> </bean> </property> </bean> <rabbit:template id="errorTemplate" connection-factory="cachingConnectionFactory" exchange="errorExchange" queue="errorQueue" routing-key="errorRoutingKey" retry-template="retryTemplate" />
private RabbitTemplate rabbitTemplate; private TransactionTemplate transactionTemplate; @Before public void init() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost("192.168.111.128"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); template = new RabbitTemplate(connectionFactory); template.setChannelTransacted(true); RabbitTransactionManager transactionManager = new RabbitTransactionManager(connectionFactory); transactionTemplate = new TransactionTemplate(transactionManager); connectionFactory.setPublisherConfirms(true); rabbitTemplate = new RabbitTemplate(connectionFactory); }
//发布确认测试 @Test public void testPublishConfirm(){ rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(ack){ System.out.println("消息确认成功"); }else{ System.out.println("消息确认失败"); } } }); //发送到一个不存在的exchange,则会触发发布确认 rabbitTemplate.convertAndSend("asd","aaa","message"); String message = (String) rabbitTemplate.receiveAndConvert(ROUTE); assertEquals("message",message); }
//事务测试 @Test public void testSendAndReceiveInTransaction() throws Exception { //因为有spring的事务参与,而发送操做在提交事务时,是不容许除template的事务有其余事务的参与,因此这里不会提交 //队列中就没有消息,因此在channel.basicGet时命令返回的是basic.get-empty(队列中没有消息时),而有消息时,返回basic.get-ok String result = transactionTemplate.execute(new TransactionCallback<String>() { @Override public String doInTransaction(TransactionStatus status) { template.convertAndSend(ROUTE, "message"); return (String) template.receiveAndConvert(ROUTE); } }); //spring事务完成,对其中的操做须要提交,发送与接收操做被认为是一个事务链而提交 assertEquals(null, result); //这里的执行不受spring事务的影响 result = (String) template.receiveAndConvert(ROUTE); assertEquals("message", result); }
转载:https://my.oschina.net/lzhaoqiang/blog/670749
学习:http://www.kancloud.cn/longxuan/rabbitmq-arron/117518