1 import pika 2 3 connection=pika.BlockingConnection(pika.ConnectionParameters('localhost')) 4 #声明一个管道 5 channel = connection.channel() 6 # 声明队列 7 channel.queue_declare(queue='hello',durable=True)#durable=True使队列持久化 8 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. 9 channel.basic_publish(exchange='', #定义
routing_key='hello',
body='Hello World!',
properties=pike.BasicProperties(delivery_mode=2,abc='alex'))#delivery_mode=2使消息持久化 10 print(" [x] Sent 'Hello World!'") 11 connection.close()
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 4 channel = connection.channel() 5 #若是确认这个队列声明过了,则能够不在此声明。若是不肯定消费者或者生产者先运行,那么能够在二者中都进行声明。 6 channel.queue_declare(queue='hello',durable=True)#durable=True使队列持久化 7 8 def callback(ch, method, properties, body): 9 #ch是管道内存对象地址 10 #method是包含给接收消息者等属性 11 print(ch,method,properties) 12 print(" [x] Received %r" % body)
#客户端对服务端的消息进行手动确认
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)#实现广播效果,消息公平分发 13 #消费消息 14 channel.basic_consume(callback,#若是收到消息,就调用callback函数来处理消息 15 queue='hello')#从哪个队列里接收消息17 18 print(' [*] Waiting for messages. To exit press CTRL+C') 19 #开始接收消息 20 channel.start_consuming()
生产者javascript
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 5 channel = connection.channel() 6 7 channel.exchange_declare(exchange='direct_logs', 8 type='direct') 9 10 severity = sys.argv[1] if len(sys.argv) > 1 else 'info' 11 message = ' '.join(sys.argv[2:]) or 'Hello World!' 12 channel.basic_publish(exchange='direct_logs', 13 routing_key=severity, 14 body=message) 15 print(" [x] Sent %r:%r" % (severity, message)) 16 connection.close()
消费者 html
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 5 channel = connection.channel() 6 channel.exchange_declare(exchange='direct_logs', 7 type='direct') 8 9 result = channel.queue_declare(exclusive=True) 10 queue_name = result.method.queue 11 12 severities = sys.argv[1:] 13 if not severities: 14 sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) 15 sys.exit(1) 16 17 for severity in severities: 18 channel.queue_bind(exchange='direct_logs', 19 queue=queue_name, 20 routing_key=severity) 21 22 print(' [*] Waiting for logs. To exit press CTRL+C') 23 24 25 def callback(ch, method, properties, body): 26 print(" [x] %r:%r" % (method.routing_key, body)) 27 28 29 channel.basic_consume(callback, 30 queue=queue_name, 31 no_ack=True) 32 33 channel.start_consuming()
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 host='localhost')) 6 channel = connection.channel() 7 channel.exchange_declare(exchange='logs',#转发器 8 type='fanout')#设置模式 9 10 message = ' '.join(sys.argv[1:]) or "info: Hello World!" 11 channel.basic_publish(exchange='logs', 12 routing_key='', 13 body=message) 14 print(" [x] Sent %r" % message) 15 connection.close()
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 4 channel = connection.channel() 5 6 channel.exchange_declare(exchange='logs',#转发器 7 type='fanout') 8 #不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除 9 result = channel.queue_declare(exclusive=True) # exclusive排他,惟一的 10 11 queue_name = result.method.queue #获取队列名字 12 13 channel.queue_bind(exchange='logs',queue=queue_name) #将队列绑定到名为logs的exchange 14 15 print(' [*] Waiting for logs. To exit press CTRL+C') 16 17 def callback(ch, method, properties, body): 18 print(" [x] %r" % body) 19 20 channel.basic_consume(callback,queue=queue_name,no_ack=True) 21 channel.start_consuming()
生产者java
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 5 channel = connection.channel() 6 channel.exchange_declare(exchange='topic_logs', 7 type='topic') 8 9 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' 10 message = ' '.join(sys.argv[2:]) or 'Hello World!' 11 channel.basic_publish(exchange='topic_logs', 12 routing_key=routing_key, 13 body=message) 14 print(" [x] Sent %r:%r" % (routing_key, message)) 15 connection.close()
消费者 python
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 5 channel = connection.channel() 6 channel.exchange_declare(exchange='topic_logs', 7 type='topic') 8 9 result = channel.queue_declare(exclusive=True) 10 queue_name = result.method.queue 11 12 binding_keys = sys.argv[1:] 13 # #号 表明收全部消息 14 # "kern.*" 表明收以kern开头的消息 15 # "*.critical" 表明收全部critical结尾的消息 16 # "kern.*" "*.critical" 同时收以上二者 17 if not binding_keys: 18 sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) 19 sys.exit(1) 20 21 for binding_key in binding_keys: 22 channel.queue_bind(exchange='topic_logs', 23 queue=queue_name, 24 routing_key=binding_key) 25 26 print(' [*] Waiting for logs. To exit press CTRL+C') 27 28 29 def callback(ch, method, properties, body): 30 print(" [x] %r:%r" % (method.routing_key, body)) 31 32 33 channel.basic_consume(callback, 34 queue=queue_name, 35 no_ack=True) 36 37 channel.start_consuming()
1 import redis 2 #>>>>>>>>>>>>>>>>>>>>>链接方法1 3 #链接 4 r = redis.Redis(host='10.211.55.4', port=6379) 5 #存储值 6 r.set('foo', 'Bar') 7 #获取值 8 print(r.get('foo')) 9 10 #>>>>>>>>>>>>>>>>>>>>>链接方法2 11 #建立链接池 12 pool=redis.ConnectionPool(host='10.211.55.4', port=6379) 13 #链接 14 r=redis.Redis(connection_pool=pool) 15 #存储值 16 r.set('foo', 'Bar') 17 #获取值 18 print(r.get('foo'))
在Redis中设置值,不存在则建立,存在则修改
set(name, value, ex=None, px=None, nx=False, xx=False)
ex,过时时间(秒)
px,过时时间(毫秒)
nx,若是设置为True,则只有name不存在时,当前set操做才执行
xx,若是设置为True,则只有name存在时,当前set操做才执行
设置值,只有name不存在时,执行设置操做(添加)
time,过时时间(数字秒 或 timedelta对象)
time_ms,过时时间(数字毫秒 或 timedelta对象)
批量设置值
mset(k1=
'v1'
, k2=
'v2'
)或者mget({
'k1'
:
'v1'
,
'k2'
:
'v2'
})
获取值
批量获取 mget(
'ylr'
,
'wupeiqi'
)或者r.mget([
'ylr'
,
'wupeiqi'
])
设置新值并获取返回原来的值(这个须要以前存在)
获取子序列(根据字节获取,非字符),能够对字符串进行切片
start,起始位置(字节);end,结束位置(字节)
修改字符串内容,从指定字符串索引开始向后替换(新值太长时,则向后添加)
offset,字符串的索引,字节(一个汉字三个字节)
value,要设置的值
对name对应值的二进制表示的位进行操做
offset,位的索引(将值变换成二进制后再进行索引)
value,值只能是 1 或 0
获取name对应的值的二进制表示中的某位的值 (0或1)
获取name对应的值的二进制表示中 1 的个数
start,位起始位置; end,位结束位置
返回name对应值的字节长度(一个汉字3个字节)
自增 name对应的值,当name不存在时,则建立name=amount,不然,则自增
amount,自增数(必须是整数)
自增 name对应的值,当name不存在时,则建立name=amount,不然,则自增
amount,自增数(浮点型)
自减 name对应的值,当name不存在时,则建立name=amount,不然,则自减
amount,自减数(整数)
在redis name对应的值后面追加内容
value, 要追加的字符串
name对应的hash中设置一个键值对(不存在,则建立;不然,修改)
key,name对应的hash中的key;value,name对应的hash中的value
hsetnx(name, key, value),当name对应的hash中不存在当前key时则建立(至关于添加)
在name对应的hash中批量设置键值对
mapping,字典,如:{'k1':'v1', 'k2': 'v2'},例如hmset('xx', {'k1':'v1', 'k2': 'v2'})
在name对应的hash中获取根据key获取value
在name对应的hash中获取多个key的值
keys,要获取key集合,如:['k1', 'k2', 'k3'];*args,要获取的key,如:k1,k2,k3,例如r.hmget('xx', ['k1', 'k2'])或者r.hmget('xx', 'k1', 'k2')
获取name对应
hash
的全部键值
获取name对应的hash中键值对的个数
获取name对应的hash中全部的key的值
获取name对应的hash中全部的value的值
检查name对应的hash是否存在当前传入的key
将name对应的hash中指定key的键值对删除
自增name对应的hash中的指定key的值,不存在则建立key=amount
自增name对应的hash中的指定key的值,不存在则建立key=amount
增量式迭代获取,对于数据大的数据很是有用,hscan能够实现分片的获取数据,并不是一次性将数据所有获取完,从而防止内存溢出
cursor,游标(基于游标分批取获取数据);match,匹配指定key,默认None 表示全部的key;count,每次分片最少获取个数,默认None表示采用Redis的默认分片个数
利用yield封装hscan建立生成器,实现分批去redis中获取数据
match,匹配指定key,默认None 表示全部的key;count,每次分片最少获取个数,默认None表示采用Redis的默认分片个数
在name对应的list中添加元素,每一个新的元素都添加到列表的最左边
r.lpush('oo', 11,22,33),保存顺序为: 33,22,11
rpush(name, values) 表示从右向左操做
在name对应的list中添加元素,只有name已经存在时,值添加到列表的最左边
rpushx(name, value) 表示从右向左操做
name对应的list元素的个数
在name对应的列表的某一个值前或后插入一个新值
where,BEFORE或AFTER;refvalue,标杆值,即:在它先后插入数据;value,要插入的数据
对name对应的list中的某一个索引位置从新赋值
index,list的索引位置;value,要设置的值
在name对应的list中删除指定的值
value,要删除的值;num=0,删除列表中全部的指定值;num=2,从前到后,删除2个;
num=-2,从后向前,删除2个
在name对应的列表的左侧获取第一个元素并在列表中移除,返回值则是第一个元素
rpop(name) 表示从右向左操做
在name对应的列表中根据索引获取列表元素
在name对应的列表分片获取数据
在name对应的列表中移除没有在start-end索引之间的值
从一个列表取出最右边的元素,同时将其添加至另外一个列表的最左边
将多个列表排列,按照从左到右去pop对应列表的元素
timeout,超时时间,当元素全部列表的元素获取完以后,阻塞等待列表内有数据的时间(秒), 0 表示永远阻塞
brpop(keys, timeout),从右向左获取数据
从一个列表的右侧移除一个元素并将其添加到另外一个列表的左侧
name对应的集合中添加元素
获取name对应的集合中元素个数
在第一个name对应的集合中且不在其余name对应的集合的元素集合
获取第一个name对应的集合中且不在其余name对应的集合,再将其新加入到dest对应的集合中
获取多一个name对应集合的并集
获取多一个name对应集合的并集,再讲其加入到dest对应的集合中
检查value是不是name对应的集合的成员
获取name对应的集合的全部成员
将某个成员从一个集合中移动到另一个集合
从集合的右侧(尾部)移除一个成员,并将其返回
从name对应的集合中随机获取 numbers 个元素
在name对应的集合中删除某些值
获取多一个name对应的集合的并集
获取多一个name对应的集合的并集,并将结果保存到dest对应的集合中
同字符串的操做,用于增量迭代分批获取元素,避免内存消耗太大
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
有序集合,在集合的基础上,为每元素排序;元素的排序须要根据另一个值来进行比较,因此,对于有序集合,每个元素有两个值,即:值和分数,分数专门用来作排序。
在name对应的有序集合中添加元素,例如:zadd('zz', 'n1', 1, 'n2', 2)或zadd('zz', n1=11, n2=22)
获取name对应的有序集合元素的数量
获取name对应的有序集合中分数 在 [min,max] 之间的个数
自增name对应的有序集合的 name 对应的分数
按照索引范围获取name对应的有序集合的元素
start,有序集合索引发始位置(非分数);end,有序集合索引结束位置(非分数);desc,排序规则,默认按照分数从小到大排序;withscores,是否获取元素的分数,默认只获取元素的值;score_cast_func,对分数进行数据转换的函数
从大到小排序:zrevrange(name, start, end, withscores=False, score_cast_func=float)
按照分数范围获取name对应的有序集合的元素:zrangebyscore(name, min, max, start=None, num=None, withscores=False, score_cast_func=float)
从大到小排序:zrevrangebyscore(name, max, min, start=None, num=None, withscores=False, score_cast_func=float)
获取某个值在 name对应的有序集合中的排行(从 0 开始)
zrevrank(name, value),从大到小排序
删除name对应的有序集合中值是values的成员
根据排行范围删除
根据分数范围删除
获取name对应有序集合中 value 对应的分数
获取两个有序集合的交集,若是遇到相同值不一样分数,则按照aggregate进行操做
aggregate的值为: SUM MIN MAX
获取两个有序集合的并集,若是遇到相同值不一样分数,则按照aggregate进行操做
同字符串类似,相较于字符串新增score_cast_func,用来对分数进行操做
根据删除redis中的任意数据类型
检测redis的name是否存在
根据模型获取redis的name
KEYS * 匹配数据库中全部 key;KEYS h?llo 匹配 hello , hallo 和 hxllo 等;KEYS h*llo 匹配 hllo 和 heeeeello 等;KEYS h[ae]llo 匹配 hello 和 hallo ,但不匹配 hillo
为某个redis的某个name设置超时时间
对redis的name重命名为
将redis的某个值移动到指定的db下
随机获取一个redis的name(不删除)
获取name对应值的类型
同字符串操做,用于增量迭代获取key
1 import redis,time 2 #链接redis 3 pool = redis.ConnectionPool(host='10.211.55.4', port=6379) 4 r = redis.Redis(connection_pool=pool) 5 6 #启动管道 7 pipe = r.pipeline() #或者pipe=r.pipeline() 8 9 #执行命令,两条命令一块儿执行 10 pipe.set('name', 'alex')
time.sleep(30) 11 pipe.set('role', 'sb') 12 13 pipe.execute()transaction=True
1 #redishelper.py 2 import redis 3 4 class RedisHelper: 5 def __init__(self): 6 self.__conn = redis.Redis(host='10.211.55.4') 7 self.chan_sub = 'fm104.5' 8 self.chan_pub = 'fm104.5' 9 10 def public(self, msg):#发布者 11 self.__conn.publish(self.chan_pub, msg) 12 return True 13 14 def subscribe(self):#订阅者 15 pub = self.__conn.pubsub()#打开收音机 16 pub.subscribe(self.chan_sub)#调频道 17 pub.parse_response()#准备接收 18 return pub
1 #订阅者.py 2 from redishelper import RedisHelper 3 4 obj = RedisHelper() 5 redis_sub = obj.subscribe() 6 7 while True: 8 msg = redis_sub.parse_response() 9 print(msg)
1 #发布者.py 2 from redishelper import RedisHelper 3 obj = RedisHelper() 4 obj.public('hello')
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 4 channel = connection.channel() 5 channel.queue_declare(queue='rpc_queue') 6 7 def fib(n): #求斐波那契数列 8 if n == 0: 9 return 0 10 elif n == 1: 11 return 1 12 else: 13 return fib(n - 1) + fib(n - 2) 14 15 16 def on_request(ch, method, props, body): 17 n = int(body) 18 print(" [.] fib(%s)" % n) 19 response = fib(n) 20 ch.basic_publish(exchange='', 21 routing_key=props.reply_to, 22 properties=pika.BasicProperties(correlation_id=props.correlation_id), 23 body=str(response)) 24 ch.basic_ack(delivery_tag=method.delivery_tag) 25 26 channel.basic_qos(prefetch_count=1) 27 channel.basic_consume(on_request, queue='rpc_queue') 28 print(" [x] Awaiting RPC requests") 29 channel.start_consuming()
1 import pika,uuid 2 3 class FibonacciRpcClient(object): 4 def __init__(self): 5 self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 6 self.channel = self.connection.channel() 7 result = self.channel.queue_declare(exclusive=True) 8 self.callback_queue = result.method.queue 9 self.channel.basic_consume(self.on_response, no_ack=True,queue=self.callback_queue) 10 11 def on_response(self, ch, method, props, body): 12 if self.corr_id == props.correlation_id: 13 self.response = body 14 15 def call(self, n): 16 self.response = None 17 self.corr_id = str(uuid.uuid4()) 18 self.channel.basic_publish(exchange='', 19 routing_key='rpc_queue', 20 properties=pika.BasicProperties( 21 reply_to=self.callback_queue, 22 correlation_id=self.corr_id, 23 ), 24 body=str(n)) 25 while self.response is None: 26 self.connection.process_data_events() 27 return int(self.response) 28 29 30 fibonacci_rpc = FibonacciRpcClient() 31 print(" [x] Requesting fib(30)") 32 response = fibonacci_rpc.call(30) 33 print(" [.] Got %r" % response)