RabbitMQ介绍

(一)RabbitMQ基本概念

  RabbitMQ是流行的开源消息队列系统,用erlang语言开发。我曾经对这门语言挺有兴趣,学过一段时间,后来没坚持。RabbitMQ是 AMQP(高级消息队列协议)的标准实现。若是不熟悉AMQP,直接看RabbitMQ的文档会比较困难。不过它也只有几个关键概念,这里简单介绍。html

RabbitMQ的结构图以下:java

一、几个概念说明:安全

Broker:简单来讲就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪一个队列。
Queue:消息队列载体,每一个消息都会被投入到一个或多个队列。
Binding:绑定,它的做用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里能够开设多个vhost,用做不一样用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每一个链接里,可创建多个channel,每一个channel表明一个会话任务。服务器

二、消息队列的使用过程大概以下:架构

(1)客户端链接到消息队列服务器,打开一个channel。
(2)客户端声明一个exchange,并设置相关属性。
(3)客户端声明一个queue,并设置相关属性。
(4)客户端使用routing key,在exchange和queue之间创建好绑定关系。
(5)客户端投递消息到exchange。dom

exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。异步

exchange也有几个类型,彻底根据key进行投递的叫作Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。对key进行模式匹配后进行投递的叫作Topic交换机,符 号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。还 有一种不须要key的,叫作Fanout交换机,它采起广播模式,一个消息进来时,投递到与该交换机绑定的全部队列。ide

三、关联关系工具

从示意图能够看出消息生产者并无直接将消息发送给消息队列,而是经过创建与Exchange的Channel,将消息发送给Exchange,Exchange根据规则,将消息转发给指定的消息队列。消费者经过创建与消息队列相连的Channel,从消息队列中获取消息。post

这里谈到的Channel能够理解为创建在生产者/消费者和RabbitMQ服务器之间的TCP链接上的虚拟链接,一个TCP链接上能够创建多个Channel。 RabbitMQ服务器的Exchange对象能够理解为生产者发送消息的邮局,消息队列能够理解为消费者的邮箱。Exchange对象根据它定义的规则和消息包含的routing key以及header信息将消息转发到消息队列。channel下图中浅红色框起来的两块所示:

根据转发消息的规则不一样,RabbitMQ服务器中使用的Exchange对象有四种,Direct Exchange, Fanout Exchange, Topic Exchange, Header Exchange,若是定义Exchange时没有指定类型和名称, RabbitMQ将会为每一个消息队列设定一个Default Exchange,它的Routing Key是消息队列名称。

RabbitMQ Java Client的官网示例有6个,本篇只使用三个例程,分别是使用默认Default Exchange的消息生产/消费,使用Direct Exchange的消息生产/消费,以及RPC方式的消息生产/消费。

为了测试方便,咱们新定义了一个virutal host,名字是test_vhosts,定义了两个用户rabbitmq_producer和rabbitmq_consumer, 设置其user_tag为administrator(能够进行远程链接), 为它们设置了访问test_vhosts下全部资源的权限。

建立virutal host,在Admin-->Virtual Hosts(右侧的导航栏上)打开:

建立用户:

为用户设置权限:(在用户列表上点击某个用户进入设置页面)

 

 

使用默认Default Exchange的消息生产/消费

咱们定义一个生产者程序,一个消费者程序。

生产者程序代码以下:

package com.gl365.payment.util.rabbitmq.demo1;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import io.netty.handler.timeout.TimeoutException;

public class ProducerApp {
    public static void main(String[] args) throws IOException, TimeoutException {  
        Connection connection = null;  
        Channel channel = null;  
        try  
        {  
            ConnectionFactory factory = new ConnectionFactory();  
            factory.setHost("localhost");  
            factory.setPort(5672);  
            factory.setUsername("rabbitmq_producer");  
            factory.setPassword("rabbitmq_producer");  
            factory.setVirtualHost("test_vhosts");  
   
            //建立与RabbitMQ服务器的TCP链接  
            connection  = factory.newConnection();  
            channel = connection.createChannel();  
            channel.queueDeclare("firstQueue", true, false, false, null);  
            String message = "First Message";             
            channel.basicPublish("", "firstQueue", null, message.getBytes());  
            System.out.println("Send Message is:'" + message + "'");              
        }  
        catch(Exception ex)  
        {  
            ex.printStackTrace();  
        }  
        finally  
        {  
            if(channel != null)  
            {  
                try {
                    channel.close();
                } catch (java.util.concurrent.TimeoutException e) {
                    e.printStackTrace();
                }  
            }  
            if(connection != null)  
            {  
                connection.close();  
            }  
        }  
    }
}

