RabbitMQ-Exchange交换器

交换器分类

RabbitMQ的Exchange(交换器)分为四类:java

  • direct(默认)
  • headers
  • fanout
  • topic

其中headers交换器容许你匹配AMQP消息的header而非路由键,除此以外headers交换器和direct交换器彻底一致,但性能却不好,几乎用不到,因此咱们本文也不作讲解。缓存

注意:fanout、topic交换器是没有历史数据的,也就是说对于中途建立的队列,获取不到以前的消息。ide

一、direct交换器

direct为默认的交换器类型,也很是的简单,若是路由键匹配的话,消息就投递到相应的队列,如图:性能

使用代码:channel.basicPublish("", QueueName, null, message)推送direct交换器消息到对于的队列,空字符为默认的direct交换器,用队列名称当作路由键。this

direct交换器代码示例spa

发送端:线程

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
// 声明队列【参数说明:参数一:队列名称,参数二:是否持久化;参数三:是否独占模式;参数四:消费者断开链接时是否删除队列;参数五:消息其余参数】 channel.queueDeclare(config.QueueName, false, false, false, null); String message = String.format("当前时间:%s", new Date().getTime()); // 推送内容【参数说明:参数一:交换机名称;参数二:队列名称,参数三:消息的其余属性-路由的headers信息;参数四:消息主体】 channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));

接收端,持续接收消息:日志

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
// 声明队列【参数说明:参数一:队列名称,参数二:是否持久化;参数三:是否独占模式;参数四:消费者断开链接时是否删除队列;参数五:消息其余参数】 channel.queueDeclare(config.QueueName, false, false, false, null); Consumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "utf-8"); // 消息正文 System.out.println("收到消息 => " + message); channel.basicAck(envelope.getDeliveryTag(), false); // 手动确认消息【参数说明:参数一:该消息的index;参数二:是否批量应答,true批量确认小于当前id的消息】 } }; channel.basicConsume(config.QueueName, false, "", defaultConsumer);

接收端,获取单条消息code

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(config.QueueName, false, false, false, null); GetResponse resp = channel.basicGet(config.QueueName, false); String message = new String(resp.getBody(), "UTF-8"); channel.basicAck(resp.getEnvelope().getDeliveryTag(), false); // 消息确认

持续消息获取使用:basic.consume;单个消息获取使用:basic.get。orm

注意:不能使用for循环单个消息消费来替代持续消息消费,由于这样性能很低;

公平调度

当接收端订阅者有多个的时候,direct会轮询公平的分发给每一个订阅者(订阅者消息确认正常),如图:

消息的发后既忘特性

发后既忘模式是指接受者不知道消息的来源,若是想要指定消息的发送者,须要包含在发送内容里面,这点就像咱们在信件里面注明本身的姓名同样,只有这样才能知道发送者是谁。

消息确认

看了上面的代码咱们能够知道,消息接收到以后必须使用channel.basicAck()方法手动确认(非自动确认删除模式下),那么问题来了。

消息收到未确认会怎么样?

若是应用程序接收了消息,由于bug忘记确认接收的话,消息在队列的状态会从“Ready”变为“Unacked”,如图:

若是消息收到却未确认,Rabbit将不会再给这个应用程序发送更多的消息了,这是由于Rabbit认为你没有准备好接收下一条消息。

此条消息会一直保持Unacked的状态,直到你确认了消息,或者断开与Rabbit的链接,Rabbit会自动把消息改完Ready状态,分发给其余订阅者。

固然你能够利用这一点,让你的程序延迟确认该消息,直到你的程序处理完相应的业务逻辑,这样能够有效的防治Rabbit给你过多的消息,致使程序崩溃。

消息确认Demo:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(config.QueueName, false, false, false, null); GetResponse resp = channel.basicGet(config.QueueName, false); String message = new String(resp.getBody(), "UTF-8"); channel.basicAck(resp.getEnvelope().getDeliveryTag(), false);

channel.basicAck(long deliveryTag, boolean multiple)为消息确认,参数1:消息的id;参数2:是否批量应答,true批量确认小于次id的消息。

总结:消费者消费的每条消息都必须确认。

消息拒绝

消息在确认以前,能够有两个选择:

选择1:断开与Rabbit的链接,这样Rabbit会从新把消息分派给另外一个消费者;

选择2:拒绝Rabbit发送的消息使用channel.basicReject(long deliveryTag, boolean requeue),参数1:消息的id;参数2:处理消息的方式,若是是true,Rabbib会从新分配这个消息给其余订阅者,若是设置成false的话,Rabbit会把消息发送到一个特殊的“死信”队列,用来存放被拒绝而不从新放入队列的消息。

消息拒绝Demo:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(config.QueueName, false, false, false, null); GetResponse resp = channel.basicGet(config.QueueName, false); String message = new String(resp.getBody(), "UTF-8"); channel.basicReject(resp.getEnvelope().getDeliveryTag(), true); //消息拒绝

二、fanout交换器——发布/订阅模式

