←←←←←←←←←←←← 快!点关注java
让咱们展现如何使用Spring Cloud Stream来设计事件驱动的微服务。首先,Spring Cloud Stream首先有什么好处?由于Spring AMPQ提供了访问AMPQ工件所需的一切。若是您不熟悉Spring AMPQ,请查看此repo,其中包含许多有用的示例。那么为何要使用Spring Cloud Stream ......?git
让咱们有一个名为streamInput的交换,它有两个队列streamInput.cities和streamInput.persons。如今让咱们将这两个队列插入两个消息通道citiesChannel和personsChannel来消费来自它的传入消息。使用Spring AMPQ,您须要建立SimpleMessageListenerContainer并在代码中链接基础结构。但这有不少样板代码。使用Spring Cloud Stream,您能够将AMPQ配置分离到属性文件:程序员
spring.cloud.stream.bindings.citiesChannel.destination=streamInput spring.cloud.stream.bindings.citiesChannel.group=cities spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.durableSubscription=true spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.bindingRoutingKey=cities spring.cloud.stream.bindings.personsChannel.destination=streamInput spring.cloud.stream.bindings.personsChannel.group=persons spring.cloud.stream.rabbit.bindings.personsChannel.consumer.durableSubscription=true spring.cloud.stream.rabbit.bindings.personsChannel.consumer.bindingRoutingKey=persons
在类路径上使用RabbitMQ Binder,每一个目标都映射到TopicExchange。在示例中,我建立了名为streamInput的TopicExchange, 并将其附加到两个消息通道citiesChannel和personsChannel。spring
spring.cloud.stream.bindings.citiesChannel.destination = streamInput spring.cloud.stream.bindings.personsChannel.destination = streamInput
如今您须要了解RabbitMQ绑定器的灵感来自Kafka,队列的消费者被分组到消费者组中,其中只有一个消费者将得到消息。这是有道理的,由于您能够轻松扩展消费者。服务器
所以,让咱们建立两个队列streamInput.persons和streamInput.cities并将它们附加到streamInput TopicExchange和提到的消息通道架构
# This will create queue "streamInput.cities" connected to message channel citiesChannel where input messages will land. spring.cloud.stream.bindings.citiesChannel.group=cities # Durable subscription, of course. spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.durableSubscription=true # AMPQ binding to exchange (previous spring.cloud.stream.bindings.<channel name>.destination settings). # Only messages with routingKey = 'cities' will land here. spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.bindingRoutingKey=cities spring.cloud.stream.bindings.personsChannel.group=persons spring.cloud.stream.rabbit.bindings.personsChannel.consumer.durableSubscription=true spring.cloud.stream.rabbit.bindings.personsChannel.consumer.bindingRoutingKey=persons
好的,到目前为止我建立了两个队列。StreamInput.cities绑定到citiesChannel。StreamInput.persons绑定到peopleChannel。less
<destination>.<group>是Spring Cloud Stream约定的队列命名,如今让咱们将它链接到Spring Integration:微服务
package com.example.spring.cloud.configuration; import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; /** \* Created by tomask79 on 30.03.17. */ public interface SinkRabbitAPI { String INPUT_CITIES = "citiesChannel"; String INPUT_PERSONS = "personsChannel"; @Input(SinkRabbitAPI.INPUT_CITIES) SubscribableChannel citiesChannel(); @Input(SinkRabbitAPI.INPUT_PERSONS) SubscribableChannel personsChannel(); }
Spring Boot启动时加载这个属性测试
package com.example.spring.cloud; import com.example.spring.cloud.configuration.SinkRabbitAPI; import com.example.spring.cloud.configuration.SourceRabbitAPI; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @EnableBinding({SinkRabbitAPI.class}) public class StreamingApplication { public static void main(String\[\] args) { SpringApplication.run(StreamingApplication.class, args); } }
在此以后,咱们能够建立消费者从绑定的消息通道中的队列接收消息:ui
import com.example.spring.cloud.configuration.SinkRabbitAPI; import com.example.spring.cloud.configuration.SourceRabbitAPI; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.stereotype.Service; import javax.annotation.Resource; /** \* Created by tomask79 on 30.03.17. */ @Service public class ProcessingAMPQEndpoint { @StreamListener(SinkRabbitAPI.INPUT_CITIES) public void processCity(final String city) { System.out.println("Trying to process input city: "+city); } @StreamListener(SinkRabbitAPI.INPUT_PERSONS) public void processPersons(final String person) { System.out.println("Trying to process input person: "+person); } }
RabbitMQ绑定器和代理配置
Spring Cloud Stream如何知道在哪里寻找消息中间件?若是在类路径中找到RabbitMQ绑定器,则使用默认RabbitMQ主机(localhost)和端口(5672)链接到RabbitMQ服务器。若是您的消息中间件配置在不一样端口,则须要配置属性:
spring: cloud: stream: bindings: ... binders: rabbitbinder: type: rabbit environment: spring: rabbitmq: host: rabbitmq port: 5672 username: XXX password: XXX
测试消息消费
Started StreamingApplication in 6.513 seconds (JVM running for 6.92) Trying to process input city: sdjfjljksdflkjsdflkjsdfsfd Trying to process input person: sdjfjljksdflkjsdflkjsdfsfd
您一般但愿在进入DLX交换以前再次尝试接收消息。首先,让咱们配置Spring Cloud Stream尝试从新发送失败消息的次数:
spring.cloud.stream.bindings.personsChannel.consumer.maxAttempts = 6
这意味着若是从streamInput.persons队列接收的消息出错,那么Spring Cloud Stream将尝试从新发送六次。让咱们试试,首先让咱们修改接收端点以模拟接收崩溃:
@StreamListener(SinkRabbitAPI.INPUT_PERSONS) public void processPersons(final String person) { System.out.println("Trying to process input person: "+person); throw new RuntimeException(); }
若是我如今尝试使用人员路由键将某些内容发布到streamInput交换中,那么这应该是输出:
Trying to process input person: sfsdfsdfsd Trying to process input person: sfsdfsdfsd Trying to process input person: sfsdfsdfsd Trying to process input person: sfsdfsdfsd Trying to process input person: sfsdfsdfsd Trying to process input person: sfsdfsdfsd Retry Policy Exhausted at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover (RejectAndDontRequeueRecoverer.java:45) ~\[spring-rabbit-1.7.0.RELEASE.jar! /:na\] at org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterc
建议将Spring Cloud Stream 用于事件驱动的MicroServices,由于它能够节省时间,并且您不须要为Java中的AMPQ基础架构编写样板代码。
秃顶程序员的不易,看到这里,点了关注吧! 点关注,不迷路,持续更新!!!