来了解RabbitMQ一个重要的概念:Exchange交换机java
蓝色框:客户端发送消息至交换机,经过路由键路由至指定的队列。 黄色框:交换机和队列经过路由键有一个绑定的关系。 绿色框:消费端经过监听队列来接收消息。git
Name
:交换机名称 Type
:交换机类型——direct、topic、fanout、headers、sharding(此篇不讲) Durability
:是否须要持久化,true为持久化 Auto Delete
:当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange Internal
:当前Exchange是否用于RabbitMQ内部使用,默认为false Arguments
:扩展参数,用于扩展AMQP协议自定制化使用github
注意:Direct模式可使用RabbitMQ自带的Exchange:default Exchange,因此不须要将Exchange进行任何绑定(binding)操做,消息传递时,RouteKey必须彻底匹配才会被队列接收,不然该消息会被抛弃。面试
重点:routing key与队列queues 的key保持一致,便可以路由到对应的queue中。编程
生产端:服务器
/** * * @ClassName: Producer4DirectExchange * @Description: 生产者 * @author Coder编程 * @date2019年7月19日 下午22:15:52 * */ public class Producer4DirectExchange { public static void main(String[] args) throws Exception { //1建立ConnectionFactory Connection connection = ConnectionUtils.getConnection(); //2建立Channel Channel channel = connection.createChannel(); //3 声明 String exchangeName = "test_direct_exchange"; String routingKey = "test.direct"; //4 发送 String msg = "Coder编程 Hello World RabbitMQ 4 Direct Exchange Message ... "; channel.basicPublish(exchangeName, routingKey , null , msg.getBytes()); } }
消费端:微信
/** * * @ClassName: Consumer4DirectExchange * @Description: 消费者 * @author Coder编程 * @date2019年7月19日 下午22:18:52 * */ public class Consumer4DirectExchange { public static void main(String[] args) throws Exception { //建立ConnectionFactory Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //声明 String exchangeName = "test_direct_exchange"; String exchangeType = "direct"; String queueName = "test_direct_queue"; String routingKey = "test.direct"; //表示声明了一个交换机 channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); //表示声明了一个队列 channel.queueDeclare(queueName, false, false, false, null); //创建一个绑定关系: channel.queueBind(queueName, exchangeName, routingKey); //durable 是否持久化消息 QueueingConsumer consumer = new QueueingConsumer(channel); //参数:队列名称、是否自动ACK、Consumer channel.basicConsume(queueName, true, consumer); //循环获取消息 while(true){ //获取消息,若是没有消息,这一步将会一直阻塞 Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息:" + msg); } } }
测试结果:app
注意须要routingKey保持一致。能够本身尝试修改routingkey,是否能收到消息。学习
注意:可使用通配符进行模糊匹配 符号 "#" 匹配一个或多个词 符号 "" 匹配很少很多一个词 例如:"log.#" 可以匹配到 "log.info.oa" "log." 只会匹配到 "log.error"测试
在一堆消息中,每一个不一样的队列只关心本身须要的消息。
生产端:
/** * * @ClassName: Producer4TopicExchange * @Description: 生产者 * @author Coder编程 * @date2019年7月19日 下午22:32:41 * */ public class Producer4TopicExchange { public static void main(String[] args) throws Exception { //1建立ConnectionFactory Connection connection = ConnectionUtils.getConnection(); //2建立Channel Channel channel = connection.createChannel(); //3声明 String exchangeName = "test_topic_exchange"; String routingKey1 = "user.save"; String routingKey2 = "user.update"; String routingKey3 = "user.delete.abc"; //4发送 String msg = "Coder编程 Hello World RabbitMQ 4 Topic Exchange Message ..."; channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes()); channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes()); channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes()); channel.close(); connection.close(); } }
消费端:
/** * * @ClassName: Consumer4TopicExchange * @Description: 消费者 * @author Coder编程 * @date2019年7月19日 下午22:37:12 * */ public class Consumer4TopicExchange { public static void main(String[] args) throws Exception { //建立ConnectionFactory Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); // 声明 String exchangeName = "test_topic_exchange"; String exchangeType = "topic"; String queueName = "test_topic_queue"; //String routingKey = "user.*"; String routingKey = "user.*"; // 1 声明交换机 channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); // 2 声明队列 channel.queueDeclare(queueName, false, false, false, null); // 3 创建交换机和队列的绑定关系: channel.queueBind(queueName, exchangeName, routingKey); //durable 是否持久化消息 QueueingConsumer consumer = new QueueingConsumer(channel); //参数:队列名称、是否自动ACK、Consumer channel.basicConsume(queueName, true, consumer); //循环获取消息 while(true){ //获取消息,若是没有消息,这一步将会一直阻塞 Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息:" + msg); } } }
测试结果:
注意一个问题:须要进行解绑
生产端:
/** * * @ClassName: Producer4FanoutExchange * @Description: 生产者 * @author Coder编程 * @date2019年7月19日 下午23:01:16 * */ public class Producer4FanoutExchange { public static void main(String[] args) throws Exception { //1建立ConnectionFactory Connection connection = ConnectionUtils.getConnection(); //2 建立Channel Channel channel = connection.createChannel(); //3 声明 String exchangeName = "test_fanout_exchange"; //4 发送 for(int i = 0; i < 10; i ++) { String msg = "Coder 编程 Hello World RabbitMQ 4 FANOUT Exchange Message ..."; channel.basicPublish(exchangeName, "", null , msg.getBytes()); } channel.close(); connection.close(); } }
消费端:
/** * * @ClassName: Consumer4FanoutExchange * @Description: 消费者 * @author Coder编程 * @date2019年7月19日 下午23:21:18 * */ public class Consumer4FanoutExchange { public static void main(String[] args) throws Exception { //建立ConnectionFactory Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); // 声明 String exchangeName = "test_fanout_exchange"; String exchangeType = "fanout"; String queueName = "test_fanout_queue"; String routingKey = ""; //不设置路由键 channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); //durable 是否持久化消息 QueueingConsumer consumer = new QueueingConsumer(channel); //参数:队列名称、是否自动ACK、Consumer channel.basicConsume(queueName, true, consumer); //循环获取消息 while(true){ //获取消息,若是没有消息,这一步将会一直阻塞 Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息:" + msg); } } }
测试结果:
content_type、content_encoding、priority
correlation_id、reply_to、expiration、message_id
timestamp、type、user_id、app_id、cluster_id
RabbitMQ的概念、安装与使用、管控台操做、结合RabbitMQ的特性、Exchange、Queue、Binding 、RoutingKey、Message进行核销API的讲解,经过本章的学习,但愿你们对RabbitMQ有一个初步的认识。
欢迎关注我的微信公众号:Coder编程 获取最新原创技术文章和免费学习资料,更有大量精品思惟导图、面试资料、PMP备考资料等你来领,方便你随时随地学习技术知识! 新建了一个qq群:315211365,欢迎你们进群交流一块儿学习。谢谢了!也能够介绍给身边有须要的朋友。
文章收录至 Github: https://github.com/CoderMerlin/coder-programming Gitee: https://gitee.com/573059382/coder-programming 欢迎关注并star~
参考文章:
《RabbitMQ消息中间件精讲》
推荐文章:
消息中间件——RabbitMQ(三)理解RabbitMQ核心概念和AMQP协议!