报警机器人常常有以下警告:python
<27>1 2018-xx-xxT06:59:03.038Z 660ece0ebaad admin/admin 14 - - Socket Error: 104 <31>1 2018-xx-xxT06:59:03.038Z 660ece0ebaad admin/admin 14 - - Removing timeout for next heartbeat interval <28>1 2018-xx-xxT06:59:03.039Z 660ece0ebaad admin/admin 14 - - Socket closed when connection was open <31>1 2018-xx-xxT06:59:03.039Z 660ece0ebaad admin/admin 14 - - Added: {'callback': <bound method SelectConnection._on_connection_start of <pika.adapters.select_connection.SelectConnection object at 0x7f74752525d0>>, 'only': None, 'one_shot': True, 'arguments': None, 'calls': 1} <28>1 2018-xx-xxT06:59:03.039Z 660ece0ebaad admin/admin 14 - - Disconnected from RabbitMQ at xx_host:5672 (0): Not specified <31>1 2018-xx-xxT06:59:03.039Z 660ece0ebaad admin/admin 14 - - Processing 0:_on_connection_closed <31>1 2018-xx-xxT06:59:03.040Z 660ece0ebaad admin/admin 14 - - Calling <bound method _CallbackResult.set_value_once of <pika.adapters.blocking_connection._CallbackResult object at 0x7f74752513f8>> for "0:_on_connection_closed"
有日志就很好办, 首先看日志在哪里打的. 从三个地方入手.linux
没有.nginx
root@660ece0ebaad:/# uwsgi --version
2.0.14
从github上co下来, 没有.git
在容器中执行github
>>> import sys >>> sys.path ['', '/usr/lib/python2.7', '/usr/lib/python2.7/plat-x86_64-linux-gnu', '/usr/lib/python2.7/lib-tk', '/usr/lib/python2.7/lib-old', '/usr/lib/python2.7/lib-dynload', '/usr/local/lib/python2.7/dist-packages', '/usr/lib/python2.7/dist-packages', '/usr/lib/python2.7/dist-packages/PILcompat', '/usr/lib/python2.7/dist-packages/gtk-2.0']
在这些目录下grep, 在pika中找到docker
root@660ece0ebaad:/usr/local/lib/python2.7# grep "Socket Error" -R . Binary file ./dist-packages/pika/adapters/base_connection.pyc matches ./dist-packages/pika/adapters/base_connection.py: LOGGER.error("Fatal Socket Error: %r", error_value) ./dist-packages/pika/adapters/base_connection.py: LOGGER.error("Socket Error: %s", error_code)
肯定pika版本.json
>>> import pika >>> pika.__version__ '0.10.0'
经过代码能够看到, Socket Error是errno的错误码, 肯定错误码含义是对端发送了RST.python2.7
>>> import errno >>> errno.errorcode[104] 'ECONNRESET'
怀疑rabbitmq server地址错误, 一个未listen的端口是会返回RST的, 验证后发现不是.
接着怀疑连接超时断开未通知客户端之类. 看rabbitmq server日志, 发现大量:socket
=ERROR REPORT==== 7-Dec-2018::20:43:18 === closing AMQP connection <0.9753.18> (172.17.0.19:27542 -> 192.168.44.112:5672): missed heartbeats from client, timeout: 60s -- =ERROR REPORT==== 7-Dec-2018::20:43:18 === closing AMQP connection <0.9768.18> (172.17.0.19:27544 -> 192.168.44.112:5672): missed heartbeats from client, timeout: 60s
发现rabbitmq server和 admin docker的连接已经所有断开函数
root@xxxxxxx:/home/dingxinglong# netstat -nap | grep 5672 | grep "172.17.0.19"
那么, 为何rabbitmq server会踢掉 pika创建的连接呢? 看pika代码注释:
:param int heartbeat_interval: How often to send heartbeats. Min between this value and server's proposal will be used. Use 0 to deactivate heartbeats and None to accept server's proposal.
咱们没有传入心跳间隔, 理论上应该使用服务端默认的60S. 实际上, 客户端历来没有发出过心跳包. 因而继续看代码:
经过打印, 确认了HeartbeatChecker对象成功建立, 也成功地建立了timer, 可是timer历来没有回调过.
从代码一路跟下去, 咱们用的是blocking_connections, 在其add_timeout注释中看到:
def add_timeout(self, deadline, callback_method): """Create a single-shot timer to fire after deadline seconds. Do not confuse with Tornado's timeout where you pass in the time you want to have your callback called. Only pass in the seconds until it's to be called. NOTE: the timer callbacks are dispatched only in the scope of specially-designated methods: see `BlockingConnection.process_data_events` and `BlockingChannel.start_consuming`. :param float deadline: The number of seconds to wait to call callback :param callable callback_method: The callback method with the signature callback_method()
timer的触发要靠process_data_events, 而咱们没有调用. 因此客户端的heartbeat历来没被触发. 简单地将heartbeat关掉以解决这个问题.
调用代码以下: 没有跑main_loop, 故, 没处理 rabbitmq_server的FIN包, 没法跟踪连接状态.
一路跟踪basic_publish接口的代码.
在发送时, 收到RST, 最终跑到 base_connection.py:452, _handle_error函数中打印socket_error.
def connect_mq(): mq_conf = xxxxx connection = pika.BlockingConnection( pika.ConnectionParameters(mq_conf['host'], int(mq_conf['port']), mq_conf['path'], pika.PlainCredentials(mq_conf['user'], mq_conf['pwd']), heartbeat_interval=0)) channel = connection.channel() channel.exchange_declare(exchange=xxxxx, type='direct', durable=True) return channel channel = connect_mq() def notify_xxxxx(): global channel def _publish(product): channel.basic_publish(exchange=xxxxx, routing_key='xxxxx', body=json.dumps({'msg': 'xxxxx'}))