发布端:函数
import pika import time credentials = pika.credentials.PlainCredentials('root', 'root',erase_on_connect =False) s_conn = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1',credentials=credentials)) # 建立链接 # 原则上,消息,只能有交换机传到队列。就像咱们家里面的交换机道理同样。 # 有多个设备链接到交换机,那么,这个交换机把消息发给那个设备呢,就是根据 # 交换机的类型来定。类型有:direct\topic\headers\fanout # fanout:这个就是,全部的设备都能收到消息,就是广播。 # 此处定义一个名称为'logs'的'fanout'类型的exchange chan = s_conn.channel() # 在链接上建立一个频道 # chan.queue_declare(queue='hello') # 声明一个队列,生产者和消费者都要声明一个相同的队列,用来防止万一某一方挂了,另外一方能正常运行 chan.exchange_declare(exchange='logs', exchange_type='fanout' ) while 1: time.sleep(1) # 将消息发送到名为log的exchange中 # 由于是fanout类型的exchange,因此无需指定routing_key ack = chan.basic_publish(exchange='logs', # 交换机 routing_key='', # 路由键,写明将消息发往哪一个队列 body='Ye:当前时间%s ' % time.strftime('%m-%d %H:%M:%S')) # 生产者要发送的消息 print("[生产者] send 'hello world") time.sleep(10) print(ack) s_conn.close() # 当生产者发送完消息后,可选择关闭
接收端:日志
import pika credentials = pika.credentials.PlainCredentials('root', 'root',erase_on_connect =False) s_conn = pika.BlockingConnection(pika.ConnectionParameters('39.106.205.106',credentials=credentials)) # 建立链接 chan = s_conn.channel() # 在链接上建立一个频道 chan.exchange_declare(exchange='logs', exchange_type='fanout' ) # chan.queue_declare(queue='hello') # 声明一个队列,生产者和消费者都要声明一个相同的队列,用来防止万一某一方挂了,另外一方能正常运行 # 相似的,好比log,咱们其实最想看的,当链接上的时刻到消费者退出,这段时间的日志 # 有些消息,过时了的对咱们并无什么用 # 而且,一个终端,咱们要收到队列的全部消息,好比:这个队列收到两个消息,一个终端收到一个。 # 咱们如今要作的是:两个终端都要收到两个 # 那么,咱们就只需作个临时队列。消费端断开后就自动删除 result = chan.queue_declare(queue='temp2', exclusive=True) # 取得队列名称 queue_name = result.method.queue # 将队列和交换机绑定一块儿 chan.queue_bind(exchange='logs', queue=queue_name ) def callback(ch, method, properties, body): # 定义一个回调函数,用来接收生产者发送的消息 print(ch, method, properties, ) print(body.decode('utf8')) chan.basic_consume( # 指定取消息的队列名, queue_name, callback, # 调用回调函数,从队列里取消息 # queue=, auto_ack=True ) # 取完一条消息后,给生产者发送确认消息,默认是False的,即 默认不给rabbitmq发送一个收到消息的确认 # 若是auto_ack=True则消失接收以后就会删除也就是只能取一次 print('[消费者] waiting for msg .') chan.start_consuming() # 开始循环取消息
多个接收端须要修改临时队列的名字,以防止冲突,会报错关于锁的错误code