RabbitMQ特性

使用默认的exchange

channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

 

若是用空字符串去申明一个exchange,那么系统就会使用"amq.direct"这个exchange。咱们在建立一个queue的时候,默认的都会有一个和新建queue同名的routingKey绑定到这个默认的exchange上去服务器

在方法中的第一个参数是须要输入一个exchange。在RabbitMQ中,全部的消息都必需要经过exchange发送到各个queue里面去。发送者发送消息,其实也就是把消息放到exchange中去。而exchange知道应该把消息放到哪里去。在这个方法中,咱们没有输入exchange的名称,只是定义了一个空的echange,而在第二个参数routeKey中输入了咱们目标队列的名称。RabbitMQ会帮我定义一个默认的exchange,这个exchange会把消息直接投递到咱们输入的队列中,这样服务端只须要直接去这个定义了的队列中获取消息就能够了网络

 

信息确认

RabbitMQ有两种应答模式,自动和手动。这也是AMQP协议所推荐的。这在point-to-point和broadcast都是同样的。负载均衡

自动应答-当RabbitMQ把消息发送到接收端,接收端把消息出队列的时候就自动帮你发应答消息给服务。测试

手动应答-须要咱们开发人员手动去调用ack方法去告诉服务已经收到。大数据

文档推荐在大数据传输中,若是对个别消息的丢失不是很敏感的话选用自动应答比较理想,而对于那些一个消息都不能丢的场景,须要选用手动应答,也就是说在正确处理完之后才应答。若是选择了自动应答,那么消息重发这个功能就没有了。atom

点对点模式spa

也就是一发一接的模式,不适用发布/订阅这种广播模式设计

//autoAck 设置false,消费端挂掉,信息不会丢失,server会re-queue
    channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
 //向服务器发送应答
          channel.basicAck(envelope.getDeliveryTag(), false);

 

在RabbitMQ中,为了避免让消息丢失,它提供了消息应答的概念。当消费者获取到了一个消息之后,须要给RabbitMQ服务一个应答的消息,告知服务我已经收到或正确处理了该消息。那么RabbitMQ能够放心的在队列中删除该消息code

 

队列持久化

 

//durable 设置true,queue持久化,server重启,此queue不丢失
    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

方法的第四的参数autoDelete,通常都会输入false。文档描述这个参数若是是true的话,意思是:若是这个queue再也不使用(没有被订阅)的话,server就会删除它。在个人测试过程当中,只要是链接改queue的全部接收者都断开链接的话,该queue就会被删除,即便里面还有没有处理的消息。RabbitMQ的重启也一样会删除他们。若是输入的是false,那与之相连的客户端都断开链接的话,服务是不会删除这个队列的,队列中的消息也就会存在。发送端在没有客户端链接的时候也能够把消息放入改队列,客户端起来的时候,就会获得这些消息。可是若是RabbitMQ服务重启的话,该队列就没有了,里面的消息天然也就没有了。server

第三个参数是exclusive,文档描述说,若是是true,那么申明这个queue的connection断了,那么这个队列就被删除了,包括里面的消息。

第二个参数durable,文档描述说,若是是true,则表明是一个持久的队列,那么在服务重启后,也会存在。由于服务会把持久化的queue存放在硬盘上,放服务重启的时候,会从新申明这个queue。固然必须是在autoDelete和exclusive都为false的时候。队列是能够被持久化,可是里面的消息是否为持久化那还要看消息的持久化设置。也就是说,若是重启以前那个queue里面还有没有发出去的消息的话,重启以后那队列里面是否是还存在原来的消息,这个就要取决于发送者在发送消息时对消息的设置了(信息持久化)。

信息持久化

  //BasicProperties设置MessageProperties.PERSISTENT_TEXT_PLAIN,信息持久化
    channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
IBasicProperties properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2;
channel.BasicPublish("", "TaskQueue", properties, bytes);

 

DeliveryMode等于2就说明这个消息是persistent的。1是默认是,不是持久的。在接收者接收消息并处理的时候会出现各类各样的问题:抛出异常致使与RabbitMQ链接断开,程序挂掉,网络问题等等。每每在出现这些问题的时候咱们一般都但愿队列能保存这些消息,并在程序再次起来的时候可以从新处理,或若是是负载均衡的模式下,可以把这个消息从新分配给其余的同等的接受者来处理。这一样也是RabbitMQ对消息持久化的一种功能。这咱们在消息的传输控制中作详细的说明

