Spring Boot 参考指南(消息传递)

32. 消息传递

Spring框架为与消息传递系统集成提供了普遍的支持,从使用JmsTemplate简化的JMS API到使用完整的基础设施异步接收消息,Spring AMQP为高级消息队列协议提供了相似的特性集。Spring Boot还为RabbitTemplate和RabbitMQ提供自动配置选项,Spring WebSocket原生包括对STOMP消息的支持,Spring Boot经过启动器和少许的自动配置支持这一点,Spring Boot还支持Apache Kafka。html

32.1 JMS

javax.jms.ConnectionFactory接口提供了建立javax.jms.Connection与JMS代理交互的标准方法,尽管Spring须要一个ConnectionFactory来处理JMS,你一般不须要本身直接使用它,而是能够依赖于更高级别的消息传递抽象。(详见Spring Framework参考文档的相关部分。)Spring Boot还自动配置发送和接收消息所需的基础设施。java

32.1.1 ActiveMQ支持

ActiveMQ在类路径上可用时,Spring Boot还能够配置ConnectionFactory,若是代理存在,则会自动启动和配置嵌入式代理(只要没有经过配置指定代理URL)。git

若是你使用 spring-boot-starter-activemq,那么将提供链接或嵌入ActiveMQ实例所需的依赖项,与JMS集成的Spring基础设施也是同样。

ActiveMQ配置由在spring.activemq.*中的外部配置属性控制,例如,你能够在application.properties中声明如下部分:github

spring.activemq.broker-url=tcp://192.168.1.210:9876
spring.activemq.user=admin
spring.activemq.password=secret

你还能够经过向org.apache.activemq:activemq-pool添加一个依赖项来共享JMS资源并相应地配置PooledConnectionFactory,以下例所示:spring

spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50
有关更多支持的选项,请参见 ActiveMQProperties,你还能够注册一个任意数量的实现 activemqconnectionfactorycustomzer的bean进行更高级的自定义。

默认状况下,ActiveMQ建立一个目的地,若是它还不存在,那么目的地将根据它们提供的名称解析。apache

32.1.2 Artemis支持

Spring Boot能够自动配置ConnectionFactory,当它检测到在类路径上可用的Artemis时,若是代理存在,则自动启动和配置嵌入式代理(除非模式属性已显式设置),所支持的模式是embedded(要明确地说明须要一个嵌入式代理,若是代理在类路径上不可用,就会出现错误)和native(使用netty传输协议链接到代理),在配置后者时,Spring Boot使用默认设置配置链接到运行在本地机器上的代理的ConnectionFactoryjson

若是你使用 spring-boot-starter-artemis,则提供了链接到现有的Artemis实例的必要依赖项,以及与JMS集成的Spring基础设施,添加 org.apache.activemq:artemis-jms-server到你的应用程序以容许你使用嵌入式模式。

Artemis配置由spring.artemis.*的外部配置属性控制,例如,你能够在application.properties中声明如下部分:bootstrap

spring.artemis.mode=native
spring.artemis.host=192.168.1.210
spring.artemis.port=9876
spring.artemis.user=admin
spring.artemis.password=secret

在嵌入代理时,你能够选择是否启用持久性,并列出应该可用的目的地,能够将它们指定为逗号分隔的列表,以使用默认选项建立它们,或者能够定义类型为org.apache.activemq.artemis.jms.server.config.JMSQueueConfigurationorg.apache.activemq.artemis.jms.server.config.TopicConfiguration的bean,分别用于高级队列和主题配置。segmentfault

有关更多受支持的选项,请参阅ArtemisPropertiesapi

不涉及JNDI查找,目的地根据它们的名称进行解析,使用Artemis配置中的name属性或经过配置提供的名称。

32.1.3 使用JNDI ConnectionFactory

若是你正在应用服务器中运行应用程序,Spring Boot试图使用JNDI定位JMS ConnectionFactory,默认状况下,检查java:/JmsXAjava:/XAConnectionFactory位置,若是须要指定替代位置,可使用spring.jms.jndi-name属性,以下例所示:

spring.jms.jndi-name=java:/MyConnectionFactory

32.1.4 发送消息

Spring的JmsTemplate是自动配置的,你能够将其自动链接到你本身的bean中,以下面的示例所示:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final JmsTemplate jmsTemplate;

    @Autowired
    public MyBean(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    // ...

}
JmsMessagingTemplate能够以相似的方式注入,若是定义了 DestinationResolverMessageConverter bean,则它将自动关联到自动配置的 JmsTemplate

