以前只是用celery, 此次用一下pikahtml
参考rabbitMQ官网的python版,https://www.rabbitmq.com/tutorials/tutorial-one-python.htmlpython
没想到各类坑.sql
若是说rabbitMQ官网是为了让新人入门,因此刻意忽略掉细节, 那么必须吐槽pika的官方文档, 很很差.远不如celeryjson
使用pika 的BlockingConnection服务器
raise self._closed_result.value.error
pika.exceptions.StreamLostError: Stream connection lost: BrokenPipeError(32, 'Broken pipe')app
根据http://www.javashuo.com/article/p-oglbiqik-d.htmlasync
是要在链接时设置心跳为0,就不会超时自动下线了, 不然RabbitMQ服务器会发过来默认值580函数
#--------------rabbitMQ------------------ import pika connection = pika.BlockingConnection( pika.ConnectionParameters( host='localhost', heartbeat=0, #never exit after start )) channel = connection.channel() channel.queue_declare(queue='update_sql')
这个错误在测试消费端时没测出来,由于测试使用的发布者和官方文档里同样,发完就下线退出了. 这样固然看不出这个心跳问题.测试
可是联调时就暴露了. 真无语.fetch
默认的body是二进制的. 而后消费端要
channel.basic_publish('exchange_name', 'routing_key', 'Test Message', pika.BasicProperties(content_type='text/plain', type='example'))
这彷佛时能够发文本的吗?
而后,看见别人还能够这么写https://blog.csdn.net/fzlulee/article/details/98480724
self.channel.basic_publish(exchange=exchange, routing_key=routing_key, body=message,
properties=pika.BasicProperties(delivery_mode=2,message_id=message_id,content_type="application/json"))
彷佛就是html请求头常见的写法了? 可是pika里没有对BasicProperties的详细文档,
,源码里也看不出注释https://pika.readthedocs.io/en/stable/_modules/pika/spec.html#BasicProperties
ack防止消费者出问题, durable防止rabbitMQ服务器自己出问题
因此ack在消费端定义
channel.basic_consume(queue='update_sql', auto_ack=False, on_message_callback=callback)
而durable在channel里队列声明里 在 生产端,消费端都要统一声明队列
channel.queue_declare(queue='update_sql', durable=True, exclusive=False, auto_delete=False)
引用 https://blog.csdn.net/hlxx55/article/details/80964440
ack
rabbitMQ是默认开启自动应答的,这样当rabbitMQ将消息发给消费者,就会从内存中将消息删除,这样会带来一个问题,若是消费者未处理完消息而宕机,那么消息就会丢失。因此,咱们将自动应答关闭,当rabbitMQ收到消费者处理完消息的回应后才会从内存中删除消息。
durable
rabbitMQ默认将消息存储在内存中,若rabbitMQ宕机,那么全部数据就会丢失,因此在声明队列的时候能够声明将数据持久化,可是若是已经声明了一个未持久化的队列,那么不能修改,只能将这个队列删除或从新声明一个持久化数据。
只在消费者这里加上basic_qos就能够了
connection = pika.BlockingConnection( pika.ConnectionParameters( host= self.HOST_RABBITMQ, heartbeat = 0, #never exit after start )) channel = connection.channel() #durable 队列中消息持久化 #exclusive (bool) – Don’t allow other consumers on the queue #./ exchange 不支持 exclusive channel.queue_declare(queue='update_sql', durable=True, exclusive=False, auto_delete=False) #1次1条消息 channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='update_sql', auto_ack=False, #不自动确认 在callback最后确认 等于 no_ack on_message_callback=self.callback) print(' [*] wg-Executor waiting for sql cmds. To exit press CTRL+C') channel.start_consuming()
此外,在消费者的callback函数里,
最好在最外层用 异常处理包裹起来,确保不管执行结果如何,都在finally里执行ack
try:
except:
else:
finally: