MQ(Message Queue,消息队列)是一种应用系统之间的通讯方法。是经过读写出入队列的消息来通讯(RPC则是经过直接调用彼此来通讯的)。缓存
1.AMQP协议安全
在了解RabbitMQ以前,首先要了解AMQP协议,AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并不受不一样客户端/中间件产品,不一样开发语言等条件的限制。服务器
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。网络
1.1 AMQP的实现有:socket
OpenAMQ分布式
AMQP的开源实现,用C语言编写,运行于Linux、AIX、Solaris、Windows、OpenVMS工具
Apache Qpid性能
Apache的开源项目,支持C++、Ruby、Java、JMS、Python和.NETui
Redhat Enterprise MRG编码
实现了AMQP的最新版本0-10,提供了丰富的特征集,好比彻底管理、联合、Active-Active集群,有Web控制台,还有许多企业级特征,客户端支持C++、Ruby、Java、JMS、Python和.NET
RabbitMQ
一个独立的开源实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。RabbitMQ发布在Ubuntu、FreeBSD平台
AMQP
InfrastructureLinux下,包括Broker、管理工具、Agent和客户端
ØMQ
一个高性能的消息平台,在分布式消息网络可做为兼容AMQP的Broker节点,绑定了多种语言,包括Python、C、C++、Lisp、Ruby等
Zyre
是一个Broker,实现了RestMS协议和AMQP协议,提供了RESTful HTTP访问网络AMQP的能力
1.2 如下是AMQP中的核心概念:
Broker
消息服务器的实体
虚拟主机(Virtual Host)
一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。客户端应用程序在登陆到服务器以后,能够选择一个虚拟主机。每一个链接(包括全部channel)都必须关联至一个虚拟主机
交换器(Exchange)
服务器中的实体,用来接收生产者发送的消息并将这些消息路由给服务器中的队列
消息队列(Message Queue)
服务器中的实体,用来保存消息直到发送给消费者
生产者(Producer)
一个向交换器发布消息的客户端应用程序
消费者(Consumer)
一个从消息队列中请求消息的客户端应用程序
绑定器(Binding)
将交换器和队列链接起来,而且封装消息的路由信息
全部这些组件的属性各不相同,可是只有交换器和队列被命名。客户端能够经过交换器的名字来发送消息,也能够经过队列的名字收取信息。由于AMQ 协议没有一个通用的标准方法来得到全部组件的名称,因此客户端对队列和交换器的访问被限制在仅能使用熟知的或者只有本身知道的名字。
绑定器没有名字,它们的生命期依赖于所紧密链接的交换器和队列。若是这二者任意一个被删除掉,那么绑定器便失效了。这就说明,若要知道交换器和队列的名字,还须要设置消息路由。
消息是一个不透明的数据包,这些包有以下性质:
发送消息是一个很是简单的过程。客户端声明一个它想要发送消息的目的交换器,而后将消息传递给交换器。
接受消息的最简单办法是设置一个订阅。客户端须要声明一个队列,而且使用一个绑定器将以前的交换器和队列绑定起来,这样的话,订阅就设置完毕。
交换器的类型;
fanout交换器
不会解释任何东西:它只是将消息投递到全部绑定到它的队列中
direct交换器
将消息根据其routing-key属性投递到包含对应key属性的绑定器上
topic交换器
模式匹配分析消息的routing-key属性。它将routing-key和binding-key的字符串切分红单词。这些单词之间用点隔开。它一样也会识别两个通配符:#匹配0个或者多个单词,*匹配一个单词。例如,binding key *.stock.#匹配routing-key usd.stcok和eur.stock.db,可是不匹配stock.nasdaq
header交换器
根据应用程序消息的特定属性进行匹配failover和system交换器当前RabbitMQ版本中均未实现
没有绑定器,哪怕是最简单的消息,交换器也不能将其投递到队列中,只能抛弃它。经过订阅一个队列,消费者可以从队列中获取消息,而后在使用事后将其在队列中删除。
不一样于队列的是,交换器有相应的类型,代表它们的投递方式(一般是在和绑定器协做的时候)。由于交换器是命名实体,因此声明一个已经存在的交换器, 可是试图赋予不一样类型是会致使错误。客户端须要删除这个已经存在的交换器,而后从新声明而且赋予新的类型。
交换器也有一些性质:
AMQP Broker都会对其支持的每种交换器类型(为每个虚拟主机)声明一个实例。这些交换器的命名规则是amq.前缀加上类型名。例如 amq.fanout。空的交换器名称等于amq.direct。对这个默认的direct交换器(也仅仅是对这个交换器),Broker将会声明一个绑定了系统中全部队列的绑定器。
这个特色告诉咱们,在系统中,任意队列均可以和默认的direct交换器绑定在一块儿,只要其routing-key等于队列名字。
默认绑定器的行为揭示了多绑定器的存在,将一个或者多个队列和一个或者多个交换器绑定起来。这使得能够将发送到不一样交换器的具备不一样routing key(或者其余属性)的消息发送到同一个队列中。
队列也有如下属性,这些属性和交换器所具备的属性相似。
这些性质能够用来建立例如排他和自删除的transient或者私有队列。这种队列将会在全部连接到它的客户端断开链接以后被自动删除掉 – 它们只是短暂地链接到Broker,可是能够用于实现例如RPC或者在AMQ上的对等通讯。
AMQP上的RPC是这样的:RPC客户端声明一个回复队列,惟一命名(例如用UUID19), 而且是自删除和排他的。而后它发送请求给一些交换器,在消息的reply-to字段中包含了以前声明的回复队列的名字。RPC服务器将会回答这些请求,使用消息的reply-to做为routing key(以前提到过默认绑定器会绑定全部的队列到默认交换器)发送到默认交换器。注意仅仅是惯例而已。根据和RPC服务器的约定,它能够解释消息的任何属性(甚至数据体)来决定回复给谁。
队列也能够是持久的,可共享,非自动删除以及非排他的。使用同一个队列的多个用户接收到的并非发送到这个队列的消息的一份拷贝,而是这些用户共享这队列中的一份数据,而后在使用完以后删除掉。
2. RabbitMQ
RabbitMQ是一个遵循AMQP协议的消息中间件,它从生产者接收消息并递送给消费者,在这个过程当中,根据规则进行路由,缓存与持久化。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
几个概念说明(彻底遵循AMQP中的概念):
消息队列的使用过程大概以下:
exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。
exchange也有几个类型,彻底根据key进行投递的叫作Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。对key进行模式匹配后进行投递的叫作Topic交换机,符号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。还有一种不须要key的,叫作Fanout交换机,它采起广播模式,一个消息进来时,投递到与该交换机绑定的全部队列。
RabbitMQ支持消息的持久化,消息队列持久化包括3个部分:
若是exchange和queue都是持久化的,那么它们之间的binding也是持久化的。若是exchange和queue二者之间有一个持久化,一个非持久化,就不容许创建绑定。
RabbitMQ的特性:
下面经过生产者代码来解释一下RabbitMQ中涉及到的概念。
public class MsgSender { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException { /** * 建立链接链接到MabbitMQ */ ConnectionFactory factory = new ConnectionFactory(); // 设置MabbitMQ所在主机ip或者主机名 factory.setHost("127.0.0.1"); // 建立一个链接 Connection connection = factory.newConnection(); // 建立一个频道 Channel channel = connection.createChannel(); // 指定一个队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 发送的消息 String message = "hello world!"; // 往队列中发出一条消息 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); // 关闭频道和链接 channel.close(); connection.close(); } }
ConnectionFactory为Connection的制造工厂。
Connection是与RabbitMQ服务器的socket连接,它封装了socket协议及身份验证相关部分逻辑。
Channel是咱们与RabbitMQ打交道的最重要的一个接口,大部分的业务操做是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。
Queue(队列)是RabbitMQ的内部对象,用于存储消息,以下图所示:
RabbitMQ中的消息都只能存储在Queue中,生产者(下图中的P)生产消息并最终投递到Queue中,消费者(下图中的C)能够从Queue中获取消息并消费。
队列是有Channel声明的,并且这个操做是幂等的。同名的队列屡次声明也只会建立一次。咱们发送消息就是想这个声明的队列里发送消息。
而后看一下消费者的代码:
public class MsgReceiver { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws IOException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); // 打开链接和建立频道,与发送端同样 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时建立队列。 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 建立队列消费者 QueueingConsumer consumer = new QueueingConsumer(channel); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 指定消费队列 channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { // nextDelivery是一个阻塞方法(内部实现实际上是阻塞队列的take方法) QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } }
从上述代码中,咱们能够看到ConnectionFactory、Connection、Channel这三个对象都仍是会建立。而队列在消费者这里又声明了一遍。这是为了防止先启动消费者,当为消费者指定队列时,若是RabbitMQ服务器上未声明过队列,就会抛出IO异常。
队列消费者,用于监听队列中的消息。调用nextDelivery方法时,内部实现就是调用队列的take方法。该方法的做用:获取并移除此队列的头部,在元素变得可用以前一直等待(若是有必要)。说白了就是若是没有消息,就处于阻塞状态。