原文地址:RabbitMQ Tutorials - "Hello World!" - Javahtml
内容根据自身的理解有增删java
RabbitMQ是一个消息代理(broker):它接受并转发消息。git
三者一般不在同一个host里。github
由于这是一个简单的事例,因此忽略了许多 Java API 细节。api
以下图所示,“P”是生产者,“C”是消费者。中间是队列。数组
建立maven工程,添加相关依赖:amqp-client
,slf4j-api
,slf4j-simple
maven
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.1.1</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.25</version> <scope>test</scope> </dependency>
We'll call our message publisher (sender) Send and our message consumer (receiver) Recv. The publisher will connect to RabbitMQ, send a single message, then exit.
生产者链接到RabbitMQ,发送发消息,而后推出。ide
public class Send { //建立queue的名称 private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { //建立一个broker的本地链接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); //建立channel Channel channel = connection.createChannel(); //声明一个queue(幂等),消息可发送至此 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; //message以字节数组的方式传递 channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); //最后关闭channel和connection channel.close(); connection.close(); } }
so unlike the publisher which publishes a single message, we'll keep it running to listen for messages and print them out.
消费者能够是一致处于监听状态,等待消息的到来。函数
Note that we declare the queue here, as well. Because we might start the consumer before the publisher, we want to make sure the queue exists before we try to consume messages from it.
这段描述可能会让读者误觉得必须先启动消费者,实际上是不必定的。由于生产者发布消息后,消息存放于 queue 中,消费者随时均可以来获取。文档里用的是we might start
,而不是we must start
。spa
public class Recv { //名字和生产者建立时保持一致 private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { //建立connection、channel和queue;这一部分和Send.java同样 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); //DefaultConsumer是Cousumer的实现类,经过回调函数接收消息 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
使用rabbitmqctl list_queues
查看当前 queue 下的消息数量
$ rabbitmqctl list_queues Timeout: 60.0 seconds ... Listing queues for vhost / ... hello 2