工做队列就是多个work共同按顺序接收同一个queue里面的任务,还能够设置basic_qos来确保当前的任务执行完毕后才继续接收任务。python
import pika # 链接 conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71", port=5672)) channel = conn.channel() # 申明队列 channel.queue_declare(queue="work_queue", durable=True) # durable 持久化,rabbit重启这个queue也不会丢失 messages = ["apple", "pear", "cherry", "banana", "watermelon"] for message in messages: # 发送消息,routing表示要发送到那个queue,body就是发送的消息内容,properties是其余的一些配置,能够设置多个 channel.basic_publish(exchange="", routing_key="work_queue", body=message, properties=pika.BasicProperties( delivery_mode=2 # 发送的消息持久化,前提是queue也是持久化到的 )) print("send {message} ok".format(message=message)) # channel.queue_delete(queue="work_queue") # 删除queue # 关闭链接 conn.close()
import pika import time # 链接 cred = pika.PlainCredentials("Glen", "Glen[1234]") # 用户名密码等信息 # conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71", port=5672)) conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71", port=5672, virtual_host="/", credentials=cred)) channel = conn.channel() # 回调函数 def callbak(ch, method, properties, body): print("body:", body) time.sleep(1) print("done..") print("method.delivery_tag", method.delivery_tag) ch.basic_ack(delivery_tag=method.delivery_tag) # 这里的功能和no_ack相似,忽然终端queue会将任务继续分配给下一个work """ 使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工做者分配多个任务, 即只有工做者完成任务以后,才会再次接收到任务。 """ channel.basic_qos(prefetch_count=1) # channel.queue_declare(queue="work_queue") channel.basic_consume(callbak, queue="work_queue", no_ack=False) # no_ack 默认使False,须要等待callback执行完毕才算这个消息处理完毕 channel.start_consuming() """ 这里多个work会按顺序接收producer发布的任务,处理完成后才继续接收 """
producer先将消息发送到交换机exchange,而后exchange再将消息发送给全部帮绑定的queue,即将消息广播出去mysql
import pika conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71", port=5672)) channel = conn.channel() # 定义交换机 """ fanout: 全部bind到此exchange的queue均可以接收消息 direct: 经过routingKey和exchange决定的那个惟一的queue能够接收消息 topic:全部符合routingKey(此时能够是一个表达式)的routingKey所bind的queue能够接收消息 """ channel.exchange_declare(exchange="message", exchange_type="fanout") while True: message = input(">>") # 直接发送到exchange,接收端使用随机的queue来绑定exchange,而后接收 channel.basic_publish(exchange="message", routing_key="", body=message) print("send {message} ok".format(message=message))
import pika conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71", port=5672)) channel = conn.channel() # 定义交换机 channel.exchange_declare(exchange="message", exchange_type="fanout") # 生成随机的queue,并绑定到交换机 result = channel.queue_declare(exclusive=True) # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除 queue_name = result.method.queue # 获取随机胜场的queue名字 # 将随机的queue绑定到exchange channel.queue_bind(exchange="message", queue=queue_name) def callback(ch, method, properties, body): print(body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
direct和路由器相似,发送小时的时候须要指定目的地routing_key,只有对应的queue才会接收git
import pika conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71", port=5672)) channel = conn.channel() # 定义路由键 """ fanout: 全部bind到此exchange的queue均可以接收消息 direct: 经过routingKey和exchange决定的那个惟一的queue能够接收消息 topic:全部符合routingKey(此时能够是一个表达式)的routingKey所bind的queue能够接收消息 """ channel.exchange_declare(exchange="message2", exchange_type="direct") while True: message, routing = input(">>").split() # 发送消息的时候同时指定routing_key,只有对应routing_key的consumer才会接收到 # 发送消息示例:info_message info channel.basic_publish(exchange="message2", routing_key=routing, body=message) # 发送的每一个消息都要指明路由 print("send {message} {routing} ok".format(message=message, routing=routing))
import pika conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71", port=5672)) channel = conn.channel() # 定义交换机 channel.exchange_declare(exchange="message2", exchange_type="direct") # 生成随机的queue,并绑定到交换机 result = channel.queue_declare(exclusive=True) # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除 queue_name = result.method.queue # 获取随机胜场的queue名字 # channel.queue_bind(exchange="message2", routing_key="info", queue=queue_name) channel.queue_bind(exchange="message2", routing_key="warning", queue=queue_name) # 绑定不一样的routing_key # channel.queue_bind(exchange="message2", routing_key="error", queue=queue_name) def callback(ch, method, properties, body): print(body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
producer发送消息的时候能够模糊地指定接收的queue,若有多个queue, mysql.error redis.eror mysql.info redis.info,指定不一样的routing_key能够匹配到不一样的queue,mysql.* 能够匹配到mysql.error,mysql.info, *.error能够匹配redis.error,mysql.error。“#”表示全部、所有的意思;“*”只匹配到一个词。redis
import pika conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71", port=5672)) channel = conn.channel() # 定义路由键 """ fanout: 全部bind到此exchange的queue均可以接收消息 direct: 经过routingKey和exchange决定的那个惟一的queue能够接收消息 topic:全部符合routingKey(此时能够是一个表达式)的routingKey所bind的queue能够接收消息 """ channel.exchange_declare(exchange="message3", exchange_type="topic") """ 发送的消息以下: a happy.work b happy.life c sad.work d sad.life """ while True: message, routing = input(">>").split() channel.basic_publish(exchange="message3", routing_key=routing, body=message) # 发送的每一个消息都要指明路由 print("send {message} {routing} ok".format(message=message, routing=routing))
import pika conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71", port=5672)) channel = conn.channel() # 定义交换机 channel.exchange_declare(exchange="message3", exchange_type="topic") # 生成随机的queue,并绑定到交换机 result = channel.queue_declare(exclusive=True) # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除 queue_name = result.method.queue # 获取随机胜场的queue名字 # channel.queue_bind(exchange="message3", routing_key="#", queue=queue_name) # 能够接收任何消息 # channel.queue_bind(exchange="message3", routing_key="happy.*", queue=queue_name) # 绑定不一样的routing_key channel.queue_bind(exchange="message3", routing_key="*.work", queue=queue_name) def callback(ch, method, properties, body): print(body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
远程调用至关于有一个控制中心和多个计算节点,控制中心发指令调用远程的计算节点的函数进行计算,而后将结果返回给计算中心,pika模块也实现了该功能sql
import pika import time # 建立链接 conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71", port=5672)) channel = conn.channel() # 定义队列 channel.queue_declare(queue="rpc_queue") # 执行的函数 def mul(n): time.sleep(5) return n * n # 定义接收到消息的处理方法 def message_handle(ch, method, properties, body): print("{body} * {body} = ?".format(body=body)) response = mul(int(body)) # 将计算结果返回 ch.basic_publish(exchange="", routing_key=properties.reply_to, body=str(response)) # 返回执行成功 ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(message_handle, queue="rpc_queue") channel.start_consuming()
import pika import threading class Center(object): def __init__(self): self.response = "" self.connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71")) self.channel = self.connection.channel() # 定义接收返回消息的队列 而后在发送命令的时候做为参数传递过去,rpc执行完毕后将消息发送到这个queue里面 self.callback_queue = self.channel.queue_declare(exclusive=True).method.queue self.channel.basic_consume(self.response_hand, no_ack=True, queue=self.callback_queue) # 定义处理返回消息的函数 def response_hand(self, ch, method, properties, body): self.response = body print(body) def request(self, n): self.response = "" # 发送计算请求,同时加上返回队列名 self.channel.basic_publish(body=str(n), exchange="", routing_key="rpc_queue", properties=pika.BasicProperties( reply_to=self.callback_queue )) # 等待接收返回数据 while self.response is "": self.connection.process_data_events() return int(self.response) while True: message = input(">>") if not message.isdigit(): continue center = Center() t = threading.Thread(target=center.request, args=(int(message), )) # 启用多线程,能够不阻塞执行命令 t.start()
redis一共有string、list、set、zset、hash这五种经常使用集合,下面对经常使用命令进行整理,参考文档http://doc.redisfans.com/shell
import redis """ redis-py提供两个类Redis和StrictRedis用于实现Redis的命令, StrictRedis用于实现大部分官方的命令,并使用官方的语法和命令 (好比,SET命令对应与StrictRedis.set方法)。Redis是StrictRedis的子类, 用于向后兼容旧版本的redis-py。 简单说,官方推荐使用StrictRedis方法。 """ # redis = redis.Redis(host="192.169.120.71", port=6379) # 链接池 # pool = redis.ConnectionPool(host="192.168.120.71", port=6379) # 链接redis # redis = redis.Redis(connection_pool=pool) # 使用默认方式链接到数据库 # redis = redis.StrictRedis(host='192.168.120.71', port=6379, db=0) # 使用url方式链接到数据库 # redis = redis.StrictRedis.from_url('redis://@192.168.120.71:6379/0') # 链接池 # pool = redis.ConnectionPool(host="192.168.120.71", port=6379) """ 有三种构造url的方法 redis://[:password]@host:port/db # TCP链接 rediss://[:password]@host:port/db # Redis TCP+SSL 链接 unix://[:password]@/path/to/socket.sock?db=db # Redis Unix Socket 链接 """ pool = redis.ConnectionPool.from_url("redis://@192.168.120.71:6379/0") redis = redis.StrictRedis(connection_pool=pool) name = redis.get("name") print(name)
import redis pool = redis.ConnectionPool(host="192.168.120.71", port=6379) # r = redis.StrictRedis(connection_pool=pool) # pipe = r.pipeline(transaction=True) # 生成管道 # pipe.set("status", 1) # pipe.set("message", "hello") # pipe.execute() # 上面两条一块儿执行,其中一条执行失败则都失败 class RedisPubSub(object): def __init__(self, channel_sub="fm110", channel_pub="fm110"): self.__conn = redis.StrictRedis(connection_pool=pool) self.channel_sub = channel_sub self.channel_pub = channel_pub def pub(self, message): self.__conn.publish(message=message, channel=self.channel_pub) # return True def sub(self): sub = self.__conn.pubsub() sub.subscribe(self.channel_sub) sub.parse_response() return sub
from day11.pub_sub_pipe import * r = RedisPubSub(channel_sub="fm110", channel_pub="fm110") r.pub("hello")
from day11.pub_sub_pipe import * r = RedisPubSub(channel_pub="fm110", channel_sub="fm110") redis_sub = r.sub() while True: msg = redis_sub.parse_response() print(msg)
3、rpc命令端数据库
import pika import threading import uuid class Center(object): def __init__(self, remote_host): self.remote_host = remote_host self.response = {} self.connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71")) self.channel = self.connection.channel() # self.channel.exchange_declare(exchange="work", exchange_type="fanout") self.channel.queue_declare(queue=remote_host) # 定义接收返回消息的队列 而后在发送命令的时候做为参数传递过去,rpc执行完毕后将消息发送到这个queue里面 self.callback_queue = self.channel.queue_declare(exclusive=True).method.queue self.channel.basic_consume(self.response_hand, no_ack=True, queue=self.callback_queue) # 定义处理返回消息的函数 def response_hand(self, ch, method, properties, body): self.response[properties.correlation_id] = eval(body.decode("utf")) print(self.remote_host, properties.correlation_id, self.response[properties.correlation_id]["stdout"], end="") def request(self, n): rpcid = str(uuid.uuid4()) # 使用UUID生成标记,随消息一块儿发送,rpc处理后再把这个id传递回来 print(self.remote_host, rpcid, n) # 这样及时再同一个队列里面的消息执行结果也不会混乱 self.response[rpcid] = "" # 发送计算请求,同时加上返回队列名 self.channel.basic_publish(body=str(n), exchange="", routing_key=self.remote_host, properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=rpcid # 发送任务时添加任务id )) # 等待接收返回数据 while self.response[rpcid] is "": self.connection.process_data_events() return self.response[rpcid] while True: message = input(">>").split() # cmd ip1,ip2,ip3 if not message: continue hosts = message[1].split(",") for host in hosts: center = Center(host) t = threading.Thread(target=center.request, args=(message[0], )) # 启用多线程,能够不阻塞执行命令 t.start()
import pika import subprocess # 建立链接 conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71", port=5672)) channel = conn.channel() # 定义队列 # 执行的函数 def cmd_handel(cmd_str): print(cmd_str) re = {} p = subprocess.run(cmd_str, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) re["stdout"] = p.stdout.decode("utf8") re["stderr"] = p.stderr.decode("utf8") re["code"] = p.returncode re["host"] = "1.1.1.1" print(re["stdout"]) return re # 定义接收到消息的处理方法 def message_handle(ch, method, properties, body): print(body.decode("utf8")) response = cmd_handel(body.decode("utf8")) # 将计算结果返回 ch.basic_publish(exchange="", routing_key=properties.reply_to, body=str(response), properties=pika.BasicProperties( correlation_id=properties.correlation_id # 返回消息时一块儿返回任务id )) # 返回执行成功 ch.basic_ack(delivery_tag=method.delivery_tag) channel.queue_declare(queue="2.2.2.2") queue_name = channel.queue_declare(exclusive=True).method.queue channel.basic_consume(message_handle, queue="2.2.2.2") channel.start_consuming()