Rabbitmq中文Java客户端API指南

Java客户端API指南

本指南涵盖了RabbitMQ Java客户端及其公共API。它假定使用最近的主要版本的客户端,读者熟悉基础知识html

该库的5.x版本系列须要JDK 8,用于编译和运行时。在Android上,这意味着只支持Android 7.0或更高版本。4.x版本系列支持7.0以前的JDK 6和Android版本。java

该库是开源的,在GitHub上开发,并在三重许可下android

· Apache公共许可证2.0git

· Mozilla公共许可证github

· GPL 2.0spring

这意味着用户能够考虑使用上述列表中的任何许可证进行许可。例如,用户能够选择Apache Public License 2.0并将该客户端包含到商业产品中。根据GPLv2许可的代码库能够选择GPLv2等。apache

还有一些 与Java客户端一块儿提供的命令行工具编程

客户端API在AMQP 0-9-1协议模型上进行了严格建模,并提供了更多的抽象以便于使用。后端

一个API参考(JavaDoc的)是单独提供的。api

概观

RabbitMQ Java客户端使用com.rabbitmq.client做为其顶层包。关键类和接口是:

· Channel:表示AMQP 0-9-1通道,并提供大部分操做(协议方法)。

· Connection:表明AMQP 0-9-1链接

· ConnectionFactory:构造链接实例

· Consumer:表明消息消费者

· DefaultConsumer:消费者经常使用的基类

· BasicProperties:消息属性(元数据)

· BasicProperties.Builder:建设者BasicProperties

协议操做可经过 Channel接口得到。Connection用于打开Channel,注册链接生命周期事件处理程序,并关闭再也不须要的链接。 Connection经过ConnectionFactory实例化,这就是您如何配置各类链接设置,如虚拟主机或用户名。

链接和频道

核心API类是Connection 和Channel,分别表明AMQP 0-9-1 connection 和Channel。它们一般在使用前进口:

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.Channel;

链接到RabbitMQ

如下代码使用给定参数(host name, port number, etc)链接到RabbitMQ节点:

ConnectionFactory factory = new ConnectionFactory();

//  默认帐号密码|“guest,限于本地链接 

factory.setUsername(userName);

factory.setPassword(password);

factory.setVirtualHost(virtualHost);

factory.setHost(hostName);

factory.setPort(portNumber);

 

Connection conn = factory.newConnection();

全部这些参数都对本地运行的RabbitMQ节点具备合理的默认值。若是在建立链接以前属性保持未分配状态,将使用属性的默认值:

属性

默认值

Username

"guest"

Password

"guest"

Virtual host

"/"

Hostname

"localhost"

port

5672用于常规链接, 5671用于使用TLS的链接

或者,可使用URI

ConnectionFactory factory = new ConnectionFactory();

factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");

Connection conn = factory.newConnection();

全部这些参数都对本地运行的RabbitMQ服务器有合理的默认值。

请注意,用户guest只能默认从本地主机链接。这是为了限制生产系统中众所周知的凭证使用。

使用Connection接口建立channel:

Channel channel = conn.createChannel();

如今可使用channel发送和接收消息,如后面的部分所述。

在服务器节点日志中 能够观察到成功和不成功的客户端链接事件。

断开与RabbitMQ的链接

要断开链接,只需关闭通道和链接:

channel.close();

conn.close();

请注意,关闭频道可能被认为是很好的作法,但在这里并非必须的 - 当底层链接关闭时,它将自动完成。

客户端断开事件能够在服务器节点日志中观察到

ConnectionChannel周期

Connection意味着周期较长。底层协议针对长时间运行的链接进行设计和优化。这意味着每一个操做打开一个新的链接,例如发布的消息是没必要要的,而且强烈不鼓励,由于它会引入大量的网络往返和开销。

Channel也意味着周期较长,但因为许多可恢复的协议错误会致使频道关闭,因此频道使用寿命可能会比链接频率短。每次操做关闭和打开新频道一般是没必要要的,但能够适当。若有疑问,请考虑重复使用channel第一。

Channel级异常(例如尝试从不存在的队列中消耗)将致使通道关闭。已关闭的频道不能再使用,而且不会再收到来自服务器的更多事件(如消息传递)。Channel级异常将由RabbitMQ记录并启动通道的关闭序列(见下文)。

