〖Python〗-- RabbitMQ消息队列

【 安装使用以及应用】

RabbitMQ 消息队列安装:html

  linux版本:CentOS 7python

安装第一步:先关闭防火墙mysql

一、Centos7.x关闭防火墙linux

1
2
3
4
5
[root@rabbitmq /]# systemctl stop firewalld.service
 
[root@rabbitmq /]# systemctl disable firewalld.service
Removed symlink /etc/systemd/system/dbus-org.fedoraproject.FirewallD1.service.
Removed symlink /etc/systemd/system/basic.target.wants/firewalld.service.

若是不想关闭防火墙,能够经过以下方法处理:nginx

1
2
3
4
开放5672端口:
 
firewall-cmd --zone= public  --add-port=5672/tcp --permanent
firewall-cmd --reload

第二步:安装Erlanggit

  因为RabbitMQ依赖Erlang, 因此须要先安装Erlang。github

Erlang的安装方式大概有两种:web

1)从Erlang Solution安装(推荐)sql

1
2
3
4
5
# 添加erlang solutions源
$ wget https: //packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
$ sudo rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
 
$ sudo yum install erlang

 

2)从EPEL源安装(这种方式安装的Erlang版本可能不是最新的,有时候不能知足RabbitMQ须要的最低版本)shell

1
2
3
4
# 启动EPEL源
$ sudo yum install epel-release
# 安装erlang
$ sudo yum install erlang

安装第三步:安装RabbitMQ

1
2
3
$ sudo rpm --import https: //www.rabbitmq.com/rabbitmq-release-signing-key.asc
$ wget https: //www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-3.6.6-1.el7.noarch.rpm # 下载RabbitMQ安装包
$ sudo yum install rabbitmq-server-3.6.6-1.el7.noarch.rpm

  注意:安装时若是遇到下面的依赖错误

1
2
Error: Package: socat-1.7.2.3-1.el6.x86_64 (epel)
Requires: libreadline.so.5()(64bit)

能够尝试先执行:$ sudo yum install socat

  至此,若是没有报错的话,RabbitMQ已经安装成功!此时咱们须要测试RabbitMQ是否能够正常使用。

测试:

一、开启服务及查看工做状态

1
2
3
4
$ rabbitmq-server #启动RabbitMQ队列$ sudo chkconfig rabbitmq-server  on   # 添加开机启动RabbitMQ服务
$ sudo /sbin/service rabbitmq-server start # 启动服务
$ sudo /sbin/service rabbitmq-server status  # 查看服务状态
$ sudo /sbin/service rabbitmq-server stop   # 中止服务$ netstat -tulnp |grep rabbitmq #查看默认启用的端口号,5672

二、关于RabbitMQ的一些基本操做

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 查看当前全部用户
$ sudo rabbitmqctl list_users<br><br>$ rabbitmqctl list_queues #查看当前的消息队列列表
<br># 查看默认guest用户的权限
$ sudo rabbitmqctl list_user_permissions guest
 
# 因为RabbitMQ默认的帐号用户名和密码都是guest。为了安全起见, 先删掉默认用户
$ sudo rabbitmqctl delete_user guest
 
# 添加新用户
$ sudo rabbitmqctl add_user username password
 
# 设置用户tag
$ sudo rabbitmqctl set_user_tags username administrator
 
# 赋予用户默认vhost的所有操做权限
$ sudo rabbitmqctl set_permissions -p / username  ".*"  ".*"  ".*"
 
# 查看用户的权限
$ sudo rabbitmqctl list_user_permissions username
更多关于rabbitmqctl的使用,能够参考<a href= "https://www.rabbitmq.com/man/rabbitmqctl.1.man.html"  target= "_blank" >帮助手册</a>。

三、开启web管理接口

  若是只从命令行操做RabbitMQ,多少有点不方便。幸亏RabbitMQ自带了web管理界面,只须要启动插件即可以使用。

1
2
3
4
5
6
$ sudo rabbitmq-plugins enable rabbitmq_management
而后经过浏览器访问
 
  http: //localhost:5672
 
输入用户名和密码访问web管理界面了。

