(翻译)RabbitMQ Java Client教程(一)Hello World

原文地址:https://www.rabbitmq.com/tutorials/tutorial-one-java.htmlhtml

介绍

RabbitMQ是一个接收并转发消息的消息代理。你能够把它当成是一座邮局,当你把想要发出的邮件放到信箱里以后,邮递员会把它运送到你指定的收件人那里。 RabbitMQ就是邮局、信箱和邮递员的集合。java

RabbitMQ和邮局的最大区别在于它处理的是消息——也就是二进制的数据,而非信件。数组

在RabbitMQ中使用了一些术语:缓存

  • 生产(producing)表示发送消息。一个发送消息的程序被称做生产者(producer)

  • 队列(queue)相似于RabbitMQ内部的邮箱。虽然消息能够在应用和RabbitMQ之间流转,可是它们只能保存在队列(queue)中。队列(queue)本质上是一个很大的消息缓冲区,它只会受到主机内存和磁盘容量的限制。多个生产者(producers)能够发送消息给同一个队列,同时多个消费者(consumers )也能够从同一个队列中接收消息。咱们用下边这张图表示一个队列:

  • 消费(consuming)表示接收消息。一个等待接收消息的程序被称做消费者(consumer

注意生产者、消费者以及消息代理不须要部署在同一台主机上,并且大多数状况下它们也确实不在一块儿。另外,一个程序也不能同时当生产者和消费者。并发

“Hello World”

在本章节咱们将编写两段Java程序:一个发送消息的生产者和一个接收消息并打印的消费者。咱们将仅着眼于那些须要用到的简单的东西而忽略一些细节。毕竟这只是一个入门的例子。异步

在下边这张图中,“P”是生产者,“C”是消费者。中间的方框是一个队列—— RabbitMQ为消费者保留的消息缓冲区。socket

发送消息

咱们将消息发送者成为==Send==,将消息接收者称为==Recv==。消息发送者将链接到RabbitMQ并发送一条消息,而后退出。编码

在==Send.java==中,咱们须要导入一些类3d

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

设置类并命名队列:代理

public class Send {
  private final static String QUEUE_NAME = "hello";
  public static void main(String[] argv) throws Exception {
      ...
  }
}

而后咱们建立一个链接到Rabbitmq服务端:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
     Channel channel = connection.createChannel()) {

}

Connection类抽象了底层的socket链接,并帮咱们处理包括协议版本协商和受权的一系列事情。这里咱们链接的是本地的RabbitMQ,因此使用了localhost。若是咱们想要链接到其余机器的RabbitMQ,咱们须要将localhost改为对应的域名或者IP。

接下来咱们建立一个channel,咱们大多数的工做均可以经过这个类提供的API完成。注意咱们可使用try-with-resources语法来建立,由于==Connection==和==Channel==都实现了==java.io.Closeable==接口。这样咱们就不须要手动关闭它们了。

为了发送消息,咱们还须要声明一个队列,全部的这些代码都包裹在try-with-resources语句块中

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

声明队列的代码是幂等的——它只有在不存在的时候才会建立。它能够接收的消息为字节数组,因此你能够任意编码你的消息。

下边是Send.java的完整代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

public class Send {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

消息发送不成功!

若是这是你第一次使用RabbitMQ而且没有看到"Sent"消息,你可能抓耳挠腮想要知道哪里出了问题。或许是RabbitMQ启动的时候磁盘空间不足(默认状况下至少须要200MB的剩余空间),因此它拒绝接收消息。检查RabbitMQ的日志文件查明缘由,并在须要的时候调低这个限制。这份配置文档会告诉你怎么设置disk_free_limit参数。

接收消息

咱们的消费者不像生产者那样只须要发送一条消息,它须要一直监听来自RabbitMQ的消息,因此咱们须要保持它持续运行接收消息并打印。

==Recv.java==的代码与Send.java的代码很相近:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

多出来的DefaultConsumer类实现了 Consumer接口,咱们将会用它来接收来自RabbitMQ的消息。

相应的配置与生产者相同:咱们开启一个 connection和一个 channel,并声明咱们想要消费的队列。注意这里队列的名字必须与生产者中的队列名字相同。

public class Recv {

  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

  }
}

注意这里咱们也声明了一个队列。由于咱们的消费者可能比生产者先启动,因此在咱们消费消息以前须要先确保队列存在。

为何咱们不使用try-with-resource语句来自动关闭channel和connection?由于若是这样作的话,咱们的程序会自动关闭关闭这些资源并退出!而实际上咱们须要这个程序持续运行以异步监听消息的到达。

以后咱们须要告诉RabbitMQ服务端将队列中的消息发送给咱们。因为RabbitMQ会将消息异步发送给咱们,因此咱们提供了一个回调方法来帮咱们缓存消息,直到咱们去处理这些消息。这就是DeliverCallback类所作的事情。

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

下边是Recv.java的完整代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Recv {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}
相关文章
相关标签/搜索