四种途径提高RabbitMQ传输数据的可靠性

前言

RabbitMQ虽然有对队列及消息等的一些持久化设置,但其实光光只是这一个是不可以保障数据的可靠性的,下面咱们提出这样的质疑:html

(1)RabbitMQ生产者是不知道本身发布的消息是否已经正确达到服务器呢,若是中间发生网络异常等状况呢?消息必然会丢失!服务器

(2)RabbitMQ若是没有设置队列持久化,RabbitMQ服务器重后队列的元数据会丢失,消息天然也会丢失!网络

(3)RabbitMQ若是消费者设置自动确认,即autoAck为true,那么无论消费者发生什么状况,该消息会自动从队列中移除,实际上消费者有可能挂掉,消息必然会丢失!post

(4)RabbitMQ中的消息若是没有匹配到队列时,那么消息也会丢失!ui

本文其实也就是结合以上四个方面进行讲解的,主要参考《RabbitMQ实战指南》(有须要PDF电子书的能够评论或者私信我),本文截图也来自其中,另外能够对一些RabbitMQ的概念的认识能够参考个人上两篇博文认识RabbitMQ交换机模型RabbitMQ是如何运转的?spa

 


 

 

1、设置mandotory参数、AE备份交换器

针对前言中的第(4)个问题,咱们能够经过设置mandotory参数与AE备份交换器来解决code

一、mandotory参数

  1)当为true时,交换器没法根据自身的类型和路由键找到一个符合条件的队列,此时RabbitMQ会调用Basic.Return命令将消息返回给生产者,消息将不会丢失orm

  2)当为false时,消息将会被直接丢弃。htm

  3)RabbitMQ经过addReturnListener添加ReturnLisener监听器监听获取没有被正确路由到合适队列的消息blog

channel.basicPublish(EXCHANGE NAME, "", true, 
MessageProperties.PERSISTENT_TEXT_PLAIN, 
"mandatory test".getBytes()); 
channel.addReturnListener(new ReturnListener(){ 
  public void handleReturn(int replyCode, String replyText, 
                String exchange, String routingKey, 
                AMQP.BasicProperties basicProperties, 
                byte[] body) throws IOException { 
    String message = new String(body); 
    System.out.println("Basic.Return 返回的结果是: " + message);
  }
}); .

 

二、AE备份交换器

  Alternate Exchange,简称AE,不设置mandatory参数,那么消息将会被丢失,设置mandatory参数的话,须要添加ReturnListner监听器,增长复杂代码,若是既不想增长代码又不想消息丢失,则使用AE,将没有被路由的消息存储于RabbitMQ中。当mandatory参数用AE一块儿使用时,mandatory将失效。在介绍AE以前,也认识RabbitMQ对于消息的过时时间TTL设置以及队列的过时时间TTL设置

2.1 TTL过时时间设置

  能够对队列设置TTL与消息设置TTL,其中消息设置TTL常常用于死信队列、延迟队列等高级应用中。

  1)设置消息TTL

  设置TTL过时时间通常有两种当时:一是经过队列属性,对队列中全部消息设置相同的TTL。二就是对消息自己单独设置,每条消息TTL不一样。若是一块儿使用时候,TTL小的为准,当一旦超过设置的TTL时间时,就会变成“死信”。

  方式一:针对每条消息设置TTL是经过增长expiration的属性参数实现的,不可能像方式二同样扫描整个队列再判断是否过时,只有当该消息即将被消费时再断定是否过时便可删除,也就是消息即便已通过期,但不必定立马被删除!

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); 
// 持久化消息
builder deliveryMode(2);
// 设置 TTL=60000ms
builder expiration( 60000 ); 
AMQP.BasicProperties properties = builder. build(); 
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "ttlTestMessage".getBytes());

  方式二:经过队列属性设置消息TTL是增长x-message-ttl参数实现的,只须要扫描整个队列头部便可当即删除,也就是消息一旦过时就会被删除!

Map<String, Object> argss = new HashMap<String , Object>(); 
argss.put("x-message-ttl", 6000); 
channel.queueDeclare(queueName, durable, exclusive, autoDelete, argss) ;

 

  2)设置队列TTL

  经过在队列中添加参数x-message-ttl参数实现设置队列被自动删除前处于未被使用状态的时间,注意是队列的使用状态,并非消息是否被消费的状态

  设置ttl=30min的队列,时间一到RabbitMQ会保证队列被删除,可是不会保证删除的速度有多快。

Map<String, Object> args = new HashMap<String, Object>{); 
args.put("x-expires", 1800000);
channel.queueDeclare("myqueue", false, false, false, args);

 

 

2.2 AE备份交换器的使用

  声明交换器的时候,添加alternate-exchange参数实现,或经过策略实现。前者优先级高。从代码角度须要如下三个步骤,具体代码以下:

Map<String, Object> args = new HashMap<String, Object>(); 
args.put("a1ternate-exchange", "myAe"); 
channe1.exchangeDec1are("norma1Exchange", "direct", true, fa1se, args); 
channe1.exchangeDec1are("myAe", "fanout", true, fa1se, nu11) ; 
channe1.queueDec1are( "norma1Queue", true, fa1se, fa1se, nu11); 
channe1.queueB nd("norma1Queue", "norma1Exchange", "norma1Key"); 
channe1.queueDec1are("unroutedQueue", true, fa1se, fa1se, nu11);

  1)声明normalExchange类型为direct的交换器、类型为fanout的myAe备份交换器;而且normalExchange的备份交换器为myAe(备份交换器建议使用fanout类型交换器)

  2)声明normalQueue队列,声明unrouteQueue队列;

  3)经过路由键normalKey绑定normalExchange与normalQueue,不适用路由键绑定unrouteQueue与myAe

 

        

相关文章
相关标签/搜索