MQTT研究之EMQ:【eclipse的paho之java客户端使用注意事项】

这里,简单记录一下本身在最近项目中遇到的paho的心得,这里也涵盖EMQX的问题。服务器

 

1. cleanSessionsession

这个标识,是确保client和server之间是否持久化状态的一个标志,不论是client仍是server重启仍是链接断掉。下面是来自paho客户端源码的注释。并发

Sets whether the client and server should remember state across restarts and reconnects.
  • If set to false both the client and server will maintain state across restarts of the client, the server and the connection. As state is maintained:
    • Message delivery will be reliable meeting the specified QOS even if the client, server or connection are restarted.
    • The server will treat a subscription as durable.
  • If set to true the client and server will not maintain state across restarts of the client, the server or the connection. This means
    • Message delivery to the specified QOS cannot be maintained if the client, server or connection are restarted
    • The server will treat a subscription as non-durable

1)。 这个标志位,设置为true,那么,当链接断掉,例如,调用EMQX的接口踢掉链接,此时,即使重连上了(不管是经过autoconnect设置为true,仍是在connectonLost这个回调函数里面配置上重连的逻辑),MQTT客户端程序都是没法进行从新订阅数据的。这个行为,说明session里面保存了会话所采用的topic信息。函数

2)。这个标志位,设置为true,autoconnect设置为false,在connectLost这个回调函数里面,自行实现从新链接的逻辑,而且再次针对相同的topic和qos进行订阅的话,当链接被踢掉,这个时候,会从新链接上,而且也会订阅上数据,只是会出现很奇怪的现象,CPU占用率比链接断开前提升不少。 个人应用(订阅到数据后,对数据进行相应的逻辑处理,正常状况下,一条数据大概1~5ms处理完)压测环境下,链接未断前,1.3W的并发,CPU空闲率在40%左右,重连以后,CPU的空闲率只有10%左右,这个地方是个大坑,目前我尚未搞清楚究竟是什么缘由致使,如有人遇到相似问题同仁,请给我留言,告知可能的缘由。(个人paho是1.2.0版本,EMQX:V3.1.1)性能

3)。这个标志位,设置为false,autoconnect设置为false,在connectLost这个回调函数里面,自行实现重连的逻辑,可是不对topic进行从新订阅,即使链接断掉,从新链接上的话,依然会进行链接断开以前的业务逻辑,订阅到所需的数据,CPU的负荷也不会变大,基本和断开以前的状态持平。测试

 

下面配上connectLost这个回调函数(MqttCallback接口的一个方法)相关代码:this