fanout有别于direct交换器,fanout是一种发布/订阅模式的交换器,当你发送一条消息的时候,交换器会把消息广播到全部附加到这个交换器的队列上。

好比用户上传了本身的头像,这个时候图片须要清除缓存,同时用户应该获得积分奖励,你能够把这两个队列绑定到图片上传的交换器上,这样当有第三个、第四个上传完图片须要处理的需求的时候,原来的代码能够不变,只须要添加一个订阅消息便可,这样发送方和消费者的代码彻底解耦,并能够垂手可得的添加新功能了。

和direct交换器不一样,咱们在发送消息的时候新增channel.exchangeDeclare(ExchangeName, "fanout"),这行代码声明fanout交换器。

发送端:

final String ExchangeName = "fanoutec"; // 交换器名称 Connection conn = connectionFactoryUtil.GetRabbitConnection(); Channel channel = conn.createChannel(); channel.exchangeDeclare(ExchangeName, "fanout"); // 声明fanout交换器 String message = "时间:" + new Date().getTime(); channel.basicPublish(ExchangeName, "", null, message.getBytes("UTF-8"));

接受消息不一样于direct,咱们须要声明fanout路由器,并使用默认的队列绑定到fanout交换器上。

接收端:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ExchangeName, "fanout"); // 声明fanout交换器 String queueName = channel.queueDeclare().getQueue(); // 声明队列 channel.queueBind(queueName, ExchangeName, ""); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); } }; channel.basicConsume(queueName, true, consumer);

fanout和direct的区别最多的在接收端,fanout须要绑定队列到对应的交换器用于订阅消息。

其中channel.queueDeclare().getQueue()为随机队列,Rabbit会随机生成队列名称,一旦消费者断开链接,该队列会自动删除。

注意:对于fanout交换器来讲routingKey(路由键)是无效的,这个参数是被忽略的。

三、topic交换器——匹配订阅模式

最后介绍的是topic交换器,topic交换器运行和fanout相似,可是能够更灵活的匹配本身想要订阅的信息,这个时候routingKey路由键就排上用场了,使用路由键进行消息(规则)匹配。

假设咱们如今有一个日志系统,会把全部日志级别的日志发送到交换器,warning、log、error、fatal,但咱们只想处理error以上的日志,要怎么处理?这就须要使用topic路由器了。

topic路由器的关键在于定义路由键,定义routingKey名称不能超过255字节,使用“.”做为分隔符,例如:com.mq.rabbit.error。

消费消息的时候routingKey可使用下面字符匹配消息:

  • "*"匹配一个分段(用“.”分割)的内容;
  • "#"匹配0和多个字符;

例如发布了一个“com.mq.rabbit.error”的消息:

能匹配上的路由键:

  • cn.mq.rabbit.*
  • cn.mq.rabbit.#
  • #.error
  • cn.mq.#
  • #

不能匹配上的路由键:

  • cn.mq.*
  • *.error
  • *

因此若是想要订阅全部消息,可使用“#”匹配。

注意:fanout、topic交换器是没有历史数据的,也就是说对于中途建立的队列,获取不到以前的消息。

发布端:

String routingKey = "com.mq.rabbit.error"; Connection conn = connectionFactoryUtil.GetRabbitConnection(); Channel channel = conn.createChannel(); channel.exchangeDeclare(ExchangeName, "topic"); // 声明topic交换器 String message = "时间:" + new Date().getTime(); channel.basicPublish(ExchangeName, routingKey, null, message.getBytes("UTF-8"));

接收端:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ExchangeName, "topic"); // 声明topic交换器 String queueName = channel.queueDeclare().getQueue(); // 声明队列 String routingKey = "#.error"; channel.queueBind(queueName, ExchangeName, routingKey); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(routingKey + "|接收消息 => " + message); } }; channel.basicConsume(queueName, true, consumer);

扩展部分—自定义线程池

若是须要更大的控制链接,用户可本身设置线程池,代码以下:

import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; ExecutorService es = Executors.newFixedThreadPool(20); Connection conn = factory.newConnection(es);

其实看过源码的同窗可能知道,factory.newConnection自己默认也有线程池的机制,ConnectionFactory.class部分源码以下:

private ExecutorService sharedExecutor; public Connection newConnection() throws IOException, TimeoutException { return newConnection(this.sharedExecutor, Collections.singletonList(new Address(getHost(), getPort()))); } public void setSharedExecutor(ExecutorService executor) { this.sharedExecutor = executor; }

其中this.sharedExecutor就是默认的线程池,能够经过setSharedExecutor()方法设置ConnectionFactory的线程池,若是不设置则为null。

用户若是本身设置了线程池,像本小节第一段代码写的那样,那么当链接关闭的时候,不会自动关闭用户自定义的线程池,因此用户必须本身手动关闭,经过调用shutdown()方法,不然可能会阻止JVM的终止。

官方的建议是只有在程序出现严重性能瓶颈的时候,才应该考虑使用此功能。