关于生产者的代码有几点说明:

1) RabbitMQ Java Client示例提供的ConnectionFactory属性设置的代码只有一句:

factory.setHost("localhost");  

这句代码表示使用rabbitmq服务器默认的virutal host(“/”),默认的用户guest/guest进行链接,可是若是这段代码运行在远程机器上时, 将由于guest用户不能用于远程链接RabbitMQ服务器而运行失败,上面提供的代码是能够进行创建远程链接的代码。

2)Channel创建后,调用Channel.queueDeclare方法建立消息队列firstQueue。

Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,  
                 Map<String, Object> arguments) throws IOException; 

这个方法的第二个参数durable表示创建的消息队列是不是持久化(RabbitMQ重启后仍然存在,并非指消息的持久化),第三个参数exclusive 表示创建的消息队列是否只适用于当前TCP链接,第四个参数autoDelete表示当队列再也不被使用时,RabbitMQ是否能够自动删除这个队列。 第五个参数arguments定义了队列的一些参数信息,主要用于Headers Exchange进行消息匹配时。

 

3)生产者发送消息使用Channel.basicPublish方法。

void basicPublish(String exchange, String routingKey,   
 BasicProperties props, byte[] body) throws IOException; 

第一个参数exchange是消息发送的Exchange名称,若是没有指定,则使用Default Exchange。 第二个参数routingKey是消息的路由Key,是用于Exchange将消息路由到指定的消息队列时使用(若是Exchange是Fanout Exchange,这个参数会被忽略), 第三个参数props是消息包含的属性信息。RabbitMQ的消息属性和消息体是分开的,不像JMS消息那样同时包含在javax.jms.Message对象中,这一点须要特别注意。 第四个参数body是RabbitMQ消息体。 咱们这里调用basicPublish方法发送消息时,props参数为null,于是咱们发送的消息是非持久化消息,若是要发送持久化消息,咱们须要进行以下设置:

AMQP.BasicProperties props =  
                    new AMQP.BasicProperties("text/plain",  
                            "UTF-8",  
                            null,  
                            2,  
                            0, null, null, null,  
                            null, null, null, null,  
                            null, null);  
 channel.basicPublish("", "firstQueue", props, message.getBytes());  

定义props时的参数2表示消息的类型为持久化消息。 运行生产者程序后,咱们能够执行rabbitmqctl命令查看队列消息,咱们看到firstQueue队列有一条消息。

消费者代码以下:

package com.gl365.payment.util.rabbitmq.demo1;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class ConsumerApp {
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("rabbitmq_consumer");
            factory.setPassword("rabbitmq_consumer");
            factory.setVirtualHost("test_vhosts");
            connection = factory.newConnection();
            channel = connection.createChannel();

            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                        byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(" Consumer have received '" + message + "'");
                }
            };
            channel.basicConsume("firstQueue", true, consumer);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}

消费者代码中,创建Connection,Channel的代码和生产者程序相似。它主要定义了一个Consumer对象,这个对象重载了DefaultCustomer类 的handleDelivery方法:

void handleDelivery(String consumerTag,  
                        Envelope envelope,  
                        AMQP.BasicProperties properties,  
                        byte[] body) 
 

handleDelivery方法的第一个参数consumerTag是接收到消息时的消费者Tag,若是咱们没有在basicConsume方法中指定Consumer Tag,RabbitMQ将使用随机生成的Consumer Tag(以下图所示)

 

第二个参数envelope是消息的打包信息,包含了四个属性:

1._deliveryTag,消息发送的编号,表示这条消息是RabbitMQ发送的第几条消息,咱们能够看到这条消息是发送的 第一条消息。

2._redeliver,重传标志,确认在收到对消息的失败确认后,是否须要重发这条消息,咱们这里的值是false,不须要重发。

3._exchange,消息发送到的Exchange名称,正如咱们上面发送消息时同样,exchange名称为空,使用的是Default Exchange。

