RabbitMQ实现即时通信竟然如此简单!

有时候咱们的项目中会用到即时通信功能,好比电商系统中的客服聊天功能,还有在支付过程当中,当用户支付成功后,第三方支付服务会回调咱们的回调接口,此时咱们须要通知前端支付成功。最近发现RabbitMQ能够很方便的实现即时通信功能,若是你没有特殊的业务需求,甚至能够不写后端代码,今天给你们讲讲如何使用RabbitMQ来实现即时通信!javascript

MQTT协议

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的轻量级通信协议,该协议构建于TCP/IP协议上。MQTT最大优势在于,能够以极少的代码和有限的带宽,为链接远程设备提供实时可靠的消息服务。html

ThirdPartyImage_ae54bd97.png

MQTT相关概念

  • Publisher(发布者):消息的发出者,负责发送消息。
  • Subscriber(订阅者):消息的订阅者,负责接收并处理消息。
  • Broker(代理):消息代理,位于消息发布者和订阅者之间,各种支持MQTT协议的消息中间件均可以充当。
  • Topic(主题):能够理解为消息队列中的路由,订阅者订阅了主题以后,就能够收到发送到该主题的消息。
  • Payload(负载);能够理解为发送消息的内容。
  • QoS(消息质量):全称Quality of Service,即消息的发送质量,主要有QoS 0、QoS 一、QoS 2三个等级,下面分别介绍下: QoS 0(Almost Once):至多一次,只发送一次,会发生消息丢失或重复; QoS 1(Atleast Once):至少一次,确保消息到达,但消息重复可能会发生; QoS 2(Exactly Once):只有一次,确保消息只到达一次。

RabbitMQ启用MQTT功能

RabbitMQ启用MQTT功能,须要先安装然RabbitMQ而后再启用MQTT插件。前端

  • 首先咱们须要安装并启动RabbitMQ,对RabbitMQ不了解的朋友能够参考《花了3天总结的RabbitMQ实用技巧,有点东西!》;
  • 接下来就是启用RabbitMQ的MQTT插件了,默认是不启用的,使用以下命令开启便可;
rabbitmq-plugins enable rabbitmq_mqtt
  • 开启成功后,查看管理控制台,咱们能够发现MQTT服务运行在1883端口上了。

ThirdPartyImage_af39b4a1.png

MQTT客户端

咱们可使用MQTT客户端来测试MQTT的即时通信功能,这里使用的是MQTTBox这个客户端工具。java

ThirdPartyImage_8e80c3e9.png

  • 点击Create MQTT Client按钮来建立一个MQTT客户端;

ThirdPartyImage_d91430ae.png

  • 接下来对MQTT客户端进行配置,主要是配置好协议端口、链接用户名密码和QoS便可;

ThirdPartyImage_3f0d8691.png

  • 再配置一个订阅者,订阅者订阅testTopicA这个主题,咱们会向这个主题发送消息;

ThirdPartyImage_55cad197.png

  • 发布者向主题中发布消息,订阅者能够实时接收到。

ThirdPartyImage_15d5058b.png

前端直接实现即时通信

既然MQTTBox客户端能够直接经过RabbitMQ实现即时通信,那咱们是否是直接使用前端技术也能够实现即时通信?答案是确定的!下面咱们将经过html+javascript实现一个简单的聊天功能,真正不写一行后端代码实现即时通信!git

  • 因为RabbitMQ与Web端交互底层使用的是WebSocket,因此咱们须要开启RabbitMQ的MQTT WEB支持,使用以下命令开启便可;
rabbitmq-plugins enable rabbitmq_web_mqtt
  • 开启成功后,查看管理控制台,咱们能够发现MQTT的WEB服务运行在15675端口上了;

ThirdPartyImage_b4e2b561.png

ThirdPartyImage_7a1a1d42.png

  • 实现的功能很是简单,一个单聊功能,须要注意的是配置好MQTT服务的访问地址为:ws://localhost:15675/ws
Title

目标Topic:发送消息:发送 清空github

``web

ThirdPartyImage_f92103e3.png

在SpringBoot中使用

没有特殊业务需求的时候,前端能够直接和RabbitMQ对接实现即时通信。可是有时候咱们须要经过服务端去通知前端,此时就须要在应用中集成MQTT了,接下来咱们来说讲如何在SpringBoot应用中使用MQTT。spring

  • 首先咱们须要在pom.xml中添加MQTT相关依赖;
org.springframework.integration    spring-integration-mqtt
  • 在application.yml中添加MQTT相关配置,主要是访问地址、用户名密码、默认主题信息;
rabbitmq:  mqtt:    url: tcp://localhost:1883    username: guest    password: guest    defaultTopic: testTopic
  • 编写一个Java配置类从配置文件中读取配置便于使用;
