本章咱们来一次快速入门RabbitMQ——生产者与消费者。须要构建一个生产端与消费端的模型。什么意思呢?咱们的生产者发送一条消息,投递到RabbitMQ集群也就是Broker。
咱们的消费端进行监听RabbitMQ,当发现队列中有消息后,就进行消费。html
本次整合主要采用SpringBoot框架,须要对SpringBoot的使用有必定了解。java
咱们来看下大概步骤:mysql
这个链接工厂须要配置一些相应的信息,例如: RabbitMQ节点的地址,端口号,VirtualHost等等。
Channel是咱们RabbitMQ全部消息进行交互的关键。git
/** * * @ClassName: ConnectionUtils * @Description: 链接工具类 * @author Coder编程 * @date 2019年6月21日 上午22:28:22 * */ public class ConnectionUtils { public static Connection getConnection() throws IOException, TimeoutException { //定义链接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置服务地址 factory.setHost("127.0.0.1"); //端口 factory.setPort(5672);//amqp协议 端口 相似与mysql的3306 //设置帐号信息,用户名、密码、vhost factory.setVirtualHost("/vhost_cp"); factory.setUsername("user_cp"); factory.setPassword("123456"); // 经过工程获取链接 Connection connection = factory.newConnection(); return connection; } }
/** * * @ClassName: Producer * @Description: 生产者 * @author Coder编程 * @date 2019年7月30日 上午21:04:43 * */ public class Producer { public static void main(String[] args) throws Exception { System.out.println("Producer start..."); //1 建立ConnectionFactory Connection connection = ConnectionUtils.getConnection(); //2 经过connection建立一个Channel Channel channel = connection.createChannel(); //3 经过Channel发送数据 for(int i=0; i < 5; i++){ String msg = "Hello RabbitMQ!"; //1 exchange 2 routingKey channel.basicPublish("", "test001", null, msg.getBytes()); } //4 记得要关闭相关的链接 channel.close(); connection.close(); } }
/** * * @ClassName: Consumer * @Description: 消费端 * @author Coder编程 * @date 2019年7月30日 上午21:08:12 * */ public class Consumer { public static void main(String[] args) throws Exception { System.out.println("Consumer start..."); //1 建立ConnectionFactory Connection connection = ConnectionUtils.getConnection(); //2经过connection建立一个Channel Channel channel = connection.createChannel(); //3声明(建立)一个队列 String queueName = "test001"; channel.queueDeclare(queueName, true, false, false, null); //4建立消费者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); //5设置Channel channel.basicConsume(queueName, true, queueingConsumer); while(true){ //6 获取消息 Delivery delivery = queueingConsumer.nextDelivery(); String msg = new String(delivery.getBody()); System.err.println("消费端: " + msg); //Envelope envelope = delivery.getEnvelope(); } } }
channel.queueDeclare(queueName, true, false, false, null);
第一个参数:queuename:队列的名称
第二个参数:durable 是否持久化。true消息会持久化到本地,保证重启服务后消息不会丢失
第三个参数:exclusive :表示独占方式,设置为true 在某些情景下有必要,例如:顺序消费。表示只有一个channel能够去监听,其余channel都不可以监听。目的就是为了保证顺序消费。
第四个参数:autoDelete:队列若是与Exchange未绑定,则自动删除
第五个参数:arguments:扩展参数github
channel.basicConsume(QUEUE_NAME, true, consumer);
第二个参数 autoAck:自动签收消息面试
(1)启动消费端
sql
(2)查看管控台
编程
能够看到已经有一个链接,一个信道,一个消费者等信息了。微信
能够看到信道目前的状态是空闲状态。框架
队列中多了test001队列。
关于管控台的介绍能够看这篇文章:消息中间件——RabbitMQ(四)命令行与管控台的基本操做!
(3)运行生产端
能够看到生产端发送完消息以后停下了,消费端迅速接收到了消息。也能够继续经过管控台观察消费的状况。
(4) 问题
注意:
这里面可能有一个问题:为何要先启动消费端呢?
由于在消费端建立的队列,咱们必需要有队列,才可以发送消息。
另外一个问题:在生产端代码中:
channel.basicPublish("", "test001", null, msg.getBytes());
并无设置exchange,只设置了队列名称,消费端却依然可以消费到消息,这是为何呢?
答:发消息的必定要指定Exchange,若是不指定Exchange或者Exchange为空的话,它会默认走第一个
它的路由规则:将相同命名的队列Queue的消息路由过去,若是路由不过去,将会把消息删除。
欢迎关注我的微信公众号:Coder编程
获取最新原创技术文章和免费学习资料,更有大量精品思惟导图、面试资料、PMP备考资料等你来领,方便你随时随地学习技术知识!
新建了一个qq群:315211365,欢迎你们进群交流一块儿学习。谢谢了!也能够介绍给身边有须要的朋友。
文章收录至
Github: https://github.com/CoderMerlin/coder-programming
Gitee: https://gitee.com/573059382/coder-programming
欢迎关注并star~
参考文章:
https://www.cnblogs.com/myJavaEE/p/6665166.html
《RabbitMQ消息中间件精讲》
推荐文章:
消息中间件——RabbitMQ(二)各大主流消息中间件综合对比介绍!