RabbitMQ和Kafka同样,都是消息中间件。RabbitMQ中有多个队列,每一个队列称之为一个Topic。生产者Producer首先须要链接到RabbitMQ中的指定队列,即在链接的时候指明消息发送到哪一个Topic中。消费者Consumer也须要链接到RabbitMQ,并指定订阅哪一个Topic,即从哪一个队列拉取消息进行消费。在Consumer中有一个回调函数,用于说明消费完这个消息以后该作什么事,一般就是向RabbitMQ发送一个ACK信号,表示消息已经被成功消费,能够从队列中删除了。若是消费者在处理消息的时候忽然宕机了,那么RabbitMQ中的队列依然存在,能够继续发送给其余消费者进行处理。python
只要消费者一直不断运行着(监听),那么生产者只要一发送消息到RabbitMQ中,就会被当即分发到对应的消费者中进行消费。此外,RabbitMQ中的消息还能够持久化到磁盘中。数据结构
在Ubuntu中安装RabbitMQ: sudo apt-get install rabbitmq-server函数
python程序要链接到RabbitMQ中,须要import pika,pika是AMQP协议的开源python实现:server
sudo pip install pika中间件
在RabbitMQ中,能够经过 sudo rabbitmqctl list_queues 命令查看到当前RabbitMQ中的队列及其对应的消息个数,<"Topic",num>,数据结构相似于Map。blog
消息发送端:rabbitmq
#!/usr/bin/env python #coding=utf8 import sys import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() channel.queue_declare(queue='libi',durable=True) message = ' '.join(sys.argv[1:]) or "bili111111111" channel.basic_publish(exchange='', routing_key='libi', body=message,properties=pika.BasicProperties(delivery_mode = 2,)) print "Sent %r" % (message,) connection.close()
消息消费端:队列
# !/usr/bin/env python # coding=utf8 import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() channel.queue_declare(queue='libi') def callback(ch, method, properties, body): print "Received %r" % (body,) time.sleep(5) print "done" channel.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume(callback, queue='libi', no_ack=False) channel.start_consuming()