本文属于翻译,转载注明出处,欢迎关注微信小程序小白AI博客
微信公众号小白AI
或者网站 https://xiaobaiai.nethtml
[TOC]java
Spring Integration
提供入站(inbound)和出站(outbound)通道适配器,以支持MQTT消息协议。使用这两适配器,须要加入依赖:web
<!-- Maven --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>5.2.1.RELEASE</version> </dependency>
// Gradle compile "org.springframework.integration:spring-integration-mqtt:5.2.1.RELEASE"
当前的MQTT Integration实现使用的是Eclipse Paho MQTT客户端库。两个适配器的配置都是使用DefaultMqttPahoClientFactory
实现的。有关配置选项的更多信息,请参阅Eclipse Paho MQTT
文档定义。spring
建议配置
MqttConnectOptions
对象并将其注入工厂(factory),而不是在工厂自己里设置(不推荐使用)MQTT链接选项。
入站通道适配器由MqttPahoMessageDrivenChannelAdapter
实现。经常使用的配置项有:express
MqttMessageConverter
(可选)。默认状况下,默认的DefaultPaHomeMessageConverter
生成一条带有字符串有效负载的消息,其头部内容以下:编程
mqtt_topic
: 接收消息的主题mqtt_duplicate
: 若是消息是重复的,则为true
mqtt_qos
: 服务质量,你能够将DefaultPahoMessageConverter
声明为<bean />
并将payloadAsBytes
属性设置为true
,从而将DefaultPahoMessageConverter
返回有效负载中的原始byte[]
MessagingException
。从Spring 4.1版开始,能够省略URL
。相反,你能够在DefaultMqttPahoClientFactory
的server URIs
属性中提供服务器uri。例如,这样作容许链接到高可用(HA)集群。
从Spring 4.2.2
开始,当适配器成功订阅到主题了,MqttSubscribedEvent
事件就会被触发。当链接失败或者订阅失败,MqttConnectionFailedEvent
事件会被触发。这两个事件都可以被一个Bean经过实现ApplicationListener
而接收到。另外,名为recoveryInterval
的新属性控制适配器在失败后尝试从新链接的时间间隔。默认为10000毫秒(10秒)。小程序
@Component public class MQTTSubscribedListener implements ApplicationListener<MqttSubscribedEvent> { private static final Logger LOGGER = LogManager.getLogger(MQTTSubscribedListener.class); @Override public void onApplicationEvent(MqttSubscribedEvent event) { LOGGER.debug("Subscribed Success: " + event.getMessage()); } }
在版本Spring 4.2.3以前,当适配器中止时,客户端老是取消订阅。这是不正确的,由于若是客户端QOS大于0,咱们须要保持订阅处于活动状态,以便在下次启动时传递适配器中止时到达的消息。这还须要将客户机工厂上的cleanSession
属性设置为false。默认为true。从4.2.3版开始,若是cleanSession属性为false,则适配器不会取消订阅(默认状况下),这个默认行为能够经过在工厂上设置consumerCloseAction
属性来重写此行为。它能够有如下值:UNSUBSCRIBE_ALWAYS
、UNSUBSCRIBE_NEVER
和UNSUBSCRIBE_CLEAN
,最后一项(默认设置)仅在cleanSession属性为true时取消订阅。若要还原到4.2.3以前的行为,请始终使用“取消订阅”设置项。注意:从Spring 5.0开始,topic、qos和retained属性映射到
.RECEIVED_…headers
(MqttHeaders.RECEIVED_topic、MqttHeaders.RECEIVED_qos和MqttHeaders.RECEIVED_retained),以免意外传播到(默认状况下)使用MqttHeaders.topic、MqttHeaders.qos和MqttHeaders.retained headers的出站消息。segmentfault
public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { LOGGER.debug("===Received Msg(topic {}): {}", message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC), message.getPayload()); } }; }
从Spring4.1
开始,你能够经过编程更改适配器订阅的主题。Spring Integration提供了addTopic()
和removeTopic()
方法。添加主题时,能够选择指定QoS
值(默认是1)。你还能够经过向具备适当有效负载的<control bus/>
发送适当的消息来修改主题。示例:微信小程序
myMqttAdapter.addTopic('foo', 1)
中止和启动适配器对主题列表(topics设置项)没有影响(它不会还原到配置中的原始设置)。这些更改不会保留到应用程序上下文的生命周期以外。新的应用程序上下文将还原为配置的设置。bash
在适配器中止(或与代理断开链接)时更改主题列表(topics)将在下次创建链接时生效。
如下Spring Boot
应用程序显示了如何使用Java配置配置入站(inbound)适配器的示例:
@SpringBootApplication public class MqttJavaApplication { public static void main(String[] args) { new SpringApplicationBuilder(MqttJavaApplication.class) .web(false) .run(args); } @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } @Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient", "topic1", "topic2"); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); return adapter; } @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { System.out.println(message.getPayload()); } }; } }
下面的Spring Boot
应用程序提供了使用Java DSL配置入站适配器的示例:
@SpringBootApplication public class MqttJavaApplication { public static void main(String[] args) { new SpringApplicationBuilder(MqttJavaApplication.class) .web(false) .run(args); } @Bean public IntegrationFlow mqttInbound() { return IntegrationFlows.from( new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient", "topic1", "topic2");) .handle(m -> System.out.println(m.getPayload())) .get(); } }
出站通道适配器由MqttPahoMessageHandler
实现,MqttPahoMessageHandler
包装在ConsumerEndpoint
中。为了方便起见,可使用名称空间配置它。
从Spring 4.1开始,适配器支持异步发送操做,在确认交付以前避免阻塞。若是须要,能够发出应用程序事件以使应用程序确认传递。
如下列表显示出站通道适配器可用的属性:
<int-mqtt:outbound-channel-adapter id="withConverter" client-id="foo" url="tcp://localhost:1883" converter="myConverter" client-factory="clientFactory" default-qos="1" qos-expression="" default-retained="true" retained-expression="" default-topic="bar" topic-expression="" async="false" async-events="false" channel="target" />
Converter(MqttMessageConver,可选的),默认的DefaultPaHomeMessageConverter
可识别如下标题:
mqtt_topic
: 消息将发送到的主题mqtt_retained
: 若是要保留消息,则为truemqtt_qos
:消息服务质量headers[mqtt_qos]
。mqtt_retained
头,则使用它。若是提供了自定义转换器,则不使用它。headers[mqtt_retained]
mqtt_topic
头,则使用)headers['mqtt_topic']
MqttMessageSentEvent
。它包含消息、主题、客户端库生成的消息id、clientId和clientInstance(每次链接客户端时递增)。当客户端库确认传递时,将发出MqttMessageDeliveredEvent
。它包含messageId、clientId和clientInstance,使传递与发送相关。任何ApplicationListener
或事件入站通道适配器均可以接收这些事件。请注意,MqttMessageDeliveredEvent
可能在MqttMessageSentEvent
以前收到。默认值为false注意,一样地,从Spring 4.1开始,能够省略URL。相反,能够在DefaultMqttPahoClientFactor
的server URIs
属性中提供服务器uri。例如,这容许链接到高可用(HA)集群。
下面的Spring Boot应用程序展现了如何使用Java配置配置出站适配器的示例:
@SpringBootApplication @IntegrationComponentScan public class MqttJavaApplication { public static void main(String[] args) { ConfigurableApplicationContext context = new SpringApplicationBuilder(MqttJavaApplication.class) .web(false) .run(args); MyGateway gateway = context.getBean(MyGateway.class); gateway.sendToMqtt("foo"); } @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" }); options.setUserName("username"); options.setPassword("password".toCharArray()); factory.setConnectionOptions(options); return factory; } @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("testClient", mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic("testTopic"); return messageHandler; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MyGateway { void sendToMqtt(String data); } }
下面的Spring Boot应用程序提供了使用Java DSL配置出站适配器的示例:
@SpringBootApplication public class MqttJavaApplication { public static void main(String[] args) { new SpringApplicationBuilder(MqttJavaApplication.class) .web(false) .run(args); } @Bean public IntegrationFlow mqttOutboundFlow() { return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient")); } }
本文属于原创,转载注明出处,欢迎关注CSDNfreeape或微信小程序小白AI博客
微信公众号小白AI
或者网站 https://xiaobaiai.net