微信公众号:跟着老万学java
关注可了解更多的编程技巧。问题或建议,请公众号留言;html
今天给你们介绍下rabbitmq中很重要的一个功能,RPC调用。java
RPC,即Remote Procedure Call的简称,也就是远程过程调用,是一种经过网络从远程计算机上请求服务,而不须要了解底层网络的技术。好比两台服务器上的A和B两个应用,须要进行服务接口的相互调用,咱们就可使用RPC实现。好比常见的Java RMI、WebService、Dubbo均可以
实现RPC调用。git
rabbitmq实现的RPC调用主要是简单,不用管各类复杂的网络协议,客户端发送消息,消费者消费消息,反馈消息到回复队列Reply中,而后客户端获取反馈的结果。github
1、原理

流程说明:
一、对于一个RPC请求,客户端发送一条消息,该消息具备两个重要属性:replyTo(设置为仅为该请求建立的匿名互斥队列,答复队列)和correlationId(设置为每一个请求的惟一值)。web
二、该请求被发送到rpc_queue队列。spring
三、RPC工做程序(消息消费者)会监听该队列的消息。监听到有新的消息后,会根据消息执行响应的逻辑,而后将结果返回到消息中携带的replyTo指定的答复队列中。编程
四、客户端(消息生产者)等待答复队列中的数据,出现出现后,它会检查correlationId属性是否一致,若是匹配,则将响应结果返回给应用程序。服务器
2、rpc的三种调用方式
以后官网就针对使用Spring AMQP实现RPC调用给出了一个简单的 Tut6Server.java示例,但真心太简单,只能做为入门的参考demo。
以后分析经过查看rabbitTemplate.sendAndReceive()方法的源码,Spring AMQP支持3中RPC调用实现。
分别是:微信
一、doSendAndReceiveWithDirect 直接反馈
二、doSendAndReceiveWithFixed 使用固定队列答复
三、doSendAndReceiveWithTemporary 使用临时队列答复网络
根据源码,对这三种方式的排序不难看出,对三者的推荐顺序为:
doSendAndReceiveWithDirect 》 doSendAndReceiveWithFixed》doSendAndReceiveWithTemporary
直接反馈无疑是最快最资源消耗最少的,固定队列会声明指定的的队列用来接收答复,
而使用临时队列来接收答复是最消耗资源,性能也是最差的,由于队列的声明,创建,销毁会消耗大。
@Nullable
protected Message doSendAndReceive(String exchange, String routingKey, Message message, @Nullable CorrelationData correlationData) {
if (!this.evaluatedFastReplyTo) {
synchronized(this) {
if (!this.evaluatedFastReplyTo) {
this.evaluateFastReplyTo();
}
}
}
if (this.usingFastReplyTo && this.useDirectReplyToContainer) {
return this.doSendAndReceiveWithDirect(exchange, routingKey, message, correlationData);
} else {
return this.replyAddress != null && !this.usingFastReplyTo ? this.doSendAndReceiveWithFixed(exchange, routingKey, message, correlationData) : this.doSendAndReceiveWithTemporary(exchange, routingKey, message, correlationData);
}
}
3、代码实战
添加依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
生产者代码:
/**
* @program: rabbitmq
* @description: 交换器常量
* @author: laowan
* @create: 2019-06-13 17:36
**/
@Getter
public enum ExchangeEnum {
DIRECT_EXCHANGE("direct"),
FIXED_EXCHANGE("fixed"),
TMP_EXCHANGE("tmp");
private String value;
ExchangeEnum(String value) {
this.value = value;
}
}
/**
* @program: rabbitmq
* @description: 队列枚举
* @author: laowan
* @create: 2019-06-13 17:37
**/
@Getter
public enum QueueEnum {
//direct模式
DIRECT_REQUEST("direct.request", "direct"),
//固定队列应答模式
FIXED_REQUEST("fixed.request", "fixed"),
FIXED_RESPONSE("fixed.response", ""),
//临时模式 消息发送到的队列
TMP_REQUEST("tmp.request", "tmp")
;
/**
* 队列名称
*/
private String name;
/**
* 队列路由键
*/
private String routingKey;
QueueEnum(String name, String routingKey) {
this.name = name;
this.routingKey = routingKey;
}
}
/**
* @program: rpc-parent
* @description: direct rpc请求模式
* @author: laowan
* @create: 2020-04-09 18:05
**/
@Configuration
@Slf4j
public class DirectReplyConfig {
/**
* 注意bean的名称是由方法名决定的,因此不能重复
* @return
*/
@Bean
public Queue directRequest() {
return new Queue(QueueEnum.DIRECT_REQUEST.getName(), true);
}
@Bean
public DirectExchange directExchange() {
return new DirectExchange(ExchangeEnum.DIRECT_EXCHANGE.getValue());
}
@Bean
public Binding directBinding() {
return BindingBuilder.bind(directRequest()).to(directExchange()).with(QueueEnum.DIRECT_REQUEST.getRoutingKey());
}
/**
* 当进行多个主题队列消费时,最好对每一个单独定义RabbitTemplate,以便将各自的参数分别控制
* @param connectionFactory
* @return
*/
@Bean
public RabbitTemplate directRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
//这一步很是关键
template.setUseTemporaryReplyQueues(false);
template.setReplyAddress("amq.rabbitmq.reply-to");
// template.expectedQueueNames();
template.setUserCorrelationId(true);
//设置请求超时时间为10s
template.setReplyTimeout(10000);
return template;
}
}
DirectProducer 生产者代码
@Component
@Slf4j
public class DirectProducer {
@Autowired
private RabbitTemplate directRabbitTemplate;
public String sendAndReceive(String request) throws TimeoutException {
log.info("请求报文:{}" , request);
//请求结果
String result = null;
//设置消息惟一id
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
//直接发送message对象
MessageProperties messageProperties = new MessageProperties();
//过时时间10秒,也是为了减小消息挤压的可能
messageProperties.setExpiration("10000");
messageProperties.setCorrelationId(correlationId.getId());
Message message = new Message(request.getBytes(), messageProperties);
StopWatch stopWatch = new StopWatch();
stopWatch.start("direct模式下rpc请求耗时");
Message response = directRabbitTemplate.sendAndReceive(ExchangeEnum.DIRECT_EXCHANGE.getValue(), QueueEnum.DIRECT_REQUEST.getRoutingKey(), message, correlationId);
stopWatch.stop();
log.info(stopWatch.getLastTaskName()+":" + stopWatch.getTotalTimeMillis());
if (response != null) {
result = new String(response.getBody());
log.info("请求成功,返回的结果为:{}" , result);
}else{
log.error("请求超时");
//为了方便jmeter测试,这里抛出异常
throw new TimeoutException("请求超时");
}
return result;
}
}
4、Fixed reply-to模式
Fixed 配置类
/**
* @program: rpc-parent
* @description: Fixed rpc请求模式
* @author: wanli
* @create: 2020-04-09 18:05
**/
@Configuration
@Slf4j
public class FixedReplyConfig {
@Bean
public Queue fixedRequest() {
return new Queue(QueueEnum.FIXED_REQUEST.getName(), true);
}
@Bean
public DirectExchange fixedExchange() {
return new DirectExchange(ExchangeEnum.FIXED_EXCHANGE.getValue());
}
@Bean
public Binding fixedBinding() {
return BindingBuilder.bind(fixedRequest()).to(fixedExchange()).with(QueueEnum.FIXED_REQUEST.getRoutingKey());
}
/**
* 注意,固定模式指定的应答队列 exclusive排他属性设置为true,且能自动删除
* @return
*/
@Bean
public Queue fixedResponseQueue() {
return new Queue(QueueEnum.FIXED_RESPONSE.getName(),false,true,true,new HashMap<>());
}
@Bean
public RabbitTemplate fixedRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
//设置固定的Reply 地址
template.setUseTemporaryReplyQueues(false);
template.setReplyAddress(QueueEnum.FIXED_RESPONSE.getName());
template.expectedQueueNames();
template.setUserCorrelationId(true);
//设置请求超时时间为10s
template.setReplyTimeout(10000);
return template;
}
@Bean
public SimpleMessageListenerContainer fixedListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//这一步很是重要,固定队列模式要,必定要主动设置 SimpleMessageListenerContainer监听容器,监听应答队列
container.setQueueNames(QueueEnum.FIXED_RESPONSE.getName());
container.setMessageListener(fixedRabbitTemplate(connectionFactory));
container.setConcurrentConsumers(100);
container.setConcurrentConsumers(100);
container.setPrefetchCount(250);
return container;
}
}
FixedProducer生产者
@Component
@Slf4j
public class FixedProducer {
@Autowired
private RabbitTemplate fixedRabbitTemplate;
public String sendAndReceive(String request) throws TimeoutException {
log.info("请求报文:{}" , request);
//请求结果
String result = null;
//设置消息惟一id
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
//直接发送message对象
MessageProperties messageProperties = new MessageProperties();
//过时时间10秒
messageProperties.setExpiration("10000");
messageProperties.setCorrelationId(correlationId.getId());
Message message = new Message(request.getBytes(), messageProperties);
StopWatch stopWatch = new StopWatch();
stopWatch.start("fixed模式下rpc请求耗时");
Message response = fixedRabbitTemplate.sendAndReceive(ExchangeEnum.FIXED_EXCHANGE.getValue(), QueueEnum.FIXED_REQUEST.getRoutingKey(), message, correlationId);
stopWatch.stop();
log.info(stopWatch.getLastTaskName()+":" + stopWatch.getTotalTimeMillis());
if (response != null) {
result = new String(response.getBody());
log.info("请求成功,返回的结果为:{}" , result);
}else{
//为了方便jmeter测试,这里抛出异常
throw new TimeoutException("请求超时");
}
return result;
}
}
5、Temporary reply-to模式
/**
* @program: rpc-parent
* @description: Temporary应答模式
* @author: laowan
* @create: 2020-04-09 18:05
**/
@Configuration
@Slf4j
public class TmpReplyConfig {
@Bean
public Queue tmpRequest() {
return new Queue(QueueEnum.TMP_REQUEST.getName(), true);
}
@Bean
public DirectExchange tmpExchange() {
return new DirectExchange(ExchangeEnum.TMP_EXCHANGE.getValue());
}
@Bean
public Binding tmpBinding() {
return BindingBuilder.bind(tmpRequest()).to(tmpExchange()).with(QueueEnum.TMP_REQUEST.getRoutingKey());
}
@Bean
public RabbitTemplate tmpRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
template.setUseTemporaryReplyQueues(true);
template.setUserCorrelationId(true);
//设置请求超时时间为10s
template.setReplyTimeout(10000);
return template;
}
}
TmpProducer生产者代码
@Component
@Slf4j
public class TmpProducer {
@Autowired
private RabbitTemplate tmpRabbitTemplate;
public String sendAndReceive(String request) throws TimeoutException {
log.info("请求报文:{}" , request);
//请求结果
String result = null;
//设置消息惟一id
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
//直接发送message对象
MessageProperties messageProperties = new MessageProperties();
//过时时间10秒
messageProperties.setExpiration("10000");
messageProperties.setCorrelationId(correlationId.getId());
Message message = new Message(request.getBytes(), messageProperties);
StopWatch stopWatch = new StopWatch();
stopWatch.start("tmp模式下rpc请求耗时");
Message response = tmpRabbitTemplate.sendAndReceive(ExchangeEnum.TMP_EXCHANGE.getValue(), QueueEnum.TMP_REQUEST.getRoutingKey(), message, correlationId);
stopWatch.stop();
log.info(stopWatch.getLastTaskName()+":" + stopWatch.getTotalTimeMillis());
if (response != null) {
result = new String(response.getBody());
log.info("请求成功,返回的结果为:{}" , result);
}else{
log.error("请求超时");
//为了方便jmeter测试,这里抛出异常
throw new TimeoutException("请求超时");
}
return result;
}
}
生产者启动类:
@SpringBootApplication
@RestController
public class ProducerApplication {
@Autowired
DirectProducer directProducer;
@Autowired
FixedProducer fixedProducer;
@Autowired
TmpProducer tmpProducer;
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
@GetMapping("/direct")
public String direct(String message) throws Exception {
return directProducer.sendAndReceive(message);
}
@GetMapping("/fixed")
public String fixed(String message) throws Exception {
return fixedProducer.sendAndReceive(message);
}
@GetMapping("/tmp")
public String tmp(String message) throws Exception {
return tmpProducer.sendAndReceive(message);
}
}
消费者基本相似,就附上DirectConsumer类的代码:
/**
* @program: rabbitmq
* @description: direct消费者
* @author: wanli
* @create: 2019-06-13 18:01
**/
@Component
@RabbitListener(queues = "direct.request")
@Slf4j
public class DirectConsumer {
@RabbitHandler
public String onMessage(byte[] message,
@Headers Map<String, Object> headers,
Channel channel) {
StopWatch stopWatch = new StopWatch("调用计时");
stopWatch.start("rpc调用消费者耗时");
String request = new String(message);
String response = null;
log.info("接收到的消息为:" + request);
//模拟请求耗时3s
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
response= this.sayHello(request);
log.info("返回的消息为:" + response);
stopWatch.stop();
log.info(stopWatch.getLastTaskName()+stopWatch.getTotalTimeMillis()+"ms");
return response;
}
public String sayHello(String name){
return "hello " + name;
}
}
6、压测
经过对/direct,/fixed,/tmp三个接口使用JMeter压测,线程数1000,时间1s,
屡次执行,比较发现:
direct和fixed的rpc方式调用的性能基本一致,差异不大,每分钟3500左右的并发
而tmp方式并发能力会弱会弱不少,大概3000并发左右。
并发请求时能够经过rabbitmq的管理界面明显看到tmp方式高并发时生成了很是多的临时队列。
性能:direct>=fixed>tmp,与以前根据源码和各自执行原理预期的执行性能基本一致
7、参数优化
生产者这边,在fix模式下,须要配置对应的SimpleMessageListenerContainer监听答复队列,能够适当增长消费者的并发数,而且提升每次抓取的消息数。
而且设置acknowledge-mode=auto自动ack。
@Bean
public SimpleMessageListenerContainer fixedListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//这一步很是重要,固定队列模式要,必定要主动设置 SimpleMessageListenerContainer监听容器,监听应答队列
container.setQueueNames(QueueEnum.FIXED_RESPONSE.getName());
container.setMessageListener(fixedRabbitTemplate(connectionFactory));
container.setConcurrentConsumers(100);
container.setConcurrentConsumers(100);
container.setPrefetchCount(250);
return container;
}
消费者这边,必定要注意设置消费者每次抓取的数量,若是每一个消息消费比较耗时,一次抓取太多,就容易致使抓取的这一批消息被这个消费者串行消费的时候出现超时状况。抓取太少,又会致使吞吐量下降。
这里我设置的是10,通过压测发如今高并发下,rpc响应出现延长,说明消费能力基本能知足。
#消费者的并发参数
spring.rabbitmq.listener.type=simple
spring.rabbitmq.listener.simple.concurrency=200
spring.rabbitmq.listener.simple.max-concurrency=500
#抓取参数很是关键,一次抓取的消息多了,消费速度一慢,就会形成响应延迟,抓取少了又会致使并发量低
spring.rabbitmq.listener.simple.prefetch=10
#能够不须要反馈
spring.rabbitmq.listener.simple.acknowledge-mode=none
7、问题
这里要吐槽一下,关于rabbitmq的RPC调用,网上的资料真到太少了,踩了很多坑。
坑一:
CORRECTION: The RabbitTemplate does not currently support Direct reply-to for sendAndReceive() operations; you can, however, specify a fixed reply queue (with a reply-listener). Or you can use rabbitTemplate.execute() with a ChannelCallback to consume the reply from that "queue" (and publish).
I have created a JIRA issue if you wish to track it.
1.4.1 and above now supports direct reply-to.
百度上找的资料太少,以后在google上找到上面的说明,大意是RabbitTemplate在sendAndReceive操做时不支持Direct reply-to调用
解决:
做为老鸟一枚,这里我就和他杠上了,恰恰不信这个邪,RabbitTemplate源码中明明能够搜索到'amq.rabbitmq.reply-to'相关判断以及doSendAndReceiveWithDirect的定义,怎么可能不支持?
坑二:
Broker does not support fast replies via 'amq.rabbitmq.reply-to'
Broker指的是咱们的rabbitmq的服务节点,不支持经过'amq.rabbitmq.reply-to'进行快速返回。
解决:
当前版本rabbitmq的Broker不支持经过'amq.rabbitmq.reply-to'进行快速返回,那么就升级broker的版本。
3.3.5版本不支持建立amq.rabbitmq.reply-to虚拟队列,那就升级到3.7.8版本。
坑三:
Caused by: java.lang.IllegalStateException: A listener container must not be provided when using direct reply-to
解决:
指定名为“amq.rabbitmq.reply-to”的反馈地址后,不能再调用expectedQueueNames方法
template.setUseTemporaryReplyQueues(false);
template.setReplyAddress("amq.rabbitmq.reply-to");
// template.expectedQueueNames();
template.setUserCorrelationId(true);
坑四:
压测过程当中,并发一高,就容易出现rpc调用超时的问题。
解决:
增长消费者的并发数,减少消费者每次抓取的消息数。
总结
有些东西,百度不会告诉你,要看官网;
有些东西,官网不会告诉你,要看源码;
有些东西,源码不会告诉你,只能根据原理实践推敲;
最后,推敲不出来,能够找老万
git源码地址:
https://github.com/StarlightWANLI/rabbitmq-rpc.git
更多精彩,关注我吧。

本文分享自微信公众号 - 跟着老万学java(douzhe_2019)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。