4._routingKey,消息发送的路由Key,咱们这里是发送消息时设置的“firstQueue”。

第三个参数properties就是上面使用basicPublish方法发送消息时的props参数,因为咱们上面设置它为null,这里接收到的properties 是默认的Properties,只有bodySize,其余全是null。

第四个参数body是消息体.

咱们这里重载的handleDelivery方法仅仅打印出了生产者发送的消息内容,实际使用时能够转发给后台程序进行处理。

在Consumer对象定义后,咱们调用了Channel.basicConsume方法将Consumer与消息队列绑定,不然Consumer没法从消息队列获取消息。

String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException  

basicConsume方法的第一个参数是Consumer绑定的队列名,第二个参数是自动确认标志,若是为true,表示Consumer接受到消息后,会自动发确认消息(Ack消息)给消息队列,消息队列会将这条消息从消息队列里删除,第三个参数就是Consumer对象,用于处理接收到的消息。

若是咱们想让消费者接收到消息后对消息进行手动确认(Manual Ack),咱们须要对代码进行两处改动:

1)在调用basicConsume方法时,将autoAck属性设置为false。

channel.basicConsume("firstQueue", false, consumer);  

2)在handleDelivery方法中调用Channel.basicAck方法,发送手动确认消息给消息队列。

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)  
                        throws IOException  
{  
      this.getChannel().basicAck(envelope.getDeliveryTag(), false);  
}  

basicAck方法有两个参数,第一个参数deliverTag是消息的发送编号,第二个参数multiple是消息确认方式,若是值为true,表示对消息队列里全部编号小于或等于当前消息编号的未确认消息进行手动确认,若是为false,表示仅确认当前消息。

消费者代码执行后,咱们能够看到消费者程序的控制台输出了这条消息的内容,并且使用rabbitmqctl命令查看队列消息时,队列里的消息数为0。

使用Direct Exchange的消息生产/消费

使用Direct Exchange的生产者/消费者代码与Default Exchange比较相似,不过生产者程序的代码须要添加建立Direct Exchange和 将Exchange和消息队列绑定的代码,具体添加和修改的代码以下:

package com.gl365.payment.util.rabbitmq.demo2;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import io.netty.handler.timeout.TimeoutException;

public class ProducerApp {
    public static void main(String[] args) throws IOException, TimeoutException {  
        Connection connection = null;  
        Channel channel = null;  
        try  
        {  
            ConnectionFactory factory = new ConnectionFactory();  
            factory.setHost("localhost");  
            factory.setPort(5672);  
            factory.setUsername("rabbitmq_producer");  
            factory.setPassword("rabbitmq_producer");  
            factory.setVirtualHost("test_vhosts");  
   
            //建立与RabbitMQ服务器的TCP链接  
            connection  = factory.newConnection();  
            channel = connection.createChannel(); 
            channel.exchangeDeclare("directExchange", "direct"); channel.queueDeclare("directQueue", true, false, false, null); channel.queueBind("directQueue", "directExchange", "directMessage");  
            String message = "First Direct Message";  
               
            channel.basicPublish("directExchange", "directMessage", null, message.getBytes());  
            System.out.println("Send Direct Message is:'" + message + "'");
        }  
        catch(Exception ex)  
        {  
            ex.printStackTrace();  
        }  
        finally  
        {  
            if(channel != null)  
            {  
                try {
                    channel.close();
                } catch (java.util.concurrent.TimeoutException e) {
                    e.printStackTrace();
                }  
            }  
            if(connection != null)  
            {  
                connection.close();  
            }  
        }  
    }
}

首先咱们调用Channel.exchangeDeclare方法建立名为“directExchange”的Direct Exchange。

Exchange.DeclareOk exchangeDeclare(String exchange, String type,boolean durable) throws IOException 

exchangeDeclare方法的第一个参数exchange是exchange名称,第二个参数type是Exchange类型,有“direct”,“fanout”,“topic”,“headers”四种,分别对应RabbitMQ的四种Exchange。第三个参数durable是设置Exchange是否持久化( 即在RabbitMQ服务器重启后Exchange是否仍存在,若是没有设置,默认是非持久化的)

建立“directQueue”消息队列后,咱们再调用Channel.queueBind方法,将咱们建立的Direct Exchange和消息队列绑定。

Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;  

