在程序系统中,例如外卖系统,订单系统,库存系统,优先级较高 发红包,发邮件,发短信,app消息推送等任务优先级很低,很适合交给消息队列去处理,以便于程序系统更快的处理其余请求。html
消息队列工做流程:前端
1 1 消息队列通常有三个角色: 2 2 队列服务端 3 3 队列生产者 4 4 队列消费者 5 5 消息队列工做流程就如同一个流水线,有产品加工,一个输送带,一个打包产品 6 6 输送带就是 不停运转的消息队列服务端 7 7 加工产品的就是 队列生产者 8 8 在传输带结尾打包产品的 就是队列消费者
队列产品:java
1 1 RabbitMQ 2 2 Erlang编写的消息队列产品,企业级消息队列软件,支持消息负载均衡,数据持久化等。 3 3 4 4 ZeroMQ 5 5 saltstack软件使用此消息,速度最快。 6 6 7 7 Redis 8 8 key-value的系统,也支持队列数据结构,轻量级消息队列 9 9 10 10 Kafka 11 11 由Scala编写,目标是为处理实时数据提供一个统1、高通量、低等待的平台
一个app系统队列工做流程:python
1 消费者,一个后台进程,不断的去检测消息队列中是否有消息,有消息就取走,开启新线程去处理业务,
若是没有一会再来
消息队列的做用:web
1 1 (1)程序解耦 2 2 容许你独立的修改或扩展两边的处理过程,只要确保它们遵照一样的接口约束 3 3 (2)冗余 4 4 消息队列把数据进行持久化直到它们已经被彻底处理,经过这一方式规避了数据丢失风险。 5 5 许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除以前,须要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。 6 6 (3)峰值处理能力(削峰) 7 7 举例秒杀活动或者抢票. 8 8 在访问量剧增的状况下,应用仍然须要继续发挥做用,可是这样的突发流量并不常见。若是为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。 9 10 使用消息队列可以使关键组件顶住突发的访问压力,而不会由于突发的超负荷的请求而彻底崩溃。 10 11 (4)可恢复性 11 12 系统的一部分组件失效时,不会影响到整个系统。消息队列下降了进程间的耦合度,因此即便一个处理消息的进程挂掉,加入队列中的消息仍然能够在系统恢复后被处理。 12 13 (5)顺序保证 13 14 在大多使用场景下,数据处理的顺序都很重要。大部分消息队列原本就是排序的,而且能保证数据会按照特定的顺序来处理。 14 15 (6)缓冲 15 16 有助于控制和优化数据流通过系统的速度,解决生产消息和消费消息的处理速度不一致的状况。 16 17 (7)异步通讯 17 18 不少时候,用户不想也不须要当即处理消息。好比发红包,发短信等流程。消息队列提供了异步处理机制,容许用户把一个消息放入队列,但并不当即处理它。想向队列中放入多少消息就放多少,而后在须要的时候再去处理它们。
rabbitMQ正则表达式
1、生活中消息队列的举例:centos
1 生活里的消息队列,如同邮局的邮箱, 2 若是没邮箱的话, 3 邮件必须找到邮件那我的,递给他,才玩完成,那这个任务会处理的很麻烦,很慢,效率很低 4 5 可是若是有了邮箱, 6 邮件直接丢给邮箱,用户只须要去邮箱里面去找,有没有邮件,有就拿走,没有就下次再来,这样能够极大的提高邮件收发效率!
rabbitmq是接收,存储,转发数据的.缓存
消息是应用间传送数据的通讯方式.消息能够很是简单,能够是字符串,也能够是序列化之后的复杂数据.消息队列是一种应用间的通讯方式,消息发送后能够当即返回,由消息系统来确保消息的可靠传递.消息发布者只须要把消息发送到MQ中而不用管谁来取,消息使用者只管从MQ取消息而无论谁发布的.这样发布者和消费者都不须要知道对方是否存在.安全
2、什么状况下用到消息队列?服务器
1. 电商订单
点外卖的业务逻辑可能会包括: 检查库存、生成的单据、发优惠红包、短信通知等。若是这些个业务同一块执行,完成下单的效率是很是的低的,发红包、短信通知这些不是完成一个订单必须的业务,就能够异步执行。
异步执行用到MQ,能够在核心流程(扣减库存,生成订单消息)等完成后发送消息到MQ,快速结束本次流程。消费者拉取MQ消息时,发现红包、短信等消息时,再进行处理。
场景:双11是购物狂节,用户下单后,订单系统须要通知库存系统,传统的作法就是订单系统调用库存系统的接口
这种作法有一个缺点:
当库存系统出现故障时,订单就会失败。(这样马云将少赚好多好多钱钱。。。。)
订单系统和库存系统高耦合.
引入消息队列
订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。
库存系统:订阅下单的消息,获取下单消息,进行库操做。
二、秒杀活动
流量削峰通常在秒杀活动中应用普遍 场景:秒杀活动,通常会由于流量过大,致使应用挂掉,为了解决这个问题,通常在应用前端加入消息队列。 做用: 1.能够控制活动人数,超过此必定阀值的订单直接丢弃(怪不得我一次秒杀都没抢到过。。。。。wtf???)
2.能够缓解短期的高流量压垮应用(应用程序按本身的最大处理能力获取订单)
3.用户的请求,服务器接收到以后,写入消息队列,超过定义的阈值就直接丢弃请求,或者跳转错误页面。
4.业务系统取出队列中的消息,再作后续处理。
3、Rabbit MQ安装
1 rabbitmq-server服务端 2 3 1.下载centos源 4 wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.cloud.tencent.com/repo/centos7_base.repo 5 2.下载epel源 6 wget -O /etc/yum.repos.d/epel.repo http://mirrors.cloud.tencent.com/repo/epel-7.repo 7 3.清空yum缓存而且生成新的yum缓存 8 yum clean all 9 yum makecache 10 4.安装erlang 11 $ yum -y install erlang 12 5.安装RabbitMQ 13 $ yum -y install rabbitmq-server 14 6.启动(无用户名密码): 15 systemctl start/stop/restart/status rabbitmq-server 16 17 设置rabbitmq帐号密码,以及角色权限设置 18 19 # 设置新用户yugo 密码123 20 sudo rabbitmqctl add_user yugo 123 21 22 # 设置用户为administrator角色 23 sudo rabbitmqctl set_user_tags yugo administrator 24 25 # 设置权限,容许对全部的队列都有权限 26 对何种资源具备配置、写、读的权限经过正则表达式来匹配,具体命令以下: 27 set_permissions [-p <vhostpath>] <user> <conf> <write> <read> 28 29 sudo rabbitmqctl set_permissions -p "/" yugo ".*" ".*" ".*" 30 31 #重启服务生效设置 32 service rabbitmq-server start/stop/restart 33 rabbitmq相关命令 34 35 // 新建用户 36 rabbitmqctl add_user {用户名} {密码} 37 38 // 设置权限 39 rabbitmqctl set_user_tags {用户名} {权限} 40 41 // 查看用户列表 42 rabbitmqctl list_users 43 44 // 为用户受权 45 添加 Virtual Hosts : 46 rabbitmqctl add_vhost <vhost> 47 48 // 删除用户 49 rabbitmqctl delete_user Username 50 51 // 修改用户的密码 52 rabbitmqctl change_password Username Newpassword 53 54 // 删除 Virtual Hosts : 55 rabbitmqctl delete_vhost <vhost> 56 57 // 添加 Users : 58 rabbitmqctl add_user <username> <password> 59 rabbitmqctl set_user_tags <username> <tag> ... 60 rabbitmqctl set_permissions [-p <vhost>] <user> <conf> <write> <read> 61 62 // 删除 Users : 63 delete_user <username> 64 65 // 使用户user1具备vhost1这个virtual host中全部资源的配置、写、读权限以便管理其中的资源 66 rabbitmqctl set_permissions -p vhost1 user1 '.*' '.*' '.*' 67 68 // 查看权限 69 rabbitmqctl list_user_permissions user1 70 71 rabbitmqctl list_permissions -p vhost1 72 73 // 清除权限 74 rabbitmqctl clear_permissions [-p VHostPath] User 75 76 //清空队列步骤 77 rabbitmqctl reset 78 须要提早关闭应用rabbitmqctl stop_app , 79 而后再清空队列,启动应用 80 rabbitmqctl start_app 81 此时查看队列rabbitmqctl list_queues 82 83 查看全部的exchange: rabbitmqctl list_exchanges 84 查看全部的queue: rabbitmqctl list_queues 85 查看全部的用户: rabbitmqctl list_users 86 查看全部的绑定(exchange和queue的绑定信息): rabbitmqctl list_bindings 87 查看消息确认信息: 88 rabbitmqctl list_queues name messages_ready messages_unacknowledged 89 查看RabbitMQ状态,包括版本号等信息:rabbitmqctl status 90 91 #开启web界面rabbitmq 92 rabbitmq-plugins enable rabbitmq_management 93 94 #访问web界面 95 http://server-name:15672/
Rabbit MQ组件解释
1 AMQP协议是一个高级抽象层消息通讯协议,RabbitMQ是AMQP协议的实现。它主要包括如下组件: 2 1.Server(broker): 接受客户端链接,实现AMQP消息队列和路由功能的进程。 3 4 2.Virtual Host:实际上是一个虚拟概念,相似于权限控制组,一个Virtual Host里面能够有若干个Exchange和Queue,可是权限控制的最小粒度是Virtual Host 5 6 3.Exchange:接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三种,不一样类型的Exchange路由的行为是不同的。 7 8 4.Message Queue:消息队列,用于存储还未被消费者消费的消息。 9 10 5.Message: 由Header和Body组成,Header是由生产者添加的各类属性的集合,包括Message是否被持久化、由哪一个Message Queue接受、优先级是多少等。而Body是真正须要传输的APP数据。 11 12 6.Binding:Binding联系了Exchange与Message Queue。Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header获得Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,二者的匹配方式由Exchange Type决定。 13 14 7.Connection:链接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP链接。 15 16 8.Channel:信道,仅仅建立了客户端到Broker之间的链接后,客户端仍是不能发送消息的。须要为每个Connection建立Channel,AMQP协议规定只有经过Channel才能执行AMQP的命令。一个Connection能够包含多个Channel。之因此须要Channel,是由于TCP链接的创建和释放都是十分昂贵的,若是一个客户端每个线程都须要与Broker交互,若是每个线程都创建一个TCP链接,暂且不考虑TCP链接是否浪费,就算操做系统也没法承受每秒创建如此多的TCP链接。RabbitMQ建议客户端线程之间不要共用Channel,至少要保证共用Channel的线程发送消息必须是串行的,可是建议尽可能共用Connection。 17 18 9.Command:AMQP的命令,客户端经过Command完成与AMQP服务器的交互来实现自身的逻辑。例如在RabbitMQ中,客户端能够经过publish命令发送消息,txSelect开启一个事务,txCommit提交一个事务。
python客户端
1 // rabbitmq官方推荐的python客户端pika模块 2 pip3 install pika
应用场景一、单发送单接收
生产者消费者模型
P 是生产者
C 是消费者
中间hello是消息队列
能够有多个P、多个C
P发送消息给hello队列,C消费者从队列中获取消息,默认轮询方式
生产者send.py
咱们的第一个程序send.py将向队列发送一条消息。咱们须要作的第一件事是创建与RabbitMQ服务器的链接。
1 #!/usr/bin/env python 2 import pika 3 # 建立凭证,使用rabbitmq用户密码登陆 4 # 去邮局取邮件,必须得验证身份 5 credentials = pika.PlainCredentials("s14","123") 6 # 新建链接,这里localhost能够更换为服务器ip 7 # 找到这个邮局,等于链接上服务器 8 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.119.10',credentials=credentials)) 9 # 建立频道 10 # 建造一个大邮箱,隶属于这家邮局的邮箱,就是个链接 11 channel = connection.channel() 12 # 声明一个队列,用于接收消息,队列名字叫“水许传” 13 channel.queue_declare(queue='水许传') 14 # 注意在rabbitmq中,消息想要发送给队列,必须通过交换(exchange),初学可使用空字符串交换(exchange=''),它容许咱们精确的指定发送给哪一个队列(routing_key=''),参数body值发送的数据 15 channel.basic_publish(exchange='', 16 routing_key='水许传', 17 body='武松又去打老虎啦2') 18 print("已经发送了消息") 19 # 程序退出前,确保刷新网络缓冲以及消息发送给rabbitmq,须要关闭本次链接 20 connection.close()
能够同时存在多个接受者,等待接收队列的消息,默认是轮训方式分配消息
接受者receive.py,能够运行屡次,运行多个消费者
1 import pika 2 # 创建与rabbitmq的链接 3 credentials = pika.PlainCredentials("s14","123") 4 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.119.10',credentials=credentials)) 5 channel = connection.channel() 6 channel.queue_declare(queue="水许传") 7 8 def callbak(ch,method,properties,body): 9 print("消费者接收到了任务:%r"%body.decode("utf8")) 10 # 有消息来临,当即执行callbak,没有消息则夯住,等待消息 11 # 老百姓开始去邮箱取邮件啦,队列名字是水许传 12 channel.basic_consume(callbak,queue="水许传",no_ack=True) 13 # 开始消费,接收消息 14 channel.start_consuming()
应用场景二、单发送多接收
使用场景:一个发送端,多个接收端,如分布式的任务派发。为了保证消息发送的可靠性,不丢失消息,使消息持久化了。同时为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息。
Rabbitmq消息确认之ack
官网资料:http://www.rabbitmq.com/tutorials/tutorial-two-python.html
默认状况下,生产者发送数据给队列,消费者取出消息后,数据将被清除。
特殊状况,若是消费者处理过程当中,出现错误,数据处理没有完成,那么这段数据将从队列丢失
no-ack机制
不确认机制
也就是说每次消费者接收到数据后,无论是否处理完毕,rabbitmq-server都会把这个消息标记完成,从队列中删除
ACK机制
ACK机制用于保证消费者若是拿了队列的消息,客户端
处理时出错了,那么队列中仍然还存在这个消息,提供下一位消费者继续取
流程
1.生产者无须变更,发送消息 2.消费者若是no_ack=True啊,数据消费后若是出错就会丢失 反之no_ack=False,数据消费若是出错,数据也不会丢失 3.ack机制在消费者代码中演示
生产者.py 只负责发送数据便可,无须变更
1 #!/usr/bin/env python 2 import pika 3 # 建立凭证,使用rabbitmq用户密码登陆 4 # 去邮局取邮件,必须得验证身份 5 credentials = pika.PlainCredentials("s14","123") 6 # 新建链接,这里localhost能够更换为服务器ip 7 # 找到这个邮局,等于链接上服务器 8 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.119.10',credentials=credentials)) 9 # 建立频道 10 # 建造一个大邮箱,隶属于这家邮局的邮箱,就是个链接 11 channel = connection.channel() 12 # 新建一个hello队列,用于接收消息 13 # 这个邮箱能够收发各个班级的邮件,经过 14 channel.queue_declare(queue='金品没') 15 # 注意在rabbitmq中,消息想要发送给队列,必须通过交换(exchange),初学可使用空字符串交换(exchange=''),它容许咱们精确的指定发送给哪一个队列(routing_key=''),参数body值发送的数据 16 channel.basic_publish(exchange='', 17 routing_key='金品没', 18 body='潘金莲又出去。。。') 19 print("已经发送了消息") 20 # 程序退出前,确保刷新网络缓冲以及消息发送给rabbitmq,须要关闭本次链接 21 connection.close()
消费者.py给与ack回复
拿到消息必须给rabbitmq服务端回复ack信息,不然消息不会被删除,防止客户端出错,数据丢失
1 import pika 2 3 credentials = pika.PlainCredentials("s14","123") 4 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.119.10',credentials=credentials)) 5 channel = connection.channel() 6 7 # 声明一个队列(建立一个队列) 8 channel.queue_declare(queue='金品没') 9 10 def callback(ch, method, properties, body): 11 print("消费者接受到了任务: %r" % body.decode("utf-8")) 12 # int('asdfasdf') 13 # 我告诉rabbitmq服务端,我已经取走了消息 14 # 回复方式在这 15 ch.basic_ack(delivery_tag=method.delivery_tag) 16 # 关闭no_ack,表明给与服务端ack回复,确认给与回复 17 channel.basic_consume(callback,queue='金品没',no_ack=False) 18 19 channel.start_consuming()
消息持久化
1 演示 2 1.执行生产者,向队列写入数据,产生一个新队列queue 3 2.重启服务端,队列丢失 4 5 3.开启生产者数据持久化后,重启rabbitmq,队列不丢失 6 4.依旧能够读取数据
消息的可靠性是RabbitMQ的一大特点,那么RabbitMQ是如何保证消息可靠性的呢——消息持久化。 为了保证RabbitMQ在退出或者crash等异常状况下数据没有丢失,须要将queue,exchange和Message都持久化。
EXchange模型
rabbitmq发送消息首先是发给exchange,而后再经过exchange发送消息给队列(queue)
exchange有四种模式
fanout
exchange将消息发送给和该exchange链接的全部queue;也就是所谓的广播模式;此模式下忽略routing_key;
direct
路由模式,经过routing_key将消息发送给对应的queue; 以下面这句便可设置exchange为direct模式,只有routing_key为“black”时才将其发送到队列queue_name;channel.queue_bind(exchange=exchange_name,queue=queue_name,routing_key='black')
在上图中,Q1和Q2能够绑定同一个key,如绑定routing_key=‘KeySame’,那么收到routing_key为KeySame的消息时将会同时发送给Q1和Q2,退化为广播模式;
top
topic模式相似于direct模式,只是其中的routing_key变成了一个有“.”分隔的字符串,“.”将字符串分割成几个单词,每一个单词表明一个条件;
headers
headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
官方教程:http://www.rabbitmq.com/tutorials/tutorial-three-python.html
发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给全部的订阅者,而消息队列中的数据被消费一次便消失。因此,RabbitMQ实现发布和订阅时,会为每个订阅者建立一个队列,而发布者发布消息时,会将消息放置在全部相关队列中。
1 # fanout全部的队列放一份/给某些队列发 2 # 传送消息的模式 3 # 与exchange有关的模式都发 4 exchange_type = fanout
能够运行屡次,运行多个消费者,等待消息
实例
1.能够运行多个消费者,至关于有多个滴滴司机,等待着Exchange同一个电台发消息 2.运行发布者,发送消息给Exchange,查看是否给全部的队列(滴滴司机)发送了消息
关键字发布Exchange
以前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 断定应该将数据发送至指定队列。
1 路由关键字是sb,alex 2 3 # -*- coding: utf-8 -*- 4 # __author__ = "maple" 5 import pika 6 7 credentials = pika.PlainCredentials("root","123") 8 connection = pika.BlockingConnection(pika.ConnectionParameters('123.206.16.61',credentials=credentials)) 9 channel = connection.channel() 10 11 # exchange='m1',exchange(秘书)的名称 12 # exchange_type='fanout' , 秘书工做方式将消息发送给全部的队列 13 channel.exchange_declare(exchange='m2',exchange_type='direct') 14 15 # 随机生成一个队列,队列退出时,删除这个队列 16 result = channel.queue_declare(exclusive=True) 17 queue_name = result.method.queue 18 19 # 让exchange和queque进行绑定,只要 20 channel.queue_bind(exchange='m2',queue=queue_name,routing_key='alex') 21 channel.queue_bind(exchange='m2',queue=queue_name,routing_key='sb') 22 23 24 def callback(ch, method, properties, body): 25 print("消费者接受到了任务: %r" % body) 26 27 channel.basic_consume(callback,queue=queue_name,no_ack=True) 28 29 channel.start_consuming() 30
1 路由关键字sb 2 3 # -*- coding: utf-8 -*- 4 # __author__ = "maple" 5 import pika 6 7 credentials = pika.PlainCredentials("root","123") 8 connection = pika.BlockingConnection(pika.ConnectionParameters('123.206.16.61',credentials=credentials)) 9 channel = connection.channel() 10 11 # exchange='m1',exchange(秘书)的名称 12 # exchange_type='fanout' , 秘书工做方式将消息发送给全部的队列 13 channel.exchange_declare(exchange='m2',exchange_type='direct') 14 15 # 随机生成一个队列 16 result = channel.queue_declare(exclusive=True) 17 queue_name = result.method.queue 18 19 # 让exchange和queque进行绑定. 20 channel.queue_bind(exchange='m2',queue=queue_name,routing_key='sb') 21 22 23 def callback(ch, method, properties, body): 24 print("消费者接受到了任务: %r" % body) 25 26 channel.basic_consume(callback,queue=queue_name,no_ack=True) 27 28 channel.start_consuming()
1 发送消息给匹配的路由,sb或者alex 2 3 # -*- coding: utf-8 -*- 4 # __author__ = "yugo" 5 6 7 import pika 8 credentials = pika.PlainCredentials("root","123") 9 connection = pika.BlockingConnection(pika.ConnectionParameters('123.206.16.61',credentials=credentials)) 10 channel = connection.channel() 11 12 # 路由模式的交换机会发送给绑定的key和routing_key匹配的队列 13 channel.exchange_declare(exchange='m2',exchange_type='direct') 14 # 发送消息,给有关sb的路由关键字 15 channel.basic_publish(exchange='m2', 16 routing_key='sb', 17 body='aaaalexlaolelaodi') 18 19 connection.close()
RPC之远程过程调用
将一个函数运行在远程计算机上而且等待获取那里的结果,这个称做远程过程调用(Remote Procedure Call)或者 RPC。
RPC是一个计算机通讯协议。
比喻
将计算机服务运行理解为厨师作饭,厨师想作一个小葱拌豆腐,厨师须要洗小葱、切豆腐、调汁、凉拌。他一我的完成全部的事,如同古老的集中式应用,一台计算机作全部的事。
制做小葱拌豆腐{
厨师>洗小葱>切豆腐>凉拌
}
rpc
应用场景
而现在,饭店作大了,有钱了,专职分工来干活,再也不是厨师单打独斗,备菜师傅准备小葱、豆腐,切菜师傅切小葱、豆腐,厨师只负责调味,完成食品。
制做小葱拌豆腐{
备菜师>洗菜
切菜师>切菜
厨师>调味
}
此时一件事好多人在作,厨师就得和其余人沟通,通知备菜、洗菜师傅的这个动做就是远程过程调用(RPC)。
这个过程在计算机系统中,一个电商的下单过程,涉及物流、支付、库存、红包等多个系统,多个系统又在多个服务器上,由不一样的技术团队负责,整个下单过程,须要全部团队进行远程调用。
下单{
库存>减小库存
支付>扣款
红包>减免红包
物流>生成订单
}
什么是rpc
rpc指的是在计算机A上的进程,调用另一台计算机B的进程,A上的进程被挂起,B上的被调用进程开始执行后,产生返回值给A,A继续执行。
调用方能够经过参数将信息传递给被调用方,然后经过返回结果获得信息,这个过程对于开发人员来讲是透明的。
如同厨师同样,服务员把菜单给后厨,厨师告诉洗菜人,备菜人,开始工做,完成工做后,整个过程对于服务员是透明的,他彻底不用管后厨是怎么把菜作好的。
因为服务在不一样的机器上,远程调用必经网络通讯,调用服务必须写一坨网络通讯代码,很容易出错且很复杂,所以就出现了RPC框架。
阿里巴巴的 Dubbo java
新浪的 Motan java
谷歌的 gRPC 多语言
Apache thrift 多语言
rpc封装了数据的序列化,反序列化,以及传输协议
python实现RPC
利用RabbitMQ构建一个RPC系统,包含了客户端和RPC服务器,依旧使用pika模块
Callback queue回调队列
一个客户端向服务器发送请求,服务器端处理请求后,将其处理结果保存在一个存储体中。而客户端为了得到处理结果,那么客户在向服务器发送请求时,同时发送一个回调队列地址reply_to
。
Correlation id 关联标识
一个客户端可能会发送多个请求给服务器,当服务器处理完后,客户端没法辨别在回调队列中的响应具体和那个请求时对应的。为了处理这种状况,客户端在发送每一个请求时,同时会附带一个独有correlation_id
属性,这样客户端在回调队列中根据correlation_id
字段的值就能够分辨此响应属于哪一个请求。
客户端发送请求:某个应用将请求信息交给客户端,而后客户端发送RPC请求,在发送RPC请求到RPC请求队列时,客户端至少发送带有reply_to以及correlation_id两个属性的信息
服务器端工做流: 等待接受客户端发来RPC请求,当请求出现的时候,服务器从RPC请求队列中取出请求,而后处理后,将响应发送到reply_to指定的回调队列中
客户端接受处理结果: 客户端等待回调队列中出现响应,当响应出现时,它会根据响应中correlation_id字段的值,将其返回给对应的应用
过程
1.启动rpc客户端,等待接收数据到来,来了以后就进行处理,再将结果丢进队列 2.启动rpc服务端,发起请求
1 import pika 2 # 创建链接,服务器地址为localhost,可指定ip地址 3 connection = pika.BlockingConnection(pika.ConnectionParameters( 4 host='192.168.119.10')) 5 # 创建会话 6 channel = connection.channel() 7 # 声明RPC请求队列 8 channel.queue_declare(queue='s14rpc') 9 10 # 模拟一个进程,例如切菜师傅,等着洗菜师傅传递数据 11 def sum(n): 12 n+=100 13 return n 14 # 对RPC请求队列中的请求进行处理 15 16 17 def on_request(ch, method, props, body): 18 print(body,type(body)) 19 n = int(body) 20 print(" 正在处理sum(%s)" % n) 21 # 调用数据处理方法 22 response = sum(n) 23 # 将处理结果(响应)发送到回调队列 24 ch.basic_publish(exchange='', 25 # reply_to表明回复目标 26 routing_key=props.reply_to, 27 # correlation_id(关联标识):用来将RPC的响应和请求关联起来。 28 properties=pika.BasicProperties(correlation_id= \ 29 props.correlation_id), 30 body=str(response)) 31 ch.basic_ack(delivery_tag=method.delivery_tag) 32 33 # 负载均衡,同一时刻发送给该服务器的请求不超过一个 34 channel.basic_qos(prefetch_count=1) 35 channel.basic_consume(on_request, queue='s14rpc') 36 print("等待接收rpc请求") 37 38 39 #开始消费 40 channel.start_consuming()