direct分发机制会根据分发关键字(routing_key),将task分发到指定的queue,work只须要监听相应的queue便可,在代码中,须要设置相应的routing_keypython
fanout机制相反,他会将task分发给全部的queuespa
fanout模式:code
emit_log.pyit
# -*- coding: UTF-8 -*- import pika if __name__ == '__main__': connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel = connection.channel() channel.exchange_declare(exchange="logs2",type="direct") message = "You are awsome!" for i in range(0, 100): # 循环100次发送消息 if i%2==0: channel.basic_publish(exchange="logs2", routing_key='even', body=message + " " + str(i),) else: channel.basic_publish(exchange="logs2", routing_key='old', body=message + " " + str(i),) print "sending ", message #两个receive_log 都将接收到task
receive_log.pyio
pika __author__ = callback(, , , body): body __name__ == : connection=pika.BlockingConnection(pika.ConnectionParameters()) channel=connection.channel() channel.exchange_declare(=,=) result=channel.queue_declare(=) queue_name=result..queue ,queue_name channel.queue_bind(=,=queue_name) channel.basic_consume(callback,=queue_name,=) channel.start_consuming()
receive_log2.py
class
pika __author__ = callback(, , , body): body __name__ == : connection=pika.BlockingConnection(pika.ConnectionParameters()) channel=connection.channel() channel.exchange_declare(=,=) result=channel.queue_declare(=) queue_name=result.method.queue ,queue_name channel.queue_bind(=,=queue_name) channel.basic_consume(callback,=queue_name,=) channel.start_consuming()
能够看出两个work均接受到全部的消息import
direct模式:coding
work的代码只须要将上述代码中的type改成type="direct",并绑定不一样的exchange便可,循环
pika __author__ = __name__ == : connection = pika.BlockingConnection(pika.ConnectionParameters()) channel = connection.channel() channel.exchange_declare(=,=) message = i (, ): i%==: channel.basic_publish(=, =, =message + + (i),) : channel.basic_publish(=, =, =message + + (i),) , message
receive_even_log.pychannel
pika __author__ = callback(, , , body): body __name__ == : connection=pika.BlockingConnection(pika.ConnectionParameters()) channel=connection.channel() channel.exchange_declare(=,=) result=channel.queue_declare(=) queue_name=result..queue ,queue_name channel.queue_bind(=,=queue_name,=) channel.basic_consume(callback,=queue_name,=) channel.start_consuming()
receive_old_log.py
pika __author__ = callback(, , , body): body __name__ == : connection=pika.BlockingConnection(pika.ConnectionParameters()) channel=connection.channel() channel.exchange_declare(=,=) result=channel.queue_declare(=) queue_name=result.method.queue ,queue_name channel.queue_bind(=,=queue_name,=) channel.basic_consume(callback,=queue_name,=) channel.start_consuming()
从结果中看出:task只分发给了相应的queue