queueBind方法第一个参数queue是消息队列的名称,第二个参数exchange是Exchange的名称,第三个参数routingKey是消息队列和Exchange之间绑定的路由key,咱们这里绑定的路由key是“directMessage”。从Exchange过来的消息,只有routing key为“directMessage”的消息会被转到消息队列“directQueue”,其余消息将不会被转发,下面将证明这一点。

运行ProducerApp程序,使用rabbitmq_producer用户登陆管理页面,咱们能够看到名为“directExchange”的Direct Exchange被建立出来。

消息队列directQueue与它绑定,routing key为directMessage。

消息队列directQueue里有一条消息

咱们修改ProducerApp的程序,将消息的routing key改成“indirectMessage”

package com.gl365.payment.util.rabbitmq.demo2;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import io.netty.handler.timeout.TimeoutException;

public class ProducerApp {
    public static void main(String[] args) throws IOException, TimeoutException {  
        Connection connection = null;  
        Channel channel = null;  
        try  
        {  
            ConnectionFactory factory = new ConnectionFactory();  
            factory.setHost("localhost");  
            factory.setPort(5672);  
            factory.setUsername("rabbitmq_producer");  
            factory.setPassword("rabbitmq_producer");  
            factory.setVirtualHost("test_vhosts");  
   
            //建立与RabbitMQ服务器的TCP链接  
            connection  = factory.newConnection();  
            channel = connection.createChannel(); 
            channel.exchangeDeclare("directExchange", "direct");  
            channel.queueDeclare("directQueue", true, false, false, null);  
            channel.queueBind("directQueue", "directExchange", "directMessage");  
            //String message = "First Direct Message";  
            String message = "First Indirect Message";  
            channel.basicPublish("directExchange", "indirectMessage", null, message.getBytes());  
            System.out.println("Send Indirect Message is:'" + message + "'"); 
            
            //channel.basicPublish("directExchange", "indirectQueue", null, message.getBytes());  
            //System.out.println("Send Direct Message is:'" + message + "'");
        }  
        catch(Exception ex)  
        {  
            ex.printStackTrace();  
        }  
        finally  
        {  
            if(channel != null)  
            {  
                try {
                    channel.close();
                } catch (java.util.concurrent.TimeoutException e) {
                    e.printStackTrace();
                }  
            }  
            if(connection != null)  
            {  
                connection.close();  
            }  
        }  
    }
}

再次运行程序后,打开管理页面,咱们看到“directQueue”队列里仍然只有一条消息。

 

咱们向Exchange发送的第二条消息因为和绑定的routing key不一致,没有被转发到“directQueue”消息队列,说明被RabbitMQ丢弃了

 

咱们经过管理界面再建立一个消息队列“indirectQueue”,在它和“directExchange”之间创建bind关系,routingkey为“indirectMessage” 。

再绑定一个

 

再次运行ProducerApp程序,咱们能够看到“directQueue”消息队列消息数还是1,但“indirectQueue”消息队列接收到了从Exchange转发来的消息。

 

使用RPC方式的消息生产/消费

RPC方式的消息生产和消费示意图以下:

 

 

在这种方式下,生产者和消费者之间的消息发送/接收流程以下:

1)生产者在发送消息的同时,将返回消息的消息队列名(replyTo中指定)以及消息关联Id(correlationId)附带在消息Properties中发送给消费者。

2)消费者在接收到消息,处理完成后,将结果做为返回消息发送到replyTo指定的返回消息队列中,同时附带接收消息中的corrleationId, 以便让生产者接收到到返回消息后,根据corrleationId确认是针对1)中发送消息的返回消息,若是correlationId确认一致,则将返回消息 取出,进行后续处理。

示意图中的生产者和消费者在发送消息时使用的都是Default Exchange,咱们接下来的程序作一点改动,使用Direct Exchange。

在咱们的程序中,生产者发送一个数字给消费者,消费者接收到消息后,计算这个数字的阶乘结果,返回给生产者。 生产者程序的主要代码以下:

