spring: mqtt: client: username: 用户名 password: 密码 serverURIs: tcp://ip:port # 客户端地址,多个使用逗号隔开 clientId: client0001 # ${random.value} keepAliveInterval: 30 connectionTimeout: 30 producer: defaultQos: 1 defaultRetained: true defaultTopic: defaultTopicName consumer: defaultQos: 1 completionTimeout: 30000 consumerTopics: topic1,topic2 # 监听的 topic,多个使用逗号隔开
/* 客户端 */ @Bean public MqttConnectOptions getMqttConnectOptions() { MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setUserName(username); mqttConnectOptions.setPassword(password.toCharArray()); mqttConnectOptions.setServerURIs(serverURIs); mqttConnectOptions.setKeepAliveInterval(keepAliveInterval); mqttConnectOptions.setConnectionTimeout(connectionTimeout); return mqttConnectOptions; } @Bean public MqttPahoClientFactory getMqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getMqttConnectOptions()); return factory; }
@Bean public MessageChannel outboundChannel() { return new DirectChannel(); } @Bean @ServiceActivator(inputChannel = OUTBOUND_CHANNEL) public MessageHandler getMqttProducer() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, getMqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic(defaultTopic); messageHandler.setDefaultRetained(defaultRetained); messageHandler.setDefaultQos(defaultProducerQos); return messageHandler; }
@MessagingGateway(defaultRequestChannel = MqttConfig.OUTBOUND_CHANNEL) public interface MqttSender { void sendToMqtt(String data); void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); }
@RestController public class TestController { @Autowired private MqttSender mqttSender; @RequestMapping("/send") private void send(String data){ mqttSender.sendToMqtt(data); } }
@Bean public MessageChannel inboundChannel() { return new DirectChannel(); } @Bean public MessageProducer getMqttConsumer() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, getMqttClientFactory(), consumerTopics); adapter.setCompletionTimeout(completionTimeout); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(defaultConsumerQos); adapter.setOutputChannel(inboundChannel()); return adapter; }
@Component public class MqttConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(MqttConsumer.class); @Bean @ServiceActivator(inputChannel = MqttConfig.INBOUND_CHANNEL) public MessageHandler handler() { return message -> { String topic = message.getHeaders().get(MqttConfig.RECEIVED_TOPIC_KEY).toString(); LOGGER.info("[{}]主题接收到消息:{}", topic, message.getPayload().toString()); }; } }
注意事项html
@ServiceActivator 和 @MessagingGateway 中绑定的 Channel 名,需与返回 MessageChannel 的 Bean 的方法名同样:vue
如发布者绑定的 Channel 名为 outboundChannel,则须要有对应的方法,以下:java
@Bean public MessageChannel outboundChannel() { return new DirectChannel(); }
参考git
完整代码:GitHubgithub