rabbitMQ是由erlang语言开发的,基于AMQP协议实现的消息队列。他是一种应用程序之间的通讯方法,在分布式系统开发中应用很是普遍。html
rabbitMq的有点:java
AMQP(advanced Message Queuing Protocol),是一个提供统一消息服务的应用标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受客户端中间件的产品不一样和开发语言不一样的限制。JMS和AMQP的区别在于:JMS是java语言专属的消息服务标准,他是在api层定义标准,而且只能用于java应用,而AMQP是在协议层定义的标准,是能够跨语言的。web
发送消息:spring
接受消息:api
若是不想本身下载,须要我这里的软件的,能够在下面评论邮箱,我私发给你。数组
1.安装erlang的环境,双击otp的运行程序,而后一路点击下一步(next)。浏览器
配置环境变量并发
在path中添加erlang的路径分布式
2.安装rabbitMq,双击rabbitmq的运行程序ide
安装完成以后在菜单页面能够看到
安装完RabbitMQ若是想要访问管理页面须要在rabbitmq的sbin目录中使用cmd执行:rabbitmq-plugins.bat enable rabbitmq_management(管理员身份运行此命令)添加可视化插件。
点击上图中的start/stop来开启/中止服务。而后在浏览器上输入地址查看,rabbitMq的默认端口是15672。默认的用户名和密码都是guest
若是安装失败,须要卸载重装的时候或者出现rabbitMq服务注册失败时,此时须要进入注册表清理erlang(搜索rabbitMQ,erlsrv将对应的项删除)
1.添加依赖
<!--添加rabbitMq的依赖--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.7.3</version> </dependency>
2.生产者代码实现
package rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.net.ConnectException; import java.util.concurrent.TimeoutException; /** * @className: producer * @description: rabbitmq的生产者代码实现 * @author: charon * @create: 2021-01-03 23:10 */ public class Producer { /** * 声明队列名 */ private static final String QUEUE = "hello charon"; public static void main(String[] args) { // 建立链接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 设置ip,端口,由于是本机,因此直接设置为127.0.0.1 connectionFactory.setHost("127.0.0.1"); // web端口默认为15672,通讯端口为5672 connectionFactory.setPort(5672); // 设置用户名和密码 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); // 设置虚拟ip,默认为/,一个rabbitmq的服务能够设置多个虚拟机,每一个虚拟机就至关于一个独立的mq connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { connection = connectionFactory.newConnection(); // 建立通道 channel = connection.createChannel(); // 声明队列(队列名称,是否持久化,是否排它,是否自动删除,队列的扩展参数好比设置存活时间等) channel.queueDeclare(QUEUE, true, false, false, null); String message = "hello charon good evening"; // 发布消息(交换机,RoutingKey即队列名,额外的消息属性,消息内容) channel.basicPublish("", QUEUE, null, message.getBytes()); System.out.println("发送消息给mq:" + message); } catch (Exception e) { e.printStackTrace(); }finally { // 关闭资源 try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }
3.消费者代码实现
package rabbitmq; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @className: Consumer * @description: 消费者的代码实现 * @author: charon * @create: 2021-01-05 08:28 */ public class Consumer { /** * 声明队列名 */ private static final String QUEUE = "hello charon"; public static void main(String[] args) throws IOException, TimeoutException { // 建立链接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 设置ip,端口,由于是本机,因此直接设置为127.0.0.1 connectionFactory.setHost("127.0.0.1"); // web端口默认为15672,通讯端口为5672 connectionFactory.setPort(5672); // 设置用户名和密码 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); // 设置虚拟ip,默认为/,一个rabbitmq的服务能够设置多个虚拟机,每一个虚拟机就至关于一个独立的mq connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); // 建立通道 Channel channel = connection.createChannel(); // 声明队列(队列名称,是否持久化,是否排它,是否自动删除,队列的扩展参数好比设置存活时间等) channel.queueDeclare(QUEUE, true, false, false, null); // 实现消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * * @param consumerTag 消费者标签 * @param envelope 信封,能够获取交换机等信息 * @param properties 消息属性 * @param body 消费内容,字节数组,能够转成字符串 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // String exchange = envelope.getExchange(); // long deliveryTag = envelope.getDeliveryTag(); String message = new String(body,"utf-8"); System.out.println("收到的消息是:"+message); } }; // 消费消息(队列名,是否自动确认,消费方法) channel.basicConsume(QUEUE,true,defaultConsumer); } }
生产者将消息放入到队列中,消费者能够有多个,同时监听同一个队列。如上图,消费者c1,c2共同争抢当前消息队列的内容,谁先拿到谁负责消费消息,缺点是在高并发的状况下,默认会产品一个消息被多个消费者共同使用,能够设置一个锁开关,保证一条消息只能被一个消费者使用。
上面的代码,能够再添加一个消费者,这样就能够实现工做队列的工做模式。
2.Publish/Subscribe 发布订阅(共享资源)
X表明rabbitMq内部组件交换机,生产者将消息放入交换机,交换机发布订阅把消息发送到全部消息队列中,对应的消费者拿到消息进行消费,对比工做队列而言,发布订阅能够实现工做队列的功能,可是比工做队列更强大。
特色:
1.每一个消费者监听本身的队列
2.生产者将消息发送给Broker,由交换机将消息转发到绑定的此交换机的每一个队列,每一个绑定交换机的队列都将接收到消息;
生产者:
package rabbitmq.publish; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @className: Producer * @description: 发布订阅的生产者 * @author: charon * @create: 2021-01-07 22:02 */ public class Producer { /**邮件的队列*/ public static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; /**短信的队列*/ public static final String QUEUE_INFORM_SMS = "queue_inform_sms"; /**交换机*/ public static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform"; public static void main(String[] args) { // 建立链接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 设置ip,端口,由于是本机,因此直接设置为127.0.0.1 connectionFactory.setHost("127.0.0.1"); // web端口默认为15672,通讯端口为5672 connectionFactory.setPort(5672); // 设置用户名和密码 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); // 设置虚拟ip,默认为/,一个rabbitmq的服务能够设置多个虚拟机,每一个虚拟机就至关于一个独立的mq connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { connection = connectionFactory.newConnection(); // 建立通道 channel = connection.createChannel(); // 声明队列(队列名称,是否持久化,是否排它,是否自动删除,队列的扩展参数好比设置存活时间等) channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); // 交换机(交换机名称,交换机类型(fanout:发布订阅,direct:routing,topic:主题,headers:header模式)) channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT); // 绑定交换机(队列名称,交换机名称,routingKey(发布订阅设置为空)) channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,""); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,""); // 发送多条消息 for (int i = 0; i < 5; i++) { String message = "hello charon good evening by publish"; // 指定交换机(交换机,RoutingKey即队列名,额外的消息属性,消息内容) channel.basicPublish(EXCHANGE_FANOUT_INFORM, "", null, message.getBytes()); System.out.println("发送消息给mq:" + message); } } catch (Exception e) { e.printStackTrace(); }finally { // 关闭资源 try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }
消费email的消费者:
package rabbitmq.publish; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @className: EmailConsumer * @description: 邮件的消息消费者 * @author: charon * @create: 2021-01-07 22:14 */ public class EmailConsumer { /**邮件的队列*/ public static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; /**短信的队列*/ public static final String QUEUE_INFORM_SMS = "queue_inform_sms"; /**交换机*/ public static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform"; public static void main(String[] args) throws IOException, TimeoutException { // 建立链接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 设置ip,端口,由于是本机,因此直接设置为127.0.0.1 connectionFactory.setHost("127.0.0.1"); // web端口默认为15672,通讯端口为5672 connectionFactory.setPort(5672); // 设置用户名和密码 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); // 设置虚拟ip,默认为/,一个rabbitmq的服务能够设置多个虚拟机,每一个虚拟机就至关于一个独立的mq connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); // 建立通道 Channel channel = connection.createChannel(); // 声明队列(队列名称,是否持久化,是否排它,是否自动删除,队列的扩展参数好比设置存活时间等) channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT); // 实现消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * * @param consumerTag 消费者标签 * @param envelope 信封,能够获取交换机等信息 * @param properties 消息属性 * @param body 消费内容,字节数组,能够转成字符串 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // String exchange = envelope.getExchange(); // long deliveryTag = envelope.getDeliveryTag(); String message = new String(body,"utf-8"); System.out.println("收到的email消息是:"+message); } }; // 消费消息(队列名,是否自动确认,消费方法) channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer); } }
消费短信的消费者:
package rabbitmq.publish; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @className: SmsConsumer * @description: * @author: charon * @create: 2021-01-07 22:17 */ public class SmsConsumer { /**邮件的队列*/ public static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; /**短信的队列*/ public static final String QUEUE_INFORM_SMS = "queue_inform_sms"; /**交换机*/ public static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform"; public static void main(String[] args) throws IOException, TimeoutException { // 建立链接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 设置ip,端口,由于是本机,因此直接设置为127.0.0.1 connectionFactory.setHost("127.0.0.1"); // web端口默认为15672,通讯端口为5672 connectionFactory.setPort(5672); // 设置用户名和密码 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); // 设置虚拟ip,默认为/,一个rabbitmq的服务能够设置多个虚拟机,每一个虚拟机就至关于一个独立的mq connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); // 建立通道 Channel channel = connection.createChannel(); // 声明队列(队列名称,是否持久化,是否排它,是否自动删除,队列的扩展参数好比设置存活时间等) channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT); // 实现消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * * @param consumerTag 消费者标签 * @param envelope 信封,能够获取交换机等信息 * @param properties 消息属性 * @param body 消费内容,字节数组,能够转成字符串 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"utf-8"); System.out.println("收到的短信消息是:"+message); } }; // 消费消息(队列名,是否自动确认,消费方法) channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer); } }
3.Routing 路由模式
生产者将消息发送给交换机按照路由判断,交换机根据路由的key,只能匹配上路由key的对应的消息队列,对应的消费者才能消费消息。
如上图,rabbitMq根据对应的key,将消息发送到对应的队列中,error通知将发送到amqp.gen-S9b上,由消费者c1消费。error,info,warning通知将发送到amqp.gen-Ag1上,由消费者c2消费。
特色:
1.每一个消费者监听本身的队列,而且设置路由key
2.生产者将消息发送给交换机,由交换机根据路由key来转发消息到指定的队列
生产者:
package rabbitmq.routing; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @className: Producer * @description: 路由模式下的生成者 * @author: charon * @create: 2021-01-07 22:34 */ public class Producer { /**邮件的队列*/ public static final String QUEUE_ROUTING_EMAIL = "queue_routing_email"; /**短信的队列*/ public static final String QUEUE_ROUTING_SMS = "queue_routing_sms"; /**交换机*/ public static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform"; /** 设置email的路由key */ public static final String ROUTING_EMAIL = "routing_email"; /** 设置sms的路由key */ public static final String ROUTING_SMS = "routing_sms"; public static void main(String[] args) { // 建立链接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 设置ip,端口,由于是本机,因此直接设置为127.0.0.1 connectionFactory.setHost("127.0.0.1"); // web端口默认为15672,通讯端口为5672 connectionFactory.setPort(5672); // 设置用户名和密码 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); // 设置虚拟ip,默认为/,一个rabbitmq的服务能够设置多个虚拟机,每一个虚拟机就至关于一个独立的mq connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { connection = connectionFactory.newConnection(); // 建立通道 channel = connection.createChannel(); // 声明队列(队列名称,是否持久化,是否排它,是否自动删除,队列的扩展参数好比设置存活时间等) channel.queueDeclare(QUEUE_ROUTING_EMAIL, true, false, false, null); channel.queueDeclare(QUEUE_ROUTING_SMS, true, false, false, null); // 交换机(交换机名称,交换机类型(fanout:发布订阅,direct:routing,topic:主题,headers:header模式)) channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); // 绑定交换机(队列名称,交换机名称,routingKey) channel.queueBind(QUEUE_ROUTING_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTING_EMAIL); channel.queueBind(QUEUE_ROUTING_SMS,EXCHANGE_ROUTING_INFORM,ROUTING_SMS); // 发送多条消息 for (int i = 0; i < 5; i++) { String message = "hello charon good evening by routing --email"; // 指定交换机(交换机,RoutingKey,额外的消息属性,消息内容) channel.basicPublish(EXCHANGE_ROUTING_INFORM, ROUTING_EMAIL, null, message.getBytes()); System.out.println("发送消息给mq:" + message); } // 发送多条消息 for (int i = 0; i < 5; i++) { String message = "hello charon good evening by routing --sms"; // 指定交换机(交换机,RoutingKey,额外的消息属性,消息内容) channel.basicPublish(EXCHANGE_ROUTING_INFORM, ROUTING_SMS, null, message.getBytes()); System.out.println("发送消息给mq:" + message); } } catch (Exception e) { e.printStackTrace(); }finally { // 关闭资源 try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }
消费email的消费者:
package rabbitmq.routing; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @className: EmailConsumer * @description: 路由模式下的email消费者 * @author: charon * @create: 2021-01-07 22:40 */ public class EmailConsumer { /**邮件的队列*/ public static final String QUEUE_ROUTING_EMAIL = "queue_routing_email"; /**交换机*/ public static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform"; /** 设置email的路由key */ public static final String ROUTING_EMAIL = "routing_email"; public static void main(String[] args) throws IOException, TimeoutException { // 建立链接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 设置ip,端口,由于是本机,因此直接设置为127.0.0.1 connectionFactory.setHost("127.0.0.1"); // web端口默认为15672,通讯端口为5672 connectionFactory.setPort(5672); // 设置用户名和密码 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); // 设置虚拟ip,默认为/,一个rabbitmq的服务能够设置多个虚拟机,每一个虚拟机就至关于一个独立的mq connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); // 建立通道 Channel channel = connection.createChannel(); // 声明队列(队列名称,是否持久化,是否排它,是否自动删除,队列的扩展参数好比设置存活时间等) channel.queueDeclare(QUEUE_ROUTING_EMAIL, true, false, false, null); channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); // 绑定队列并指明路由key channel.queueBind(QUEUE_ROUTING_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTING_EMAIL); // 实现消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * * @param consumerTag 消费者标签 * @param envelope 信封,能够获取交换机等信息 * @param properties 消息属性 * @param body 消费内容,字节数组,能够转成字符串 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"utf-8"); System.out.println("收到的email消息是:"+message); } }; // 消费消息(队列名,是否自动确认,消费方法) channel.basicConsume(QUEUE_ROUTING_EMAIL,true,defaultConsumer); } }
消费短信的消费者:
package rabbitmq.routing; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @className: EmailConsumer * @description: 路由模式下的email消费者 * @author: charon * @create: 2021-01-07 22:40 */ public class SmsConsumer { /**邮件的队列*/ public static final String QUEUE_ROUTING_SMS = "queue_routing_sms"; /**交换机*/ public static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform"; /** 设置email的路由key */ public static final String ROUTING_SMS = "routing_sms"; public static void main(String[] args) throws IOException, TimeoutException { // 建立链接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 设置ip,端口,由于是本机,因此直接设置为127.0.0.1 connectionFactory.setHost("127.0.0.1"); // web端口默认为15672,通讯端口为5672 connectionFactory.setPort(5672); // 设置用户名和密码 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); // 设置虚拟ip,默认为/,一个rabbitmq的服务能够设置多个虚拟机,每一个虚拟机就至关于一个独立的mq connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); // 建立通道 Channel channel = connection.createChannel(); // 声明队列(队列名称,是否持久化,是否排它,是否自动删除,队列的扩展参数好比设置存活时间等) channel.queueDeclare(QUEUE_ROUTING_SMS, true, false, false, null); channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); // 绑定队列并指明路由key channel.queueBind(QUEUE_ROUTING_SMS,EXCHANGE_ROUTING_INFORM,ROUTING_SMS); // 实现消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * * @param consumerTag 消费者标签 * @param envelope 信封,能够获取交换机等信息 * @param properties 消息属性 * @param body 消费内容,字节数组,能够转成字符串 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"utf-8"); System.out.println("收到的短信消息是:"+message); } }; // 消费消息(队列名,是否自动确认,消费方法) channel.basicConsume(QUEUE_ROUTING_SMS,true,defaultConsumer); } }
4.Topic 主题模式
特色:
1.每一个消费者监听本身的队列,而且设置带通配符的routingkey
2.生产者将消息发送给broker,由交换机及根据路由key来转发消息到指定的队列
5.Header 转发器
取消了路由key,使用header中的key/value(键值对)来匹配队列。
6.RPC 远程调用
基于direct类型交换机实现。生产者将消息远程发送给rpc队列,消费者监听rpc消息队列的消息并消息,而后将返回结果放入到响应队列中,生产者监听响应队列中的消息,拿到消费者的处理结果,实现远程RPC远程调用。
参考文件:
https://www.cnblogs.com/Jeely/p/10784013.html
https://lovnx.blog.csdn.net/article/details/70991021