这些教程介绍了使用RabbitMQ建立消息传递应用程序的基础知识。您须要安装RabbitMQ服务器才能完成教程html
RabbitMQ是一个消息代理:它接受和转发消息。你能够把它想象成一个邮局:当你把你想要发布的邮件放在邮箱里时,你能够肯定先生或女士邮递员最终将邮件发送给你的收件人。java
在这个比喻中,RabbitMQ是邮政信箱,邮局和邮递员。git
RabbitMQ和邮局的主要区别在于它不处理纸张,而是接受,存储和转发二进制数据块 - 消息。github
英文原文地址:https://www.rabbitmq.com/tutorials/tutorial-one-java.htmlspring
在本教程的这一部分中,咱们将用Java编写两个程序; 一个发送单个消息的生产者,以及接收消息并将其打印出来的消费者。api
咱们将详细介绍Java API中的一些细节,专一于这个很是简单的事情,以便开始使用。 这是一个消息传递的“Hello World”。数组
在下图中,“P”是咱们的生产者,“C”是咱们的消费者。 中间的盒子是一个队列 - RabbitMQ表明消费者保存的消息缓冲区。缓存
RabbitMQ 有许多不一样语言的RabbitMQ客户端。 这里咱们将使用RabbitMQ提供的Java客户端。服务器
下载RabbitMQ提供的Java客户端以及它的依赖(SLF4J API and SLF4J Simple)异步
将这些文件复制到工做目录中,并跟着教程复制Java文件。
Tips:
RabbitMQ Java客户端也位于中央Maven存储库中,其中包含groupId com.rabbitmq和artifactId amqp-client。
请注意SLF4J Simple对于教程来讲已经足够了,可是您应该在生产环境中使用像Logback 这样的完整日志库。
关于Maven 等其余下载方式请移步
Java Client
- On Maven Central: RabbitMQ Java client.
- Quick download: Binary | Source
- Javadoc
- All Java client downloads
- Older versions
咱们拥有Java客户端及其依赖关系,那么咱们接下来开始写代码。
因为是学习使用RabbitMQ,咱们这里使用 STS 来写这个项目。
1. 打开STS,新建一个名字叫作 RabbitMQ_HelloWorld_Sample 的 Java Project。
2. 新建一个叫作libs的文件夹,咱们将上面三个jar 复制到咱们的项目中,而后并添加依赖
完成后项目结构如图所示
3. 编写生产者类
咱们会打电话给咱们的消息发布者(发送者)发送和咱们的消息使用者(接收者)Recv。 发布者将链接到RabbitMQ,发送一条消息,而后退出
建立Send.java 文件,关于代码的讲解在注释里:
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Send { //设置消息队列的名称 private final static String QUEUE_NAME="hello"; public static void main(String[] args) throws java.io.IOException, TimeoutException{ /** * 建立一个到RabbitMQ Server 的链接 * 链接抽象出套接字链接,并为咱们处理协议版本协商和身份验证等。 * 在这里,咱们链接到本地机器上的代理 - 所以是本地主机。 * 若是咱们想链接到另外一台机器上的代理,咱们只需在此指定其名称或IP地址。 * */ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); /** * 接下来咱们建立一个Channel 对象,这是大部分用于完成任务的API驻留的地方。 * 要想发送出去,咱们必须声明一个队列来执行发送,那么咱们能够将消息发布到队列中: * */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; //声明一个队列是幂等的 - 只有当它不存在时才会被建立。 //消息内容是一个字节数组,因此你能够编码任何你喜欢的地方。 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); //最后咱们关闭这些链接对象 channel.close(); connection.close(); } }
执行成功后你将看到这样的内容:
Tips: ,
若是这是您第一次使用RabbitMQ,而且您没有看到“已发送”消息,那么您可能会抓住您的头脑,想知道会出现什么问题?
也许代理启动时没有足够的可用磁盘空间(默认状况下它至少须要200 MB空闲空间),所以拒绝接受消息。
检查代理日志文件以确认并在必要时减小限制。 配置文件文档将告诉你如何设置disk_free_limit。
还有一种多是你的RabbitMQ 没有启动,执行下面命令再次尝试便可。
rabbitmq-service.bat start
4. 编写消费者
这就是咱们的出版商。 咱们的消费者推送来自RabbitMQ的消息,所以与发布单个消息的发布者不一样,咱们将继续运行以收听消息并将其打印出来。
建立一个Recv.java 文件,代码讲解在注释里面
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; /** * 消费者 * 额外的DefaultConsumer是一个实现Consumer接口的类,咱们将使用它来缓存由服务器推送给咱们的消息。 * */ public class Recv { private final static String QUEUE_NAME="hello"; public static void main(String[] args) throws IOException, TimeoutException { /** * 设置与发布者相同; * 咱们打开一个链接和一个通道,并声明咱们将要使用的队列。 * 请注意,这与发送发布到的队列相匹配 * */ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); /** * 请注意,咱们也在这里声明队列。 * 由于咱们可能会在发布者以前启动消费者,因此咱们但愿在咱们尝试使用消息以前确保队列已存在。 * */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); /** * 咱们即将告诉服务器将队列中的消息传递给咱们。 * 因为它会异步推送消息,所以咱们以对象的形式提供回调,该消息将缓冲消息,直到咱们准备好使用它们。 * 这是一个DefaultConsumer子类的做用。 * */ Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
执行成功后
咱们能够看到咱们的消费者收到了消息队列生产者刚发布的消息。
本篇完~