Spring框架为与消息传递系统集成提供了普遍的支持,从使用JmsTemplate
简化的JMS API到使用完整的基础设施异步接收消息,Spring AMQP为高级消息队列协议提供了相似的特性集。Spring Boot还为RabbitTemplate
和RabbitMQ提供自动配置选项,Spring WebSocket原生包括对STOMP消息的支持,Spring Boot经过启动器和少许的自动配置支持这一点,Spring Boot还支持Apache Kafka。html
javax.jms.ConnectionFactory
接口提供了建立javax.jms.Connection
与JMS代理交互的标准方法,尽管Spring须要一个ConnectionFactory
来处理JMS,你一般不须要本身直接使用它,而是能够依赖于更高级别的消息传递抽象。(详见Spring Framework参考文档的相关部分。)Spring Boot还自动配置发送和接收消息所需的基础设施。java
当ActiveMQ在类路径上可用时,Spring Boot还能够配置ConnectionFactory
,若是代理存在,则会自动启动和配置嵌入式代理(只要没有经过配置指定代理URL)。git
若是你使用
spring-boot-starter-activemq
,那么将提供链接或嵌入ActiveMQ实例所需的依赖项,与JMS集成的Spring基础设施也是同样。
ActiveMQ配置由在spring.activemq.*
中的外部配置属性控制,例如,你能够在application.properties
中声明如下部分:github
spring.activemq.broker-url=tcp://192.168.1.210:9876 spring.activemq.user=admin spring.activemq.password=secret
你还能够经过向org.apache.activemq:activemq-pool
添加一个依赖项来共享JMS资源并相应地配置PooledConnectionFactory
,以下例所示:spring
spring.activemq.pool.enabled=true spring.activemq.pool.max-connections=50
有关更多支持的选项,请参见
ActiveMQProperties,你还能够注册一个任意数量的实现
activemqconnectionfactorycustomzer
的bean进行更高级的自定义。
默认状况下,ActiveMQ建立一个目的地,若是它还不存在,那么目的地将根据它们提供的名称解析。apache
Spring Boot能够自动配置ConnectionFactory
,当它检测到在类路径上可用的Artemis时,若是代理存在,则自动启动和配置嵌入式代理(除非模式属性已显式设置),所支持的模式是embedded
(要明确地说明须要一个嵌入式代理,若是代理在类路径上不可用,就会出现错误)和native
(使用netty
传输协议链接到代理),在配置后者时,Spring Boot使用默认设置配置链接到运行在本地机器上的代理的ConnectionFactory
。json
若是你使用spring-boot-starter-artemis
,则提供了链接到现有的Artemis实例的必要依赖项,以及与JMS集成的Spring基础设施,添加org.apache.activemq:artemis-jms-server
到你的应用程序以容许你使用嵌入式模式。
Artemis配置由spring.artemis.*
的外部配置属性控制,例如,你能够在application.properties
中声明如下部分:bootstrap
spring.artemis.mode=native spring.artemis.host=192.168.1.210 spring.artemis.port=9876 spring.artemis.user=admin spring.artemis.password=secret
在嵌入代理时,你能够选择是否启用持久性,并列出应该可用的目的地,能够将它们指定为逗号分隔的列表,以使用默认选项建立它们,或者能够定义类型为org.apache.activemq.artemis.jms.server.config.JMSQueueConfiguration
或org.apache.activemq.artemis.jms.server.config.TopicConfiguration
的bean,分别用于高级队列和主题配置。segmentfault
有关更多受支持的选项,请参阅ArtemisProperties。api
不涉及JNDI查找,目的地根据它们的名称进行解析,使用Artemis配置中的name
属性或经过配置提供的名称。
若是你正在应用服务器中运行应用程序,Spring Boot试图使用JNDI定位JMS ConnectionFactory
,默认状况下,检查java:/JmsXA
和java:/XAConnectionFactory
位置,若是须要指定替代位置,可使用spring.jms.jndi-name
属性,以下例所示:
spring.jms.jndi-name=java:/MyConnectionFactory
Spring的JmsTemplate是自动配置的,你能够将其自动链接到你本身的bean中,以下面的示例所示:
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Component; @Component public class MyBean { private final JmsTemplate jmsTemplate; @Autowired public MyBean(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } // ... }
JmsMessagingTemplate能够以相似的方式注入,若是定义了DestinationResolver
或MessageConverter
bean,则它将自动关联到自动配置的JmsTemplate
。
当出现JMS基础设施时,可使用@JmsListener
注解任何bean,以建立监听器端点,若是没有定义JmsListenerContainerFactory
,则会自动配置默认工厂,若是定义了DestinationResolver
或MessageConverter
bean,则它将自动关联到默认工厂。
默认状况下,默认工厂是事务性的,若是你运行的基础设施中存在JtaTransactionManager
,那么它默认与侦听器容器相关联,若是没有,则启用sessionTransacted
标志。在后一个场景中,你能够经过在监听器方法(或委托)上添加@Transactional
,将本地数据存储事务与接收消息的处理相关联,这确保在本地事务完成以后,传入消息获得确认,这还包括发送在相同JMS会话上执行的响应消息。
如下组件在someQueue
目的地上建立监听器端点:
@Component public class MyBean { @JmsListener(destination = "someQueue") public void processMessage(String content) { // ... } }
有关更多细节,请参见 @EnableJms的Javadoc。
若是你须要建立更多的JmsListenerContainerFactory
实例,或者但愿重写默认的实例,Spring Boot提供了一个DefaultJmsListenerContainerFactoryConfigurer
,你可使用它来初始化一个DefaultJmsListenerContainerFactory
,其设置与自动配置的工厂相同。
例如,下面的示例公开了另外一个使用特定MessageConverter
的工厂:
@Configuration static class JmsConfiguration { @Bean public DefaultJmsListenerContainerFactory myFactory( DefaultJmsListenerContainerFactoryConfigurer configurer) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); configurer.configure(factory, connectionFactory()); factory.setMessageConverter(myMessageConverter()); return factory; } }
而后你能够在任何@JmsListener
注解的方法中使用工厂,以下所示:
高级消息队列协议(AMQP)是面向消息的中间件的一种平台无关的、有线级别的协议。Spring AMQP项目将核心Spring概念应用于基于AMQP的消息传递解决方案的开发,Spring Boot为经过RabbitMQ使用AMQP提供了一些方便,包括spring-boot-starter-amqp
“启动器”。
RabbitMQ是一个轻量级的、可靠的、可伸缩的、可移植的消息代理,基于AMQP协议,Spring使用RabbitMQ
经过AMQP协议进行通讯。
RabbitMQ配置由spring.rabbitmq.*
的外部配置属性控制,例如,你能够在application.properties
中声明如下部分:
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=secret
若是上下文中存在ConnectionNameStrategy
bean,那么它将自动用于命名由自动配置的ConnectionFactory
建立的链接。有关更多受支持的选项,请参阅RabbitProperties。
有关详细信息,请参阅 RabbitMQ使用的协议AMQP
Spring的AmqpTemplate
和AmqpAdmin
是自动配置的,你能够将它们自动链接到你本身的bean中,以下面的示例所示:
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class MyBean { private final AmqpAdmin amqpAdmin; private final AmqpTemplate amqpTemplate; @Autowired public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) { this.amqpAdmin = amqpAdmin; this.amqpTemplate = amqpTemplate; } // ... }
RabbitMessagingTemplate能够以相似的方式注入,若是定义了MessageConverter
bean,它会自动关联到自动配置的AmqpTemplate
。
若是有必要,任何定义为bean的org.springframework.amqp.core.Queue
自动用于在RabbitMQ实例上声明相应的队列。
重试操做,能够对AmqpTemplate
启用重试(例如,若是代理链接丢失了),默认状况下禁用重试。
当Rabbit基础设施存在时,可使用@RabbitListener
对任何bean进行注解,以建立监听器端点,若是没有定义RabbitListenerContainerFactory
,则会自动配置默认的SimpleRabbitListenerContainerFactory
,你可使用spring.rabbitmq.listener.type
属性切换到直接容器。若是定义了MessageConverter
或MessageRecoverer
bean,则它将自动与默认工厂相关联。
如下示例组件在someQueue
队列上建立监听器端点:
@Component public class MyBean { @RabbitListener(queues = "someQueue") public void processMessage(String content) { // ... } }
有关更多细节,请参见 @EnableRabbit的Javadoc。
若是你须要建立更多的RabbitListenerContainerFactory
实例,或者你想要覆盖缺省值,Spring Boot提供了一个SimpleRabbitListenerContainerFactoryConfigurer
和一个DirectRabbitListenerContainerFactoryConfigurer
,你可使用它们初始化一个SimpleRabbitListenerContainerFactory
,以及一个DirectRabbitListenerContainerFactory
,其设置与自动配置使用的工厂相同。
选择哪一种容器类型并不重要,这两个bean经过自动配置公开。
例如,下面的configuration类公开另外一个使用特定MessageConverter
的工厂:
@Configuration static class RabbitConfiguration { @Bean public SimpleRabbitListenerContainerFactory myFactory( SimpleRabbitListenerContainerFactoryConfigurer configurer) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); factory.setMessageConverter(myMessageConverter()); return factory; } }
而后你能够在任何@RabbitListener
注解的方法中使用工厂,以下所示:
@Component public class MyBean { @RabbitListener(queues = "someQueue", containerFactory="myFactory") public void processMessage(String content) { // ... } }
你能够启用重试来处理监听器抛出异常的状况,默认状况下,使用RejectAndDontRequeueRecoverer
,可是能够定义本身的MessageRecoverer
,当重试结束时,消息将被拒绝,若是将代理配置为这样作,则消息将被删除或路由到死信交换,默认状况下,重试被禁用。
重要
默认状况下,若是重试被禁用而且监听器抛出异常,该递送被无限期地重试,你能够用两种方式修改此行为:将defaultRequeueRejected
属性设置为false
,以便尝试零重复发送,或者抛出AmqpRejectAndDontRequeueException
来通知消息应该被拒绝,后者是在启用重试并达到最大提交尝试次数时使用的机制。
经过提供spring-kafka
项目的自动配置来支持Apache Kafka。
Kafka配置由spring.kafka.*
中的外部配置属性控制,例如,你能够在application.properties
中声明如下部分:
spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=myGroup
要在启动时建立主题,请添加一个类型
NewTopic
的bean,若是主题已经存在,则忽略bean。
有关更多受支持的选项,请参阅KafkaProperties。
Spring的KafkaTemplate
是自动配置的,你能够直接在你本身的bean中自动链接它,以下面的示例所示:
@Component public class MyBean { private final KafkaTemplate kafkaTemplate; @Autowired public MyBean(KafkaTemplate kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } // ... }
若是定义了一个RecordMessageConverter
bean,它将自动关联到自动配置的KafkaTemplate
当存在Apache Kafka基础设施时,任何bean均可以使用@KafkaListener
进行注解,以建立监听器端点,若是没有定义KafkaListenerContainerFactory
,默认设置为使用spring.kafka.listener.*
中定义的键,此外,若是定义了一个RecordMessageConverter
bean,它将自动关联到默认的工厂。
@Component public class MyBean { @KafkaListener(topics = "someTopic") public void processMessage(String content) { // ... } }
自动配置所支持的属性显示在附录A中,通用的应用程序属性。注意,在大多数状况下,这些属性(连字符或驼峰式大小写)直接映射到Apache Kafka *属性,有关详细信息,请参阅Apache Kafka文档。
前几个属性同时适用于生产者和消费者,可是若是你但愿对每一个属性使用不一样的值,能够在生产者或消费者级别指定,Apache Kafka设计具备高、中或低重要性的属性,Spring Boot自动配置支持全部重要属性、一些选定的中属性和低属性,以及任何没有默认值的属性。
Kafka支持的属性中只有一部分是能够经过KafkaProperties
类得到的,若是你但愿为生产者或消费者配置不受直接支持的其余属性,请使用如下属性:
spring.kafka.properties.prop.one=first spring.kafka.admin.properties.prop.two=second spring.kafka.consumer.properties.prop.three=third spring.kafka.producer.properties.prop.four=fourth
这将设置通用的prop.one
Kafka属性为first
(适用于生产者、消费者和管理员),prop.two
管理员属性为second
,prop.three
消费者属性为third
而且prop.four
生产者属性为fourth
。
你还能够配置Spring Kafka JsonDeserializer
,以下所示:
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.value.default.type=com.example.Invoice spring.kafka.consumer.properties.spring.json.trusted.packages=com.example,org.acme
一样,能够禁用JsonSerializer
在header中发送类型信息的默认行为:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer spring.kafka.producer.properties.spring.json.add.type.headers=false
以这种方式设置的属性将覆盖Spring Boot显式支持的任何配置项。