MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通讯方法。应用程序经过读写出入队列的消息(针对应用程序的数据)来通讯,而无需专用链接来连接它们。消息传递指的是程序之间经过在消息中发送数据进行通讯,而不是经过直接调用彼此来通讯,直接调用一般是用于诸如远程过程调用的技术。排队指的是应用程序经过 队列来通讯。队列的使用除去了接收和发送应用程序同时执行的要求。其中较为成熟的MQ产品有IBM WEBSPHERE MQ等等。python
与如下二者不一样的是,rabbitMq 能够跨进程。socket
线程queue:同一进程下不一样线程之间的交互。函数
进程queue:父进程与子进程进行交互,或者同属于同一父进程下多个子进程进行交互。不一样进程之间不能交互。spa
RabbitMQ能够同时维护不少的队列,生产者能够把消息放到不一样的队列发送给消费者。线程
send端blog
import pika #至关于声明一个socket conn = pika.BlockingConnection(pika.ConnectionParameters('localhost')) #声明一个管道 channel = conn.channel() #声明queue channel.queue_declare(queue='hello') channel.basic_publish(exchange='',routing_key='hello',body='Hello World!') # routing_key 消息的key body 消息的内容 print('Sent "hello world"') conn.close()
运行结果队列
Sent "hello world"
receive端进程
import pika #至关于声明一个socket conn = pika.BlockingConnection(pika.ConnectionParameters('localhost')) #声明一个管道 channel = conn.channel() #声明queue 这里能够不用声明,可是若是消费者先运行,又不但愿出错,就要消费者先运行 channel.queue_declare(queue='hello') def callback(ch,method,properties,body): print('[x] Received %r' % body ) channel.basic_consume(callback, queue='hello',no_ack=True ) #消费消息 若是收到消息就调用CALLBACK函数处理 print('[*] Waiting for message.To exit press CTRL+C') channel.start_consuming() #开始收消息
运行结果get
[*] Waiting for message.To exit press CTRL+C [x] Received b'Hello World!'
代码同上,启动三个消费者和一个生产者,第一个消费者会接收到生产者的第一个消息,第二个消费者会接收到生产者的第二个消息,这样依次轮训。消息队列
若是某一个消费者在处理消息的过程当中,断电或者当机了,那么这个消息状态须要确认。消费者处理完成后发状态给生产者,生产者把消息从队列里删除。
no_ack=True (消费者端参数)
表示不确认消息状态,生产者不关心消费者状态。 若是把这个参数去掉,那么生产者须要获得消费者的回应状态,好比我启动了3个消费者和一个生产者,生产者须要获得消费者的回应。若是第一个消费者获得消息,中断了链接,那么消息会发送到第二个消费者,依次类推,直到消费者给生产者一个状态,生产者才会把消息从队列里删除。
这个状态要手动发送给生产者:
ch.basic_ack(delivery_tag=method.delivery_tag)
生产者接收到状态就会把消息从队列里面删除。
从上面咱们知道,消费者在发生当机或者其它状况,只要没有把状态返回给生产者,那么这个消息一直都在。若是生产者当机了怎么办?消息还存不存在了呢?
若是生产者当机,那么以前存的消息都会丢失。为了不这种状况,那么就要要求数据持久化。
durable=True
在声明队列的时候 同时声明队列持久化。
channel.queue_declare(queue='hello',durable=True)
这里是队列持久化了,可是消息尚未了。
在生产者发送消息的一端加上以上参数。
properties=pika.BasicProperties( delivery_mode=2, )
查看hello2的消息。