RabbitMQ之HelloWorld【译】

简介

RabbitMQ是一个消息代理,主要的想法很简单:它接收并转发消息。你能够把它当作一个邮局,当你发送邮件到邮筒,你相信邮差先生最终会将邮件投递给收件人。RabbitMQ在这个比喻里,是一个邮筒,邮局和一个邮递员。
RabbitMQ和邮局最大的不一样是,RabbitMQ不处理纸张,而是接收、存储和转发数据的二进制形式。
RabbitMQ和普通消息,使用的一些术语。
生产:意味着发送,发送消息的程序是生产者,生产者以下:
html

队列:是一个邮箱的名称。它在RabbitMQ里面。虽然消息流经RabbitMQ和应用程序,可是他们只能存储在队列里面。一个队列不受任何限制的约束,只要你喜欢你能够存储尽量多的东西,它本质上是一个无限的缓冲区。不少生产者能够发送消息到一个队列,不少消费者能够从一个队列获取数据。队列以下:
java

消费者:也有相似的含义接收,一个消费者是一个程序,主要是等待接收消息,消费者以下:
git

注意:生产者、消费者和代理不必定在一台机器上,事实上不少应用中也是如此。github

HelloWorld

下面咱们将用Java编写两个程序,生产者发送一个消息,消费者接收消息并打印出来。数组

在下图中,”P“是咱们的生产者,”C“是咱们的消费者,在中间的盒子是一个队列——即RabbitMQ维持的一个消息缓冲区。
缓存

The Java client library
RabbitMQ支持不少种协议,本教程使用AMQP 0-9-1,这是一个开发的通用的消息协议,RabbitMQ的客户端支持不少种语言,这里咱们将使用RabbitMQ提供的Java客户端。异步

下载客户端库包,并解压到工做目录:socket

$ unzip rabbitmq-java-client-bin-*.zip  
$ cp rabbitmq-java-client-bin-*/*.jar ./

固然,RabbitMQ也在maven中央仓库中:maven

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.6.1</version>
</dependency>

如今咱们有了Java客户端和依赖,咱们能够写一点代码了。ide

发送(Sending)

咱们称呼咱们的消息发送者为send,接受者为recv,发送者将链接到RabbitMQ,发送一条消息,而后退出。


在send.java中,咱们须要引入一些类:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

建立这个类,并命名队列:

public class Send {
  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv)
      throws java.io.IOException {
      ...
  }
}

而后咱们建立一个到服务的链接:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

这个链接虚拟化了socket链接,这里咱们在本地链接上代理,若是咱们想链接其余机器上的代理,咱们能够改变name和IP地址便可。
而后咱们建立了一个channel——提供了不少API供咱们获取东西的所在。

为了发送消息,咱们定义了一个队列,而后咱们能发布消息到队列上。

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

定义一个幂等的队列——只能在未存在的状况下建立。消息体是一个字节数组。
最后咱们关闭通道和链接:

channel.close();
connection.close();

以上就是Send.java的全部代码。
tips:
send无论用!在发送消息以后,在RabbitMQ的后台并无看到发送消息,你可能认为是程序错了,也许是代理没有足够的空间了(默认是须要1G的),因此拒绝接受消息。检查代理的logfile确认缘由所在。而后去配置文件设置disk_free_limit便可。

接收(Receiving)

咱们的接受者从RabbitMQ拉取消息,因此不像发送一条消息的发送者,咱们必须保持监听,而后打印出来。


recv和send的import差很少:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;

DefaultConsumer实现了Consumer接口——咱们用来缓冲服务推给咱们的消息。
实现和sender差很少,咱们打开一个链接和通道,定义一个咱们即将消费的队列,和send发布的队列同样。

public class Recv {
  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv)
      throws java.io.IOException,
             java.lang.InterruptedException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    ...
    }
}

这里咱们定义了一个队列,由于咱们也许会在sender以前启动receiver,咱们须要确认这个队列在咱们消费以前就已经存在了。
咱们让服务从队列里面给咱们传递消息,自从他将要异步的推送消息,咱们提供了一个缓存消息的回调方法,知道咱们去用他,这就是DefaultConsumer作的事情。

Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
      throws IOException {
    String message = new String(body, "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
  }
};
channel.basicConsume(QUEUE_NAME, true, consumer);

以上就是所有的Recv.java。

原文地址:https://www.rabbitmq.com/tutorials/tutorial-one-java.html

代码地址:https://github.com/aheizi/hi-mq

相关:
1.RabbitMQ之HelloWorld
2.RabbitMQ之任务队列
3.RabbitMQ之发布订阅
4.RabbitMQ之路由(Routing)
5.RabbitMQ之主题(Topic)
6.RabbitMQ之远程过程调用(RPC)

相关文章
相关标签/搜索