RabbitMQ是一个消息代理,主要的想法很简单:它接收并转发消息。你能够把它当作一个邮局,当你发送邮件到邮筒,你相信邮差先生最终会将邮件投递给收件人。RabbitMQ在这个比喻里,是一个邮筒,邮局和一个邮递员。
RabbitMQ和邮局最大的不一样是,RabbitMQ不处理纸张,而是接收、存储和转发数据的二进制形式。
RabbitMQ和普通消息,使用的一些术语。
生产:意味着发送,发送消息的程序是生产者,生产者以下:
html
队列:是一个邮箱的名称。它在RabbitMQ里面。虽然消息流经RabbitMQ和应用程序,可是他们只能存储在队列里面。一个队列不受任何限制的约束,只要你喜欢你能够存储尽量多的东西,它本质上是一个无限的缓冲区。不少生产者能够发送消息到一个队列,不少消费者能够从一个队列获取数据。队列以下:
java
消费者:也有相似的含义接收,一个消费者是一个程序,主要是等待接收消息,消费者以下:
git
注意:生产者、消费者和代理不必定在一台机器上,事实上不少应用中也是如此。github
下面咱们将用Java编写两个程序,生产者发送一个消息,消费者接收消息并打印出来。数组
在下图中,”P“是咱们的生产者,”C“是咱们的消费者,在中间的盒子是一个队列——即RabbitMQ维持的一个消息缓冲区。
缓存
The Java client library
RabbitMQ支持不少种协议,本教程使用AMQP 0-9-1,这是一个开发的通用的消息协议,RabbitMQ的客户端支持不少种语言,这里咱们将使用RabbitMQ提供的Java客户端。异步
下载客户端库包,并解压到工做目录:socket
$ unzip rabbitmq-java-client-bin-*.zip $ cp rabbitmq-java-client-bin-*/*.jar ./
固然,RabbitMQ也在maven中央仓库中:maven
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.1</version> </dependency>
如今咱们有了Java客户端和依赖,咱们能够写一点代码了。ide
咱们称呼咱们的消息发送者为send,接受者为recv,发送者将链接到RabbitMQ,发送一条消息,而后退出。
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;
建立这个类,并命名队列:
public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException { ... } }
而后咱们建立一个到服务的链接:
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
这个链接虚拟化了socket链接,这里咱们在本地链接上代理,若是咱们想链接其余机器上的代理,咱们能够改变name和IP地址便可。
而后咱们建立了一个channel——提供了不少API供咱们获取东西的所在。
为了发送消息,咱们定义了一个队列,而后咱们能发布消息到队列上。
channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");
定义一个幂等的队列——只能在未存在的状况下建立。消息体是一个字节数组。
最后咱们关闭通道和链接:
channel.close(); connection.close();
以上就是Send.java的全部代码。
tips:
send无论用!在发送消息以后,在RabbitMQ的后台并无看到发送消息,你可能认为是程序错了,也许是代理没有足够的空间了(默认是须要1G的),因此拒绝接受消息。检查代理的logfile确认缘由所在。而后去配置文件设置disk_free_limit便可。
咱们的接受者从RabbitMQ拉取消息,因此不像发送一条消息的发送者,咱们必须保持监听,而后打印出来。
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer;
DefaultConsumer实现了Consumer接口——咱们用来缓冲服务推给咱们的消息。
实现和sender差很少,咱们打开一个链接和通道,定义一个咱们即将消费的队列,和send发布的队列同样。
public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); ... } }
这里咱们定义了一个队列,由于咱们也许会在sender以前启动receiver,咱们须要确认这个队列在咱们消费以前就已经存在了。
咱们让服务从队列里面给咱们传递消息,自从他将要异步的推送消息,咱们提供了一个缓存消息的回调方法,知道咱们去用他,这就是DefaultConsumer作的事情。
Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer);
以上就是所有的Recv.java。
原文地址:https://www.rabbitmq.com/tutorials/tutorial-one-java.html
代码地址:https://github.com/aheizi/hi-mq
相关:
1.RabbitMQ之HelloWorld
2.RabbitMQ之任务队列
3.RabbitMQ之发布订阅
4.RabbitMQ之路由(Routing)
5.RabbitMQ之主题(Topic)
6.RabbitMQ之远程过程调用(RPC)