[java]  view plain  copy
 
  1.   //建立RPC发送消息的Direct Exchange,消息队列和绑定关系。  
  2.   channel.exchangeDeclare("rpcSendExchange", "direct",true);  
  3.   channel.queueDeclare("rpcSendQueue", true, false, false, null);  
  4.   channel.queueBind("rpcSendQueue", "rpcSendExchange", "rpcSendMessage");  
  5.   
  6.   //创建RPC返回消息的Direct Exchange, 消息队列和绑定关系           
  7.   channel.exchangeDeclare("rpcReplyExchange", "direct",true);  
  8.   channel.queueDeclare("rpcReplyQueue", true, false, false, null);  
  9.   channel.queueBind("rpcReplyQueue", "rpcReplyExchange", "rpcReplyMessage");  
  10.   
  11.   //建立接收RPC返回消息的消费者,并将它与RPC返回消息队列相关联。  
  12.   QueueingConsumer replyCustomer = new QueueingConsumer(channel);  
  13.   channel.basicConsume("rpcReplyQueue", true,replyCustomer);  
  14.   
  15.   String number = "10";  
  16.   
  17.   //生成RPC请求消息的CorrelationId  
  18.   String correlationId = UUID.randomUUID().toString();  
  19.   //在RabbitMQ消息的Properties中设置RPC请求消息的CorrelationId以及  
  20.   //ReplyTo名称(咱们这里使用的是Exchange名称,  
  21.   //而不是消息队列名称)  
  22.   BasicProperties props = new BasicProperties  
  23.                       .Builder()  
  24.                       .correlationId(correlationId)  
  25.                       .replyTo("rpcReplyExchange")  
  26.                       .build();  
  27.   
  28.   System.out.println("The send message's correlation id is:" + correlationId);              
  29.   channel.basicPublish("rpcSendExchange", "rpcSendMessage", props, number.getBytes());  
  30.   
  31.   String response = null;  
  32.   
  33.   while(true)  
  34.   {  
  35.           //从返回消息中取一条消息  
  36.    Delivery delivery = replyCustomer.nextDelivery();  
  37.    //若是消息的CorrelationId与发送消息的CorrleationId一致,表示这条消息是  
  38.           //发送消息对应的返回消息,是阶乘运算的计算结果。  
  39.           System.out.println("The received reply message's correlation id is:" + messageCorrelationId);  
  40.           String messageCorrelationId = delivery.getProperties().getCorrelationId();  
  41.    if (!Strings.isNullOrEmpty(messageCorrelationId) && messageCorrelationId.equals(correlationId))   
  42.           {  
  43.     response = new String(delivery.getBody());  
  44.     break;  
  45.    }  
  46.   }  
  47.   
  48.   //输出阶乘运算结果  
  49.   if(!Strings.isNullOrEmpty(response))  
  50.   {  
  51. System.out.println("Factorial(" + number + ") = " + response);  
  52.   }  
消费者程序的主要代码以下:
[java] view plain copy
 
  1.  Consumer consumer = new DefaultConsumer(channel)  
  2.  {  
  3.     @Override  
  4.     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException  
  5.     {  
  6.        //获取返回消息发送到的Exchange名称  
  7.        String replyExchange = properties.getReplyTo();  
  8.    
  9.        //设置返回消息的Properties,附带发送消息的CorrelationId.  
  10.        AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()  
  11.                             .correlationId(properties.getCorrelationId())  
  12.                             .build();  
  13.    
  14.        String message = new String(body,"UTF-8");  
  15.        System.out.println("The received message is:" + message);  
  16.        System.out.println("The received message's correlation id is:" + properties.getCorrelationId());  
  17.    
  18.        //计算阶乘,factorial方法是计算阶乘的方法。  
  19.        int number = Integer.parseInt(message);  
  20.        String response = factorial(number);  
  21.    
  22.        //将阶乘消息发送到Reply Exchange  
  23.        this.getChannel().basicPublish(replyExchange, "rpcReplyMessage",replyProps, response.getBytes());  
  24.    }  
  25. };  
  26.    
  27. channel.basicConsume("rpcSendQueue", true, consumer);  

先运行生产者程序,发送请求消息到Send Exchange,而后等待消费者发送的返回消息。 
再启动消费者程序,计算阶乘并返回结果给Reply Exchange。 两个程序的控制台信息以下图所示
生产者程序控制台

消费者程序控制台

从控制台信息能够看出生产者端根据返回消息中包含的Correlation Id判断出这是发送消息对应的返回消息,获取了阶乘的计算结果。