四、配置RabbitMQ

  关于RabbitMQ的配置,能够下载RabbitMQ的配置文件模板到/etc/rabbitmq/rabbitmq.config, 而后按照需求更改便可。
  关于每一个配置项的具体做用,能够参考官方文档
  更新配置后,别忘了重启服务哦!

五、开启用户远程访问

  默认状况下,RabbitMQ的默认的guest用户只容许本机访问, 若是想让guest用户可以远程访问的话,只须要将配置文件中的loopback_users列表置为空便可,以下:

1
{loopback_users, []}

  另外关于新添加的用户,直接就能够从远程访问的,若是想让新添加的用户只能本地访问,能够将用户名添加到上面的列表, 如只容许admin用户本机访问。

1
{loopback_users, [ "admin" ]}

更新配置后,别忘了重启服务哦!

 

RabbitMQ 消息队列 

  RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。

  MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通讯方法。应用程序经过读写出入队列的消息(针对应用程序的数据)来通讯,而无需专用链接来连接它们。消息传递指的是程序之间经过在消息中发送数据进行通讯,而不是经过直接调用彼此来通讯,直接调用一般是用于诸如远程过程调用的技术。排队指的是应用程序经过队列来通讯。队列的使用除去了接收和发送应用程序同时执行的要求。

通俗的讲,就是生产者消费者模型。

  实现生产者消费者模型的核心就是队列!经过队列去链接完成操做!

  这个模型解决了耦合性,让生产者和消费者之间没有直接的联系,而是经过队列创建桥梁。这其中最重要的就是队列。

1、队列的使用:

一、基于Queue实现生产者消费者模型,python的队列 queue。

q=queue.Queue()
q.put()
q.qsize() #队列内消息个数
q.get()
先进先出
 
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import Queue
import threading
 
 
message = Queue.Queue(10)
 
 
def producer(i):
while True:
message.put(i)
 
 
def consumer(i):
while True:
msg = message.get()
 
 
for i in range(12):
t = threading.Thread(target=producer, args=(i,))
t.start()
 
for i in range(10):
t = threading.Thread(target=consumer, args=(i,))
t.start()
View Code

如上:python拥有本身的队列模式,可是有一点不得不注意,他的队列只能在同一进程内多线程间起做用,不能跨进程操做!

二、队列的做用:
  存储消息、数据
  保证消息的顺序
  保证数据的交付

三、消息队列解决了两个问题:
  解耦 自然的解耦,程序间调用再也不使用接口,而是调用消息队列的接口把执行结果放到队列中,实现解耦。【实际开发过程当中,必定要想尽办法下降程序间的耦合】
  异步: 异步操做,程序再也不等待执行结果,而是提供接收接口
    优势:解决排队问题
    缺点:不能保证任务及时执行
    应用场景:去哪儿购买飞机票,
  同步:
    优势:保证任务及时执行
    缺点:排队问题

四、有关大并发的事宜:

  web网站:
  以前部署:apache 1000 - 2000 一台机器同一时刻只能承载这么多请求!
  经常使用部署:nginx 10000 - 20000
  什么算大并发?有三个指标:
  pv = page visit 页面访问量 【一天访问量上亿才算大网站】【具备分散性,高峰时间段访问明显】
  通常,一亿的pv用10台server web cluster 集群,pv分散到实际的用户上并很少,推算到秒级别访问量  
  uv = user visit 用户访问量【相对页面访问量很小】
  qps = 每秒钟的访问流量 or 用户量

  对同一个请求的访问,多个机器每一个都负责一点,这就叫分布式运算。

五、引入rabbitmq的缘由:

  异步操做,应对大并发。
  解决Python队列不能跨进程执行的弊端。
  对于RabbitMQ来讲,生产和消费再也不针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。

  它做为一个独立的组件,能够同时存在多个队列【能够为任何程序提供队列】,每一个队列对应不一样的应用程序,队列之间平行存在,相互独立,不能混用!

  与Python Queue队列相比,消息不会直接放到rabbitmq队列中,而是会先经过一次消息过滤,看他所属哪一个程序,而后放到对应的队列中。

  在不存在外力的影响下,RabbitMQ 队列中的消息,若是不消费就永远存在!只有消费以后会消失。
  RabbitMQ 是一个公共的组件,能为多语言间提供队列【例如:生产者是Java,消费者是Python】。

