介绍
RabbitMQ是做为一个消息代理中间件,其设计的目的很简单:收集消息而后转发消息。你能够把它当成一个邮局:当你发送邮件到邮箱时你确定相信邮递员确定会把这封邮件送到收件人手上。在这个比喻中,RabbitMQ实际充当了邮箱、邮局以及邮递员的角色。
与邮局最大的不一样是,RabbitMQ它不处理纸张文件,而是负责接收,储存以及转发二进制数据message。
在详细介绍以前,咱们首先了解一些RabbitMQ的术语:html
消息队列:消息队列存在RabbitMQ server端中;虽然消息流经你的应用程序和RabbitMQ server,但他们只能够储存在RabbitMQ server队列中。队列是不受任何限制的,它能够储存你发送的全部消息,由于本质上它就是一个无限缓冲区。能够多个生产者同时发送消息给一个队列,同时 也能够多个消费者从一个队列接收数据。如下图来表示消息队列:数组
消费者:咱们称大部分时间都在等待接收消息的程序端为消费者;如下图来表示消费者:”Hello world“
在本节咱们将用java编写两个程序:一个生产者用来发送消息以及一个消费者用来接收消息而且打印出来。在实现过程当中咱们主要关注如何实现功能而对于java API的细节一笔带过。
在下面这个图中,”P”表明生产者,”C”表明消费者,在中间的盒子表示队列(一个RabbitMQ用来保存消息的缓冲区)。
基于java的客户端:
RabbitMQ遵循AMQP协议(高级消息队列协议)–这是一个开放的、通用的消息协议。如今网上已存在不少基于的不一样语言实现的AMQP客户端。在本例中咱们使用java语言的客户端。maven引入以下:缓存
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>rabbitmq-client</artifactId>
<version>1.3.0</version>
</dependency>
生产者
在这个例子中,咱们分别以Send和Recv来表示生产者、消费者。Send主要用来链接RabbitMQ server及发送一个消息,以后退出。
下面是Send.class实现:服务器
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException { //--------------1-------------- ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //--------------2----------------- channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); //---------------3---------------- channel.close(); connection.close(); } }
代码的第一部分主要建立一个与RabbitMQ服务器的链接,该连接抽象了socket操做,负责判断协议版本和身份认证等等。另外由于咱们把消息 代理broker搭建在本地,因此链接的地址是localhost。若是你想链接到其余机器的代理broker,你能够指定它的域名或者IP地址。
在第二部分,咱们建立了一个通道;通道为咱们提供了不少API用于完成消息的发送。好比,咱们能够声明一个队列用来发送消息以及把消息发布到声明的队列。
值得注意的是,声明一个队列是幂等的,只有在队列不存在的时候才建立。另外因为消息的内容是二进制的字节数组,因此收到数据后你能够编码成任意你须要的数据。
最后,也就是第三部分,咱们须要关掉通道channel以及链接。
发送失败
若是你是第一次使用RabbitMQ,那么当你没有看到发送的消息的时候,你可能会挠头到底想问题出在了哪呢?没关系,有多是broker(代理,之后 统称broker)没有足够的磁盘空间(默认状况下它至少须要1Gb的剩余空间)所以拒绝接收任何消息。你能够查看broker的日志文件来确认而且减小 一下限制。地址http://www.rabbitmq.com/configure.html#config-items里说明了如何设置 disk_free_limit.
异步
消费者
下面实现咱们的消费者,它主要用来持续监听RabbitMQ推送消息而且打印。
下面是Recv.class的实现:socket
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { //-------------1------------------- ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); //--------------2------------------- QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } }
QueueingConsumer类主要用来缓存RabbitMQ服务器推送的消息。
创建Recv的过程跟send相似;首先建立一个到服务器的链接和一个通道channel(后面统一叫channel),以及定义一个咱们将要消费的队列。注意队列必须与send的匹配,也就是名字要同样。
在第一部分,咱们声明了一个队列;另外因为咱们可能在sender运行前启动了Recv,所以咱们要确保在咱们消费消息前队列已经存在。
在第二部分中咱们告诉服务器要把QUEUE_NAME队列中的消息推送给咱们。由于服务器是异步推送消息给咱们,所以咱们须要提供一个回调对象QueueingConsumer来缓存消息直到被程序消费掉。
QueueingConsumer.nextDelivery()会堵塞直到RabbitMQ服务器推送消息过来。maven