背景知识spring
RabbitMQ性能优化
RabbitMQ 是基于 AMQP 协议实现的一个消息队列(Message Queue),Message Queue 是一个典型的生产者/消费者模式。生产者发布消息,消费者消费消息,生产者和消费者之间是解耦的,互相不知道对方的存在。服务器
RPC架构
Remote Procedure Call:远程过程调用,一次远程过程调用的流程即客户端发送一个请求到服务端,服务端根据请求信息进行处理后返回响应信息,客户端收到响应信息后结束。并发
如何使用 RabbitMQ 实现 RPC?负载均衡
使用 RabbitMQ 实现 RPC,相应的角色是由生产者来做为客户端,消费者做为服务端。异步
但 RPC 调用通常是同步的,客户端和服务器也是紧密耦合的。即客户端经过 IP/域名和端口连接到服务器,向服务器发送请求后等待服务器返回响应信息。分布式
但 MQ 的生产者和消费者是彻底解耦的,那么如何用 MQ 实现 RPC 呢?很明显就是把 MQ 看成中间件实现一次双向的消息传递:ide
客户端和服务端便是生产者也是消费者。客户端发布请求,消费响应;服务端消费请求,发布响应。微服务
具体实现
MQ部分的定义
请求信息的队列
咱们须要一个队列来存放请求信息,客户端向这个队列发布请求信息,服务端消费该队列处理请求。该队列不须要复杂的路由规则,直接使用 RabbitMQ 默认的 direct exchange 来路由消息便可。
响应信息的队列
存放响应信息的队列不该只有一个。若是存在多个客户端,不能保证响应信息被发布请求的那个客户端消费到。因此应为每个客户端建立一个响应队列,这个队列应该由客户端来建立且只能由这个客户端使用并在使用完毕后删除,这里可使用 RabbitMQ 提供的排他队列(Exclusive Queue):
channel.queueDeclare(queue:"", durable:false, exclusive:true, autoDelete:false, new HashMap<>())
而且要保证队列名惟一,声明队列时名称设为空 RabbitMQ 会生成一个惟一的队列名。
exclusive 设为 true 表示声明一个排他队列,排他队列的特色是只能被当前的链接使用,而且在链接关闭后被删除。
一个简单的 demo(使用 pull 机制)
咱们使用一个简单的 demo 来了解客户端和服务端的处理流程。
发布请求
咱们在声明队列时为每个客户端声明了独有的响应队列,那服务器在发布响应时如何知道发布到哪一个队列呢?其实就是客户端须要告诉服务端将响应发布到哪一个队列,RabbitMQ 提供了这个支持,消息体的 Properties 中有一个属性 reply_to 就是用来标记回调队列的名称,服务器须要将响应发布到 reply_to 指定的回调队列中。
解决了这个问题以后咱们就能够编写客户端发布请求的代码了:
// 定义响应回调队列 String replyQueueName = channel.queueDeclare("", false, true, false, new HashMap<>()).getQueue(); // 设置回调队列到 Properties AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .replyTo(replyQueueName) .build(); String request = "request"; // 发布请求 channel.basicPublish("", "rpc_queue", properties, request.getBytes());
Direct reply-to:
RabbitMQ 提供了一种更便捷的机制来实现 RPC,不须要客户端每次都定义回调队列,客户端发布请求时将 replyTo 设为 amq.rabbitmq.reply-to ,消费响应时也指定消费 amq.rabbitmq.reply-to ,RabbitMQ 会为客户端建立一个内部队列
消费请求
接下来是服务端处理请求的部分,接收到请求后通过处理将响应信息发布到 reply_to 指定的回调队列:
// 服务端 Consumer 的定义 public class RpcServer extends DefaultConsumer { public RpcServer(Channel channel) { super(channel); } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); String response = (msg + " Received"); // 获取回调队列名 String replyTo = properties.getReplyTo(); // 发布响应消息到回调队列 this.getChannel().basicPublish("", replyTo, new AMQP.BasicProperties(), response.getBytes()); } } ... // 启动服务端 Consumer channel.basicConsume("rpc_queue", true, new RpcServer(channel));
接收响应
客户端如何接收服务器的响应呢?有两种方式:1.轮询的去 pull 回调队列中的消息,2.异步的消费回调队列中的消息。咱们在这里简单实现第一种方案。
GetResponse getResponse = null; while (getResponse == null) { getResponse = channel.basicGet(replyQueueName, true); } String response = new String(getResponse.getBody());
一个简单的基于 RabbitMQ 的 RPC 模型已经实现了,但这个 demo 并不实用,由于客户端每次发送完请求都要同步的轮询等待响应消息,只能每次处理一个请求。RabbitMQ 的 pull 模式效率也比较低。
实现一个完备可用的 RPC 模式须要作的工做还有不少,要处理的关键点也比较复杂,有句话叫不要重复造轮子,spring 已经实现了一个完备可用的 RPC 模式的库,接下来咱们来了解一下。顺便在此给你们推荐一个Java架构方面的交流学习群:698581634,进群便可获取Java架构师资料:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系,群里必定有你须要的资料,你们赶忙加群吧。
Spring Rabbit 中的实现
和上面 demo 的 pull 模式一次只能处理一个请求相对应的:如何异步的接收响应并处理多个请求呢?关键点就在于咱们须要记录请求和响应并将它们关联起来,RabbitMQ 也提供了支持,Properties 中的另外一个属性 correlation_id 用来标识一个消息的惟一 id。
参考 spring-rabbit 中的 convertSendAndReceive 方法的实现,为每一次请求生成一个惟一的 correlation_id :
private final AtomicInteger messageTagProvider = new AtomicInteger(); ... String messageTag = String.valueOf(this.messageTagProvider.incrementAndGet()); ... message.getMessageProperties().setCorrelationId(messageTag);
并使用一个 ConcurrentHashMap 来维护 correlation_id 和响应信息的映射:
private final Map<String, PendingReply> replyHolder = new ConcurrentHashMap<String, PendingReply>(); ... final PendingReply pendingReply = new PendingReply(); this.replyHolder.put(correlationId, pendingReply);
PendingReply 中有一个 BlockingQueue 存放响应信息,在发送完请求信息后调用 BlockingQueue 的 pull 方法并设置超时时间来获取响应:
private final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(1);
public Message get ( long timeout , TimeUnit unit ) throws InterruptedException { Object reply = this . queue . poll ( timeout , unit ); return reply == null ? null : processReply ( reply
);
}
在获取响应后不论结果如何,都会将 PendingReply 从 replyHolder 中移除,防止 replyHolder 中积压超时的响应消息:
try { reply = exchangeMessages(exchange, routingKey, message, correlationData, channel, pendingReply,messageTag); } finally { this.replyHolder.remove(messageTag); ... }
响应信息是什么时候如何被放到这个 BlockingQueue 中的呢?看一下 RabbitTemplate 接收消息的地方:
public void onMessage(Message message) { String messageTag; if (this.correlationKey == null) { // using standard correlationId property messageTag = message.getMessageProperties().getCorrelationId(); } else { messageTag = (String) message.getMessageProperties() .getHeaders().get(this.correlationKey); } // 存在 correlation_id 才认为是RPC的响应信息,不存在时不处理 if (messageTag == null) { logger.error("No correlation header in reply"); return; } // 从 replyHolder 中取出 correlation_id 对应的 PendingReply PendingReply pendingReply = this.replyHolder.get(messageTag); if (pendingReply == null) { if (logger.isWarnEnabled()) { logger.warn("Reply received after timeout for " + messageTag); } throw new AmqpRejectAndDontRequeueException("Reply received after timeout"); } else { restoreProperties(message, pendingReply); // 将响应信息 add 到 BlockingQueue 中 pendingReply.reply(message); } }
以上的 spring 代码隐去了不少额外部分的处理和细节,只关注关键的部分。
至此一个完整可用的由 RabbitMQ 做为中间件实现的 RPC 模式就完成了。
总结
服务端
服务端的实现比较简单,和通常的 Consumer 的区别只在于须要将请求回复到 replyTo 指定的 queue 中并带上消息标识 correlation_id 便可
服务端的一点小优化:
超时的处理是由客户端来实现的,那服务端有没有能够优化的地方呢?
答案是有的:若是咱们的服务端处理比较耗时,如何判断客户端是否还在等待响应呢?
咱们可使用 passive 参数去检查 replyTo 的 queue 是否存在,由于客户端声明的是内部队列,客户端若是断掉连接了这个 queue 就不存在了,这时服务端就无需处理这个消息了。
客户端
客户端承担了更多的工做量,包括:
好在 spring 已经实现了一套完备可靠的代码,咱们在清楚了流程和关键点以后,能够直接使用 spring 提供的 RabbitTemplate ,无需本身实现。
使用 MQ 实现 RPC 的意义
经过 MQ 实现 RPC 看起来比客户端和服务器直接通信要复杂一些,那咱们为何要这样作呢?或者说这样作有什么好处:
原文连接:https://mp.weixin.qq.com/s/40SIlST9JNgBvq276ERWTA?utm_source=tuicool&utm_medium=referral
全文完