这个例子只是简单的生产者和消费者之间的方法调用,实际使用时,咱们能够基于这个实例,实现更为复杂的操做。

 

RabbitMQ Client的重连机制

 

RabbitMQ Java Client提供了重连机制,不过在RabbitMQ Java Client 4.0版本以前,自动重连默认是关闭的。从Rabbit Client 4.0版本开始,自动重连默认是打开的。控制自动重连的属性是com.rabbitmq.client.ConnectionFactory类的automaticRecovery和topologyRecovery属性。

设置automaticRecovery属性为true时,会执行如下recovery:

1)Connection的重连。

2)侦听Connection的Listener的恢复。

3)从新创建在Connection基础上的Channel。

4)侦听Channel的Listener的恢复。

5)Channel上的设置,如basicQos,publisher confirm以及事务属性等的恢复。

当设置topologyRecovery属性为true时,会执行如下recovery:

1)exchange的从新定义(不包含预约义的exchange)

2)queue的从新定义(不包含预约义的queue)

3)binding的从新定义(不包含预约义的binding)

4)全部Consumer的恢复

咱们定义一个带auto recovery的消费者程序,咱们使用RabbitMQ Java Client 4.0.0版本,这个版本引入了AutorecoveringConnection和

AutorecoveringChannel类,能够添加RecoveryListener对Recovery过程进行监控。

 

[java] view plain copy
 
  1. public class RecoveryConsumerApp  
  2. {  
  3.     public static void main( String[] args ) throws IOException, TimeoutException {  
  4.             ConnectionFactory connectionFactory = new ConnectionFactory();  
  5.             ...................  
  6.    
  7.             AutorecoveringConnection connection = (AutorecoveringConnection)connectionFactory.newConnection();  
  8.             String originalLocalAddress =  
  9.                     connection.getLocalAddress() + ":" + connection.getLocalPort();  
  10.             System.out.println("The origin connection's local address is:" + originalLocalAddress);  
  11.    
  12.             AutorecoveringChannel  channel = (AutorecoveringChannel)connection.createChannel();  
  13.             System.out.println("The origin channel's channel number is:" + channel.getChannelNumber());  
  14.    
  15.             channel.exchangeDeclare("recoveryExchange", BuiltinExchangeType.DIRECT, false, true ,null);  
  16.             channel.queueDeclare("recoveryQueue", false, false, true,null);  
  17.             channel.queueBind("recoveryQueue", "recoveryExchange", "recoveryMessage");  
  18.    
  19.             connection.addRecoveryListener(new RecoveryListener() {  
  20.                 public void handleRecovery(Recoverable recoverable) {  
  21.                     System.out.println("Connection handleRecovery method is called");  
  22.                     AutorecoveringConnection recoveredConnection =  
  23.                             (AutorecoveringConnection)recoverable;  
  24.                     String recoveredLocalAddress =  
  25.                             recoveredConnection.getLocalAddress() + ":" + recoveredConnection.getLocalPort();  
  26.                     System.out.println("The recovered connection's local address is:" + recoveredLocalAddress);  
  27.                 }  
  28.    
  29.                 public void handleRecoveryStarted(Recoverable recoverable) {  
  30.                     System.out.println("Connection handleRecoveryStarted method is called");  
  31.                 }  
  32.             });  
  33.    
  34.             channel.addRecoveryListener(new RecoveryListener() {  
  35.                     public void handleRecovery(Recoverable recoverable) {  
  36.                         System.out.println("Channel handleRecovery method is called");  
  37.                         AutorecoveringChannel recoveryChannel =  
  38.                                 (AutorecoveringChannel)recoverable;  
  39.                         System.out.println("The recovered Channel's number is:" + recoveryChannel.getChannelNumber());  
  40.                     }  
  41.    
  42.                     public void handleRecoveryStarted(Recoverable recoverable) {  
  43.                         System.out.println("Channel handleRecoveryStarted method is called");  
  44.                     }  
  45.             });  
  46.    
  47.     }  
  48. }  

 

这个程序中Exchange, Queue都是非持久化而且自动删除的。 咱们为Connection和Channel分别添加了Recovery Listener匿名对象,

便于确认他们确实进行了Recovery操做。

启动程序后,咱们能够看到recoveryExchange和recoveryQueue都被建立出来,且Binding关系创建了。

 

链接的本地地址是0.0.0.0:8109,Channel编号是1

