RabbitMQ

RabbitMQ和Kafka同样,都是消息中间件。RabbitMQ中有多个队列,每一个队列称之为一个Topic。生产者Producer首先须要链接到RabbitMQ中的指定队列,即在链接的时候指明消息发送到哪一个Topic中。消费者Consumer也须要链接到RabbitMQ,并指定订阅哪一个Topic,即从哪一个队列拉取消息进行消费。在Consumer中有一个回调函数,用于说明消费完这个消息以后该作什么事,一般就是向RabbitMQ发送一个ACK信号,表示消息已经被成功消费,能够从队列中删除了。若是消费者在处理消息的时候忽然宕机了,那么RabbitMQ中的队列依然存在,能够继续发送给其余消费者进行处理。python

只要消费者一直不断运行着(监听),那么生产者只要一发送消息到RabbitMQ中,就会被当即分发到对应的消费者中进行消费。此外,RabbitMQ中的消息还能够持久化到磁盘中。数据结构

在Ubuntu中安装RabbitMQ:   sudo apt-get install rabbitmq-server函数

python程序要链接到RabbitMQ中,须要import pika,pika是AMQP协议的开源python实现:server

  sudo pip install pika中间件

在RabbitMQ中,能够经过 sudo rabbitmqctl list_queues 命令查看到当前RabbitMQ中的队列及其对应的消息个数,<"Topic",num>,数据结构相似于Map。blog

 

消息发送端:rabbitmq

#!/usr/bin/env python
#coding=utf8
import sys
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()

channel.queue_declare(queue='libi',durable=True)
message = ' '.join(sys.argv[1:]) or "bili111111111"
channel.basic_publish(exchange='', routing_key='libi', body=message,properties=pika.BasicProperties(delivery_mode = 2,))
print "Sent %r" % (message,)
connection.close()

  

消息消费端:队列

# !/usr/bin/env python
# coding=utf8
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))
channel = connection.channel()
channel.queue_declare(queue='libi')

def callback(ch, method, properties, body):
    print "Received %r" % (body,)
    time.sleep(5)
    print "done"
    channel.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(callback, queue='libi', no_ack=False)
channel.start_consuming()
相关文章
相关标签/搜索