使用交换机(Exchanges队列(Queues 

客户端应用程序与协议的高级构建块交换和队列一块儿工做。这些必须在可使用以前进行声明。声明任何一种类型的对象只是确保其中一个名称存在,并在必要时建立它。

继续前面的例子,下面的代码声明了一个exchange和一个queue,而后将它们绑定在一块儿。

channel.exchangeDeclare(exchangeName, "direct", true);

String queueName = channel.queueDeclare().getQueue();

channel.queueBind(queueName, exchangeName, routingKey);

这将主动声明如下对象,这两个对象均可以经过使用其余参数进行定制。这里他们都没有任何特别的论点。

1. 一个持久的,非自动删除的“direct”类型的交换

2. 一个具备生成名称的非持久,独占,自动删除队列

上面的函数调用而后使用给定的路由密钥(routing key)将队列绑定到交换机。

请注意,当只有一个客户端想要使用它时,这将是一种典型的声明方式:它不须要知名的名称,没有其余客户端可使用它(独占),而且会自动清除(自动删除)。若是有几个客户想共享一个知名名称的队列,那么这个代码将是合适的:

channel.exchangeDeclare(exchangeName,“direct”,true);

channel.queueDeclare(queueName,true,false,false,null);

channel.queueBind(queueName,exchangeName,routingKey);

声明:

1. 一个持久的,非自动删除的“direct”类型的交换

2. 一个持久的,非独占,自动删除队列

许多Channel API方法被重载。这些便捷的ExchangeDeclare,queueDeclare和queueBind短格式 使用合理的默认值。还有更多的参数更多的表单,能够根据须要覆盖这些默认值,在须要的地方提供彻底控制。

这种“简单形式,长形式”模式在客户端API使用中使用。

一些常见操做还有一个“不等待”版本,不会等待服务器响应。例如,要声明一个队列并指示服务器不发送任何响应,请使用

channel.queueDeclareNoWait(queueName,true,false,false,null);

“不等待”版本更高效,但提供较低的安全保证,例如,它们更依赖于检测失败操做的心跳机制。若有疑问,请从标准版开始。只有在高拓扑(队列,绑定)流失的场景中才须要“无等待”版本。

队列或交换能够被明确删除:

channel.queueDelete("queue-name")

只有在队列为空时才能删除队列:

channel.queueDelete(“queue-name”,false,true)

或者若是没有使用(没有任何消费者):

channel.queueDelete(“queue-name”,true,false)

能够清除队列(删除全部消息):

channel.queuePurge(“队列名称”)

发布消息

要将消息发布到交易所,请按以下方式使用Channel.basicPublish:

byte[] messageBodyBytes = "Hello, world!".getBytes();

channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

为了进行良好的控制,您可使用重载的变体来指定强制标志,或使用预设的消息属性发送消息:

channel.basicPublish(exchangeName, routingKey, mandatory,

                    MessageProperties.PERSISTENT_TEXT_PLAIN,

                     messageBodyBytes);

这将发送带有交付模式2(持久性),优先级1和内容类型“text / plain”的消息。你可使用一个Builder类来构建你本身的消息属性对象,只要你喜欢就能够提供许多属性,例如:

channel.basicPublish(exchangeName, routingKey,

             new AMQP.BasicProperties.Builder()

               .contentType("text/plain")

               .deliveryMode(2)

               .priority(1)

               .userId("bob")

               .build()),

               messageBodyBytes);

本示例使用自定义标题发布消息:

Map<String, Object> headers = new HashMap<String, Object>();

headers.put("latitude",  51.5252949);

headers.put("longitude", -0.0905493);

 

channel.basicPublish(exchangeName, routingKey,

             new AMQP.BasicProperties.Builder()

               .headers(headers)

               .build()),

               messageBodyBytes);

本示例发布包含过时(expiration)的消息:

channel.basicPublish(exchangeName, routingKey,

             new AMQP.BasicProperties.Builder()

               .expiration("60000")

               .build()),

               messageBodyBytes);

咱们没有在这里说明全部的可能性。

请注意,BasicProperties是自动生成的持有AMQP的内部类。

Channel#basicPublish的 调用最终会阻止 资源驱动型警报生效。

通道Channels 和并发注意事项(线程安全)

做为一个经验法则,在线程之间共享Channel实例是须要避免的。应用程序应该更喜欢使用每一个线程的通道,而不是在多个线程之间 共享同一个通道

尽管通道上的某些操做能够安全地同时调用,但有些操做不会而且会致使不正确的帧交错,双重确认等。

