有时候咱们的项目中会用到即时通信功能,好比电商系统中的客服聊天功能,还有在支付过程当中,当用户支付成功后,第三方支付服务会回调咱们的回调接口,此时咱们须要通知前端支付成功。最近发现RabbitMQ能够很方便的实现即时通信功能,若是你没有特殊的业务需求,甚至能够不写后端代码,今天给你们讲讲如何使用RabbitMQ来实现即时通信!javascript
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的轻量级通信协议,该协议构建于TCP/IP协议上。MQTT最大优势在于,能够以极少的代码和有限的带宽,为链接远程设备提供实时可靠的消息服务。html
RabbitMQ启用MQTT功能,须要先安装然RabbitMQ而后再启用MQTT插件。前端
rabbitmq-plugins enable rabbitmq_mqtt
咱们可使用MQTT客户端来测试MQTT的即时通信功能,这里使用的是MQTTBox这个客户端工具。java
既然MQTTBox客户端能够直接经过RabbitMQ实现即时通信,那咱们是否是直接使用前端技术也能够实现即时通信?答案是确定的!下面咱们将经过html+javascript实现一个简单的聊天功能,真正不写一行后端代码实现即时通信!git
rabbitmq-plugins enable rabbitmq_web_mqtt
Title
目标Topic:发送消息:发送 清空
github
``web
没有特殊业务需求的时候,前端能够直接和RabbitMQ对接实现即时通信。可是有时候咱们须要经过服务端去通知前端,此时就须要在应用中集成MQTT了,接下来咱们来说讲如何在SpringBoot应用中使用MQTT。spring
org.springframework.integration spring-integration-mqtt
rabbitmq: mqtt: url: tcp://localhost:1883 username: guest password: guest defaultTopic: testTopic
/** * MQTT相关配置 * Created by macro on 2020/9/15. */@Data@EqualsAndHashCode(callSuper = false)@Component@ConfigurationProperties(prefix = "rabbitmq.mqtt")public class MqttConfig { /** * RabbitMQ链接用户名 */ private String username; /** * RabbitMQ链接密码 */ private String password; /** * RabbitMQ的MQTT默认topic */ private String defaultTopic; /** * RabbitMQ的MQTT链接地址 */ private String url;}
/** * MQTT消息订阅者相关配置 * Created by macro on 2020/9/15. */@Slf4j@Configurationpublic class MqttInboundConfig { @Autowired private MqttConfig mqttConfig; @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } @Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl(), "subscriberClient", mqttConfig.getDefaultTopic()); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); //设置消息质量:0->至多一次;1->至少一次;2->只有一次 adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); return adapter; } @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message> message) throws MessagingException { //处理订阅消息 log.info("handleMessage : {}",message.getPayload()); } }; }}
/** * MQTT消息发布者相关配置 * Created by macro on 2020/9/15. */@Configurationpublic class MqttOutboundConfig { @Autowired private MqttConfig mqttConfig; @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(new String[] { mqttConfig.getUrl()}); options.setUserName(mqttConfig.getUsername()); options.setPassword(mqttConfig.getPassword().toCharArray()); factory.setConnectionOptions(options); return factory; } @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("publisherClient", mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic()); return messageHandler; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); }}
/** * MQTT网关,经过接口将数据传递到集成流 * Created by macro on 2020/9/15. */@Component@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")public interface MqttGateway { /** * 发送消息到默认topic */ void sendToMqtt(String payload); /** * 发送消息到指定topic */ void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic); /** * 发送消息到指定topic并设置QOS */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);}
/** * MQTT测试接口 * Created by macro on 2020/9/15. */@Api(tags = "MqttController", description = "MQTT测试接口")@RestController@RequestMapping("/mqtt")public class MqttController { @Autowired private MqttGateway mqttGateway; @PostMapping("/sendToDefaultTopic") @ApiOperation("向默认主题发送消息") public CommonResult sendToDefaultTopic(String payload) { mqttGateway.sendToMqtt(payload); return CommonResult.success(null); } @PostMapping("/sendToTopic") @ApiOperation("向指定主题发送消息") public CommonResult sendToTopic(String payload, String topic) { mqttGateway.sendToMqtt(payload, topic); return CommonResult.success(null); }}
2020-09-17 14:29:01.689 INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig : handleMessage : 来自网页上的消息2020-09-17 14:29:06.101 INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig : handleMessage : 来自网页上的消息2020-09-17 14:29:07.384 INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig : handleMessage : 来自网页上的消息
消息中间件应用愈来愈普遍,不只能够实现可靠的异步通讯,还能够实现即时通信,掌握一个消息中间件仍是颇有必要的。若是没有特殊业务需求,客户端或者前端直接使用MQTT对接消息中间件便可实现即时通信,有特殊需求的时候也可使用SpringBoot集成MQTT的方式来实现,总之消息中间件是实现即时通信的一个好选择!后端
关注公众号:java宝典
springboot