RabbitMQ的消息可靠性能够分为三部分,分别是消息的接收、消息的保存(消息持久化,前面文章已经讲了)、消息的发送。java
RabbitMQ消息的接收如同在以前讲消息队列时说的有三种状况,分别是同步可靠发送、异步可靠发送、异步发送。RabbitMQ提供一下两种方式实现消息的发送的可靠:spring
事务机制服务器
RabbitMQ提供事务机制(同步可靠)保证消息的可靠发送。事务主要有txSelect()、txCommit()和txRollback()三个方法。txSelect()用于开启事务,txCommit()用于提交事务,若是在txCommit()以前Broker出现错误抛出异常则须要txRollback()回滚事务。示例代码以下:异步
Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); try { channel.txSelect(); channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, "hello".getBytes(StandardCharsets.UTF_8)); channel.txCommit(); } catch (Exception e) { channel.txRollback(); }
RabbitMQ事务的AMQP消息以下:ide
Confirm模式函数
Confirm模式主要解决消息可靠而且比事务模式更高效。生产者将信道设置成confirm模式,一旦信道进入confirm模式,全部在该信道上面发布的消息都会被指派一个惟一的ID(从1开始),一旦消息被投递到全部匹配的队列以后,broker就会发送一个确认给生产者(包含消息的惟一ID),这就使得生产者知道消息已经正确到达目的队列了,若是消息和队列是可持久化的,那么确认消息会将消息写入磁盘以后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也能够设置basic.ack的multiple域,表示到这个序列号以前的全部消息都已经获得了处理。spring-boot
Confirm模式主要有三种模式:ui
普通模式的Confirm是同步可靠模式(也能够超时等待),有点类型事务模式,代码以下:spa
channel.confirmSelect(); channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, "hello".getBytes(StandardCharsets.UTF_8)); if (channel.waitForConfirms()) { System.out.println("message send Success"); }
异步模式的Confirm是异步可靠模式,该模式就是前面文章将的注册回调函数,服务器成功接收后将会回调该函数。RabbitMQ的异步Confirm基本也是这种,只能有少量不一样。RabbitMQ在客户端须要维护没有成功发送消息Id集,一些参数的意义以下:.net
代码示例以下:
SortedSet<Long> unconfirmSet = Collections.synchronizedSortedSet(new TreeSet<>()); channel.confirmSelect(); channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println(deliveryTag + " :Ack"); if (multiple) { unconfirmSet.headSet(deliveryTag + 1).clear(); } else { unconfirmSet.remove(deliveryTag); } } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println(deliveryTag + " :Nack"); if (multiple) { unconfirmSet.headSet(deliveryTag + 1).clear(); } else { unconfirmSet.remove(deliveryTag); } } }); long nextPublishSeqNo = channel.getNextPublishSeqNo(); channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, "hello".getBytes(StandardCharsets.UTF_8)); unconfirmSet.add(nextPublishSeqNo);
RabbitMQ提供消息确认机制(Message Ack),RabbitMQ只有在Consumer发送Ack后才会删除消息。
Consumer发送Ack有下面两种方式:
自动发送Ack,当Consumer接收到消息后自动发送Ack给RabbitMQ
channel.basicConsume(queueName, true, new DefaultConsumer(consumer) { public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { } });
手动发送Ack,当Consumer处理完这条消息后手动发送Ack给RabbitMQ
channel.basicConsume(queueName, true, new DefaultConsumer(consumer) { public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { channel.basicAck(envelope.getDeliveryTag(), false); } });
Spring Framework有一套使用RabbitMQ的API,因此咱们仅仅使用Spring API便可操做RabbitMQ。
Spring Boot使用须要导入一下Jar包:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
而后在yml或者properties文件中配置以下属性:
spring: rabbitmq: host: 47.100.20.186 username: guest password: guest
Spring Boot会根据属性配置经过RabbitAutoConfiguration注入相关对象进行使用。
RabbitMQ配置类,主要配置RabbitMQ的Queue、Exchange等
@Configuration public class RabbitConfig { public static final String QUEUE = "spring_queue_hello"; public static final String ROUTING_KEY = "spring_key_hello"; @Bean public Exchange exchange() { return new DirectExchange(EXCHANGE_NAME, true, false, null); } @Bean public Queue queue() { return new Queue(QUEUE, true, false, false, null); } @Bean public Binding binding(Exchange exchange, @Qualifier("queue") Queue queue) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY).noargs(); } }
生成者,Spring经过AmqpTemplate发送消息
@Component public class Producer { @Autowired private AmqpTemplate amqpTemplate; // 这里使用Spring Scheduled每3s发送消息 @Scheduled(fixedDelay = 3000L) public void send() { String message = "hello"; amqpTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, RabbitConfig.ROUTING_KEY, message); } }
生成者,经过注解消费
@Component @RabbitListener(queues = RabbitConfig.QUEUE) public class Consumer { @RabbitHandler public void process(String message) { System.out.println(message); } }
由上面能够,Spring帮助咱们编解码了,Spring经过MessageConverter进行消息的编解码,Spring默认注入SimpleMessageConverter消息解析器进行消息的编解码,其支持Java序列化编解码,若是咱们想使用Json,能够注入JacksonMessageConverter进行编解码,代码以下:
@Bean public Jackson2JsonMessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); }
http://blog.didispace.com/spring-boot-rabbitmq/ http://blog.csdn.net/u013256816/article/details/55515234