send端fetch
import pika credentials = pika.PlainCredentials('zhou', '123456') connection = pika.BlockingConnection(pika.ConnectionParameters( host='10.0.1.14',credentials=credentials)) channel = connection.channel() #声明queue channel.queue_declare(queue='hello') #n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close()
receive端this
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() #You may ask why we declare the queue again ‒ we have already declared it in our previous code. # We could avoid that if we were sure that the queue already exists. For example if send.py program #was run before. But we're not yet sure which program to run first. In such cases it's a good # practice to repeat declaring the queue in both programs. channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(callback, queue='hello', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
消费一个消息可能会花好几秒。你可能会问,若是若是一个消费者启动了一个长的任务,消息发了一半而后死了。按咱们如今的代码,一旦rabbitmq传递了一个消息到给消费者,而后rabbitmq就迅速的将这个消息从内存里删除。在这种状况下,若是你kill了一个消费者,咱们就会失去了这个正在传递的消息。url
可是咱们不想失去这个消息。若是一个消费者死了,我想把这个任务传给另外一个消费者。spa
为了确保一个消息从不丢失,rabbitmq支持消息确认。消费者处理完信息后会向rabbitmq发送一个消息确认,这样rabbitmq就能够从内存里删除这个消息了。code
若是一个消费者死掉了,没有发ack,rabbitmq就会知道这个消息没有传递成功,就会把这个消息从新存到queue里。blog
若是此时有其它的消费者在线,rabbitmq就会迅速的将这个消息传递给其它的的消费者。这样就确保了消息没有丢失,即便这个消费者是偶尔挂了。rabbitmq
根本就没有超时时间一说,当那个消费者挂了,rabbitmq会从新转发那条消息。即便这个消息处理了很长时间都不要紧。队列
rabbitmq消息确认默认是开启的(no_ack=Fault)。在以前的例子里咱们显然用的no_ack=True。内存
生产者ssl
import pika credentials = pika.PlainCredentials('zhou', '123456') # connection = pika.BlockingConnection(pika.ConnectionParameters(host=url_1, # credentials=credentials, ssl=ssl, port=port)) connection = pika.BlockingConnection(pika.ConnectionParameters( host='10.0.1.14',credentials=credentials)) channel = connection.channel() # 声明queue,durable=True(队列持久化) channel.queue_declare(queue='alex3',durable=True) # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. channel.basic_publish(exchange='', routing_key='alex3', #send msg to this queue body='Hello World!23', properties=pika.BasicProperties( delivery_mode=2, # 消息持久化 ) ) print(" [x] Sent 'Hello World!2'") connection.close()
消费者
import pika import time credentials = pika.PlainCredentials('zhou', '123456') connection = pika.BlockingConnection(pika.ConnectionParameters( host='10.0.1.14',credentials=credentials)) channel = connection.channel() # You may ask why we declare the queue again ‒ we have already declared it in our previous code. # We could avoid that if we were sure that the queue already exists. For example if send.py program # was run before. But we're not yet sure which program to run first. In such cases it's a good # practice to repeat declaring the queue in both programs.
channel.queue_declare(queue='alex3',durable=True)#队列持久化 def callback(ch, method, properties, body): print(ch, method, properties) print(" [x] Received %r" % body) time.sleep(1) channel.basic_consume(callback, queue='alex3', no_ack=True ) channel.basic_qos(prefetch_count=1) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
若是Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,极可能出现,一个机器配置不高的消费者那里堆积了不少消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,能够在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。
channel.basic_qos(prefetch_count=1)
生产者
import pika,sys credentials = pika.PlainCredentials('zhou', '123456') # connection = pika.BlockingConnection(pika.ConnectionParameters(host=url_1, # credentials=credentials, ssl=ssl, port=port)) connection = pika.BlockingConnection(pika.ConnectionParameters( host='10.0.1.14',credentials=credentials)) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print(" [x] Sent %r" % message) connection.close()
消费者
import pika,time credentials = pika.PlainCredentials('zhou', '123456') # connection = pika.BlockingConnection(pika.ConnectionParameters(host=url_1, # credentials=credentials, ssl=ssl, port=port)) connection = pika.BlockingConnection(pika.ConnectionParameters( host='10.0.1.14',credentials=credentials)) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
当要采用广播发消息的时候就要用到exchange了。
exchange是一个很简单的东西,一端从生产者里接收消息,另外一端把消息推送到队列里。exchange必须清除的知道它从生产者里接收的消息要发送给谁。接收到的消息是应该被追加到指定的queue里?仍是应该追加到不少个queue里?或者是扔掉?这个规则都是根据exchange的类型来定义的。
fanout: 全部bind到此exchange的queue均可以接收消息
direct: 经过routingKey和exchange决定的那个惟一的queue能够接收消息
topic:全部符合routingKey(此时能够是一个表达式)的routingKey所bind的queue能够接收消息
表达式符号说明:#表明一个或多个字符,*表明任何字符
例:#.a会匹配a.a,aa.a,aaa.a等
*.a会匹配a.a,b.a,c.a等
注:使用RoutingKey为#,Exchange Type为topic的时候至关于使用fanout
headers: 经过headers 来决定把消息发给哪些queue
send端
import pika import sys credentials = pika.PlainCredentials('zhou', '123456') connection = pika.BlockingConnection(pika.ConnectionParameters( host='10.0.1.14',credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()
recv端
import pika credentials = pika.PlainCredentials('zhou', '123456') connection = pika.BlockingConnection(pika.ConnectionParameters( host='10.0.1.14',credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='logs',type='fanout') result = channel.queue_declare(exclusive=True) # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除 queue_name = result.method.queue channel.queue_bind(exchange='logs',queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, ) channel.start_consuming()
send端
import pika import sys credentials = pika.PlainCredentials('zhou', '123456') connection = pika.BlockingConnection(pika.ConnectionParameters( host='10.0.1.14',credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='direct_logs',type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, #error body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
recv端
import pika import sys credentials = pika.PlainCredentials('zhou', '123456') connection = pika.BlockingConnection(pika.ConnectionParameters( host='10.0.1.14',credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='direct_logs',type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: #[error info warning] channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback,queue=queue_name,) channel.start_consuming()
send端
import pika import sys credentials = pika.PlainCredentials('zhou', '123456') connection = pika.BlockingConnection(pika.ConnectionParameters( host='10.0.1.14',credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='topic_logs',type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()
recv端
import pika import sys credentials = pika.PlainCredentials('zhou', '123456') connection = pika.BlockingConnection(pika.ConnectionParameters( host='10.0.1.14',credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='topic_logs',type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback,queue=queue_name) channel.start_consuming()