此时咱们关闭RabbitMQ服务器,再重启RabbitMQ服务器,咱们能够从控制台界面看到有链接超时的警告信

息以及重连信息。

从重连日志信息中咱们能够看出Channel的编号仍是1,可是Connection的本地地址已经变成了0.0.0.0:8470,证实进行了重连。

链接到recoveryQueue队列上的Consumer Tag也进行了恢复,并且Consumer Tag与以前的Consumer Tag一致,这是由于设置了

topologyRecovery属性为true。

咱们再在生产者程序中使用重连机制,依然使用Rabbit Java Client 4.0版本 生产者程序的片断以下:

 

[java] view plain copy
 
  1. <span style="font-size: 17.5px;">  </span>factory.setAutomaticRecoveryEnabled(true);  
  2.    factory.setNetworkRecoveryInterval(60000);  
  3.    factory.setTopologyRecoveryEnabled(true);  
  4.    
  5.    AutorecoveringConnection connection = (AutorecoveringConnection)factory.newConnection();  
  6.    AutorecoveringChannel channel = (AutorecoveringChannel)connection.createChannel();     
  7.    //设置Channel为Publish Confirm模式  
  8.    channel.confirmSelect();  <span style="font-size: 17.5px;">  </span>  


 

登陆管理界面,咱们能够看到生产者创建的Channel是Confirm模式(图中Mode列用C表示)

咱们关掉RabbitMQ服务器,再重启RabbitMQ服务器,能够看到生产者Channel被恢复,可是本地端口号已经从13684变成了13874,

说明这是从新建立的Channel,建立的Channel仍然是Confirm模式,和最初的Channel一致。

若是咱们设置Channel为Transaction模式(调用Channel.txSelect()方法),重连后恢复的Channel的模式也仍然是Transaction模式。

RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。消息队列持久化包括3个部分:
(1)exchange持久化,在声明时指定durable => 1
(2)queue持久化,在声明时指定durable => 1
(3)消息持久化,在投递时指定delivery_mode => 2(1是非持久化)

若是exchange和queue都是持久化的,那么它们之间的binding也是持久化的。若是exchange和queue二者之间有一个持久化,一个非持久化,就不容许创建绑定。

(2、基本概念介绍)

      AMQP(高级消息队列协议) 是一个异步消息传递所使用的应用层协议规范,做为线路层协议,而不是API(例如JMS),AMQP 客户端可以无视消息的来源任意发送和接受信息。AMQP的原始用途只是为金融界提供一个能够彼此协做的消息协议,而如今的目标则是为通用消息队列架构提供通用构建工具。所以,面向消息的中间件 (MOM)系统,例如发布/订阅队列,没有做为基本元素实现。反而经过发送简化的AMQ实体,用户被赋予了构建例如这些实体的能力。这些实体也是规范的一 部分,造成了在线路层协议顶端的一个层级:AMQP模型。这个模型统一了消息模式,诸如以前提到的发布/订阅,队列,事务以及流数据,而且添加了额外的特性,例如更易于扩展,基于内容的路由。

AMQP当中有四个概念很是重要

  1. virtual host,虚拟主机
  2. exchange,交换机
  3. queue,队列
  4. binding,绑定

一个虚拟主机持有一组交换机、队列和绑定。

为何须要多个虚拟主机呢?由于RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。所以,若是须要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别建立一个虚拟主机。每个RabbitMQ服务器都有一个默认的虚拟主机/

何谓虚拟主机(virtual host),交换机(exchange),队列(queue)和绑定(binding)

队列(Queues)是你的消息(messages)的终点,能够理解成装消息的容器。消息就一直在里面,直到有客户端(也就是消费者,Consumer)链接到这个队列而且将其取走为止。不过,也能够将一个队列配置成这样的:一旦消息进入这个队列,此消息就被删除。

队列是由消费者(Consumer)经过程序创建的,不是经过配置文件或者命令行工具。这没什么问题,若是一个消费者试图建立一个已经存在的队列,RabbitMQ会直接忽略这个请求。所以咱们能够将消息队列的配置写在应用程序的代码里面。

而要把一个消息放进队列前,须要有一个交换机(Exchange)。