消息的拒收

拒收,是接收端在收到消息的时候响应给RabbitMQ服务的一种命令,告诉服务器不该该由我处理,或者拒绝处理,扔掉。接收端在发送reject命令的时候能够选择是否要从新放回queue中。若是没有其余接收者监控这个queue的话,要注意一直无限循环发送的危险。

BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
channel.BasicReject(ea.DeliveryTag, false);

BasicReject方法第一个参数是消息的DeliveryTag,对于每一个Channel来讲,每一个消息都会有一个DeliveryTag,通常用接收消息的顺序来表示:1,2,3,4 等等。第二个参数是是否放回queue中,requeue。

BasicReject一次只能拒绝接收一个消息,而BasicNack方法能够支持一次0个或多个消息的拒收,而且也能够设置是否requeue。

channel.BasicNack(3, true, false);

在第一个参数DeliveryTag中若是输入3,则消息DeliveryTag小于等于3的,这个Channel的,都会被拒收。

消息的QoS

QoS = quality-of-service, 顾名思义,服务的质量。一般咱们设计系统的时候不能彻底排除故障或保证说没有故障,而应该设计有完善的异常处理机制。在出现错误的时候知道在哪里出现什么样子的错误,缘由是什么,怎么去恢复或者处理才是真正应该去作的。在接收消息出现故障的时候咱们能够经过RabbitMQ重发机制来处理。重发就有重发次数的限制,有些时候你不可能不限次数的重发,这取决于消息的大小,重要程度和处理方式。

甚至QoS是在接收端设置的。发送端没有任何变化,接收端的代码也比较简单,只须要加以下代码:

channel.BasicQos(0, 1, false);

代码第一个参数是可接收消息的大小的,可是彷佛在客户端2.8.6版本中它必须为0,即便:不受限制。若是不输0,程序会在运行到这一行的时候报错,说尚未实现不为0的状况。第二个参数是处理消息最大的数量。举个例子,若是输入1,那若是接收一个消息,可是没有应答,则客户端不会收到下一个消息,消息只会在队列中阻塞。若是输入3,那么能够最多有3个消息不该答,若是到达了3个,则发送端发给这个接收方得消息只会在队列中,而接收方不会有接收到消息的事件产生。总结说,就是在下一次发送应答消息前,客户端能够收到的消息最大数量。第三个参数则设置了是否是针对整个Connection的,由于一个Connection能够有多个Channel,若是是false则说明只是针对于这个Channel的。

这种数量的设置,也为咱们在多个客户端监控同一个queue的这种负载均衡环境下提供了更多的选择。

 //对服务器确认以前,一次只接受一条信息
channel.basicQos(1);

 

mandatory标志的做用

在生产者经过channel的basicPublish方法发布消息时,一般有几个参数须要设置,为此咱们有必要了解清楚这些参数表明的具体含义及其做用,查看Channel接口,会发现存在3个重载的basicPublish方法

1 void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;  
2   
3 void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)  
4             throws IOException;  
5   
6 void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)  
7             throws IOException;  

 

他们共有的参数分别是:​ exchange:交换机名称​ routingKey:路由键​ props:消息属性字段,好比消息头部信息等等​ body:消息主体部分

mandatory和immediate是AMQP协议中basic.pulish方法中的两个标志位,它们都有当消息传递过程当中不可达目的地时将消息返回给生产者的功能。具体区别在于:

mandatory标志位

当mandatory标志位设置为true时,若是exchange根据自身类型和消息routeKey没法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者;当mandatory设为false时,出现上述情形broker会直接将消息扔掉。

immediate标志位

当immediate标志位设置为true时,若是exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的全部queue(一个或多个)都没有消费者时,该消息会经过basic.return方法返还给生产者。

归纳来讲,mandatory标志告诉服务器至少将该消息route到一个队列中,不然将消息返还给生产者;immediate标志告诉服务器若是该消息关联的queue上有消费者,则立刻将消息投递给它,若是全部queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。

相关文章
相关标签/搜索