32.1.5 接收消息

当出现JMS基础设施时,可使用@JmsListener注解任何bean,以建立监听器端点,若是没有定义JmsListenerContainerFactory,则会自动配置默认工厂,若是定义了DestinationResolverMessageConverter bean,则它将自动关联到默认工厂。

默认状况下,默认工厂是事务性的,若是你运行的基础设施中存在JtaTransactionManager,那么它默认与侦听器容器相关联,若是没有,则启用sessionTransacted标志。在后一个场景中,你能够经过在监听器方法(或委托)上添加@Transactional,将本地数据存储事务与接收消息的处理相关联,这确保在本地事务完成以后,传入消息获得确认,这还包括发送在相同JMS会话上执行的响应消息。

如下组件在someQueue目的地上建立监听器端点:

@Component
public class MyBean {

    @JmsListener(destination = "someQueue")
    public void processMessage(String content) {
        // ...
    }

}
有关更多细节,请参见 @EnableJms的Javadoc

若是你须要建立更多的JmsListenerContainerFactory实例,或者但愿重写默认的实例,Spring Boot提供了一个DefaultJmsListenerContainerFactoryConfigurer,你可使用它来初始化一个DefaultJmsListenerContainerFactory,其设置与自动配置的工厂相同。

例如,下面的示例公开了另外一个使用特定MessageConverter的工厂:

@Configuration
static class JmsConfiguration {

    @Bean
    public DefaultJmsListenerContainerFactory myFactory(
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory =
                new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, connectionFactory());
        factory.setMessageConverter(myMessageConverter());
        return factory;
    }

}

而后你能够在任何@JmsListener注解的方法中使用工厂,以下所示:

32.2 AMQP

高级消息队列协议(AMQP)是面向消息的中间件的一种平台无关的、有线级别的协议。Spring AMQP项目将核心Spring概念应用于基于AMQP的消息传递解决方案的开发,Spring Boot为经过RabbitMQ使用AMQP提供了一些方便,包括spring-boot-starter-amqp“启动器”。

32.2.1 RabbitMQ支持

RabbitMQ是一个轻量级的、可靠的、可伸缩的、可移植的消息代理,基于AMQP协议,Spring使用RabbitMQ经过AMQP协议进行通讯。

RabbitMQ配置由spring.rabbitmq.*的外部配置属性控制,例如,你能够在application.properties中声明如下部分:

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret

若是上下文中存在ConnectionNameStrategy bean,那么它将自动用于命名由自动配置的ConnectionFactory建立的链接。有关更多受支持的选项,请参阅RabbitProperties

有关详细信息,请参阅 RabbitMQ使用的协议AMQP

32.2.2 发送消息

Spring的AmqpTemplateAmqpAdmin是自动配置的,你能够将它们自动链接到你本身的bean中,以下面的示例所示:

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final AmqpAdmin amqpAdmin;
    private final AmqpTemplate amqpTemplate;

    @Autowired
    public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
        this.amqpAdmin = amqpAdmin;
        this.amqpTemplate = amqpTemplate;
    }

    // ...

}
RabbitMessagingTemplate能够以相似的方式注入,若是定义了 MessageConverter bean,它会自动关联到自动配置的 AmqpTemplate

若是有必要,任何定义为bean的org.springframework.amqp.core.Queue自动用于在RabbitMQ实例上声明相应的队列。

重试操做,能够对AmqpTemplate启用重试(例如,若是代理链接丢失了),默认状况下禁用重试。

32.2.3 接收消息

当Rabbit基础设施存在时,可使用@RabbitListener对任何bean进行注解,以建立监听器端点,若是没有定义RabbitListenerContainerFactory,则会自动配置默认的SimpleRabbitListenerContainerFactory,你可使用spring.rabbitmq.listener.type属性切换到直接容器。若是定义了MessageConverterMessageRecoverer bean,则它将自动与默认工厂相关联。

如下示例组件在someQueue队列上建立监听器端点:

@Component
public class MyBean {

    @RabbitListener(queues = "someQueue")
    public void processMessage(String content) {
        // ...
    }

}
有关更多细节,请参见 @EnableRabbit的Javadoc