/** * MQTT相关配置 * Created by macro on 2020/9/15. */@Data@EqualsAndHashCode(callSuper = false)@Component@ConfigurationProperties(prefix = "rabbitmq.mqtt")public class MqttConfig {    /**     * RabbitMQ链接用户名     */    private String username;    /**     * RabbitMQ链接密码     */    private String password;    /**     * RabbitMQ的MQTT默认topic     */    private String defaultTopic;    /**     * RabbitMQ的MQTT链接地址     */    private String url;}
  • 添加MQTT消息订阅者相关配置,使用@ServiceActivator注解声明一个服务激活器,经过MessageHandler来处理订阅消息;
/** * MQTT消息订阅者相关配置 * Created by macro on 2020/9/15. */@Slf4j@Configurationpublic class MqttInboundConfig {    @Autowired    private MqttConfig mqttConfig;    @Bean    public MessageChannel mqttInputChannel() {        return new DirectChannel();    }    @Bean    public MessageProducer inbound() {        MqttPahoMessageDrivenChannelAdapter adapter =                new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl(), "subscriberClient",                        mqttConfig.getDefaultTopic());        adapter.setCompletionTimeout(5000);        adapter.setConverter(new DefaultPahoMessageConverter());        //设置消息质量:0->至多一次;1->至少一次;2->只有一次        adapter.setQos(1);        adapter.setOutputChannel(mqttInputChannel());        return adapter;    }    @Bean    @ServiceActivator(inputChannel = "mqttInputChannel")    public MessageHandler handler() {        return new MessageHandler() {            @Override            public void handleMessage(Message> message) throws MessagingException {                //处理订阅消息                log.info("handleMessage : {}",message.getPayload());            }        };    }}
  • 添加MQTT消息发布者相关配置;
/** * MQTT消息发布者相关配置 * Created by macro on 2020/9/15. */@Configurationpublic class MqttOutboundConfig {    @Autowired    private MqttConfig mqttConfig;    @Bean    public MqttPahoClientFactory mqttClientFactory() {        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();        MqttConnectOptions options = new MqttConnectOptions();        options.setServerURIs(new String[] { mqttConfig.getUrl()});        options.setUserName(mqttConfig.getUsername());        options.setPassword(mqttConfig.getPassword().toCharArray());        factory.setConnectionOptions(options);        return factory;    }    @Bean    @ServiceActivator(inputChannel = "mqttOutboundChannel")    public MessageHandler mqttOutbound() {        MqttPahoMessageHandler messageHandler =                new MqttPahoMessageHandler("publisherClient", mqttClientFactory());        messageHandler.setAsync(true);        messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());        return messageHandler;    }    @Bean    public MessageChannel mqttOutboundChannel() {        return new DirectChannel();    }}
  • 添加MQTT网关,用于向主题中发送消息;
/** * MQTT网关,经过接口将数据传递到集成流 * Created by macro on 2020/9/15. */@Component@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")public interface MqttGateway {    /**     * 发送消息到默认topic     */    void sendToMqtt(String payload);    /**     * 发送消息到指定topic     */    void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);    /**     * 发送消息到指定topic并设置QOS     */    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);}
  • 添加MQTT测试接口,使用MQTT网关向特定主题中发送消息;
/** * MQTT测试接口 * Created by macro on 2020/9/15. */@Api(tags = "MqttController", description = "MQTT测试接口")@RestController@RequestMapping("/mqtt")public class MqttController {    @Autowired    private MqttGateway mqttGateway;    @PostMapping("/sendToDefaultTopic")    @ApiOperation("向默认主题发送消息")    public CommonResult sendToDefaultTopic(String payload) {        mqttGateway.sendToMqtt(payload);        return CommonResult.success(null);    }    @PostMapping("/sendToTopic")    @ApiOperation("向指定主题发送消息")    public CommonResult sendToTopic(String payload, String topic) {        mqttGateway.sendToMqtt(payload, topic);        return CommonResult.success(null);    }}
  • 调用接口向主题中发送消息进行测试;

ThirdPartyImage_cb4af000.png

  • 后台成功接收到消息并进行打印。
2020-09-17 14:29:01.689  INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig   : handleMessage : 来自网页上的消息2020-09-17 14:29:06.101  INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig   : handleMessage : 来自网页上的消息2020-09-17 14:29:07.384  INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig   : handleMessage : 来自网页上的消息

总结

消息中间件应用愈来愈普遍,不只能够实现可靠的异步通讯,还能够实现即时通信,掌握一个消息中间件仍是颇有必要的。若是没有特殊业务需求,客户端或者前端直接使用MQTT对接消息中间件便可实现即时通信,有特殊需求的时候也可使用SpringBoot集成MQTT的方式来实现,总之消息中间件是实现即时通信的一个好选择!后端

关注公众号:java宝典
aspringboot

相关文章
相关标签/搜索