在共享通道上同时发布可能会致使连线上的帧错误交错,触发链接级别的协议异常并由代理当即关闭链接。所以它须要在应用程序代码中进行明确的同步(Channel#basicPublish必须在关键部分中调用)。在线程之间共享频道也会干扰发布商确认。最好避免在共享通道上同时发布,例如经过使用每一个线程的通道。

可使用通道池来避免在共享通道上同时发布:一旦线程完成一个通道的处理,它就会将其返回到池中,从而使该通道可用于另外一个线程。通道池能够被认为是一个特定的同步解决方案。建议使用现有的共享库来代替自行开发的解决方案。例如,Spring AMQP 具备即用型通道池功能。

通道消耗资源,在大多数状况下,应用程序在同一个JVM进程中不多须要超过几百个开放通道。若是咱们假设应用程序对每一个通道都有一个线程(由于不该该同时使用通道),那么单个JVM的数千个线程已经有至关可观的开销,这多是能够避免的。此外,一些快速发布商能够轻松地使网络接口和代理节点饱和:发布涉及的工做量少于路由,存储和传递消息的工做量。

要避免的典型反模式是为每一个发布的消息打开一个频道。渠道应该是至关长寿的,打开一个新渠道是一个网络往返,这使得这种模式很是低效。

在一个线程中使用并在共享通道上的另外一个线程中发布多是安全的。

服务器推送的交付(请参见下面的部分)与保证每通道排序被保留的保证同时进行分派。调度机制使用java.util.concurrent.ExecutorService,每一个链接一个。能够提供一个自定义执行程序,该自定义执行程序将由使用 ConnectionFactory#setSharedExecutor设置程序的单个ConnectionFactory生成的全部链接共享。

使用手动确认时,重要的是要考虑哪些线程进行确认。若是它与接收交付的线程不一样(例如Consumer#handleDelivery 委托交付处理到另外一个线程),则将multiple参数设置为true进行确认是不安全的,将致使双重确认,并所以致使通道级协议异常关闭频道。一次确认一条消息多是安全的。

经过订阅接收消息(“推送API”)

import com.rabbitmq.client.Consumer;

import com.rabbitmq.client.DefaultConsumer;

接收消息的最有效方式是使用Consumer 界面设置订阅。消息将在到达时自动发送,而不是必须明确要求。

在调用与Consumer相关的API方法时 ,我的订阅始终由其消费者标签引用。消费者标签是消费者标识符,能够是客户端或服务器生成的。要让RabbitMQ生成节点范围的惟一标记,请使用Channel#basicConsume覆盖,该覆盖不会接收使用者标记参数,也不会传递消费者标记的空字符串,并使用Channel#basicConsume返回的值。消费者标签用于取消消费者。

不一样的消费者实例必须具备不一样的消费者标签。强烈建议在链接上重复使用消费者标签,而且可能会致使自动链接恢复问题,并在监控消费者时混淆监控数据。

实现Consumer的最简单方法是为便利类DefaultConsumer建立子类。该子类的一个对象能够经过basicConsume 调用来设置订阅:

boolean autoAck = false;

channel.basicConsume(queueName, autoAck, "myConsumerTag",

     new DefaultConsumer(channel) {

         @Override

         public void handleDelivery(

String consumerTag,

                        Envelope envelope,

                        AMQP.BasicProperties properties,

                        byte[] body)

             throws IOException

         {

             String routingKey = envelope.getRoutingKey();

             String contentType = properties.getContentType();

             long deliveryTag = envelope.getDeliveryTag();

             //(处理消息组件在这里...) 

             channel.basicAck(deliveryTag, false);

         }

     });

在这里,由于咱们指定了autoAck = false,确认传递给消费者的消息,最简单的 方法是在handleDelivery方法中完成,如图中所示。

更复杂的消费者将须要覆盖更多的方法。特别是,handleShutdownSignal 当通道和链接关闭被调用,handleConsumeOk传递消费者标签的任何其余回调到以前消费者被调用。

消费者也能够分别实现 handleCancelOk和handleCancel 方法来通知显式和隐式取消。

您可使用 Channel.basicCancel明确取消特定的消费者:

channel.basicCancel(consumerTag);

经过消费者标签。

就像出版商同样,为消费者考虑并发危害安全也很重要。

对消费者的回调被调度到与实例化其通道的线程分离的线程池中 。这意味着消费者能够安全地调用Connection或Channel上的阻塞方法 ,例如 Channel#queueDeclare或 Channel#basicCancel。

每一个通道都有本身的调度线程。对于每一个 频道一个消费者最多见的使用状况,这意味着消费者不支持其余消费者。若是每一个频道有多个消费者,请注意,长时间运行的消费者可能会阻止向该频道上的 其余消费者发送回调 。

有关并发性和并发性危害安全性的其余主题,请参阅并发注意事项(线程安全性)部分。

检索单个消息(“Pull API”)

要显式检索消息,请使用 Channel.basicGet。返回的值是GetResponse的一个实例,从中能够提取标题信息(属性)和消息正文:

boolean autoAck = false ;

GetResponse response = channel.basicGet(queueName, autoAck);

if (response == null) {

     //没有检索到消息。

} else {

    AMQP.BasicProperties props = response.getProps();

    byte[] body = response.getBody();

    long deliveryTag = response.getEnvelope().getDeliveryTag();

    ...

 

    

而且因为上面的autoAck = false,您还必须调用Channel.basicAck来确认您已成功接收消息:

    ...

    channel.basicAck(method.deliveryTag,false); //确认收到消息 

}

处理不可路由的消息

若是消息发布时设置了“强制(mandatory)”标志,但没法路由,代理会将其返回给发送客户端(经过AMQP.Basic.Return 命令)。

通知这样的回报,客户能够实现ReturnListener 接口并调用Channel.addReturnListener。若是客户端还没有配置特定通道的返回侦听器,则相关的返回消息将被静默放弃。

channel.addReturnListener(new ReturnListener() {

    public void handleReturn(int replyCode,

                                  String replyText,

                                  String exchange,

                                  String routingKey,

                                  AMQP.BasicProperties properties,

                                  byte[] body)

    throws IOException {

        ...

    }

});

例如,若是客户端发布的消息的“mandatory”标志设置为未绑定到队列的“direct”类型的交换,则会调用返回监听器。

关机协议

客户端关机过程概述

AMQP 0-9-1链接和通道共享相同的通常方法来管理网络故障,内部故障和明确的本地关闭。

AMQP 0-9-1链接和通道具备如下生命周期状态:

· open:对象已准备好使用

· closing:对象已明确通知本地关闭,已向任何支持的下层对象发出关闭请求,而且正在等待其关闭过程完成

· closed:对象已收到来自任何底层对象的全部关闭完成通知,所以已关闭

这些对象老是处于关闭状态,不管致使关闭的缘由如应用程序请求,内部客户端库故障,远程网络请求仍是网络故障。

AMQP链接和通道对象具备如下与关机相关的方法:

· addShutdownListener(ShutdownListener listener)和 removeShutdownListener(ShutdownListener listener)来管理任何侦听器,当对象转换到关闭(closing)状态时将会触发这些侦听器 。请注意,将ShutdownListener添加到已关闭的对象将当即触发侦听器

· getCloseReason(),以容许调查对象关闭的缘由

· isOpen(),用于测试对象是否处于打开状态

· close(int closeCode,String closeMessage),以显式通知要关闭的对象

监听的简单用法以下所示:

import com.rabbitmq.client.ShutdownSignalException;import com.rabbitmq.client.ShutdownListener;

connection.addShutdownListener(new ShutdownListener() {

    public void shutdownCompleted(ShutdownSignalException cause)

    {

        ...

    }

});

关于关机状况的信息

能够经过显式调用getCloseReason() 方法或使用ShutdownListener类的服务中的cause参数(ShutdownSignalException cause) 方法来检索 ShutdownSignalException,其中包含关于关闭缘由的全部可用信息。

该ShutdownSignalException类提供方法来分析关机的缘由。经过调用isHardError()方法,咱们能够得到有关链接或通道错误的信息,getReason()以AMQP方法的形式返回有关缘由的信息 -AMQP.Channel.Close或 AMQP.Connection.Close(若是缘由是库中的某个异常(例如网络通讯故障),则返回null,在这种状况下,可使用getCause()检索异常。

public void shutdownCompleted(ShutdownSignalException cause){

  if (cause.isHardError())

  {

    Connection conn = (Connection)cause.getReference();

    if (!cause.isInitiatedByApplication())

    {

      Method reason = cause.getReason();

      ...

    }

    ...

  } else {

    Channel ch = (Channel)cause.getReference();

    ...

  }

}

  

原子性和使用isOpen()方法

不建议在生产代码中使用通道和链接对象 的isOpen()方法,由于方法返回的值取决于关闭缘由的存在。如下代码说明了竞争条件的可能性:

public void brokenMethod(Channel channel){

    if (channel.isOpen())

    {

 //下面的代码依赖于处于打开状态的通道。//可是没有在信道状态变化的可能性ISOPEN()和basicQos(1)呼叫之间// 

       ...

        channel.basicQos(1);

    }

}  

相反,咱们一般应该忽略这种检查,并简单地尝试所需的行动。若是在代码的执行过程当中链接的通道关闭,则会引起ShutdownSignalException异常,指示对象处于无效状态。咱们还应该捕获 由SocketException引发的IOException,当代理意外关闭链接时,或者在代理启动clean close时发生ShutdownSignalException。

public  void  validMethod (Channel channel) {

     try {

        ...

        channel.basicQos( 1);

    } catch(ShutdownSignalException sse){

         //可能检查频道是否被关闭

        //当咱们开始操做时,

        //关闭它的缘由 

        ...

    } catch(IOException ioe){

         //检查链接关闭的缘由 

        ...

    }

}

高级链接选项

消费者线程池

消费者线程(请参阅下面的接收)默认状况下会自动分配到新的ExecutorService线程池中。若是须要更大的控制权,请在newConnection()方法上 提供ExecutorService,以便使用此线程池。下面是一个例子,其中提供了比一般分配的更大的线程池:

 ExecutorService es = Executors.newFixedThreadPool(20);

  Connection conn = factory.newConnection(es);

不管执行人及的ExecutorService类中的java.util.concurrent包。

当链接关闭时,默认的ExecutorService 将被shutdown(),但用户提供的 ExecutorService(如上面的es) 不会被shutdown()。提供定制ExecutorService的客户端必须确保它最终关闭(经过调用其shutdown() 方法),不然池的线程可能会阻止JVM终止。

同一个执行者服务能够在多个链接之间共享,或者在从新链接时被重复使用,可是在关闭后它不能使用()。

若是有证据代表消费者 回调处理中存在严重瓶颈,则应仅考虑使用此功能。若是没有消费者回调执行,或者不多,默认分配绰绰有余。开销最小,而且分配的总线程资源是有界的,即便偶尔会出现一连串的消费者活动。

使用主机列表

能够将Address数组传递给newConnection()。的地址是简单地在一个方便的类com.rabbitmq.client包与主机 和端口组件。例如:

 Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1), new Address(hostname2, portnumber2)};

  Connection conn = factory.newConnection(addrArr);

将尝试链接到hostname1:portnumber1,而且若是没法链接到hostname2:portnumber2。返回的链接是数组中的第一个成功(不抛出 IOException)。这彻底等同于重复设置工厂的主机和端口,每次都调用factory.newConnection(),直到其中一个成功。

若是还提供了ExecutorService(使用表单factory.newConnection(es,addrArr)),则线程池将与(第一个)成功链接相关联。

若是您想要更多地控制主机链接到,请参阅 对服务发现的支持

使用AddressResolver接口进行服务发现

从版本3.6.6开始,可让AddressResolver的实现 在建立链接时选择链接的位置:

  Connection conn = factory.newConnection(addressResolver);

该AddressResolver接口是这样的:

  public interface AddressResolver {

    List<Address> getAddresses() throws IOException;

  }

就像主机列表同样,返回的第一个地址将首先尝试,而后第二个地址返回,若是客户端没法链接到第一个地址,依此类推。

若是还提供了ExecutorService(使用表单factory.newConnection(es,addressResolver)),则线程池将与(第一个)成功链接相关联。

该AddressResolver是实现定制服务发现逻辑,这是一个动态的基础设施特别有用的理想场所。结合自动恢复功能,客户端能够自动链接到第一次启动时还没有达到的节点。亲和性和负载平衡是其中可使用自定义AddressResolver的其余场景。

Java客户端随附如下实现(有关详细信息,请参阅javadoc):

1. DnsRecordIpAddressResolver:给定主机的名称,返回其IP地址(针对平台DNS服务器的分辨率)。这对于简单的基于DNS的负载平衡或故障转移颇有用。

2. DnsSrvRecordAddressResolver:给定服务的名称,返回主机名/端口对。搜索被实现为DNS SRV请求。当使用像HashiCorp Consul这样的服务注册表时,这可能颇有用 。

心跳超时

有关检测信号以及如何在Java客户端中配置它们的更多信息,请参阅Heartbeats指南

自定义线程工厂

诸如Google App Engine(GAE)等环境能够限制直接线程实例化。要在这样的环境中使用RabbitMQ Java客户端,有必要配置一个自定义的ThreadFactory,它使用适当的方法来实例化线程,例如GAE的ThreadManager。如下是Google App Engine的一个示例。

import com.google.appengine.api.ThreadManager;

ConnectionFactory cf = new ConnectionFactory();

cf.setThreadFactory(ThreadManager.backgroundThreadFactory();

支持Java非阻塞IO

Java客户端4.0版为Java非阻塞IO(又名Java NIO)带来实验性支持。NIO不必定比堵塞IO更快,它只是容许更容易地控制资源(在这种状况下,线程)。

在默认的阻塞IO模式下,每一个链接使用一个线程从网络套接字读取。使用NIO模式,您能够控制从网络套接字读写的线程数。

若是Java进程使用许多链接(数十或数百),请使用NIO模式。您应该使用比使用默认阻止模式更少的线程。经过设置适当的线程数量,您不该该尝试下降性能,特别是在链接不太忙时。

NIO必须明确启用:

ConnectionFactory connectionFactory = new ConnectionFactory();

connectionFactory.useNio();

NIO模式能够经过NioParams类来配置:

connectionFactory.setNioParams(new NioParams().setNbIoThreads(4));

NIO模式使用合理的默认值,但您可能须要根据您本身的工做负载进行更改。其中一些设置是:使用的IO线程总数,缓冲区大小,用于IO循环的服务执行程序,内存写入队列的参数(写请求在网络上发送以前已排队)。请阅读Javadoc了解详情和默认值。

从网络故障中自动恢复

链接恢复

客户端和RabbitMQ节点之间的网络链接可能会失败。RabbitMQ Java客户端支持链接和拓扑(队列,交换,绑定和使用者)的自动恢复。许多应用程序的自动恢复过程遵循如下步骤:

1. 从新链接

2. 还原链接侦听器

3. 从新开放频道

4. 还原通道侦听器

5. 恢复频道basic.qos设置,发行商确认和交易设置

拓扑恢复包括为每一个通道执行的如下操做

1. 从新申报交换机(除了预约义的)

2. 从新申报队列

3. 恢复全部绑定

4. 恢复全部消费者

从Java客户端的4.0.0版开始,默认状况下启用自动恢复(所以也是拓扑恢复)。

拓扑恢复依赖于实体(队列,交换,绑定,使用者)的每一个链接缓存。当链接声明一个队列时,它将被添加到缓存中。当它被删除或计划删除(例如,由于它被自动删除)它将被删除。这个模型有一些局限在下面。

要禁用或启用自动链接恢复,请使用factory.setAutomaticRecoveryEnabled(boolean) 方法。如下片断显示了如何显式启用自动恢复(例如,对于Java 4.0.0以前的客户端):

ConnectionFactory factory = new ConnectionFactory();

factory.setUsername(userName);

factory.setPassword(password);

factory.setVirtualHost(virtualHost);

factory.setHost(hostName);

factory.setPort(portNumber);

factory.setAutomaticRecoveryEnabled(true);

//链接会自动恢复 

Connection conn = factory.newConnection();

若是因为异常致使恢复失败(例如,RabbitMQ节点仍然没法访问),它将在固定时间间隔后重试(默认为5秒)。间隔能够配置:

ConnectionFactory factory = new ConnectionFactory();

//每10秒尝试恢复一次 

factory.setNetworkRecoveryInterval(10000);

当提供地址列表时,列表会被混淆,而且全部地址都会在下一个地址以后被尝试:

ConnectionFactory factory = new ConnectionFactory();

Address[] addresses = {new Address("192.168.1.4"), new Address("192.168.1.5")};

factory.newConnection(addresses);

什么时候会触发链接恢复?

自动链接恢复(若是启用)将由如下事件触发:

· I / O异常在链接的I / O循环中抛出

· 套接字读取操做超时

· 检测到错过的服务器心跳(超时)

· 链接的I / O循环中会引起任何其余意外的异常

以先发生者为准。

通道级别的异常不会触发任何形式的恢复,由于它们一般表示应用程序中存在语义问题(例如尝试从不存在的队列中使用)。

恢复监听器

能够在可恢复的链接和通道上注册一个或多个恢复监听器。当启用链接恢复时,由ConnectionFactory#newConnection和Connection#createChannel返回的 链接将 实现com.rabbitmq.client.Recoverable接口,提供两个具备至关描述性名称的方法:

· addRecoveryListener

· removeRecoveryListener

请注意,您目前须要将链接和频道投射到Recoverable 才能使用这些方法。

对发布的影响

链接断开时 使用Channel.basicPublish发布的消息将丢失。在链接恢复后,客户端不会将它们排队等待传递。为了确保发布的消息到达RabbitMQ应用程序须要使用Publisher确认 并考虑链接失败。

拓扑恢复

拓扑恢复涉及恢复交换,队列,绑定和消费者。当启用自动恢复功能时,它默认启用。所以,从Java客户端4.0.0开始,默认启用拓扑恢复。

若是须要,能够显式禁用拓扑恢复:

ConnectionFactory factory = new ConnectionFactory();

Connection conn = factory.newConnection();

 //启用自动恢复(例如,先前的Java客户端4.0.0) 

factory.setAutomaticRecoveryEnabled(true);

//禁用拓扑恢复 

factory.setTopologyRecoveryEnabled(false);

故障检测和恢复限制

自动链接恢复具备许多应用程序开发人员须要注意的局限性和故意设计决策。

拓扑恢复依赖于实体(队列,交换,绑定,使用者)的每一个链接缓存。当链接声明一个队列时,它将被添加到缓存中。当它被删除或计划删除(例如,由于它被自动删除)它将被删除。这使得能够在不出现意外结果的状况下在不一样频道上声明和删除实体。这也意味着使用自动链接恢复的链接上的全部通道上的消费者标记(通道专用标识符)必须是惟一的。

当链接中断或丢失时,须要时间来检测。所以,库和应用程序都不知道有效的链接失败。在这个时间段内发布的任何消息都会像往常同样序列化并写入TCP套接字。他们只能经过发布商确认向代理商交付:经过AMQP 0-9-1进行发布彻底是异步设计。

当启用了自动恢复功能的链接检测到套接字或I / O操做错误时,缺省状况下会在可配置延迟5秒后进行恢复。这种设计假定即便大量的网络故障是短暂的而且一般很短暂,但它们不会当即消失。链接恢复尝试将以相同的时间间隔继续,直到成功打开新链接。

当链接处于恢复状态时,任何在其频道上尝试发布的内容都将被拒绝,并有异常。客户端当前不执行此类传出消息的任何内部缓冲。应用程序开发者有责任跟踪这些消息并在恢复成功时从新发布它们。 发布商确认是一种协议扩展,应该由发布商不能承受消息丢失的状况下使用。

因为通道级别的异常致使通道关闭时,链接恢复不会启动。这种例外一般表示应用程序级别的问题。目前(library)没法就此状况作出明智的决定。

即便在链接恢复启动后,闭合通道也不会恢复。这包括明确关闭的通道和上面的通道级异常状况。

手动确认和自动恢复

当使用手动确认时,在消息传递和确认之间,到RabbitMQ节点的网络链接可能会失败。链接恢复后,RabbitMQ将重置全部通道上的交付标签。这意味着basic.ackbasic.nackbasic.reject 与旧的交付标签将致使通道异常。为了不这种状况,RabbitMQ Java客户端跟踪并更新交付标签,使它们在恢复之间单调增加。 Channel.basicAck, Channel.basicNack和 Channel.basicReject而后将调整后的交付标签转换为RabbitMQ使用的标签。带有陈旧交付标签的确认将不会发送。使用手动确认和自动恢复的应用程序必须可以处理从新投递。

渠道Channels生命周期和拓扑恢复

对于应用程序开发人员来讲,自动链接恢复应该尽量透明,这就是为何Channel实例保持不变,即便多个链接失败并在幕后恢复。从技术上讲,当自动恢复打开时,Channel实例充当代理或装饰器:他们将AMQP业务委托给实际的AMQP通道实现,并在其周围实施一些恢复机制。这就是为何当它建立了一些资源(队列,交换,绑定)以后不该该关闭通道,或者这些资源的拓扑恢复稍后会失败,由于通道已关闭。相反,应该在应用程序的生命周期中建立通道。

未处理的异常

与链接,通道,恢复和消费者生命周期相关的未处理异常委派给异常处理程序。异常处理程序是实现ExceptionHandler接口的任何对象 。默认状况下,使用DefaultExceptionHandler的一个实例。它将异常详细信息打印到标准输出。

可使用ConnectionFactory#setExceptionHandler覆盖处理程序 。它将用于工厂建立的全部链接:

ConnectionFactory factory = new ConnectionFactory();

cf.setExceptionHandler(customHandler);

异常处理程序应该用于异常记录。

Metrics 性能监控Metrics and monitoring

从版本4.0.0开始,客户端收集运行时指标(例如已发布消息的数量)。度量标准集合是可选的,并使用setMetricsCollector(metricsCollector)方法在ConnectionFactory级别进行设置 。此方法须要一个MetricsCollector实例,该实例在客户端代码的多个位置中调用。

客户端支持 Micrometer (截至版本4.3)和 Dropwizard Metrics开箱即用。

如下是收集的指标:

· 打开的链接数

· 开放频道的数量

· 已发布消息的数量

· 消费的消息数量

· 已确认消息的数量

· 被拒绝的信息数量

Micrometer和Dropwizard指标都提供计数,但也包括平均速率,最后五分钟速率等与消息相关的指标。他们还支持常见的监控和报告工具(JMX,Graphite,Ganglia,Datadog等)。有关更多详细信息,请参阅下面的专用章

请注意关于metrics collection的如下内容:

· 在使用Micrometer或Dropwizard指标时,不要忘记将适当的依赖关系(以Maven,Gradle或甚至JAR文件的形式)添加到JVM类路径。这些是可选的依赖关系,不会随Java客户端自动拖动。您可能还须要添加其余依赖项,具体取决于所使用的报告后端。

· metrics collection是可扩展的。鼓励为特定需求实施自定义 MetricsCollector。

· 所述MetricsCollector设置在ConnectionFactory,但能够在不一样的实例共享。

· metrics collection不支持事务。例如,若是在事务中发送确认而且事务被回滚,则确认在客户metrics中被计数(显然不是broker实体)。请注意,确认实际上发送给代理,而后经过事务回滚取消,所以客户端指标在发送确认方面是正确的。总而言之,不要将客户端指标用于关键业务逻辑,它们不能保证彻底准确。它们旨在简化关于正在运行的系统的推理并使操做更高效。

Micrometer 支持

您能够经过如下方式使用Micrometer 启用metrics collection :

ConnectionFactory connectionFactory = new ConnectionFactory();

MicrometerMetricsCollector metrics = new MicrometerMetricsCollector();

connectionFactory.setMetricsCollector(metrics);

...

metrics.getPublishedMessages();//得到Micrometer的Counter对象

Micrometer支持 多种报告后端:Netflix Atlas,Prometheus,Datadog,Influx,JMX等。

您一般会将MeterRegistry的一个实例传递 给MicrometerMetricsCollector。这里是JMX的一个例子:

JmxMeterRegistry registry = new JmxMeterRegistry();

MicrometerMetricsCollector metrics = new MicrometerMetricsCollector(registry);

ConnectionFactory connectionFactory = new ConnectionFactory();

connectionFactory.setMetricsCollector(metrics);

Dropwizard指标Metrics支持

您能够经过如下方式使用Dropwizard启用metrics collection :

ConnectionFactory connectionFactory = new ConnectionFactory();

StandardMetricsCollector metrics = new StandardMetricsCollector();

connectionFactory.setMetricsCollector(metrics);

...

metrics.getPublishedMessages();//得到Metrics的Meter对象

Dropwizard指标支持 多种报告后端:控制台,JMX,HTTP,Graphite,Ganglia等。

您一般会将MetricsRegistry的实例传递 给StandardMetricsCollector。这里是JMX的一个例子:

MetricRegistry registry = new MetricRegistry();

StandardMetricsCollector metrics = new StandardMetricsCollector(registry);

 

ConnectionFactory connectionFactory = new ConnectionFactory();

connectionFactory.setMetricsCollector(metrics);

 

JmxReporter reporter = JmxReporter

  .forRegistry(registry)

  .inDomain("com.rabbitmq.client.jmx")

  .build();

reporter.start();

          

Google App Engine上的RabbitMQ Java客户端

在Google App Engine上使用RabbitMQ Java客户端(GAE)须要使用自定义线程工厂,使用GAE的ThreadManager实例化线程(请参阅上文)。此外,有必要设置一个低心跳间隔(4-5秒),以免运行到低的InputStream上GAE读超时:

ConnectionFactory factory = new ConnectionFactory();

cf.setRequestedHeartbeat(5);

        

警告和限制

为了使拓扑恢复成为可能,RabbitMQ Java客户端维护已声明的队列,交换和绑定的缓存。缓存是按链接的。某些RabbitMQ功能使客户没法观察一些拓扑变化,例如,当因为TTL而删除队列时。RabbitMQ Java客户端尝试在最多见的状况下使缓存条目无效:

· 当队列被删除时。

· 交换被删除时。

· 当绑定被删除。

· 消费者在自动删除的队列上取消时。

· 当队列或交换机从自动删除的交易所解除锁定时。

可是,除了单个链接以外,客户端没法跟踪这些拓扑变化。依赖自动删除队列或交换机以及队列TTL(注意:不是消息TTL!)并使用自动链接恢复的应用程序应显式删除已知未使用或已删除的实体,以清除客户端拓扑高速缓存。这是经过促进通道#queueDelete, 通道#exchangeDelete,通道#queueUnbind和通道#exchangeUnbind 是幂等在RabbitMQ的3.3.x(删除的内容不是有不致使异常)。

RPC(请求/回复)模式:一个例子

为了方便编程,Java客户端API提供了一个类RpcClient,它使用临时答复队列经过AMQP 0-9-1 提供简单的RPC式通讯工具。

该类不会对RPC参数和返回值施加任何特定的格式。它只是提供了一种机制,使用特定的路由密钥向给定的交换机发送消息,并等待回复队列上的响应。

import com.rabbitmq.client.RpcClient;

RpcClient rpc = new RpcClient(channel, exchangeName, routingKey);

(这个类如何使用AMQP 0-9-1的实现细节以下:请求消息是在 basic.correlation_id字段被设置为这个RpcClient实例惟一的值的状况下发送的,而且basic.reply_to被设置为回复队列。)

一旦建立了此类的实例,就可使用它经过使用如下任何方法发送RPC请求:

byte[] primitiveCall(byte[] message);

String stringCall(String message)

Map mapCall(Map message)

Map mapCall(Object[] keyValuePairs)

该primitiveCall方法传送原始字节数组做为请求和响应机构。方法stringCall是一个简单的primitiveCall简便包装器,它将消息体做为默认字符编码中的String实例处理。

该mapCall变种是有点更复杂的:它们编码java.util.Map包含普通的Java值到AMQP 0-9-1二进制表表示,和解码以一样的方式回应。(请注意,这里可使用哪些值类型有一些限制 - 请参阅javadoc了解详细信息。)

全部的marshalling/unmarshalling便利方法使用primitiveCall做为传输机制,并在其上提供一个包装层。

TLS支持

能够使用TLS加密客户端与代理之间的通讯 。客户端和服务器认证(又名同行认证)也被支持。如下是对Java客户端使用加密的最简单方法:

ConnectionFactory factory = new ConnectionFactory();

factory.setHost(“localhost”);

factory.setPort(5671);

factory.useSslProtocol();

      

请注意,客户端并未强制执行上述示例中的任何服务器身份验证(对等证书链验证)做为缺省值,使用TrustManager的 “信任全部证书” 。这对本地开发很方便,但容易发生中间人攻击,所以不推荐用于生产。要了解更多关于RabbitMQ中TLS支持的信息,请参阅TLS指南。若是您只想配置Java客户端(尤为是对等验证和信任管理器部分),请阅读TLS指南的相应部分

相关文章
相关标签/搜索