public void connectionLost(Throwable cause) {
        // 链接丢失后,通常在这里面进行重连
        System.out.println(">>>>>>>>>>>>>>>" + cause.getMessage());
        System.out.println("链接断开,能够作重连");
        for (int i = 0; i < 3; i++) {
            if(reconnect()) {
                break;
            }else{
                try {
                    Thread.sleep(i * 2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    private boolean reconnect() {
        try {
            mqttClient.connect(mqttConnectOptions);
            Thread.sleep(100);
            if( mqttClient.isConnected() ) {
                //mqttClient.subscribe(this.topic, 0);
                return true;
            }
        } catch (MqttException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return  false;
    }

 

2.  EMQX的承压能力spa

网上标榜EMQX单节点处理能力多么牛逼,100W链接毫无压力,这个数值,其实呢,我以为要仔细看测试场景,单单看链接数,其实没有什么意义,要看生产者消费者都存在的状况下,还有数据流通这种场景,链接能力或者数据处理能力如何。 我不是说100W链接能力是虚构的,我是想说纯粹的链接其实没有多大的价值,由于EMQX是消息总线,只有链接,不存在数据流动,有多大意义呢?设计

仍是接着我上面的应用压测,咱们团队开发的一个规则引擎,1.6W的消息并发(4000设备,每一个设备每秒4条消息,固然是程序模拟出来的),规则引擎4C16G的服务器2台,每台跑3个实例,共享订阅两个EMQX节点(EMQX是集群),EMQX服务器配置4C16G。结果跑不了多久时间(1个小时不到,有时半个小时),就会出现EMQX平凡踢掉消费者链接的状况。rest

2019-09-19 14:22:42.710 [error] 0ba45c9872464c609c150f156e3f2a7e@10.95.198.25:52388 [Connection] Shutdown exceptionally due to message_queue_too_long
2019-09-19 14:22:55.746 [error] 4014030aed1642bba6ecec85debed172@10.95.198.26:60404 [Connection] Shutdown exceptionally due to message_queue_too_long
2019-09-19 14:23:08.131 [error] dde7f075ab2d45fdabfd192b5c6a4a30@10.95.198.25:52394 [Connection] Shutdown exceptionally due to message_queue_too_long
2019-09-19 14:23:14.374 [error] 72a25aca01164c8c8b4cf48451c4e316@10.95.198.25:52456 [Connection] Shutdown exceptionally due to message_queue_too_long
2019-09-19 14:23:41.686 [error] cd56963bfb4e4c0c8275abe9a24078de@10.95.198.26:60462 [Connection] Shutdown exceptionally due to message_queue_too_long
2019-09-19 14:23:52.638 [error] 4014030aed1642bba6ecec85debed172@10.95.198.26:60514 [Connection] Shutdown exceptionally due to message_queue_too_long
2019-09-19 14:24:06.015 [error] dde7f075ab2d45fdabfd192b5c6a4a30@10.95.198.25:52496 [Connection] Shutdown exceptionally due to message_queue_too_long
2019-09-19 14:24:13.541 [error] 0ba45c9872464c609c150f156e3f2a7e@10.95.198.25:52474 [Connection] Shutdown exceptionally due to message_queue_too_long

针对这个问题,我咨询过青云的EMQ团队的工程师,也在Github上咨询过EMQX的维护者,都反馈说是消费者处理速度太慢,emq的消息队列消息堆积致使。现象如此,怎么解决呢,彷佛只能添加消费者服务,或者下降消息压力,EMQX可否提高性能呢?我以为EMQX如今共享订阅的能力不行,就这4000个链接投递消息,1.6W的并发,4000个topic,采用共享订阅的方式,性能感受不是很好,是咱们程序设计的有问题,仍是EMQX共享订阅性能真的有待提高?为何这么说能,咱们测试过非共享订阅,就是明确订阅某个指定topic。非共享订阅状况下,相同的服务器上,比共享订阅性能好不少不少(差很少一半)。。。(欢迎探讨)

 

从EMQX的配置中,针对上面这种消息队列太长的问题,emqx.conf的配置文件中有相关信息,参考下面这个错误找到了相关的配置参数,EMQX的官方参数解释或者支持真心跟不上,没有国外开源组织社区营造的好,这个须要努力。

2019-09-19 14:22:30.362 [error] f2ac199b0314449d822e150c8d51de93 crasher:
    initial call: emqx_session:init/1
    pid: <0.20141.1>
    registered_name: []
    exception exit: killed
      in function  emqx_session:handle_info/2 (src/emqx_session.erl, line 641)
      in call from gen_server:try_dispatch/4 (gen_server.erl, line 637)
      in call from gen_server:handle_msg/6 (gen_server.erl, line 711)
    ancestors: [emqx_session_sup,emqx_sm_sup,emqx_sup,<0.1386.0>]
    message_queue_len: 0
    messages: []
    links: [<0.1577.0>]
    dictionary: [{force_shutdown_policy,
                      #{max_heap_size => 838860800,message_queue_len => 8000}},
                  {deliver_stats,676193},
                  {'$logger_metadata$',
                      #{client_id => <<"f2ac199b0314449d822e150c8d51de93">>}}]
    trap_exit: true
    status: running
    heap_size: 6772
    stack_size: 27
    reductions: 69920965
  neighbours:

再看看emqx.conf的配置文件中,和这个queue相关的配置:

## Max message queue length and total heap size to force shutdown
## connection/session process.
## Message queue here is the Erlang process mailbox, but not the number
## of queued MQTT messages of QoS 1 and 2.
##
## Numbers delimited by `|'. Zero or negative is to disable.
##
## Default:
##   - 8000|800MB on ARCH_64 system
##   - 1000|100MB on ARCH_32 sytem
## zone.external.force_shutdown_policy = 8000|800MB

有人会说,你能够将这里的消息数量调大点啊,没错,这个调一下是能够改善,可是不能根治问题,本身想一想吧,大点最多也就是对消息速率波动的韧性加大了,可是不能解决持续生成高于所谓的消费慢这种状况下的问题。 EMQ方说辞其实,在咱们的这个场景下,我是不那么认同的,为何这么说呢, 个人规则引擎消费日志里面显示,每条消息处理的时间并无变长,CPU的忙碌程度并无恶化, 添加共享订阅实例变多,EMQX性能降低了,我以为EMQX在共享订阅变多的状况下,对消费者端投递消息的速率或者效率降低了,可是呢,EMQX这个broker从消息生产者这边接收消息的能力没有改变,致使EMQX的消息队列消息积压,最终出现踢链接的policy得以执行。。。

 

还有一个问题,不知道细心的读者有没有发现,消费者这边消息消费的好好的,消息积压了,EMQX为什么要把消费者的链接给踢掉呢,为什么不是将生产者的链接踢掉呢?这个逻辑我以为有点不是很好理解,原本消息就积压了,是否是要加快消费才能缓解或者解除消息积压的问题?读者大家是如何理解的,也能够留言探讨!

相关文章
相关标签/搜索