消息队列工做流程:html
消息队列通常有三个角色: 队列服务端 队列生产者 队列消费者 - 消息队列工做流程就如同一个流水线,有产品加工,一个输送带,一个打包产品 - 输送带就是 不停运转的消息队列服务端 - 加工产品的就是 队列生产者 - 在传输带结尾打包产品的 就是队列消费者
队列产品前端
RabbitMQ Erlang编写的消息队列产品,企业级消息队列软件,支持消息负载均衡,数据持久化等。 ZeroMQ saltstack软件使用此消息,速度最快。 Redis key-value的系统,也支持队列数据结构,轻量级消息队列 Kafka 由Scala编写,目标是为处理实时数据提供一个统1、高通量、低等待的平台
1)程序解耦 容许你独立的扩展或修改两边的处理过程,只要确保它们遵照一样的接口约束。 2)冗余: 消息队列把数据进行持久化直到它们已经被彻底处理,经过这一方式规避了数据丢失风险。 许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除以前,须要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。 3)峰值处理能力: (大白话,就是原本公司业务只须要5台机器,可是临时的秒杀活动,5台机器确定受不了这个压力,咱们又不可能将总体服务器架构提高到10台,那在秒杀活动后,机器不就浪费了吗?所以引入消息队列) 在访问量剧增的状况下,应用仍然须要继续发挥做用,可是这样的突发流量并不常见。 若是为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。 使用消息队列可以使关键组件顶住突发的访问压力,而不会由于突发的超负荷的请求而彻底崩溃。 4)可恢复性: 系统的一部分组件失效时,不会影响到整个系统。 消息队列下降了进程间的耦合度,因此即便一个处理消息的进程挂掉,加入队列中的消息仍然能够在系统恢复后被处理。 5)顺序保证: 在大多使用场景下,数据处理的顺序都很重要。 大部分消息队列原本就是排序的,而且能保证数据会按照特定的顺序来处理。(Kafka保证一个Partition内的消息的有序性) 6)缓冲: 有助于控制和优化数据流通过系统的速度,解决生产消息和消费消息的处理速度不一致的状况。 7)异步通讯: 不少时候,用户不想也不须要当即处理消息。好比发红包,发短信等流程。 消息队列提供了异步处理机制,容许用户把一个消息放入队列,但并不当即处理它。想向队列中放入多少消息就放多少,而后在须要的时候再去处理它们。
场景说明:用户注册后,须要发注册邮件和注册短信。传统的作法有两种 1.串行的方式;2.并行方式python
订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功 库存系统:订阅web
下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操做 假如:在下单时库存系统不数据库
能正常使用。也不影响正常下单,由于下单后,订单系统写入消息队列就再也不关心其余的后续操做了。实现订单系统安全
与库存系统的应用解耦服务器
流量削锋也是消息队列中的经常使用场景,通常在秒杀或团抢活动中使用普遍。 应用场景:秒杀活动,通常会由于流量网络
过大,致使流量暴增,应用挂掉。为解决这个问题,通常须要在应用前端加入消息队列。 a、能够控制活动的人数数据结构
b、能够缓解短期内高流量压垮应用架构
用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错
误页面。 秒杀业务根据消息队列中的请求信息,再作后续处理
点对点通信
聊天时通信
rabbitMQ安装
[root@xujunk ~]#yum install erlang [root@xujunk ~]#yum install rabbitmq-server -y
启动rabbitmq-server
[root@xujunk ~]#systemctl start rabbitmq-server
配置rabbitmq建立管理用户以及后台管理页面
[root@xujunk ~]#rabbitmqctl add_user xjk 123
给新用户设置管理员角色
[root@xujunk ~]#rabbitmqctl set_user_tags xjk adminstarator
给当前用户,设置权限:能够对全部的队列,进行可读可写操做
#语法:set_permissions [-p <vhostpath>] <user> <conf> <write> <read> [root@xujunk ~]#rabbitmqctl set_permissions -p "/" xjk ".*" ".*" ".*"
添加rabbtimq管理界面
[root@xujunk ~]#rabbitmq-plugins enable rabbitmq_management
web端访问:
http://192.168.58.131:15672/
登录rabbitmq服务器输入帐号密码
在一个文件夹建立2个py文件,一个消费者,一个表明生产者
import pika # 创建与rabbitmq的链接 credentials = pika.PlainCredentials("xjk","123") connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.58.131',credentials=credentials)) channel = connection.channel() channel.queue_declare(queue="水续传") def callbak(ch,method,properties,body): print("消费者接收到了任务:%r"%body.decode("utf8")) # 有消息来临,当即执行callbak,没有消息则夯住,等待消息 # 老百姓开始去邮箱取邮件啦,队列名字是水许传 channel.basic_consume("水续传",callbak) # 开始消费,接收消息 channel.start_consuming() #!!注意:因rabbitmq版本不一样,channel.basic_consume的参数位置会有所变化,报错“got multiple values for keyword argument 'queue'”须要根据源码调整参数位置。
#!/usr/bin/env python import pika # 建立凭证,使用rabbitmq用户密码登陆 # 去邮局取邮件,必须得验证身份 credentials = pika.PlainCredentials("xjk","123") # 新建链接,这里localhost能够更换为服务器ip # 找到这个邮局,等于链接上服务器 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.58.131',credentials=credentials)) # 建立频道 # 建造一个大邮箱,隶属于这家邮局的邮箱,就是个链接 channel = connection.channel() # 声明一个队列,用于接收消息,队列名字叫“水许传” channel.queue_declare(queue='水续传') # 注意在rabbitmq中,消息想要发送给队列,必须通过交换(exchange),初学可使用空字符串交换 (exchange=''),它容许咱们精确的指定发送给哪一个队列(routing_key=''),参数body值发>送的数据 channel.basic_publish(exchange='', routing_key='水续传', body='大郎 起来喝药了') print("已经发送了消息") # 程序退出前,确保刷新网络缓冲以及消息发送给rabbitmq,须要关闭本次链接 connection.close()
ACK机制:
ACK机制用于保证消费者若是拿了队列的消息,客户端
处理时出错了,那么队列中仍然还存在这个消息,提供下一位消费者继续取
官网资料:http://www.rabbitmq.com/tutorials/tutorial-two-python.html
生产者pro_ack.py
#!/usr/bin/env python import pika # 建立凭证,使用rabbitmq用户密码登陆 # 去邮局取邮件,必须得验证身份 credentials = pika.PlainCredentials("xjk","123") # 新建链接,这里localhost能够更换为服务器ip # 找到这个邮局,等于链接上服务器 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.58.131',credentials=credentials)) # 建立频道 # 建造一个大邮箱,隶属于这家邮局的邮箱,就是个链接 channel = connection.channel() # 新建一个hello队列,用于接收消息 # 这个邮箱能够收发各个班级的邮件,经过 channel.queue_declare(queue='西游记') # 注意在rabbitmq中,消息想要发送给队列,必须通过交换(exchange),初学可使用空字 符串交换(exchange=''),它容许咱们精确的指定发送给哪一个队列(routing_key=''),参数body值发送的数据 channel.basic_publish(exchange='', routing_key='西游记', body='大师兄,师傅被妖怪抓走了') print("已经发送了消息") # 程序退出前,确保刷新网络缓冲以及消息发送给rabbitmq,须要关闭本次链接 connection.close()
消费者代码cus_ack
import pika credentials = pika.PlainCredentials("xjk","123") connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.58.131',credentials=credentials)) channel = connection.channel() # 声明一个队列(建立一个队列) channel.queue_declare(queue='西游记') def callback(ch, method, properties, body): print("消费者接受到了任务: %r" % body.decode("utf-8")) #int('asdfasdf') # 我告诉rabbitmq服务端,我已经取走了消息 # 回复方式在这,告诉服务端,我正确消费了消息,你能够标记 清除了 ch.basic_ack(delivery_tag=method.delivery_tag) # 关闭no_ack,表明给与服务端ack回复,确认给与回复 channel.basic_consume(on_message_callback=callback,queue='西>游记',auto_ack=False) channel.start_consuming()
向队列push:python3 pro_ack.py
从队列pull:python3 cus_ack.py
注意:reids版本不一样:
#若是用此方法报错: channel.basic_consume(callback,queue='西游记',no_ack=False) #改为: channel.basic_consume(on_message_callback=callback,queue='西>游记',auto_ack=False) #缘由版本的问题
显示效果:
当另外一终端执行:[root@xujunk cs]#python3 cus_ack.py
消息的可靠性是RabbitMQ的一大特点,那么RabbitMQ是如何保证消息可靠性的呢——消息持久化。 为了保证RabbitMQ在退出或者crash等异常状况下数据没有丢失,须要将queue,exchange和Message都持久化。
生产者:Persist_pro.py
import pika # 有密码 channel = connection.channel() # 声明一个队列(建立一个队列) # 默认此队列不支持持久化,若是服务挂掉,数据丢失 # durable=True 开启持久化,必须新开启一个队列,本来的队列已经不支持持久化了 ''' 实现rabbitmq持久化条件 delivery_mode=2 使用durable=True声明queue是持久化 ''' channel.queue_declare(queue='python',durable=True) #这 里实现队列建立的时候,就是持久化的 channel.basic_publish(exchange='', routing_key='python', # 消息队列名称 body='life is short,i use python ', # 支持数据持>久化 properties=pika.BasicProperties( delivery_mode=2,#表明消息是持久的 2 ) ) connection.close()
消费者 :Persist_cus.py
import pika credentials = pika.PlainCredentials("xjk","123") connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.58.131',credentials=credentials)) channel = connection.channel() # 确保队列持久化 channel.queue_declare(queue='python',durable=True) ''' 必须确保给与服务端消息回复,表明我已经消费了数据,不然数据 一>直持久化,不会消失 ''' def callback(ch, method, properties, body): print("成功取出了消息 >>: %r" % body.decode("utf-8")) # 模拟代码报错 # int('asdfasdf') # 此处报错,没有给予回复,保>证客户端挂掉,数据不丢失 # 告诉服务端,我已经取走了数据,不然数据一直存在 ch.basic_ack(delivery_tag=method.delivery_tag) # 关闭no_ack,表明给与回复确认 channel.basic_consume(on_message_callback=callback,queue='python',auto_ack=False) channel.start_consuming()
显示效果以下,D
表示数据持久化,这样不管重启服务器,队列不会丢失
前面的效果都是一对一发,若是作一个广播效果可不能够,这时候就要用到exchange了 。exchange必须精确的知道
收到的消息要发给谁。exchange的类型决定了怎么处理, 类型有如下几种:
fanout:exchange将消息发送给和该exchange链接的全部queue;也就是所谓的广播模式;此模式下忽略
routing_key
driect:经过routingKey和exchange决定的那个惟一的queue能够接收消息,只有routing_key为“black”时才将
其发送到队列queue_name;
topic: 全部符合routingKey(此时能够是一个表达式)的routingKey所bind的queue能够接收消息
须要queue和exchange绑定,由于消费者不是和exchange直连的,消费者是连在queue上,queue绑定在exchange 上,消费者只会在queue里读取消息
发送端:fanout_send.py
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() # 注意:这里是广播,不须要声明queue channel.exchange_declare(exchange='logs',exchange_type='fanout') # 声明广播管道 # message = ' '.join(sys.argv[1:]) or "info: Hello World!" message = "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) # 注意此处空,必须有 print(" [x] Sent %r" % message) connection.close()
接收端:fanout_recv.py
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs',exchange_type='fanout') #不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除 result = channel.queue_declare('',exclusive=True) # 获取随机的queue名字 queue_name = result.method.queue print("random queuename:", queue_name) # queue绑定到转发器上 channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(on_message_callback=callback, queue=queue_name, auto_ack=True) channel.start_consuming()
路由模式,经过routing_key将消息发送给对应的queue; 以下面这句便可设置exchange为direct模式,只有 routing_key为“black”时才将其发送到队列queue_name;
channel.queue_bind(exchange=exchange_name,queue=queue_name,routing_key='black')
在上图中,Q1和Q2能够绑定同一个key,如绑定routing_key=‘KeySame’,那么收到routing_key为KeySame的消息
时将会同时发送给Q1和Q2,退化为广播模式;
发送端:direct_send.py
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs',exchange_type='direct')#重要程度级别,这里默认定义为 info severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
接收端:direct_recv.py
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs',exchange_type='direct') result = channel.queue_declare('',exclusive=True) queue_name = result.method.queue # 获取运行脚本全部的参数 severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) # 循环列表去绑定 for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(on_message_callback=callback, queue=queue_name, auto_ack=True) channel.start_consuming()
效果演示:
以前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键
字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 断定应该将数据发送至指定队列。
消费者:key_cus1.py
import pika credentials = pika.PlainCredentials("xjk","123") connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.58.131',credentials=credentials)) channel = connection.channel() # exchange='m1',exchange(秘>书)的名称 # exchange_type='fanout' , 秘书工做方式将消息发送 给全部的队列 channel.exchange_declare(exchange='m2',exchange_type='direct') # 随机生成一个队列,队列退出时,删除这个队列 result = channel.queue_declare('cus1',exclusive=True) queue_name = result.method.queue#让exchange和queque进行绑定 ,只要 channel.queue_bind(exchange='m2',queue=queue_name,routing_key='bmw') channel.queue_bind(exchange='m2',queue=queue_name,routing_key='benz') def callback(ch, method, properties, body): print("消费者接受到了任务: %r" % body) channel.basic_consume(on_message_callback=callback,queue=queue_name,auto_ack=True) channel.start_consuming()
消费者:key_cus2.py
import pika credentials = pika.PlainCredentials("xjk","123") connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.58.131',credentials=credentials)) channel = connection.channel() # exchange='m1',exchange(秘>书)的名称 # exchange_type='fanout' , 秘书工做方式将消息发送 给全部的队列 channel.exchange_declare(exchange='m2',exchange_type='direct') # 随机生成一个队列,队列退出时,删除这个队列 result = channel.queue_declare('cus2',exclusive=True) queue_name = result.method.queue#让exchange和queque进行绑定 ,只要 channel.queue_bind(exchange='m2',queue=queue_name,routing_key='bmw') def callback(ch, method, properties, body): print("消费者接受到了任务: %r" % body) channel.basic_consume(on_message_callback=callback,queue=queue_name,auto_ack=True) channel.start_consuming()
生产者:key_pub.py
import pika credentials = pika.PlainCredentials("xjk","123") connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.58.131',credentials=credentials)) channel = connection.channel() # 路由模式的交换机会发送给绑定的 key和routing_key匹配的队列 channel.exchange_declare(exchange='m2',exchange_type='direct') # 发送消息,给有关benz的路由关键字 channel.basic_publish(exchange='m2', routing_key='benz', body='benz is good car') connection.close()
#新建用户 [root@xujunk cs]#rabbitmqctl add_user 用户名 密码 #设置权限 [root@xujunk cs]#rabbitmqctl set_user_tags {用户名} {权限} #查看用户列表 [root@xujunk cs]#rabbitmqctl list_users #为用户受权 添加 Virtual Hosts : rabbitmqctl add_vhost <vhost> #修改用户密码 rabbitmqctl change_password 用户名 密码 [root@xujunk cs]#rabbitmqctl change_password xm 1234 #删除 Users : rabbitmqctl delete_user 用户名 [root@xujunk cs]#rabbitmqctl delete_user xm #使用户user1具备vhost1这个virtual host中全部资源的配置、写、读权限以便管理其中的资源 rabbitmqctl set_permissions -p vhost1 user1 '.*' '.*' '.*' [root@xujunk ~]#rabbitmqctl set_permissions -p "/" xjk ".*" ".*" ".*" #查看权限 rabbitmqctl list_user_permissions user1 [root@xujunk cs]#rabbitmqctl list_user_permissions xjk #查看队列 [root@xujunk cs]#rabbitmqctl list_queues #清空队列步骤: 1.关闭应用:rabbitmqctl stop_app 2.从新启动:rabbitmqctl reset 3.此时查看队列:rabbitmqctl list_queues 查看全部的exchange:rabbitmqctl list_exchanges 查看全部的queue: rabbitmqctl list_queues 查看全部的用户:rabbitmqctl list_users 查看全部的绑定(exchange和queue的绑定信息):rabbitmqctl list_bindings
AMQP AMQP协议是一个高级抽象层消息通讯协议,RabbitMQ是AMQP协议的实现。它主要包括如下组件: 1.Server(broker): 接受客户端链接,实现AMQP消息队列和路由功能的进程。 2.Virtual Host:实际上是一个虚拟概念,相似于权限控制组,一个Virtual Host里面能够有若干个Exchange和Queue,可是权限控制的最小粒度是Virtual Host 3.Exchange:接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三种,不一样类型的Exchange路由的行为是不同的。 4.Message Queue:消息队列,用于存储还未被消费者消费的消息。 5.Message: 由Header和Body组成,Header是由生产者添加的各类属性的集合,包括Message是否被持久化、由哪一个Message Queue接受、优先级是多少等。而Body是真正须要传输的APP数据。 6.Binding:Binding联系了Exchange与Message Queue。Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header获得Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,二者的匹配方式由Exchange Type决定。 7.Connection:链接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP链接。 8.Channel:信道,仅仅建立了客户端到Broker之间的链接后,客户端仍是不能发送消息的。须要为每个Connection建立Channel,AMQP协议规定只有经过Channel才能执行AMQP的命令。一个Connection能够包含多个Channel。之因此须要Channel,是由于TCP链接的创建和释放都是十分昂贵的,若是一个客户端每个线程都须要与Broker交互,若是每个线程都创建一个TCP链接,暂且不考虑TCP链接是否浪费,就算操做系统也没法承受每秒创建如此多的TCP链接。RabbitMQ建议客户端线程之间不要共用Channel,至少要保证共用Channel的线程发送消息必须是串行的,可是建议尽可能共用Connection。 9.Command:AMQP的命令,客户端经过Command完成与AMQP服务器的交互来实现自身的逻辑。例如在RabbitMQ中,客户端能够经过publish命令发送消息,txSelect开启一个事务,txCommit提交一个事务。