交换机(Exchange)能够理解成具备路由表的路由程序。每一个消息都有一个称为路由键(routing key)的属性,就是一个简单的字符串。交换机当中有一系列的绑定(binding),即路由规则(routes)。(例如,指明具备路由键 “X” 的消息要到名为timbuku的队列当中去。)

消费者程序(Consumer)要负责建立你的交换机。交换机能够存在多个,每一个交换机在本身独立的进程当中执行,所以增长多个交换机就是增长多个进程,能够充分利用服务器上的CPU核以便达到更高的效率。例如,在一个8核的服务器上,能够建立5个交换机来用5个核,另外3个核留下来作消息处理。相似的,在RabbitMQ的集群当中,你能够用相似的思路来扩展交换机一边获取更高的吞吐量。

交换机如何判断要把消息送到哪一个队列?你须要路由规则,即绑定(binding)。一个绑定就是一个相似这样的规则:将交换机“desert(沙漠)”当中具备路由键“阿里巴巴”的消息送到队列“hideout(山洞)”里面去。换句话说,一个绑定就是一个基于路由键将交换机和队列链接起来的路由规则。例如,具备路由键“audit”的消息须要被送到两个队列,“log-forever”和“alert-the-big-dude”。要作到这个,就须要建立两个绑定,每一个都链接一个交换机和一个队列,二者都是由“audit”路由键触发。在这种状况下,交换机会复制一份消息而且把它们分别发送到两个队列当中。交换机不过就是一个由绑定构成的路由表。

交换机有多种类型。他们都是作路由的,可是它们接受不一样类型的绑定。为何不建立一种交换机来处理全部类型的路由规则呢?由于每种规则用来作匹配分子的CPU开销是不一样的。例如,一个“topic”类型的交换机试图将消息的路由键与相似“dogs.*”的模式进行匹配。匹配这种末端的通配符比直接将路由键与“dogs”比较(“direct”类型的交换机)要消耗更多的CPU。若是你不须要“topic”类型的交换机带来的灵活性,你能够经过使用“direct”类型的交换机获取更高的处理效率。那么有哪些类型,他们又是怎么处理的呢?

  Exchange

  1. Exchange Direct

     

    Exchange Fanout

    Exchange Topic

     


持久化

你花了大量的时间来建立队列、交换机和绑定,而后,服务器程序挂了。你的队列、交换机和绑定怎么样了?还有,放在队列里面可是还没有处理的消息们呢?

若是你是用默认参数构造的这一切的话,那么,他们都灰飞烟灭了。RabbitMQ重启以后会干净的像个新生儿。你必须重作全部的一切,亡羊补牢,如何避免未来再度发生此类杯具?

队列和交换机有一个建立时候指定的标志durable。durable的惟一含义就是具备这个标志的队列和交换机会在重启以后从新创建,它不表示说在队列当中的消息会在重启后恢复。那么如何才能作到不仅是队列和交换机,还有消息都是持久的呢?

可是首先须要考虑的问题是:是否真的须要消息的持久化?若是须要重启后消息能够回复,那么它须要被写入磁盘。但即便是最简单的磁盘操做也是要消耗时间的。因此须要衡量判断。

当你将消息发布到交换机的时候,能够指定一个标志“Delivery Mode”(投递模式)。根据你使用的AMQP的库不一样,指定这个标志的方法可能不太同样。简单的说,就是将Delivery Mode设置成2,也就是持久的(persistent)便可。通常的AMQP库都是将Delivery Mode设置成1,也就是非持久的。因此要持久化消息的步骤以下:

  1. 将交换机设成 durable。
  2. 将队列设成 durable。
  3. 将消息的 Delivery Mode 设置成2 。

绑定(Bindings)怎么办?绑定没法在建立的时候设置成durable。没问题,若是你绑定了一个durable的队列和一个durable的交换机,RabbitMQ会自动保留这个绑定。相似的,若是删除了某个队列或交换机(不管是否是durable),依赖它的绑定都会自动删除。

注意:

  • RabbitMQ 不容许你绑定一个非坚固(non-durable)的交换机和一个durable的队列。反之亦然。要想成功必须队列和交换机都是durable的。
  • 一旦建立了队列和交换机,就不能修改其标志了。例如,若是建立了一个non-durable的队列,而后想把它改变成durable的,惟一的办法就是删除这个队列而后重现建立。所以,最好仔细检查建立的标志。