MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通信协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优势在于,能够以极少的代码和有限的带宽,为链接远程设备提供实时可靠的消息服务。做为一种低开销、低带宽占用的即时通信协议,使其在物联网、小型设备、移动应用等方面有较普遍的应用。 MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特色使它适用范围很是普遍。在不少状况下,包括受限的环境中,如:机器与机器(M2M)通讯和物联网(IoT)。其在,经过卫星链路通讯传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已普遍使用。java
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
inBound配置属性web
package com.chen.config; import lombok.Data; /** * @author: ChenJie * @date 2018/8/21 */ @Data public class MqttInboundProperties { private String url; private String username; private String password; private String clientId; private String topics; }
package com.chen.config; import lombok.Getter; import lombok.Setter; /** * @author: ChenJie * @date 2018/8/21 */ @Setter @Getter public class MqttOutboundProperties { private String urls; private String username; private String password; private String clientId; private String topic; }
package com.chen.config; import org.springframework.boot.context.properties.ConfigurationProperties; /** * @author: ChenJie * @date 2018/8/21 */ @ConfigurationProperties(prefix = "com.mqtt") public class MqttProperties { private MqttInboundProperties inbound; private MqttOutboundProperties outbound; public MqttInboundProperties getInbound() { return inbound; } public void setInbound(MqttInboundProperties inbound) { this.inbound = inbound; } public MqttOutboundProperties getOutbound() { return outbound; } public void setOutbound(MqttOutboundProperties outbound) { this.outbound = outbound; } }
package com.chen.mqtt; import com.chen.config.MqttProperties; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; /** * @author: ChenJie * @date 2018/8/21 */ @Configuration @Slf4j public class MqttInboundConfiguration { @Autowired private MqttProperties mqttProperties; @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } @Bean public MessageProducer inbound(MqttPahoClientFactory mqttPahoClientFactory) { String[] inboundTopics = mqttProperties.getInbound().getTopics().split(","); MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInbound().getUrl(), mqttProperties.getInbound().getClientId(), mqttPahoClientFactory,inboundTopics); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); return adapter; } @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { log.info("收到消息:"+(String) message.getPayload()); } }; } }
package com.chen.mqtt; import com.chen.config.MqttProperties; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; /** * @author: ChenJie * @date 2018/8/21 */ @Configuration public class MqttOutboundConfiguration { @Autowired private MqttProperties mqttProperties; @Bean public MqttPahoClientFactory mqttClientFactory() { String[] serverURIs = mqttProperties.getOutbound().getUrls().split(","); DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); // factory.setServerURIs("tcp://192.168.10.100:1883", "tcp://host2:1883"); factory.setServerURIs(serverURIs); factory.setCleanSession(false); factory.setUserName(mqttProperties.getOutbound().getUsername()); factory.setPassword(mqttProperties.getOutbound().getPassword()); return factory; } @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getOutbound().getClientId(), mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic(mqttProperties.getOutbound().getTopic()); return messageHandler; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } }
package com.chen.service; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; /** * @author: ChenJie * @date 2018/8/21 */ @Component @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttGateway { void sendToMqtt(String data); void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); }
package com.chen.controller; import com.chen.service.MqttGateway; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /** * @author: ChenJie * @date 2018/8/21 */ @RestController public class MessageController { @Autowired MqttGateway mqttGateway; @RequestMapping(value="/sendMsg") public String sendMsg(@RequestParam String message){ mqttGateway.sendToMqtt("defualt-topic:"+message); return "success"; } }
@SpringBootApplication @Configuration @EnableConfigurationProperties(MqttProperties.class) public class MqttSpringbootApplication { public static void main(String[] args) { SpringApplication.run(MqttSpringbootApplication.class, args); } }