六、生产者消费者经过RabbitMQ队列创建通讯的步骤:

  生产者:
    端口 ip 认证信息
    建立队列
    向队列发送消息
  消费者:
    端口 ip 认证信息
    从指定的队列中取消息

七、RabbitMQ 配置 (Python链接队列)
  1.7.一、客户端若想调用RabbitMQ,须要安装对应的API。在python中安装pika,经过pika链接rabbitmq

1
pip3 install pika

注意:若想使用、远程链接rabbitmq server的话,就须要在RabbitMQ队列这个组件内配置权限信息

八、rabbitmq 建立用户和设置权限

  1)首先在rabbitmq server上建立一个用户:rabbitmqctl add_user aaa 密码
  2)配置权限,受权容许从外面访问全部队列:rabbitmqctl set_permissions -p / aaa "." "." ".*" #受权全部!

set_permissions [-p vhost] {user} {conf} {write} {read}
vhost
The name of the virtual host to which to grant the user access, defaulting to /. ,默认是 /
user
The name of the user to grant access to the specified virtual host.
conf
A regular expression matching resource names for which the user is granted configure permissions.
write
A regular expression matching resource names for which the user is granted write permissions.
read
A regular expression matching resource names for which the user is granted read permissions.
关于建立用户

九、链接队列时,须要创建通讯,配置认证信息(生产者和消费者都必须与队列创建联系)

