在程序系统中,例如外卖系统,订单系统,库存系统,优先级较高 发红包,发邮件,发短信,app消息推送等任务优先级很低,很适合交给消息队列去处理,以便于程序系统更快的处理其余请求。
# 消息队列通常有三个角色: 队列服务端 队列生产者 队列消费者
RabbitMQ Erlang编写的消息队列产品,企业级消息队列软件,支持消息负载均衡,数据持久化等。 ZeroMQ saltstack软件使用此消息,速度最快。 Redis key-value的系统,也支持队列数据结构,轻量级消息队列 Kafka 由Scala编写,目标是为处理实时数据提供一个统1、高通量、低等待的平台
1)程序解耦 容许你独立的扩展或修改两边的处理过程,只要确保它们遵照一样的接口约束。 2)冗余: 消息队列把数据进行持久化直到它们已经被彻底处理,经过这一方式规避了数据丢失风险。 许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除以前,须要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。 3)峰值处理能力: (大白话,就是原本公司业务只须要5台机器,可是临时的秒杀活动,5台机器确定受不了这个压力,咱们又不可能将总体服务器架构提高到10台,那在秒杀活动后,机器不就浪费了吗?所以引入消息队列) 在访问量剧增的状况下,应用仍然须要继续发挥做用,可是这样的突发流量并不常见。 若是为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。 使用消息队列可以使关键组件顶住突发的访问压力,而不会由于突发的超负荷的请求而彻底崩溃。 4)可恢复性: 系统的一部分组件失效时,不会影响到整个系统。 消息队列下降了进程间的耦合度,因此即便一个处理消息的进程挂掉,加入队列中的消息仍然能够在系统恢复后被处理。 5)顺序保证: 在大多使用场景下,数据处理的顺序都很重要。 大部分消息队列原本就是排序的,而且能保证数据会按照特定的顺序来处理。(Kafka保证一个Partition内的消息的有序性) 6)缓冲: 有助于控制和优化数据流通过系统的速度,解决生产消息和消费消息的处理速度不一致的状况。 7)异步通讯: 不少时候,用户不想也不须要当即处理消息。好比发红包,发短信等流程。 消息队列提供了异步处理机制,容许用户把一个消息放入队列,但并不当即处理它。想向队列中放入多少消息就放多少,而后在须要的时候再去处理它们。
# RabbitMQ 是用来 接收,存储,转发数据的 # 消息(Message)是指在应用间传送的数据。消息能够很是简单,好比只包含文本字符串,也能够更复杂,可能包含嵌入对象。 # 消息队列(Message Queue)是一种应用间的通讯方式,消息发送后能够当即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而无论是谁发布的。这样发布者和使用者都不用知道对方的存在。 # 官网学习:https://www.rabbitmq.com/tutorials/tutorial-one-python.html
RabbitMQ 下载安装启动html
### rabbitmq-server服务端 1.下载centos源 wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.cloud.tencent.com/repo/centos7_base.repo 2.下载epel源 wget -O /etc/yum.repos.d/epel.repo http://mirrors.cloud.tencent.com/repo/epel-7.repo 3.清空yum缓存而且生成新的yum缓存 yum clean all yum makecache 4.安装erlang $ yum -y install erlang 5.安装RabbitMQ $ yum -y install rabbitmq-server 6. 开启rabbitmq服务 systemctl start rabbitmq-server 7. 开启rabbitmq管理平台 rabbitmq-plugins enable rabbitmq-manager
#### 设置rabbitmq帐号密码,以及角色权限设置 # 设置新用户yugo 密码123 sudo rabbitmqctl add_user yugo 123 # 设置用户为administrator角色 sudo rabbitmqctl set_user_tags yugo administrator ### 设置权限,容许对全部的队列都有权限 对何种资源具备配置、写、读的权限经过正则表达式来匹配,具体命令以下: set_permissions [-p <vhostpath>] <user> <conf> <write> <read> sudo rabbitmqctl set_permissions -p "/" yugo ".*" ".*" ".*" ### 重启服务生效设置 service rabbitmq-server start/stop/restart #### rabbitmq相关命令 // 新建用户 rabbitmqctl add_user {用户名} {密码} // 设置权限 rabbitmqctl set_user_tags {用户名} {权限} // 查看用户列表 rabbitmqctl list_users // 为用户受权 添加 Virtual Hosts : rabbitmqctl add_vhost <vhost> // 删除用户 rabbitmqctl delete_user Username // 修改用户的密码 rabbitmqctl change_password Username Newpassword // 删除 Virtual Hosts : rabbitmqctl delete_vhost <vhost> // 添加 Users : rabbitmqctl add_user <username> <password> rabbitmqctl set_user_tags <username> <tag> ... rabbitmqctl set_permissions [-p <vhost>] <user> <conf> <write> <read> // 删除 Users : delete_user <username> // 使用户user1具备vhost1这个virtual host中全部资源的配置、写、读权限以便管理其中的资源 rabbitmqctl set_permissions -p vhost1 user1 '.*' '.*' '.*' // 查看权限 rabbitmqctl list_user_permissions user1 rabbitmqctl list_permissions -p vhost1 // 清除权限 rabbitmqctl clear_permissions [-p VHostPath] User //清空队列步骤 rabbitmqctl reset 须要提早关闭应用rabbitmqctl stop_app , 而后再清空队列,启动应用 rabbitmqctl start_app 此时查看队列rabbitmqctl list_queues ### 其余操做 查看全部的exchange: rabbitmqctl list_exchanges 查看全部的queue: rabbitmqctl list_queues 查看全部的用户: rabbitmqctl list_users 查看全部的绑定(exchange和queue的绑定信息): rabbitmqctl list_bindings 查看消息确认信息: rabbitmqctl list_queues name messages_ready messages_unacknowledged 查看RabbitMQ状态,包括版本号等信息:rabbitmqctl status ### 开启web界面rabbitmq rabbitmq-plugins enable rabbitmq_management ### 访问web界面 http://server-name:15672/
# RabbitMQ组件解释 AMQP AMQP协议是一个高级抽象层消息通讯协议,RabbitMQ是AMQP协议的实现。它主要包括如下组件: 1.Server(broker): 接受客户端链接,实现AMQP消息队列和路由功能的进程。 2.Virtual Host:实际上是一个虚拟概念,相似于权限控制组,一个Virtual Host里面能够有若干个Exchange和Queue,可是权限控制的最小粒度是Virtual Host 3.Exchange:接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三种,不一样类型的Exchange路由的行为是不同的。 4.Message Queue:消息队列,用于存储还未被消费者消费的消息。 5.Message: 由Header和Body组成,Header是由生产者添加的各类属性的集合,包括Message是否被持久化、由哪一个Message Queue接受、优先级是多少等。而Body是真正须要传输的APP数据。 6.Binding:Binding联系了Exchange与Message Queue。Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header获得Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,二者的匹配方式由Exchange Type决定。 7.Connection:链接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP链接。 8.Channel:信道,仅仅建立了客户端到Broker之间的链接后,客户端仍是不能发送消息的。须要为每个Connection建立Channel,AMQP协议规定只有经过Channel才能执行AMQP的命令。一个Connection能够包含多个Channel。之因此须要Channel,是由于TCP链接的创建和释放都是十分昂贵的,若是一个客户端每个线程都须要与Broker交互,若是每个线程都创建一个TCP链接,暂且不考虑TCP链接是否浪费,就算操做系统也没法承受每秒创建如此多的TCP链接。RabbitMQ建议客户端线程之间不要共用Channel,至少要保证共用Channel的线程发送消息必须是串行的,可是建议尽可能共用Connection。 9.Command:AMQP的命令,客户端经过Command完成与AMQP服务器的交互来实现自身的逻辑。例如在RabbitMQ中,客户端能够经过publish命令发送消息,txSelect开启一个事务,txCommit提交一个事务。
// rabbitmq官方推荐的python客户端pika模块 pip3 install pika
# 安装 pika 0.10.0 #### 生产 - 消费者 模型 ## touch send.py #!/usr/bin/env python import pika # 建立凭证,使用rabbitmq用户密码登陆 # 去邮局取邮件,必须得验证身份 credentials = pika.PlainCredentials("s14","123") # 新建链接,这里localhost能够更换为服务器ip # 找到这个邮局,等于链接上服务器 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.119.10',credentials=credentials)) # 建立频道 # 建造一个大邮箱,隶属于这家邮局的邮箱,就是个链接 channel = connection.channel() # 声明一个队列,用于接收消息,队列名字叫“水许传” channel.queue_declare(queue='水许传') # 注意在rabbitmq中,消息想要发送给队列,必须通过交换(exchange),初学可使用空字符串交换(exchange=''),它容许咱们精确的指定发送给哪一个队列(routing_key=''),参数body值发送的数据 channel.basic_publish(exchange='', routing_key='水许传', body='武松又去打老虎啦2') print("已经发送了消息") # 程序退出前,确保刷新网络缓冲以及消息发送给rabbitmq,须要关闭本次链接 connection.close() ## touch receive.py import pika # 创建与rabbitmq的链接 credentials = pika.PlainCredentials("s14","123") connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.119.10',credentials=credentials)) channel = connection.channel() channel.queue_declare(queue="水许传") def callbak(ch,method,properties,body): print("消费者接收到了任务:%r"%body.decode("utf8")) # 有消息来临,当即执行callbak,没有消息则夯住,等待消息 # 老百姓开始去邮箱取邮件啦,队列名字是水许传 channel.basic_consume(callbak,queue="水许传",no_ack=True) # 开始消费,接收消息 channel.start_consuming()
# ACK机制用于保证消费者若是拿了队列的消息,客户端处理时出错了,那么队列中仍然还存在这个消息,提供下一位消费者继续取 ### 流程: 1.生产者无须变更,发送消息 2.消费者若是no_ack=True啊,数据消费后若是出错就会丢失 反之no_ack=False,数据消费若是出错,数据也不会丢失 3.ack机制在消费者代码中演示 ### no-ack # 当 no-ack为false时,表示 须要获得ack的校验 # 当 no-ack为True时,表示 不须要获得校验
### send2.py #!/usr/bin/env python import pika # 建立凭证,使用rabbitmq用户密码登陆 # 去邮局取邮件,必须得验证身份 credentials = pika.PlainCredentials("s14","123") # 新建链接,这里localhost能够更换为服务器ip # 找到这个邮局,等于链接上服务器 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.119.10',credentials=credentials)) # 建立频道 # 建造一个大邮箱,隶属于这家邮局的邮箱,就是个链接 channel = connection.channel() # 新建一个hello队列,用于接收消息 # 这个邮箱能够收发各个班级的邮件,经过 channel.queue_declare(queue='金品没') # 注意在rabbitmq中,消息想要发送给队列,必须通过交换(exchange),初学可使用空字符串交换(exchange=''),它容许咱们精确的指定发送给哪一个队列(routing_key=''),参数body值发送的数据 channel.basic_publish(exchange='', routing_key='金品没', body='潘金莲又出去。。。') print("已经发送了消息") # 程序退出前,确保刷新网络缓冲以及消息发送给rabbitmq,须要关闭本次链接 connection.close() ### receive2.py import pika credentials = pika.PlainCredentials("s14","123") connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.119.10',credentials=credentials)) channel = connection.channel() # 声明一个队列(建立一个队列) channel.queue_declare(queue='金品没') def callback(ch, method, properties, body): print("消费者接受到了任务: %r" % body.decode("utf-8")) # int('asdfasdf') # 我告诉rabbitmq服务端,我已经取走了消息 # 回复方式在这 ch.basic_ack(delivery_tag=method.delivery_tag) # 关闭no_ack,表明给与服务端ack回复,确认给与回复 channel.basic_consume(callback,queue='金品没',no_ack=False) channel.start_consuming()
1.执行生产者,向队列写入数据,产生一个新队列queue 2.重启服务端,队列丢失 3.开启生产者数据持久化后,重启rabbitmq,队列不丢失 4.依旧能够读取数据
#### chiuhua_send.py import pika # 无密码 # connection = pika.BlockingConnection(pika.ConnectionParameters('123.206.16.61')) # 有密码 credentials = pika.PlainCredentials("s14","123") connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.119.10',credentials=credentials)) channel = connection.channel() # 声明一个队列(建立一个队列) # 默认此队列不支持持久化,若是服务挂掉,数据丢失 # durable=True 开启持久化,必须新开启一个队列,本来的队列已经不支持持久化了 ''' 实现rabbitmq持久化条件 delivery_mode=2 使用durable=True声明queue是持久化 ''' channel.queue_declare(queue='LOL',durable=True) channel.basic_publish(exchange='', routing_key='LOL', # 消息队列名称 body='德玛西亚万岁', # 支持数据持久化 properties=pika.BasicProperties( delivery_mode=2,#表明消息是持久的 2 ) ) connection.close()
### chijiuhua_receive.py import pika credentials = pika.PlainCredentials("s14","123") connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.119.10',credentials=credentials)) channel = connection.channel() # 确保队列持久化 channel.queue_declare(queue='LOL',durable=True) ''' 必须确保给与服务端消息回复,表明我已经消费了数据,不然数据一直持久化,不会消失 ''' def callback(ch, method, properties, body): print("消费者接受到了任务: %r" % body.decode("utf-8")) # 模拟代码报错 # int('asdfasdf') # 此处报错,没有给予回复,保证客户端挂掉,数据不丢失 # 告诉服务端,我已经取走了数据,不然数据一直存在 ch.basic_ack(delivery_tag=method.delivery_tag) # 关闭no_ack,表明给与回复确认 channel.basic_consume(callback,queue='LOL',no_ack=False) channel.start_consuming()
#RabbitMQ参考博客: https://www.cnblogs.com/pyyu/p/10318053.html
# 什么是 RPC RPC(Remote Procedure Call)——远程过程调用,它是一种经过网络从远程计算机程序上请求服务,而不须要了解底层网络技术的协议。 # RPC的做用 RPC 就是为解决服务之间信息交互而发明和存在的。 # rpc 简易理解: 相似 WEB网络请求。 RPC就是一种远程调用函数接口的方式,说白了,就是一种远程调用函数接口的方式,客户端和服务端之间约定一种契约(函数接口),而后服务端一直等待客户端的调用。
# RPC的交互 # RPC是两个子系统之间进行的直接消息交互,使用操做系统提供的套接字做为消息的载体 # 1. python的socket编程就是一种RPC通讯 #1.1 rpc_server.py import socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(("localhost", 8080)) sock.listen(1) # 监听客户端链接 while True: conn, addr = sock.accept() # 接收一个客户端链接 print(conn.recv(1024)) # 从接收缓冲读消息 recv buffer conn.sendall(b"world") # 将响应发送到发送缓冲 send buffer conn.close() # 关闭链接 #1.2 rpc_client.py import socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect(("localhost", 8080)) # 链接服务器 sock.sendall(b"hello") # 将消息输出到发送缓冲 send buffer print(sock.recv(1024)) # 从接收缓冲 recv buffer 中读响应 sock.close() # 关闭套接字...
# xmlrpc库 python 内置的 # 服务端 from xmlrpc.server import SimpleXMLRPCServer # 调用函数 def respon_string(str): return "get string:%s"%str if __name__ == '__main__': # 初始化 获得一个server对象 server = SimpleXMLRPCServer(('localhost', 8888)) # 注册 被调用的函数 server.register_function(respon_string, "get_string") print ("Listening for Client") server.serve_forever() # 保持等待调用状态 # 客户端 from xmlrpc.client import ServerProxy if __name__ == '__main__': server = ServerProxy("http://localhost:8888") # 初始化服务器 print (server.get_string("oldboy_python6666")) # 调用函数并传参
# 参考 屁眼鱼: https://www.cnblogs.com/pyyu/p/9465608.html
# 什么是 saltstack? saltstack是由thomas Hatch于2011年建立的一个开源项目,设计初衷是为了实现一个快速的远程执行系统。 # salt的做用 早期运维人员会根据本身的生产环境来写特定脚本完成大量重复性工做,这些脚本复杂且难以维护。系统管理员面临的问题主要是一、系统配置管理,二、远程执行命令,所以诞生了不少开源软件,系统维护方面有fabric、puppet、chef、ansible、saltstack等,这些软件擅长维护系统状态或方便的对大量主机进行批量的命令执行。 salt灵活性强大,能够进行大规模部署,也能进行小规模的系统部署。salt的设计架构适用于任意数量的服务器,从少许本地网络系统到跨越数个数据中心,拓扑架构都是c/s模型,配置简单。 无论是几台、几百台、几千台服务器,均可以使用salt在一个中心节点上进行管控,灵活定位任意服务器子集来运行命令。 Salt是python编写的,支持用户经过python自定义功能模块,也提供了大量的python API接口,用户能够根据须要进行简单快速的扩展。 # saltstack的运行方式 Local 本地运行,交付管理 Master/Minion <<< 经常使用方式 Salt SSH 不须要客户端
在安装salt以前,先理解salt架构中各个角色,主要区分是salt-master和salt-minion,顾名思义master是中心控制系统,minion是被管理的客户端。 salt架构中的一种就是master > minion。
# 环境准备 服务器环境 centos7(master) centos7(minion) ip地址 192.168.178.131 192.168.178.132 身份 master slave 软件包 salt-master salt-minion
# 1. 修改hosts文件 192.168.11.132 slave 192.168.11.131 master
# 2 . 关闭 防火墙 # 关闭firewalld systemctl disable firewalld systemctl stop firewalld # 关闭iptables iptables -F
# 3. 安装 saltstack wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo wget -O /etc/yum.repos.d/epel.repo http://mirrors.aliyun.com/repo/epel-7.repo yum clean all #清空缓存 yum makecache #生成yum缓存 # 3.1 master 节点上 安装 yum install salt-master -y # 3.2 minion 节点上 安装 yum install salt-minion -y
# 4. 修改配置文件 # 4.1 vim /etc/master # salt运行的用户,影响到salt的执行权限 user: root #s alt的运行线程,开的线程越多通常处理的速度越快,但通常不要超过CPU的个数 worker_threads: 10 # master的管理端口 publish_port : 4505 # master跟minion的通信端口,用于文件服务,认证,接受返回结果等 ret_port : 4506 # 若是这个master运行的salt-syndic链接到了一个更高层级的master,那么这个参数须要配置成链接到的这个高层级master的监听端口 syndic_master_port : 4506 # 指定pid文件位置 pidfile: /var/run/salt-master.pid # 4.2 vim /etc/minion # minion的识别ID,能够是IP,域名,或是能够经过DNS解析的字符串 id: slave # salt运行的用户权限 user: root # master的识别ID,能够是IP,域名,或是能够经过DNS解析的字符串 master : master # master通讯端口 master_port: 4506 # 备份模式,minion是本地备份,当进行文件管理时的文件备份模式 backup_mode: minion # 执行salt-call时候的输出方式 output: nested # minion等待master接受认证的时间 acceptance_wait_time: 10 # 失败重连次数,0表示无限次,非零会不断尝试到设置值后中止尝试 acceptance_wait_time_max: 0 # 从新认证延迟时间,能够避免由于master的key改变致使minion须要从新认证的syn风暴 random_reauth_delay: 60 # 日志文件位置 log_file: /var/logs/salt_minion.log
# 5. 启动salt-master和salt-minion systemctl start salt-minion systemctl start salt-master #检查salt状态 systemctl status salt-minion systemctl status salt-master
# 6. 在master上接收minion秘钥 # 1. 查看秘钥 salt-key -L # 2. 查看 minion秘钥 master节点: salt-key -f slave minion节点: salt-call --local key.finger # 3. master添加minion秘钥 salt-key -a slave
# 7. salt 命令 salt '*' test.ping # 7.1 test.fib生成斐波那契数列 salt "minion" test.fib 50 # 8. cmd是超级模块,全部shell命令都能执行 salt 'slave' cmd.run 'ps -ef|grep python' # 9. 安装软件 salt 'slave' pkg.install "nginx" # 10. 输出结果 --out # json salt --out=json '*' cmd.run_all 'hostname' { "slave": { "pid": 2268, "retcode": 0, "stderr": "", "stdout": "slave" } } # yaml salt --out=yaml '*' cmd.run_all 'hostname' slave: pid: 2289 retcode: 0 stderr: '' stdout: slave
### 在学习saltstack过程当中,第一要点就是States编写技巧,简称SLS文件。这个文件遵循YAML语法。初学者看这玩意很容易懵逼,来,超哥拯救你学习YAML语法 # json xml yaml 数据序列化格式 # yaml容易被解析,应用于配置文件 ## salt的配置文件是yaml配置文件,不能用tab ## saltstack,k8s,ansible都用的yaml格式配置文件 # 语法规则: 大小写敏感 使用缩进表示层级关系 缩进时禁止tab键,只能空格 缩进的空格数不重要,相同层级的元素左侧对其便可 # 表示注释行 ### yaml支持的数据结构 # 1. 对象: 键值对,也称做映射 mapping 哈希hashes 字典 dict 冒号表示 key: value key冒号后必须有 # 2. 数组: 一组按次序排列的值,又称为序列sequence 列表list 短横线 - list1 # 3. 纯量: 单个不可再分的值 # 4. 对象:键值对 yaml first_key: second_key:second_value python { 'first_key':{ 'second_key':'second_value', } }
# 注意: YAML是YAML Ain't Markup Language的首字母缩写,YAML的语法简单, 结构体经过空格展现 项目使用 '-' 表明 键值对经过 ':' 分割 ### YAML语法遵循固定的缩进风格,表示数据层级结构关系,saltstack须要每一个缩进级别由2个空格组成,禁止用tabs!!!
# 1. salt 'slave' grains.items # 2. salt "*" grains.item ipv4 --out json >> salt-granis-computerIPV4-.json
# 1. 安装模块 pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple salt # 2. 进入python3交互模式 import salt.client local = salt.client.LocalClient() local.cmd('*','cmd.run',['poweroff']) #向全部minion发送关机命令