若是你须要建立更多的RabbitListenerContainerFactory实例,或者你想要覆盖缺省值,Spring Boot提供了一个SimpleRabbitListenerContainerFactoryConfigurer和一个DirectRabbitListenerContainerFactoryConfigurer,你可使用它们初始化一个SimpleRabbitListenerContainerFactory,以及一个DirectRabbitListenerContainerFactory,其设置与自动配置使用的工厂相同。

选择哪一种容器类型并不重要,这两个bean经过自动配置公开。

例如,下面的configuration类公开另外一个使用特定MessageConverter的工厂:

@Configuration
static class RabbitConfiguration {

    @Bean
    public SimpleRabbitListenerContainerFactory myFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer) {
        SimpleRabbitListenerContainerFactory factory =
                new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        factory.setMessageConverter(myMessageConverter());
        return factory;
    }

}

而后你能够在任何@RabbitListener注解的方法中使用工厂,以下所示:

@Component
public class MyBean {

    @RabbitListener(queues = "someQueue", containerFactory="myFactory")
    public void processMessage(String content) {
        // ...
    }

}

你能够启用重试来处理监听器抛出异常的状况,默认状况下,使用RejectAndDontRequeueRecoverer,可是能够定义本身的MessageRecoverer,当重试结束时,消息将被拒绝,若是将代理配置为这样作,则消息将被删除或路由到死信交换,默认状况下,重试被禁用。

重要
默认状况下,若是重试被禁用而且监听器抛出异常,该递送被无限期地重试,你能够用两种方式修改此行为:将 defaultRequeueRejected属性设置为 false,以便尝试零重复发送,或者抛出 AmqpRejectAndDontRequeueException来通知消息应该被拒绝,后者是在启用重试并达到最大提交尝试次数时使用的机制。

32.3 Apache Kafka支持

经过提供spring-kafka项目的自动配置来支持Apache Kafka

Kafka配置由spring.kafka.*中的外部配置属性控制,例如,你能够在application.properties中声明如下部分:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
要在启动时建立主题,请添加一个类型 NewTopic的bean,若是主题已经存在,则忽略bean。

有关更多受支持的选项,请参阅KafkaProperties

32.3.1 发送消息

Spring的KafkaTemplate是自动配置的,你能够直接在你本身的bean中自动链接它,以下面的示例所示:

@Component
public class MyBean {

    private final KafkaTemplate kafkaTemplate;

    @Autowired
    public MyBean(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    // ...

}
若是定义了一个 RecordMessageConverter bean,它将自动关联到自动配置的 KafkaTemplate

32.3.2 接收消息

当存在Apache Kafka基础设施时,任何bean均可以使用@KafkaListener进行注解,以建立监听器端点,若是没有定义KafkaListenerContainerFactory,默认设置为使用spring.kafka.listener.*中定义的键,此外,若是定义了一个RecordMessageConverter bean,它将自动关联到默认的工厂。

@Component
public class MyBean {

    @KafkaListener(topics = "someTopic")
    public void processMessage(String content) {
        // ...
    }

}

32.3.3 附加的Kafka属性

自动配置所支持的属性显示在附录A中,通用的应用程序属性。注意,在大多数状况下,这些属性(连字符或驼峰式大小写)直接映射到Apache Kafka *属性,有关详细信息,请参阅Apache Kafka文档。

前几个属性同时适用于生产者和消费者,可是若是你但愿对每一个属性使用不一样的值,能够在生产者或消费者级别指定,Apache Kafka设计具备高、中或低重要性的属性,Spring Boot自动配置支持全部重要属性、一些选定的中属性和低属性,以及任何没有默认值的属性。

Kafka支持的属性中只有一部分是能够经过KafkaProperties类得到的,若是你但愿为生产者或消费者配置不受直接支持的其余属性,请使用如下属性:

spring.kafka.properties.prop.one=first
spring.kafka.admin.properties.prop.two=second
spring.kafka.consumer.properties.prop.three=third
spring.kafka.producer.properties.prop.four=fourth

这将设置通用的prop.oneKafka属性为first(适用于生产者、消费者和管理员),prop.two 管理员属性为secondprop.three消费者属性为third而且prop.four生产者属性为fourth

你还能够配置Spring Kafka JsonDeserializer,以下所示:

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.Invoice
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example,org.acme

一样,能够禁用JsonSerializer在header中发送类型信息的默认行为:

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=false
以这种方式设置的属性将覆盖Spring Boot显式支持的任何配置项。

下一篇:使用RestTemplate调用REST服务

相关文章
相关标签/搜索