1
2
3
credentials  =  pika.PlainCredentials( 'aaa' '密码' #配置认证信息<br>#创建连接
connection  =  pika.BlockingConnection(pika.ConnectionParameters( '10.211.55.5' , 5672 , '/' ,credentials))
channel  =  connection.channel()  #队列链接通道

 十、生成者与消费者之间收发消息通讯

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
生产者:
发送消息语法:
     #先声明queue(没有就建立,有就使用)
     channel.queue_declare(queue = 'task123' ,durable = True )
     #创建通讯
     channel.basic_publish(exchange = '',   #给负责消息过滤的方法传递参数
                       routing_key = 'task123' #路由
                       body = 'Hello World2!' ,消息内容
                     )
     routing_key  =  'task123'   #路由 把消息队列先传给Exchange 而后再转到task123队列上
 
消费者:
获取消息语法:
def  callback(ch,mothod,properties,body)  #获取消息执行的函数
     参数解释:
         ch:指队列通道
         method:请求方法
         properties: 消息参数
         body:消息内容
     
channel.basic_consume(
     callback,     #取到消息以后,调用函数 callback
     queue = "xxxxx" ,   #队列名称
     no_ack = True ,
)
#开始消费
channel.start_consuming()  # 阻塞模式

  注意: 通常申明队列只须要在生产者端申明,但消费者端也能够申明。是防止若是生产者没有启动,消费者先启动后没有队列会报错的问题。此时服务端若是有相同代码,会检查若是有相同队列就不建立。消费者再次申明队列,目的是:消费者要清楚去哪里取数据!

2、RabbitMQ框架图:

 

3、示例(默认RabbitMQ队列已启动)

 一、实现简单队列创建通讯

import pika

credentials = pika.PlainCredentials('aaa', '123')  # 配置认证的用户 密码
parameters = pika.ConnectionParameters(host="192.168.152.132", credentials=credentials)
connection = pika.BlockingConnection(parameters)  # 创建一个连接对象
channel = connection.channel()  # 队列链接通道

channel.queue_declare(queue='hello')  # 声明队列queue 用rabbitmqctl list_queuse 查看
channel.basic_publish(exchange='', routing_key='hello', body='server hello world')  # routing_key 路由表明要发送的队列 body是发送的内容
print('server send "hello world"')
connection.close()  # 关闭链接 相似socket
生产者
#消费者是一种阻塞模式,会一致取数据
import pika

credentials = pika.PlainCredentials('aaa', '123')  # 配置认证的用户 密码
parameters = pika.ConnectionParameters(host="192.168.152.132", credentials=credentials)
connection = pika.BlockingConnection(parameters)  # 创建一个连接对象
channel = connection.channel()  # 队列链接通道
channel.queue_declare(queue='hello')  # 声明queue 用rabbitmqctl list_queuse 查看

def callback(ch, method, properties, body):
    print("Recived %r" % ch, method, properties, body)

channel.basic_consume(callback,  # 取到消息后,执行callback函数
                      queue='hello', # 从hello队列获取数据
                      no_ack=True
                      )
print("waiting for message")
channel.start_consuming()  # 进入阻塞模式
消费者

二、消息持久化

  如何保证队列中的消息被彻底处理完毕?咱们正常的思惟应该是:没有处理完,应该返回队列。可是在上面的代码中,若是消费者客户端挂了或者在处理的过程当中中止了,不只消息没有处理完毕,同时队列中也没有了。

  2.一、模拟客户端中断 观察服务端队列的数据会不会返回(答案:不会)

1
2
3
#- 开启一个生产者,两个消费者
#- 服务端向队列中存放一个值,一客户端从队列中取到数据,在睡10秒期间中断,表示出错,它不会报告给服务端
#- 这时队列中为零,另外一客户端也不会取到值

测试代码以下:

#生产者
import pika

credentials = pika.PlainCredentials("aaa","123") #受权的帐号 密码
connection = pika.BlockingConnection(
pika.ConnectionParameters('192.168.152.132',credentials=credentials)) #创建socket

channel = connection.channel() #建立rabbitmq协议通道
channel.queue_declare(queue='hello') #经过通道生成一个队列

channel.basic_publish(exchange='',
routing_key='hello', #队列
body='Hello World!') #内容
print(" [x] Sent 'Hello World!'")
connection.close()
生产者
消费者
#解决办法:
  #0、发送消息时,在函数中添加如下参数,保证消息持久化
    properties=pika.BasicProperties(
      delivery_mode=2, # make message persistent),# 数字表明状态:2保持消息持久化;1处理中;0处理完毕;
  #一、消费确认的问题!
       #在消费者端,从队列中获取信息函数中有一个参数:no_ack = True 的意思是消息处理后,向rabbit-server确认消息已消费完毕。
    删除这个参数,再也不确认信息已消费,rabbit-server的消息队列中会一致存在数据
  #二、解决消费后数据还存在问题!
    #   解决rabbit-server中消息被消费后数据还存在的状况,在消费者处理消息的函数中,使用ch.basic_ack(delivery_tag=method.delivery_tag)与生产者手动确认,消息处理完毕!
  #经过这两个参数,同时保证了消费者可以消费完数据不挂,同时消费完后rabbit-server收到消费完的消息把被消费的数据删除  
 
#1. 生产者端发消息时,加参数 消息持久化
  properties=pika.BasicProperties(
    delivery_mode=2, # make message persistent),
#2. 消费者端,消息处理完毕时,发送确认包
  ch.basic_ack(delivery_tag=method.delivery_tag)
 
  channel.basic_consume(callback, #取到消息后,调用callback 函数
          queue='task1',)
          #no_ack=True) #消息处理后,不向rabbit-server确认消息已消费完毕
解决办法

2.二、模拟测试 观察服务端队列的数据会不会返回(答案:会)

1
2
3
#- 开启一个服务端,两个客户端
#- 服务端向队列中存放一个值,一客户端从队列中取到数据,在睡20秒期间中断,表示出错,它会报给服务端,服务端队列还有值
#- 这时启动另外一客户端还能够取到值    
#生产者
import pika

credentials = pika.PlainCredentials('aaa', '123')

parameters = pika.ConnectionParameters(host='192.168.152.132',credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel() #队列链接通道

#声明queue
channel.queue_declare(queue='hello',durable=True)

channel.basic_publish(exchange='',
      routing_key='task1', #路由
      properties=pika.BasicProperties(
      delivery_mode=2, # make message persistent
      ), #新加入参数
      body='Hello World2!')

print(" [x] Sent 'Hello World!'")

connection.close()
生产者
消费者

在这种模式下,RabbitMQ会默认把p发的消息依次分发给各个消费者(c),跟负载均衡差很少。此时,先启动消息生产者,而后再分别启动3个消费者,经过生产者多发送几条消息,你会发现,这几条消息会被依次分配到各个消费者身上。

三、队列【及消息】持久化

 当咱们把rabbitmq-server重启后,发现全部的消息就都丢失了?这种问题怎么办?假如咱们在某一个队列中加入了上万条消息,忽然消息队列重启了。。。那是否是咱们还得手动去添加消失的消息么?不用!如下是解决办法:
  一、生产者在声明队列的时候使用参数,保持队列持久化 durable = True。
    注意:必定是要在队列第一次声明的时候前添加,不能对已经生成的队列从新再进行一次设置,不然会报错【没法从新修改队列】。
  二、再经过参数delivery_mode = 2 把消息也变成持久化的。即使是rabbitmq服务重启后,也不会丢消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#队列持久化 【仅设置单个】
channel.queue_declare(queue = 'hello' ,durable = True )
systemctl restart rabbitmq - server    #重启服务发现hello队列还在,可是消息不在
rabbitmqctl list_queues   #查看消息队列
     #hello
  
  
#队列和消息持久化 【两个参数都存在】
channel.queue_declare(queue = 'hello' ,durable = True )
                 properties = pika.BasicProperties(
                     delivery_mode = 2 ,   # make message persistent
                 ),
systemctl restart rabbitmq - server    #重启服务发现队列和消息都还在
rabbitmqctl list_queues  #查看消息队列
     #hello 1   
import pika

credentials = pika.PlainCredentials("aaa","123")                     #受权的帐号 密码
connection = pika.BlockingConnection(
    pika.ConnectionParameters('192.168.152.132',credentials=credentials))  #创建socket

channel = connection.channel()            #建立rabbitmq协议通道

channel.queue_declare(queue='hello',durable=True)      #经过通道生成一个队列

channel.basic_publish(exchange='',
                      routing_key='hello',      #队列
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent
                      ),
                      body='Hello World!')      #内容
print(" [x] Sent 'Hello World!'")
connection.close()
生产者
消费者

四、多消费者间分发(消费者的公平分发) 

  若是Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,极可能出现,一个机器配置不高的消费者那里堆积了不少消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,能够在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。

  

  传统模式:轮询(排队)获取队列中的数据,若是有一个消费者处理慢了,其余的消费者须要一直等待。那怎么解决并发的问题呢?别人处理快慢与本人处理的速度无关。
  解决方案:以谁先处理完,谁就先得到数据的原则【消息处理完毕才会再拿一条数据】。在消费者端加上这个条件判断:channel.basic_qos(prefetch_count=1) # 公平分发,能者多劳,每次执行一个。
  注意:生产者的代码不变,消费者代码中加入每次处理一次的参数:channel.basic_qos(prefetch_count=1) # 公平分发 

import pika

credentials = pika.PlainCredentials("aaa","123")                     #受权的帐号 密码
connection = pika.BlockingConnection(
    pika.ConnectionParameters('192.168.152.132',credentials=credentials))  #创建socket

channel = connection.channel()            #建立rabbitmq协议通道

channel.queue_declare(queue='hello',durable=True)      #经过通道生成一个队列

channel.basic_publish(exchange='',
                      routing_key='hello',      #队列
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent
                      ),
                      body='Hello World!')      #内容
print(" [x] Sent 'Hello World!'")
connection.close()
生产者
import pika
import time

credentials = pika.PlainCredentials('aaa', '123')  # 配置认证的用户 密码
parameters = pika.ConnectionParameters(host="192.168.11.144", credentials=credentials)
connection = pika.BlockingConnection(parameters)  # 创建一个连接对象
channel = connection.channel()  # 队列链接通道


def callback(ch, method, properties, body):
    print("Recived %r" % ch, method, properties, body)
    time.sleep(10)
    print('msg handle done...',body)
    ch.basic_ack(delivery_tag=method.delivery_tag) # 这个是表示消费者处理完了

channel.basic_qos(prefetch_count=1) # 公平分发
channel.basic_consume(callback,  # 取到消息后,执行callback函数
                      queue='hello',
                      # no_ack=True
                      )
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()  # 进入阻塞模式
消费者

五、消息订阅发布 Publish\Subscribe(消息发布\订阅) 消息过滤 exchange

广播策略:每一个人都能收到;或是过滤某些人能够接收

一个生产者,对应对个消费者!
  exchange type 过滤类型
    fanout = 广播
    direct = 组播
    topic = 规则播 
    header = 略过。。。
  以前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被全部的Queue收到,相似广播的效果,这时候就要用到exchange了,Exchange在定义的时候是有类型的,以决定究竟是哪些Queue符合条件,能够接收消息

  fanout: 全部bind到此exchange的queue均可以接收消息
  direct: 经过routingKey和exchange决定的那个惟一的queue能够接收消息
  topic:全部符合routingKey(此时能够是一个表达式)的routingKey所bind的queue能够接收消息
    表达式符号说明:#表明一个或多个字符,*表明任何字符
      例:#.a会匹配a.a,aa.a,aaa.a等
      *.a会匹配a.a,b.a,c.a等
    注:使用RoutingKey为#,Exchange Type为topic的时候至关于使用fanout 

  headers: 经过headers 来决定把消息发给哪些queue

a、广播模式(一个生产者,多个消费者):
  一、路由指定为空!全部消息都发给exchange处理转到队列,转到哪一个队列就须要exchange指定,因此在创建链接的时候要指定名字。
  注意:exchange只负责转发不负责存放消息!若是没有队列绑定消息就会扔掉!
  二、自动生成队列名,而后使用完以后再删掉
队列参数exclusive=True惟一的,rabbit 随机生成一个名字。
  三、生产者和消费者端都要声明队列,以排除生成者未启动,消费者获取报错的问题
  四、生产者发送一条消息,说有的消费者都能接收到!高效,效率的完成发送!
  应用场景:新浪微博 订阅模式,只有当前登陆的用户才能够收到实时发送的消息 

import pika
        import sys

        credentials = pika.PlainCredentials('aaa', '123')

        parameters = pika.ConnectionParameters(host='192.168.152.132',credentials=credentials)
        connection = pika.BlockingConnection(parameters)

        channel = connection.channel() #队列链接通道

        channel.exchange_declare(exchange='logs',type='fanout') #声明队列 exchange名字和类型

        message = ' '.join(sys.argv[1:]) or "info: Hello World!" #获取外界输入的信息,不然就是hello world

        channel.basic_publish(exchange='logs', #指定exchange的名字
                              routing_key='', #注意,不须要指定队列名!
                              body=message) #信息
        print(" [x] Sent %r" % message)
        connection.close()
生产者
import pika
        credentials = pika.PlainCredentials('aaa', '123')

        parameters = pika.ConnectionParameters(host='192.168.152.132',credentials=credentials)
        connection = pika.BlockingConnection(parameters)

        channel = connection.channel() #队列链接通道
        channel.exchange_declare(exchange='logs', type='fanout')

        queue_obj = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
        queue_name = queue_obj.method.queue  #获取队列名
        print('queue name',queue_name,queue_obj) #打印会列名

        channel.queue_bind(exchange='logs',queue=queue_name) #绑定队列到Exchange

        print(' [*] Waiting for logs. To exit press CTRL+C')

        def callback(ch, method, properties, body):
            print(" [x] %r" % body)
        channel.basic_consume(callback,queue=queue_name, no_ack=True)

        channel.start_consuming()
消费者

b、direct 组播模式:有选择的接收消息(exchange type=direct)

  一、有选择的接收消息(exchange type=direct),RabbitMQ还支持根据关键字发送,至关因而添加了一个过滤地带!
    即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 断定应该将数据发送至指定队列。
  二、发什么类型的,什么类型的接收,在接收端运行的时候加参数,指定接收的类型。
  三、routing_key = 'xxx' 与广播相比再也不为空,队列由执行时手动输入获取,而后路由指定发送到哪一个队列。
  四、按照类型:生产者发送指定类型的消息;消费者循环绑定队列,若是不存在不接收
    例子:就像广播电台,要想接收生产者发送的数据,必须是绑定且在线!若是断开一段时间再接收该电台消息,以前的讯息就不会再收到!
  应用场景:日志分类处理逻辑 【注:能够同时存在多个消费者】

import pika
    import sys

    credentials = pika.PlainCredentials('aaa', '123')

    parameters = pika.ConnectionParameters(host='192.168.152.132',credentials=credentials)
    connection = pika.BlockingConnection(parameters)

    channel = connection.channel() #队列链接通道

    channel.exchange_declare(exchange='direct_log',type='direct')  #声明消息队列及类型

    log_level =  sys.argv[1] if len(sys.argv) > 1 else 'info' #日志等级

    message = ' '.join(sys.argv[1:]) or "info: Hello World!"  #接收手动输入的消息内容

    channel.basic_publish(exchange='direct_log',
                          routing_key=log_level,
                          body=message)
    print(" [x] Sent %r" % message)
    connection.close()
生产者
import pika,sys
        credentials = pika.PlainCredentials('aaa', '123')

        parameters = pika.ConnectionParameters(host='192.168.152.132',credentials=credentials)
        connection = pika.BlockingConnection(parameters)

        channel = connection.channel() #队列链接通道

        queue_obj = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
        queue_name = queue_obj.method.queue
        print('queue name',queue_name,queue_obj)

        log_levels = sys.argv[1:] #日志等级 info warning error danger

        #判断存不存在,不存在退出
        if not log_levels:
            sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
            sys.exit(1)
        
        #循环绑定队列
        for level in log_levels:
            channel.queue_bind(exchange='direct_log',
                               queue=queue_name,
                               routing_key=level) #绑定队列到Exchange

        print(' [*] Waiting for logs. To exit press CTRL+C')

        def callback(ch, method, properties, body):
            print(" [x] %r" % body)

        channel.basic_consume(callback,queue=queue_name, no_ack=True)
        channel.start_consuming()
消费者

c、topic规则播

  话题类型,能够根据正则进行更精确的匹配,按照规则过滤。exchange type = topic,仅改下类型便可!

  在topic类型下,可让队列绑定几个模糊的关键字,以后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

  # 表示能够匹配 0 个 或 多个 单词
  * 表示只能匹配 一个 单词

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
To receive  all  the logs run:
 
     python receive_logs_topic.py  "#"
     To receive  all  logs  from  the facility  "kern" :
 
     python receive_logs_topic.py  "kern.*"
     Or  if  you want to hear only about  "critical"  logs:
 
     python receive_logs_topic.py  "*.critical"
     You can create multiple bindings:
 
     python receive_logs_topic.py  "kern.*"  "*.critical"
     And to emit a log with a routing key  "kern.critical"  type :
 
     python emit_log_topic.py  "kern.critical"  "A critical kernel error" 
1
2
3
4
5
6
7
8
9
10
#测试执行以下:
     #客户端一:
         -  python3 receive1.py  * .django
     #客户端二:
         -  python3 receive1.py mysql.error
     #客户端三:
         -  python3 receive1.py mysql. *
      
     #服务端:
         -  python3 receive1.py   #匹配相应的客户端
import pika
import sys

credentials = pika.PlainCredentials('aaa', '123')

parameters = pika.ConnectionParameters(host='192.168.152.132',credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel() #队列链接通道

channel.exchange_declare(exchange='topic_log',type='topic')

#log_level =  sys.argv[1] if len(sys.argv) > 1 else 'info'
log_level =  sys.argv[1] if len(sys.argv) > 1 else 'all.info'

message = ' '.join(sys.argv[1:]) or "all.info: Hello World!"

channel.basic_publish(exchange='topic_log',
                      routing_key=log_level,
                      body=message)
print(" [x] Sent %r" % message)
connection.close()
生产者
import pika,sys
credentials = pika.PlainCredentials('aaa', '123')

parameters = pika.ConnectionParameters(host='192.168.152.132',credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel() #队列链接通道

queue_obj = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_name = queue_obj.method.queue

log_levels = sys.argv[1:] # info warning errr

if not log_levels:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for level in log_levels:
    channel.queue_bind(exchange='topic_log',
                       queue=queue_name,
                       routing_key=level) #绑定队列到Exchange

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,queue=queue_name, no_ack=True)
channel.start_consuming()
消费者

 

六、RPC remote producer call 远程执行调用

  从上边全部的例子中你有没有发现,上面的队列都是单向执行的,须要有发送端和接收端。若是远程的一台机器执行完毕再返回结果,那就实现不了了。若是让他执行完返回,这种模式叫什么呢?RPC(远程过程调用),snmp就是典型的RPC。

  那RabbitMQ能不能返回呢,怎么返回呢?可让机器既是发送端又是接收端。可是接收端返回消息怎么返回?能够发送到发过来的queue里么?答案固然是不能够,若是仍是存在原先的队列就会直接陷入死循环!因此返回时,须要让消息内部指定再创建一个队列queue,把结果发送新的queue里。

  同时,为了服务端返回的queue不写死,在客户端给服务端发指令的的时候,同时带一条消息说,你结果返回给哪一个queue

  在执行多个消息任务的时候,怎么区分判断哪一个消息是先执行呢?答案就是,在发任务时,再额外提交一个惟一标识符。
task1,task2异步执行,可是返回的顺序是不固定的,为了区分是谁执行完的,在发送的任务添加惟一标识符,这样取回的时候就能区分。
设置一个异步RPC
  声明一个队列reply_to,做为返回消息结果的队列
  发送消息队列,消息中带惟一标识uid
  监听reply_to队列,直到有结果
在类中声明监听 

__author__ = 'ShengXin'

#1. 定义fib函数
#2. 声明接收指令的队列名rpc_queue
#3. 开始监听队列,收到消息后 调用fib函数
#4. 把fib执行结果,发送回客户端指定的reply_to 队列
import subprocess
import pika
import time
credentials = pika.PlainCredentials('aaa', '123')

parameters = pika.ConnectionParameters(host='192.168.152.132',credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel() #队列链接通道

channel.queue_declare(queue='rpc_queue2')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)


def run_cmd(cmd):
    cmd_obj = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
    result = cmd_obj.stdout.read() + cmd_obj.stderr.read()

    return result


def on_request(ch, method, props, body):
    cmd = body.decode("utf-8")

    print(" [.] run (%s)" % cmd)
    response = run_cmd(cmd)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to, #队列 接收客户端传过来的队列,返回
                     properties=pika.BasicProperties(correlation_id = props.correlation_id),
                     body=response)

    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(on_request, queue='rpc_queue2')

print(" [x] Awaiting RPC requests")
channel.start_consuming()

rpc-server
rpc-server
# 1.声明一个队列,做为reply_to返回消息结果的队列
# 2.  发消息到队列,消息里带一个惟一标识符uid,reply_to
# 3.  监听reply_to 的队列,直到有结果
import queue

import pika
import uuid

class CMDRpcClient(object):
    def __init__(self):
        credentials = pika.PlainCredentials('aaa', '123')
        parameters = pika.ConnectionParameters(host='192.168.152.132',credentials=credentials)
        self.connection = pika.BlockingConnection(parameters)
        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue #命令的执行结果的queue

        #声明要监听callback_queue
        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        """
        收到服务器端命令结果后执行这个函数
        :param ch:
        :param method:
        :param props: 服务器端返回的消息结果!
        :param body:
        :return:
        """
        if self.corr_id == props.correlation_id:
            self.response = body.decode("gbk") #把执行结果赋值给Response

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4()) #惟一标识符号
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue2',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue, #传递要返回的消息队列   
                                         correlation_id = self.corr_id,  #惟一id
                                         ),
                                   body=str(n))
        #循环监听
        while self.response is None:
            self.connection.process_data_events()  #检测监听的队列里有没有新消息,若是有,收,若是没有,返回None
            #检测有没有要发送的新指令
        return self.response

cmd_rpc = CMDRpcClient()

print(" [x] Requesting fib(30)")
response = cmd_rpc.call('ipconfig')

print(response)
rpc-client
相关文章
相关标签/搜索