不少开发人员说,将应用程序切换到异步处理很复杂。由于他们有一个自然须要同步通讯的Web应用程序。在这篇文章中,我想介绍一种方法来达到异步通讯的目的:使用一些众所周知的库和工具来设计他们的系统。 下面的例子是用Java编写的,但我相信它更多的是基本原理,同一个应用程序能够用任何语言来从新写。java
所需的工具和库:git
一个用Spring MVC编写的Web应用程序并运行在Tomcat上。 它所作的只是将一个字符串发送到一个队列中 (异步通讯的开始) 并等待另外一个队列中的消息做为HTTP响应发送回来。github
首先,咱们须要定义几个依赖项,而后等待Spring Boot执行全部必要的自动配置。web
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.thedeanda</groupId>
<artifactId>lorem</artifactId>
</dependency>
</dependencies>
复制代码
@SpringBootApplication
public class BlockingApplication {
public static void main(String[] args) {
SpringApplication.run(BlockingApplication.class, args);
}
@RestController
public static class MessageController {
private final RabbitTemplate rabbitTemplate;
public MessageController(CachingConnectionFactory connectionFactory) {
this.rabbitTemplate = new RabbitTemplate(connectionFactory);
}
@GetMapping("invoke")
public String sendMessage() {
Message response = rabbitTemplate.sendAndReceive("uppercase", null, request());
return new String(response.getBody());
}
private static Message request() {
Lorem LOREM = LoremIpsum.getInstance();
String name = LOREM.getFirstName() + " " + LOREM.getLastName();
return new Message(name.getBytes(), new MessageProperties());
}
}
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setAddresses("localhost:5672");
factory.setUsername("admin");
factory.setPassword("admin");
return factory;
}
}
复制代码
第二个应用程序仅仅是一个等待消息的RabbitMQ的消费端,将拿到的字符串转换为大写,而后将此结果发送到输出队列中。spring
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
复制代码
@SpringBootApplication
public class ServiceApplication {
public static void main(String[] args) {
SpringApplication.run(ServiceApplication.class, args);
}
public static class MessageListener {
public String handleMessage(byte[] message) {
Random rand = new Random();
// Obtain a number between [0 - 49] + 50 = [50 - 99]
int n = rand.nextInt(50) + 50;
String content = new String(message);
try {
Thread.sleep(n);
} catch (InterruptedException e) {
e.printStackTrace();
}
return content.toUpperCase();
}
}
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setAddresses("localhost:5672");
factory.setUsername("admin");
factory.setPassword("admin");
return factory;
}
@Bean
public SimpleMessageListenerContainer serviceListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setConcurrentConsumers(20);
container.setMaxConcurrentConsumers(40);
container.setQueueNames("uppercase_messages");
container.setMessageListener(new MessageListenerAdapter(new MessageListener()));
return container;
}
}
复制代码
程序启动并首次调用sendMessage()方法后,咱们能够看到Spring AMQP支持自动建立了一个新的回复队列并等待来自咱们的服务应用程序的响应。bash
2019-05-12 17:23:21.451 INFO 4574 --- [nio-8080-exec-1] .l.DirectReplyToMessageListenerContainer : Container initialized for queues: [amq.rabbitmq.reply-to]
2019-05-12 17:23:21.457 INFO 4574 --- [nio-8080-exec-1] .l.DirectReplyToMessageListenerContainer : SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-VF-iqD9rLEuljIBstbCI1A identity=10e58093] started
复制代码
若是咱们在消费端应用程序中查看消息,咱们能够看到Spring自动传播有关回复队列的信息以及**相关ID,**用于将其传递回Web应用程序以便可以将请求和响应配对在一块儿。服务器
这就是发生魔术的地方。 固然,若是您想使其更复杂,您能够在协做中包含更多服务,而后将Web应用程序的最终响应放入与自动生成的队列不一样的队列中, 该队列只具备正确的关联ID。 另外,不要忘记设置合理的超时。app
这个解决方案还有一个很大的缺点 - 应用程序吞吐量。 我故意这样作,以便我能够跟进这篇文章,进一步深刻调查AsyncProfiler
! 可是目前,咱们使用Tomcat做为主HTTP服务器,默认为200个线程,这意味着咱们的应用程序没法同时处理200多条消息,由于咱们的服务器线程正在等待RabbitMQ 回复队列的响应,直到有消息进入或发生超时。dom
感谢您阅读本文,敬请关注后续内容! 若是您想本身尝试一下,请查看个人GitHub存储库。异步