<!DOCTYPE html>
RabbitMQ 消息队列
python里有threading QUEUE 只用于线程间交互,进程QUEUE 用于父进程与子进程或者是兄弟进程
RabbitMQ采用消息轮询的方式发送消息。一个一个的给每一个消费者
应用之间使用socket实现数据共享
连接几个应用的中间商著名的有:
html
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
channel.queue_declare(queue='队列名')
channel.basic_publish(exchange='',routing_key='队列名',body='Hello World!')
connection.close()
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
channel.queue_declare(queue='队列名')
def callback(ch,method,properties,body):
print(ch,method,properties,body)
print("[x] Received %r"%body)
ch.basic_ack(delivery_tag=method.delivery_tag)手动确认消息处理完,否则消息一直不销毁
channel.basic_consume('定义的函数名',queue='队列名',no_ack=True)
channel.start_consuming()
采用轮询的方式,依次的发给每一个消费者
生产者会等待消费者肯定处理完消息的回复才会销毁消息。
当消息执行到一半的时候,消费者断开,消息会保留发送给下一个消费者处理
python
消息必须等消费者手动肯定后,才销毁ch.basicack(deliverytag=method.delivery_tag)手动确认消息处理完,否则消息一直不销毁
当RabbitMQ服务中止,服务里的消息队列会销毁。
若是想保持消息队列的持久化,必须在声明队列的时候设置,durable=True。这样当RabbitMQ服务断开再重启,消息队列仍是存在,消息会销毁
channel.queue_declare(queue='队列名',durable=True)
消息也持久化
channel.basic_publish(exchange='',routing_key='队列名',body='Hello World!',properties=pika.BasicProperties(delivery_mode=2))
mysql
消费者端加channel.basicqos(prefetchcount=1),加过这句话实现了,不是按顺序分发,而是看哪个是空闲的,才分发给空闲的消费者消息。多大本事干多少活。
广播是生产者发消息,全部消费者都收到。
用exchange实现广播。
fanout:全部bind到此exchange的queue均可以接收消息
direct:经过routingkey和exchange决定的那个惟一的queue能够接收消息
topic:全部符合routingkey(此时能够是一个表达式)的routingkey所bind的queue能够接受消息。
git
设置管道的时候设置
channel.exchange_declare(exchange='logs',type='fanout')
不声明queue
生产者:
github
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
channel.exchange_declare(exchange='logs',exchange_type='fanout')
channel.basic_publish(exchange='logs',routing_key='',body='Hello World!')
connection.close()
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
channel.exchange_declare(exchange='logs',exchange_type='fanout')
result=channel.queue<em>declare(exclusive=True)exclusive排他的,单独的,生成随街queue,绑定再exchange上 queue</em>name=result.method.queue<br /> channel.queue<em>bind(exchange='logs',queue=queue</em>name)绑定转发器,接受转发器里的消息,exchange与随机生成的queue绑定 </p> <p><code>6. 定义函数,标准的处理消息的函数都会带下面四个参数ch管道声明的对象地址,method包含你要发消息给谁的信息,properties</code> def callback(ch,method,properties,body): print(ch,method,properties,body) print("[x] Received %r"%body)<br /> ch.basic<em>ack(delivery</em>tag=method.delivery<em>tag)手动确认消息处理完,否则消息一直不销毁
channel.basic_consume('定义的函数名',queue='队列名',no_ack=True)
channel.start_consuming()
生产者:
web
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
severity=sys.argv[1] if len(sys.argv)>1 else 'info'
message=' '.join(sys.argv[2:]) or 'hello world!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,#相似指定queue
body=message)
connection.close()
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
result=channel.queue<em>declare(exclusive=True)exclusive排他的,单独的,生成随街queue,绑定再exchange上 queue</em>name=result.method.queue<br /> severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1)</p> <p>for severity in severities: channel.queue<em>bind(exchange='direct</em>logs',queue=queue<em>name,routing</em>key=severity)</p> <p><code>6. 定义函数,标准的处理消息的函数都会带下面四个参数ch管道声明的对象地址,method包含你要发消息给谁的信息,properties</code> def callback(ch,method,properties,body): print(ch,method,properties,body) print("[x] Received %r"%body)<br /> ch.basic<em>ack(delivery</em>tag=method.delivery<em>tag)手动确认消息处理完,否则消息一直不销毁
channel.basic_consume('定义的函数名',queue='队列名',no_ack=True)
channel.start_consuming()
更细致的消息过滤,包括应用程序,#收全部消息,.info接受带有.info的消息,mysql.接受带mysql的消息
生产者
import pika import sys</p> <p>connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel()</p> <p>channel.exchange<em>declare(exchange='topic</em>logs', type='topic')</p> <p>routing<em>key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic</em>publish(exchange='topic<em>logs', routing<em>key=routing</em>key, body=message) print(" [x] Sent %r:%r" % (routing</em>key, message)) connection.close() <code>消费者</code> import pika import sys</p> <p>connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel()</p> <p>channel.exchange<em>declare(exchange='topic</em>logs', type='topic')</p> <p>result = channel.queue<em>declare(exclusive=True) queue</em>name = result.method.queue</p> <p>binding<em>keys = sys.argv[1:] if not binding</em>keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1)</p> <p>for binding<em>key in binding</em>keys: channel.queue<em>bind(exchange='topic<em>logs', queue=queue</em>name, routing</em>key=binding_key)</p> <p>print(' [*] Waiting for logs. To exit press CTRL+C') </p> <p>def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) </p> <p>channel.basic<em>consume(callback, queue=queue</em>name, no_ack=True) </p> <p>channel.start_consuming() <br />
redis
即便生产者又是消费者
startcosuming为阻塞模式,rpc不用阻塞,rpc是执行一会这个再去执行另外一个。processdata_events()非阻塞方法,能收到消息就收,没有消息不阻塞继续往下执行
服务器端
```sql
author = 'Alex Li'
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))mongodb
channel = connection.channel()
channel.queuedeclare(queue='rpcqueue')
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2)
def on_request(ch, method, props, body):
n = int(body)
print(" [.] fib(%s)" % n)
response = fib(n)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = <br/>
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basicqos(prefetchcount=1) channel.basic_consume(onrequest, queue='rpcqueue')
print(" [x] Awaiting RPC requests") channel.start_consuming() 客户端
import pika import uuid
class FibonacciRpcClient(object): def init(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
replyto = self.callbackqueue,#客户端发送消息是,把接收返回消息的管道也告诉给服务器端 correlationid = self.corrid,#用于判断服务器端返回的结果和个人请求是不是同一条。目的就是为了客户端能够同时发两条消息,当服务器端返回结果时,须要有判断关于哪条请求的结果 ), body=str(n)) while self.response is None: self.connection.processdataevents() return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) print(" [.] Got %r" % response) ```
缓存中间商,用socket,例如:mongodb,redis,memcache
* 链接方式
* 链接池
* 操做
- String 操做
- Hash 操做
- List 操做
- Set 操做
- Sort Set 操做
* 管道
* 发布订阅