原文地址:https://www.rabbitmq.com/tutorials/tutorial-one-java.htmlhtml
RabbitMQ是一个接收并转发消息的消息代理。你能够把它当成是一座邮局,当你把想要发出的邮件放到信箱里以后,邮递员会把它运送到你指定的收件人那里。 RabbitMQ就是邮局、信箱和邮递员的集合。java
RabbitMQ和邮局的最大区别在于它处理的是消息——也就是二进制的数据,而非信件。数组
在RabbitMQ中使用了一些术语:缓存
注意生产者、消费者以及消息代理不须要部署在同一台主机上,并且大多数状况下它们也确实不在一块儿。另外,一个程序也不能同时当生产者和消费者。并发
在本章节咱们将编写两段Java程序:一个发送消息的生产者和一个接收消息并打印的消费者。咱们将仅着眼于那些须要用到的简单的东西而忽略一些细节。毕竟这只是一个入门的例子。异步
在下边这张图中,“P”是生产者,“C”是消费者。中间的方框是一个队列—— RabbitMQ为消费者保留的消息缓冲区。socket
咱们将消息发送者成为==Send==,将消息接收者称为==Recv==。消息发送者将链接到RabbitMQ并发送一条消息,而后退出。编码
在==Send.java==中,咱们须要导入一些类3d
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;
设置类并命名队列:代理
public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ... } }
而后咱们建立一个链接到Rabbitmq服务端:
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { }
Connection类抽象了底层的socket链接,并帮咱们处理包括协议版本协商和受权的一系列事情。这里咱们链接的是本地的RabbitMQ,因此使用了localhost。若是咱们想要链接到其余机器的RabbitMQ,咱们须要将localhost改为对应的域名或者IP。
接下来咱们建立一个channel,咱们大多数的工做均可以经过这个类提供的API完成。注意咱们可使用try-with-resources语法来建立,由于==Connection==和==Channel==都实现了==java.io.Closeable==接口。这样咱们就不须要手动关闭它们了。
为了发送消息,咱们还须要声明一个队列,全部的这些代码都包裹在try-with-resources语句块中
channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");
声明队列的代码是幂等的——它只有在不存在的时候才会建立。它能够接收的消息为字节数组,因此你能够任意编码你的消息。
下边是Send.java的完整代码
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.nio.charset.StandardCharsets; public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'"); } } }
消息发送不成功!
若是这是你第一次使用RabbitMQ而且没有看到"Sent"消息,你可能抓耳挠腮想要知道哪里出了问题。或许是RabbitMQ启动的时候磁盘空间不足(默认状况下至少须要200MB的剩余空间),因此它拒绝接收消息。检查RabbitMQ的日志文件查明缘由,并在须要的时候调低这个限制。这份配置文档会告诉你怎么设置disk_free_limit参数。
咱们的消费者不像生产者那样只须要发送一条消息,它须要一直监听来自RabbitMQ的消息,因此咱们须要保持它持续运行接收消息并打印。
==Recv.java==的代码与Send.java的代码很相近:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback;
多出来的DefaultConsumer类实现了 Consumer接口,咱们将会用它来接收来自RabbitMQ的消息。
相应的配置与生产者相同:咱们开启一个 connection和一个 channel,并声明咱们想要消费的队列。注意这里队列的名字必须与生产者中的队列名字相同。
public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); } }
注意这里咱们也声明了一个队列。由于咱们的消费者可能比生产者先启动,因此在咱们消费消息以前须要先确保队列存在。
为何咱们不使用try-with-resource语句来自动关闭channel和connection?由于若是这样作的话,咱们的程序会自动关闭关闭这些资源并退出!而实际上咱们须要这个程序持续运行以异步监听消息的到达。
以后咱们须要告诉RabbitMQ服务端将队列中的消息发送给咱们。因为RabbitMQ会将消息异步发送给咱们,因此咱们提供了一个回调方法来帮咱们缓存消息,直到咱们去处理这些消息。这就是DeliverCallback类所作的事情。
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
下边是Recv.java的完整代码
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }