RabbitMq 是实现了高级消息队列协议(AMQP)的开源消息代理中间件。消息队列是一种应用程序对应用程序的通行方式,应用程序经过写消息,将消息传递于队列,由另外一应用程序读取 完成通讯。而做为中间件的 RabbitMq 无疑是目前最流行的消息队列之一。python
RabbitMq 应用场景普遍:正则表达式
生产者(producter):队列消息的产生者,负责生产消息,并将消息传入队列json
import pika import json credentials = pika.PlainCredentials('shampoo', '123456') # mq用户名和密码 # 虚拟队列须要指定参数 virtual_host,若是是默认的能够不填。 connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials)) channel=connection.channel() # 声明消息队列,消息将在这个队列传递,如不存在,则建立 result = channel.queue_declare(queue = 'python-test') for i in range(10): message=json.dumps({'OrderId':"1000%s"%i}) # 向队列插入数值 routing_key是队列名 channel.basic_publish(exchange = '',routing_key = 'python-test',body = message) print(message) connection.close()
消费者(consumer):队列消息的接收者,负责 接收并处理 消息队列中的消息服务器
import pika credentials = pika.PlainCredentials('shampoo', '123456') connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials)) channel = connection.channel() # 申明消息队列,消息在这个队列传递,若是不存在,则建立队列 channel.queue_declare(queue = 'python-test', durable = False) # 定义一个回调函数来处理消息队列中的消息,这里是打印出来 def callback(ch, method, properties, body): ch.basic_ack(delivery_tag = method.delivery_tag) print(body.decode()) # 告诉rabbitmq,用callback来接收消息 channel.basic_consume(callback, queue = 'python-test') # 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理 channel.start_consuming()
MQ默认创建的是临时 queue 和 exchange,若是不声明持久化,一旦 rabbitmq 挂掉,queue、exchange 将会所有丢失。因此咱们通常在建立 queue 或者 exchange 的时候会声明 持久化。架构
1.queue 声明持久化并发
# 声明消息队列,消息将在这个队列传递,如不存在,则建立。durable = True 表明消息队列持久化存储,False 非持久化存储 result = channel.queue_declare(queue = 'python-test',durable = True)
2.exchange 声明持久化分布式
# 声明exchange,由exchange指定消息在哪一个队列传递,如不存在,则建立.durable = True 表明exchange持久化存储,False 非持久化存储 channel.exchange_declare(exchange = 'python-test', durable = True)
注意:若是已存在一个非持久化的 queue 或 exchange ,执行上述代码会报错,由于当前状态不能更改 queue 或 exchange 存储属性,须要删除重建。若是 queue 和 exchange 中一个声明了持久化,另外一个没有声明持久化,则不容许绑定。函数
3.消息持久化高并发
虽然 exchange 和 queue 都申明了持久化,但若是消息只存在内存里,rabbitmq 重启后,内存里的东西仍是会丢失。因此必须声明消息也是持久化,从内存转存到硬盘。fetch
# 向队列插入数值 routing_key是队列名。delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化 channel.basic_publish(exchange = '',routing_key = 'python-test',body = message, properties=pika.BasicProperties(delivery_mode = 2))
4.acknowledgement 消息不丢失
消费者(consumer)调用callback函数时,会存在处理消息失败的风险,若是处理失败,则消息丢失。可是也能够选择消费者处理失败时,将消息回退给 rabbitmq ,从新再被消费者消费,这个时候须要设置确认标识。
channel.basic_consume(callback,queue = 'python-test', # no_ack 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。True,不管调用callback成功与否,消息都被消费掉 no_ack = False)
rabbitmq 的发布与订阅要借助交换机(Exchange)的原理实现:
Exchange 一共有三种工做模式:fanout, direct, topicd
这种模式下,传递到 exchange 的消息将会转发到全部与其绑定的 queue 上。
发布者:
import pika import json credentials = pika.PlainCredentials('shampoo', '123456') # mq用户名和密码 # 虚拟队列须要指定参数 virtual_host,若是是默认的能够不填。 connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials)) channel=connection.channel() # 声明exchange,由exchange指定消息在哪一个队列传递,如不存在,则建立。durable = True 表明exchange持久化存储,False 非持久化存储 channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='fanout')
for i in range(10): message=json.dumps({'OrderId':"1000%s"%i}) # 向队列插入数值 routing_key是队列名。delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化。routing_key 不须要配置 channel.basic_publish(exchange = 'python-test',routing_key = '',body = message, properties=pika.BasicProperties(delivery_mode = 2)) print(message) connection.close()
订阅者:
import pika credentials = pika.PlainCredentials('shampoo', '123456') connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials)) channel = connection.channel() # 建立临时队列,consumer关闭后,队列自动删除 result = channel.queue_declare(exclusive=True) # 声明exchange,由exchange指定消息在哪一个队列传递,如不存在,则建立。durable = True 表明exchange持久化存储,False 非持久化存储 channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='fanout') # 绑定exchange和队列 exchange 使咱们可以确切地指定消息应该到哪一个队列去 channel.queue_bind(exchange = 'python-test',queue = result.method.queue) # 定义一个回调函数来处理消息队列中的消息,这里是打印出来 def callback(ch, method, properties, body): ch.basic_ack(delivery_tag = method.delivery_tag) print(body.decode()) channel.basic_consume(callback, queue = result.method.queue, # 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。True,不管调用callback成功与否,消息都被消费掉 no_ack = False) channel.start_consuming()
这种工做模式的原理是 消息发送至 exchange,exchange 根据 路由键(routing_key)转发到相对应的 queue 上。
发布者:
import pika import json credentials = pika.PlainCredentials('shampoo', '123456') # mq用户名和密码 # 虚拟队列须要指定参数 virtual_host,若是是默认的能够不填。 connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials)) channel=connection.channel() # 声明exchange,由exchange指定消息在哪一个队列传递,如不存在,则建立。durable = True 表明exchange持久化存储,False 非持久化存储 channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='direct') for i in range(10): message=json.dumps({'OrderId':"1000%s"%i}) # 指定 routing_key。delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化 channel.basic_publish(exchange = 'python-test',routing_key = 'OrderId',body = message, properties=pika.BasicProperties(delivery_mode = 2)) print(message) connection.close()
消费者:
import pika credentials = pika.PlainCredentials('shampoo', '123456') connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials)) channel = connection.channel() # 建立临时队列,consumer关闭后,队列自动删除 result = channel.queue_declare(exclusive=True) # 声明exchange,由exchange指定消息在哪一个队列传递,如不存在,则建立。durable = True 表明exchange持久化存储,False 非持久化存储 channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='direct') # 绑定exchange和队列 exchange 使咱们可以确切地指定消息应该到哪一个队列去 channel.queue_bind(exchange = 'python-test',queue = result.method.queue,routing_key='OrderId') # 定义一个回调函数来处理消息队列中的消息,这里是打印出来 def callback(ch, method, properties, body): ch.basic_ack(delivery_tag = method.delivery_tag) print(body.decode()) #channel.basic_qos(prefetch_count=1) # 告诉rabbitmq,用callback来接受消息 channel.basic_consume(callback, queue = result.method.queue, # 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。True,不管调用callback成功与否,消息都被消费掉 no_ack = False) channel.start_consuming()
这种模式和第二种模式差很少,exchange 也是经过 路由键 routing_key 来转发消息到指定的 queue 。 不一样点是 routing_key 使用正则表达式支持模糊匹配,但匹配规则又与常规的正则表达式不一样,好比‘’#‘’是匹配所有,“*”是匹配一个词。
举例:routing_key =“#orderid#”,意思是将消息转发至全部 routing_key 包含 “orderid” 字符的队列中。代码和模式二 相似,就不贴出来了。