在本文中,咱们将向您介绍Spring Cloud Stream
,这是一个用于构建消息驱动的微服务应用程序的框架,这些应用程序由一个常见的消息传递代理(如RabbitMQ
、Apache Kafka
等)链接。html
Spring Cloud Stream
构建在现有Spring框架(如Spring Messaging
和Spring Integration
)之上。尽管这些框架通过了实战测试,工做得很是好,可是实现与使用的message broker
紧密耦合。此外,有时对某些用例进行扩展是困难的。java
Spring Cloud Stream
背后的想法是一个很是典型的Spring Boot
概念——抽象地讲,让Spring根据配置和依赖关系管理在运行时找出实现自动注入
。这意味着您能够经过更改依赖项和配置文件来更改message broker
。能够在这里找到目前已经支持的各类消息代理。git
本文将使用RabbitMQ
做为message broker
。在此以前,让咱们了解一下broker
(代理)的一些基本概念,以及为何要在面向微服务的体系架构中须要它。github
在微服务体系架构中,咱们有许多相互通讯以完成请求的小型应用程序—它们的主要优势之一是改进了的可伸缩性。一个请求从多个下游微服务传递到完成是很常见的。例如,假设咱们有一个Service-A
内部调用Service-B
和Service-C
来完成一个请求: 算法
是的,还会有其余组件,好比Spring Cloud Eureka
、Spring Cloud Zuul
等等,但咱们仍是专一关心这类架构的特有问题。spring
假设因为某种缘由Service-B
须要更多的时间来响应。也许它正在执行I/O操做
或长时间的DB事务
,或者进一步调用其它致使Service-B
变得更慢的服务,这些都使其没法更具效率。apache
如今,咱们能够启动更多的Service-B
实例来解决这个问题,这样很好,可是Service-A
其实是响应很快的,它须要等待Service-B
的响应来进一步处理。这将致使Service-A
没法接收更多的请求,这意味着咱们还必须启动Service-A
的多个实例。bash
另外一种方法解决相似状况的是使用事件驱动的微服务体系架构。这基本上意味着Service-A
不直接经过HTTP
调用Service-B
或Service-C
,而是将请求或事件发布给message broker
(消息代理)。Service-B
和Service-C
将成为message broker
(消息代理)上此事件的订阅者。 架构
与依赖HTTP调用的传统微服务体系架构相比,这有许多优势:并发
Service-A
不须要了解Service-B
和Service-C
。它只须要链接到message broker
并发布事件。事件如何进一步编排取决于代理设置。经过这种方式,Service-A
能够独立地运行,这是微服务的核心概念之一。高级消息队列协议(AMQP)
是RabbitMQ
用于消息传递的协议。虽然RabbitMQ
支持其余一些协议,可是AMQP
因为兼容性和它提供的大量特性而更受欢迎。
所以发布者将消息发布到RabbitMQ
中称为Exchange
(交换器)。Exchange
(交换器)接收消息并将其路由到一个或多个Queues
(队列)。路由算法依赖于Exchange
(交换器)类型和routing
(路由)key/header(与消息一块儿传递)。将Exchange
(交换器)链接到Queues
(队列)的这些规则称为bindings
(绑定)。
绑定能够有4种类型:
routing key
(路由键)将Exchange
(交换器)类型直接路由到特定的Queues
(队列)。Exchange
(交换器)中的全部Queues
(队列)。routing key
(路由键)匹配将消息路由到(0、1或更多)的Queues
(队列)。Topic
(主题)交换类型,可是它是基routing header
(路由头)而不是routing key
(路由键)来路由的。经过Exchange
(交换器)和Queues
(队列)发布和消费消息的整个过程是经过一个Channel
(通道)完成的。
有关路由的详细信息,请访问此连接。
咱们能够从这里下载并安装基于咱们的操做系统的二进制文件。
然而,在本文中,咱们将使用cloudamqp.com
提供的免费云安装。只需注册服务并登陆便可。
在主仪表板上单击建立新实例
:
而后给你的实例起个名字,而后进入下一步:
而后选择一个可用区:
最后,查看实例信息,点击右下角的建立实例
:
就是这样。如今在云中运行了一个RabbitMQ
实例。有关实例的更多信息,请转到您的仪表板并单击新建立的实例
:
咱们能够看到咱们能够访问RabbitMQ实例的主机,好比从咱们的项目链接所需的用户名和密码:
咱们将在Spring应用程序中使用AMQP URL
链接到这个实例,因此请在某个地方记下它。
您还能够经过单击左上角的RabbitMQ manager
来查看管理器控制台。这将采用它来管理的您的RabbitMQ
实例。
如今咱们的设置已经准备好了,让咱们建立咱们的服务:
RabbitMQ
使用Spring Initializr
建立一个脚手架项目。这将是咱们的producer
项目,咱们将使用REST
端点发布消息。
选择您喜欢的Spring Boot
版本,添加Web
和Cloud Stream
依赖项,生成Maven
项目:
注意:
请注意cloud-stream
依赖项。这也须要像RabbitMQ
、Kafka
等绑定器依赖项才能工做。
因为咱们将使用RabbitMQ
,添加如下Maven
依赖项:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
复制代码
或者,咱们也能够将二者结合起来使用spring-cloud-starter-stream-rabbit
:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
复制代码
使用一样的方法,建立消费者项目,但仅使用spring-cloud-starter-stream-rabbit
依赖项。
如前所述,将消息从发布者传递到队列的整个过程是经过通道完成的。所以,让咱们建立一个HelloBinding
接口,其中包含咱们的消息机制greetingChannel
:
interface HelloBinding {
@Output("greetingChannel")
MessageChannel greeting();
}
复制代码
由于这将发布消息,因此咱们使用@Output
注解。方法名能够是咱们想要的任意名称,固然,咱们能够在一个接口中有多个Channel
(通道)。
如今,让咱们建立一个REST
,它将消息推送到这个Channel
(通道)
@RestController
public class ProducerController {
private MessageChannel greet;
public ProducerController(HelloBinding binding) {
greet = binding.greeting();
}
@GetMapping("/greet/{name}")
public void publish(@PathVariable String name) {
String greeting = "Hello, " + name + "!";
Message<String> msg = MessageBuilder.withPayload(greeting)
.build();
this.greet.send(msg);
}
}
复制代码
上面,咱们建立了一个ProducerController
类,它有一个MessageChannel
类型的属性 greet
。这是经过咱们前面声明的方法在构造函数中初始化的。
注意: 咱们能够用简洁的方式作一样的事情,可是咱们使用不一样的名称来让您更清楚地了解事物是如何链接的。
而后,咱们有一个简单的REST
接口,它接收PathVariable
的name
,并使用MessageBuilder
建立一个String
类型的消息。最后,咱们使用MessageChannel
上的.send()
方法来发布消息。
如今,咱们将在的主类中添加@EnableBinding
注解,传入HelloBinding
告诉Spring
加载。
@EnableBinding(HelloBinding.class)
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
复制代码
最后,咱们必须告诉Spring
如何链接到RabbitMQ
(经过前面的AMQP URL
),并将greetingChannel
链接到一可用的消费者。
这两个都是在application.properties
中定义的:
spring.rabbitmq.addresses=<amqp url>
spring.cloud.stream.bindings.greetingChannel.destination = greetings
server.port=8080
复制代码
如今,咱们须要监听以前建立的通道greetingChannel
。让咱们为它建立一个绑定:
public interface HelloBinding {
String GREETING = "greetingChannel";
@Input(GREETING)
SubscribableChannel greeting();
}
复制代码
与生产者绑定的两个很是明显区别。由于咱们正在消费消息,因此咱们使用SubscribableChannel
和@Input
注解链接到greetingChannel
,消息数据将被推送这里。
如今,让咱们建立处理数据的方法:
@EnableBinding(HelloBinding.class)
public class HelloListener {
@StreamListener(target = HelloBinding.GREETING)
public void processHelloChannelGreeting(String msg) {
System.out.println(msg);
}
}
复制代码
在这里,咱们建立了一个HelloListener
类,在processHelloChannelGreeting
方法上添加@StreamListener
注解。这个方法须要一个字符串做为参数,咱们刚刚在控制台打印了这个参数。咱们还在类添加@EnableBinding
启用了HelloBinding
。
一样,咱们在这里使用@EnableBinding
,而不是主类,以便告诉咱们如何使用。
看看咱们的主类,咱们没有任何修改:
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
复制代码
在application.properties
配置文件中,咱们须要定义与生产者同样的属性,除了修改端口以外
spring.rabbitmq.addresses=<amqp url>
spring.cloud.stream.bindings.greetingChannel.destination=greetings
server.port=9090
复制代码
让咱们同时启动生产者和消费者服务。首先,让咱们经过点击端点http://localhost:8080/greet/john
来生产消息。
在消费者日志中看到消息内容:
咱们使用如下命令启动另外一个消费者服务实例(在另外一个端口(9091)上):
$ mvn spring-boot:run -Dserver.port=9091
复制代码
如今,当咱们点击生产者REST
端点生产消息时,咱们看到两个消费者都收到了消息:
这多是咱们在一些用例中想要的。可是,若是咱们只想让一个消费者消费一条消息呢?为此,咱们须要在application.properties
中建立一个消费者组。消费者的配置文件:
spring.cloud.stream.bindings.greetingChannel.group = greetings-group
复制代码
如今,再次在不一样的端口上运行消费者的2个实例,并经过生产者生产消息再次查看:
这一切也能够在RabbitMQ
管理器控制台看到:
在本文中,咱们解释了消息传递的主要概念、它在微服务中的角色以及如何使用Spring Cloud Stream
实现它。咱们使用RabbitMQ
做为消息代理,可是咱们也可使用其余流行的代理,比如Kafka
,只需更改配置和依赖项。
与往常同样,本文使用的示例代码能够在GitHub得到完整的源代码。
原文:stackabuse.com/spring-clou…
译者:李东