1,生产者python
new_task.py函数
import pika if __name__ == '__main__': connection=pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel=connection.channel() channel.queue_declare("Kadima") message="You are awsome!" for i in range(0,100):#循环100次发送消息 channel.basic_publish(exchange="",routing_key='Kadima',body=message+" "+str(i)) print "sending ",message
2,多个消费者spa
消费者1,work.pycode
#-*- coding: UTF-8 -*- import time import pika import sys __author__ = 'Yue' var=0 def callback(ch, method, properties, body): # temp=var+1 #这里有趣的是不能写成var+=1或者var=var+1,要知道为何,就须要清楚“Python全局变量和局部变量” #global var #var+=1 #if var==20: #sys.exit() print "1 received %r" % (body,) time.sleep(body.count(".")) print "Done" if __name__ == '__main__': connection=pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel=connection.channel() channel.queue_declare("Kadima") channel.basic_consume(callback,queue="Kadima",no_ack=True) print ' [1] Waiting for messages' channel.start_consuming()
work2.pyit
import time import pika __author__ = 'Yue' def callback(ch, method, properties, body): print "2 received %r" % (body,) time.sleep(body.count(".")) print "Done" if __name__ == '__main__': connection=pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel=connection.channel() channel.queue_declare("Kadima") channel.basic_consume(callback,queue="Kadima",no_ack=True) print ' [2] Waiting for messages' channel.start_consuming()
3,执行work,work2,new_taskio
个人启动顺序是work,work2,从执行结果能够看出,RabbitMQ是将task分别依次分发给按照时间顺序注册的work上的,class
也就是,task1,task2,task3,task4,它会将task1,task3分发给work,另外两个分发给task3,task4import
接下来,有趣的事情就要发生了:变量
当把work.py中的callback函数的注释内容打开后(起做用是让work处理19个task,便退出程序),MQ并无将本该分发给work的task分发给work2,那到底去哪里了呢?我暂时假设为work退出时并无告诉MQ他不干了(他出现异常啦),MQ仍是会将task分发给workobject
4,那没有执行完的任务怎么办呢?
Message acknowledgment :ack默认是打开的
修改work代码以下
#-*- coding: UTF-8 -*- import time import pika import sys __author__ = 'Yue' var=0 def callback(ch, method, properties, body): print ch, method, properties, body # <pika.adapters.blocking_connection.BlockingChannel object at 0x02973BF0> <Basic. # Deliver(['consumer_tag=ctag1.8b367697d96c4579ba78914d8a4760a8', 'delivery_tag=50 # ', 'exchange=', 'redelivered=False', 'routing_key=Kadima'])> <BasicProperties> Y # ou are awsome! 98 temp=var+1 #这里有趣的是不能写成var+=1或者var=var+1,要知道为何,就须要清楚“Python全局变量和局部变量” global var var+=1 if var==20: print var , body sys.exit() print "1 received %r" % (body,) # time.sleep(0.1) print "Done" #设置返回ack的标志,method.delivery_tag是MQ分发给Work时的一个标记 ch.basic_ack(delivery_tag = method.delivery_tag) if __name__ == '__main__': connection=pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel=connection.channel() channel.queue_declare("Kadima") channel.basic_consume(callback,queue="Kadima") print ' [1] Waiting for messages' channel.start_consuming()
work2
def callback(ch, method, properties, body): print "2 received %r" % (body,) # time.sleep(0.1) print "Done" ch.basic_ack(delivery_tag = method.delivery_tag) if __name__ == '__main__': connection=pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel=connection.channel() channel.queue_declare("Kadima") channel.basic_consume(callback,queue="Kadima") print ' [2] Waiting for messages' channel.start_consuming()
因而可知,MQ会从新将没有执行,或执行失败的任务从新分发给存活的work2,并且,他的分发顺序也颇有趣,是在本来应该分发给work2的task执行结束后再去分发未执行的任务。‘
5,思考,若是在work2中
channel.basic_consume(callback,queue="Kadima",no_ack=True)
会出现什么状况。。。