AMQP:Advanced Message Queuing Protocol,高级消息协议。服务器
RabbitMQ就是AMQP协议的erlang实现,AMQP的模型架构和RabbitMQ的模型架构是同样的,生产者将消息送给交换器,交换器和队列绑定。当生产者发送消息时所携带的RoutingKey与绑定时的BindingKey相匹配时,消息即被存入相应的队列之中,消费者能够订阅相应的队列来获取消息。架构
AMQP协议自己包括三层:ide
❤ Module Layer:位于协议的最高层,主要定义了一些供客户端调用的命令,客户端能够利用这些命令实现本身的业务逻辑。例如:客户端可使用Queue.Declare命令声明一个队列或者使用Basic.Consum订阅消费一个队列中的消息。fetch
❤ Session Layer:位于中间层,主要负责将客户端的命令发送给服务器,再将服务器的应答返回给客户端,主要为客户端与服务器之间的通讯提供可靠性同步机制和错误处理。ui
❤ Transport Layer:位于最底层,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示等。spa
AMQP说到底仍是一个通讯协议,通讯协议都会涉及报文交互,从low-level层面举例来讲,AMQP自己是应用层的协议,其填充于TCP协议层的数据部分,而从high-level层面来讲,AMQP是经过协议命令交互的。AMQP协议能够看做是一系列结构化命令的集合,这里的命令表明一种操做,相似于Http中的方法(GET、POST、PUT、DELETE等)。code
为了更好的说明AMQP协议命令的流转过程,下面经过代码的方式来解释:blog
//建立链接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP_ADDRESS); factory.setPort(PORT); factory.setUsername("joe"); factory.setPassword("123456"); //建立链接 Connection connection = factory.newConnection(); //建立信道 Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"direct",true,false,null); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY); String message = "Hello Rabbitmq"; channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes()); //关闭资源 channel.close(); connection.close();
当客户端与Broker(RabbitMQ服务器)创建链接的时候,会调用factory.newConnection方法,这个方法会进一步封装成Protocol Header的报文头发送给Broker,以此来通知Broker本次交互采用的是AMQP协议,紧接着Broker返回Connection.Start来创建链接,在链接的过程当中涉及Connection.Start/.Start-OK、Connection.Tune/.Tune-OK、Connection.Open/.Open-ok这6个命令的交互。队列
当客户端调用connection.createChannel方法准备开启信道的时候,其包装Channel.Open命令发送给Broker,等待Channel.Open-Ok命令。资源
当客户端发送消息的时候,须要调用channel.basicPublish方法,对应的AMQP命令为Basic.Publish,注意这个命令和前面涉及的命令略有不一样,这个命令还包括了Content Header和Content Body。Content Header里面包含的是消息体的属性,例如:投递模式、优先级等,而Content Body包含消息体的自己。
当客户端发送完消息须要关闭资源时,涉及Channel.Close/.Close-Ok与Connection.Close/.Close-Ok的命令交互。详细的流转过程以下图2-10所示:
仍是经过代码的方式来了解流转的过程:
Address[] addresses = new Address[]{new Address(IP_ADDRESS,PORT)}; //链接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("joe"); factory.setPassword("123456"); //创建链接 Connection connection = factory.newConnection(addresses); final Channel channel = connection.createChannel();//建立信道 channel.basicQos(64); //消费消息 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("receive message : " + new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(QUEUE_NAME,consumer); //关闭资源 channel.close(); connection.close();
消费者客户端一样须要与Broker创建链接,与生产者客户端同样,协议交互一样涉及Connection.Start/.Start-Ok、Connection.Tune/.Tune-Ok和Connection.Open/.Open-Ok等。
紧接着就是在Connection之上创建channel,和以前的生产者同样协议涉及Channel.Open/Open-Ok。
若是在消费以前调用了Channel.basicQos(int prefetchCount)的方法来设置消费者客户端最大能“保持”的未确认的消息数(即预取个数),那么协议流转就会涉及Basix.Qos/.Qos-Ok这两个AMQP命令。
在真正的消费以前,消费者客户端须要向Broker发送Basic.Consume命令(即调用channel.basicConsume方法)将Channel设置为接收模式,以后Broker回执Basic.Consume-Ok以告诉消费者客户端准备好消费消息。紧接着Broker向消费者客户端推送消息(Push),即Basic.Deliver命令,有意思的是这个和Basic.Publish命令同样会携带Content Header 和Content Body。
消费者接收到消息并正确消费后,向Broker发送确认,即Basic.Ack命令。
在消费者中止消费的时候,主动关闭链接,这点和生产者是同样的,涉及到Channel.Close/Channel-Ok和Connection.Close/.Close-Ok。
下图2-11是消费者详细的流转过程:
下图是一些其余的AMQP命令:
参考:《RabbitMQ实战指南》 朱忠华 编著;