本篇博客围绕下面几个方面展开:java
Now, Let's begin !数组
1、代码前的理论热身服务器
咱们来看张图:ide
Publisher(生产者)生成消息,而后publish(发布)消息到exchange(路由器,也有资料翻译成交换机),而后根据路由规则将消息传递到Queue(队列),最终交由Consumer(消费者)进行消费处理。测试
这里的生产者和消费者都是咱们的应用,所以咱们的代码中要实现这两个部分。spa
中间的节点就是RabbitMQ 提供的内容,须要再生产者和消费者里面调用其接口来定义和使用这些节点。翻译
2、代码实例:Hello RabbitMQ调试
package com.sam.hello_rabbitmq; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Provider { //定义队列名 static String QUEUE_NAME = "helloRabbit"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; Channel channel = null; try { //1.建立链接和通道 connection = factory.newConnection(); channel = connection.createChannel(); //2.为通道声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //3.发布消息 String msg = " hello rabbitmq, welcome to sam's blog."; channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("provider send a msg: " + msg); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { //4.关闭链接 if (channel != null) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
在第2步中,channel.queueDeclare 用来建立队列,有5个参数:String queue, 队列名; boolean durable, 该队列是否须要持久化; boolean exclusive,该队列是否为该通道独占的(其余通道是否能够消费该队列); boolean autoDelete,该队列再也不使用的时候,是否让RabbitMQ服务器自动删除掉; Map<String, Object> arguments 其余参数。第3步中,channel.basicPublish 发布消息(用在生产者),有4个参数:String exchange, 路由器(有的资料翻译成交换机)的名字,即将消息发到哪一个路由器; String routingKey, 路由键,即发布消息时,该消息的路由键是什么; BasicProperties props, 指定消息的基本属性; byte[] body 消息体,也就是消息的内容,是字节数组。 可能你会疑惑,为何没有exchange呢?由于若是声明了队列,能够不声明路由器。code
2.接着来实现消费者,消费者实现和生产者过程差很少,可是在这里并无关闭链接和通道,是由于要消费者一直等待随时可能发来的消息。代码以下:blog
package com.sam.hello_rabbitmq; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class HelloConsumer { public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; Channel channel = null; try { // 1.建立链接和通道 connection = factory.newConnection(); channel = connection.createChannel(); // 2.为通道声明队列 channel.queueDeclare(Provider.QUEUE_NAME, false, false, false, null); System.out.println(" **** keep alive ,waiting for messages, and then deal them"); // 3.经过回调生成消费者 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException { //获取消息内容而后处理 String msg = new String(body, "UTF-8"); System.out.println("*********** HelloConsumer" + " get message :[" + msg +"]"); } }; //4.消费消息 channel.basicConsume(Provider.QUEUE_NAME, true, consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
在第4步中,channel.basicConsume 用来接收消息,用在消费者,有3个参数:String queue, 队列名字,即要从哪一个队列中接收消息; boolean autoAck, 是否自动确认,默认true; Consumer callback 消费者,即谁接收消息。
3、运行代码并调试问题
代码写好了,接下来进行测试,