day10-python-协程\异步IO\RabbitMQ队列\redis缓存

1、协程javascript

协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程html

协程拥有本身的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其余地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。所以:java

协程能保留上一次调用时的状态(即全部局部状态的一个特定组合),每次过程重入时,就至关于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。python

协程的好处:git

  • 无需线程上下文切换的开销
  • 无需原子操做锁定及同步的开销
    •   "原子操做(atomic operation)是不须要synchronized",所谓原子操做是指不会被线程调度机制打断的操做;这种操做一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另外一个线程)。原子操做能够是一个步骤,也能够是多个操做步骤,可是其顺序是不能够被打乱,或者切割掉只执行部分。视做总体是原子性的核心。
  • 方便切换控制流,简化编程模型
  • 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。因此很适合用于高并发处理。

缺点:程序员

  • 没法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程须要和进程配合才能运行在多CPU上.固然咱们平常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
  • 进行阻塞(Blocking)操做(如IO时)会阻塞掉整个程序

使用yield实现协程操做例子github

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import time
import queue

def consumer(name):
    print("--->starting eating baozi...")
    while True:
        new_baozi = yield
        print("[%s] is eating baozi %s" % (name, new_baozi))
        # time.sleep(1)

def producer():
    r = con.__next__()
    r = con2.__next__()
    n = 0
    while n < 5:
        n += 1
        con.send(n)
        con2.send(n)
        print("\033[32;1m[producer]\033[0m is making baozi %s" % n)

if __name__ == '__main__':
    con = consumer("c1")
    con2 = consumer("c2")
    p = producer()

看楼上的例子,我问你这算不算作是协程呢?你说,我他妈哪知道呀,你前面说了一堆废话,可是并没告诉我协程的标准形态呀,我腚眼一想,以为你说也对,那好,咱们先给协程一个标准定义,即符合什么条件就能称之为协程:正则表达式

  1. 必须在只有一个单线程里实现并发
  2. 修改共享数据不需加锁
  3. 用户程序里本身保存多个控制流的上下文栈
  4. 一个协程遇到IO操做自动切换到其它协程

基于上面这4点定义,咱们刚才用yield实现的程并不能算是合格的线程,由于它有一点功能没实现,哪一点呢?redis

 

Greenletsql

greenlet是一个用C实现的协程模块,相比与python自带的yield,它可使你在任意函数之间随意切换,而不需把这个函数先声明为generator

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from greenlet import greenlet

def test1():
    print(12)
    gr2.switch()
    print(34)
    gr2.switch()

def test2():
    print(56)
    gr1.switch()
    print(78)

gr1 = greenlet(test1)   #启动一个协程
gr2 = greenlet(test2)
gr1.switch()

感受确实用着比generator还简单了呢,但好像尚未解决一个问题,就是遇到IO操做,自动切换,对不对?

 

Gevent

Gevent 是一个第三方库,能够轻松经过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet所有运行在主程序操做系统进程的内部,但它们被协做式地调度。

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import gevent


def foo():
    print('Running in foo')
    gevent.sleep(2)
    print('Explicit context switch to foo again')


def bar():
    print('Explicit精确的 context内容 to bar')
    gevent.sleep(1)
    print('Implicit context switch back to bar')

def func3():
    print("running func3")
    gevent.sleep(0)
    print("running func3 again")

gevent.joinall([
    gevent.spawn(foo),  #生成
    gevent.spawn(bar),
    gevent.spawn(func3)
])

 

同步和异步的性能区别

import gevent
 
def task(pid):
    """
    Some non-deterministic task
    """
    gevent.sleep(0.5)
    print('Task %s done' % pid)
 
def synchronous():
    for i in range(1,10):
        task(i)
 
def asynchronous():
    threads = [gevent.spawn(task, i) for i in range(10)]
    gevent.joinall(threads)
 
print('Synchronous:')
synchronous()
 
print('Asynchronous:')
asynchronous()

上面程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn。 初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall 函数,后者阻塞当前流程,并执行全部给定的greenlet。执行流程只会在 全部greenlet执行完后才会继续向下走。

 

遇到IO阻塞时会自动切换任务

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from urllib import request
import gevent,time
from gevent import monkey

monkey.patch_all()  #把当前程序的全部io操做给我单独的作上标记

def f(url):
    print('GET: %s' % url)
    resp = request.urlopen(url)
    data = resp.read()
    print('%d bytes received from %s.' % (len(data), url))

urls = ['https://www.python.org/',
        'https://www.yahoo.com/',
        'https://github.com/']
time_start = time.time()
for url in urls:
    f(url)
print("同步cost",time.time() - time_start)

async_time_start = time.time()
gevent.joinall([
    gevent.spawn(f, 'https://www.python.org/'),
    gevent.spawn(f, 'https://www.yahoo.com/'),
    gevent.spawn(f, 'https://github.com/'),
])
print("异步cost",time.time() - async_time_start)

经过gevent实现单线程下的多socket并发

server side 

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import sys
import socket
import time
import gevent

from gevent import socket, monkey

monkey.patch_all()


def server(port):
    s = socket.socket()
    s.bind(('0.0.0.0', port))
    s.listen(500)
    while True:
        cli, addr = s.accept()
        gevent.spawn(handle_request, cli)


def handle_request(conn):
    try:
        while True:
            data = conn.recv(1024)
            print("recv:", data)
            conn.send(data)
            if not data:
                conn.shutdown(socket.SHUT_WR)

    except Exception as  ex:
        print(ex)
    finally:
        conn.close()


if __name__ == '__main__':
    server(8001)

client side 

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket

HOST = 'localhost'  # The remote host
PORT = 8001  # The same port as used by the server
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
while True:
    msg = bytes(input(">>:"), encoding="utf8")
    s.sendall(msg)
    data = s.recv(1024)
    # print(data)
    #
    print('Received', repr(data))
s.close()

并发100个sock链接

import socket
import threading

def sock_conn():

    client = socket.socket()

    client.connect(("localhost",8001))
    count = 0
    while True:
        #msg = input(">>:").strip()
        #if len(msg) == 0:continue
        client.send( ("hello %s" %count).encode("utf-8"))

        data = client.recv(1024)

        print("[%s]recv from server:" % threading.get_ident(),data.decode()) #结果
        count +=1
    client.close()


for i in range(100):
    t = threading.Thread(target=sock_conn)
    t.start()

2、事件驱动和异步IO

一般,咱们写服务器处理模型的程序时,有如下几种模型:
(1)每收到一个请求,建立一个新的进程,来处理该请求;
(2)每收到一个请求,建立一个新的线程,来处理该请求;
(3)每收到一个请求,放入一个事件列表,让主进程经过非阻塞I/O方式来处理请求
上面的几种方式,各有千秋,
第(1)中方法,因为建立新的进程的开销比较大,因此,会致使服务器性能比较差,但实现比较简单。
第(2)种方式,因为要涉及到线程的同步,有可能会面临 死锁等问题。
第(3)种方式,在写应用程序代码时,逻辑比前面两种都复杂。
综合考虑各方面因素,通常广泛认为第(3)种方式是大多数 网络服务器采用的方式
 

看图说话讲事件驱动模型

在UI编程中,经常要对鼠标点击进行相应,首先如何得到鼠标点击呢?
方式一:建立一个线程,该线程一直循环检测是否有鼠标点击,那么这个方式有如下几个缺点
1. CPU资源浪费,可能鼠标点击的频率很是小,可是扫描线程仍是会一直循环检测,这会形成不少的CPU资源浪费;若是扫描鼠标点击的接口是阻塞的呢?
2. 若是是堵塞的,又会出现下面这样的问题,若是咱们不但要扫描鼠标点击,还要扫描键盘是否按下,因为扫描鼠标时被堵塞了,那么可能永远不会去扫描键盘;
3. 若是一个循环须要扫描的设备很是多,这又会引来响应时间的问题;
因此,该方式是很是很差的。

方式二:就是事件驱动模型
目前大部分的UI编程都是事件驱动模型,如不少UI平台都会提供onClick()事件,这个事件就表明鼠标按下事件。事件驱动模型大致思路以下:
1. 有一个事件(消息)队列;
2. 鼠标按下时,往这个队列中增长一个点击事件(消息);
3. 有个循环,不断从队列取出事件,根据不一样的事件,调用不一样的函数,如onClick()、onKeyDown()等;
4. 事件(消息)通常都各自保存各自的处理函数指针,这样,每一个消息都有独立的处理函数;

 

 

 

事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。它的特色是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。另外两种常见的编程范式是(单线程)同步以及多线程编程。

让咱们用例子来比较和对比一下单线程、多线程以及事件驱动编程模型。下图展现了随着时间的推移,这三种模式下程序所作的工做。这个程序有3个任务须要完成,每一个任务都在等待I/O操做时阻塞自身。阻塞在I/O操做上所花费的时间已经用灰色框标示出来了。

 

在单线程同步模型中,任务按照顺序执行。若是某个任务由于I/O而阻塞,其余全部的任务都必须等待,直到它完成以后它们才能依次执行。这种明确的执行顺序和串行化处理的行为是很容易推断得出的。若是任务之间并无互相依赖的关系,但仍然须要互相等待的话这就使得程序没必要要的下降了运行速度。

在多线程版本中,这3个任务分别在独立的线程中执行。这些线程由操做系统来管理,在多处理器系统上能够并行处理,或者在单处理器系统上交错执行。这使得当某个线程阻塞在某个资源的同时其余线程得以继续执行。与完成相似功能的同步程序相比,这种方式更有效率,但程序员必须写代码来保护共享资源,防止其被多个线程同时访问。多线程程序更加难以推断,由于这类程序不得不经过线程同步机制如锁、可重入函数、线程局部存储或者其余机制来处理线程安全问题,若是实现不当就会致使出现微妙且使人痛不欲生的bug。

在事件驱动版本的程序中,3个任务交错执行,但仍然在一个单独的线程控制中。当处理I/O或者其余昂贵的操做时,注册一个回调到事件循环中,而后当I/O操做完成时继续执行。回调描述了该如何处理某个事件。事件循环轮询全部的事件,当事件到来时将它们分配给等待处理事件的回调函数。这种方式让程序尽量的得以执行而不须要用到额外的线程。事件驱动型程序比多线程程序更容易推断出行为,由于程序员不须要关心线程安全问题。

当咱们面对以下的环境时,事件驱动模型一般是一个好的选择:

  1. 程序中有许多任务,并且…
  2. 任务之间高度独立(所以它们不须要互相通讯,或者等待彼此)并且…
  3. 在等待事件到来时,某些任务会阻塞。

当应用程序须要在任务间共享可变的数据时,这也是一个不错的选择,由于这里不须要采用同步处理。

网络应用程序一般都有上述这些特色,这使得它们可以很好的契合事件驱动编程模型。

此处要提出一个问题,就是,上面的事件驱动模型中,只要一遇到IO就注册一个事件,而后主程序就能够继续干其它的事情了,只到io处理完毕后,继续恢复以前中断的任务,这本质上是怎么实现的呢?哈哈,下面咱们就来一块儿揭开这神秘的面纱。。。。

3、select、poll、epoll三者的区别

 http://www.javashuo.com/article/p-qajblppq-do.html

4、IO多路复用

https://www.cnblogs.com/alex3714/articles/5876749.html

5、select/selectors模块

select 多并发socket 例子

select socket server

import select
import socket
import sys
import queue


server = socket.socket()
server.setblocking(0)

server_addr = ('localhost',10000)

print('starting up on %s port %s' % server_addr)
server.bind(server_addr)

server.listen(5)


inputs = [server, ] #本身也要监测呀,由于server自己也是个fd
outputs = []

message_queues = {}

while True:
    print("waiting for next event...")

    readable, writeable, exeptional = select.select(inputs,outputs,inputs) #若是没有任何fd就绪,那程序就会一直阻塞在这里

    for s in readable: #每一个s就是一个socket

        if s is server: #别忘记,上面咱们server本身也当作一个fd放在了inputs列表里,传给了select,若是这个s是server,表明server这个fd就绪了,
            #就是有活动了, 什么状况下它才有活动? 固然 是有新链接进来的时候 呀
            #新链接进来了,接受这个链接
            conn, client_addr = s.accept()
            print("new connection from",client_addr)
            conn.setblocking(0)
            inputs.append(conn) #为了避免阻塞整个程序,咱们不会马上在这里开始接收客户端发来的数据, 把它放到inputs里, 下一次loop时,这个新链接
            #就会被交给select去监听,若是这个链接的客户端发来了数据 ,那这个链接的fd在server端就会变成就续的,select就会把这个链接返回,返回到
            #readable 列表里,而后你就能够loop readable列表,取出这个链接,开始接收数据了, 下面就是这么干 的

            message_queues[conn] = queue.Queue() #接收到客户端的数据后,不马上返回 ,暂存在队列里,之后发送

        else: #s不是server的话,那就只能是一个 与客户端创建的链接的fd了
            #客户端的数据过来了,在这接收
            data = s.recv(1024)
            if data:
                print("收到来自[%s]的数据:" % s.getpeername()[0], data)
                message_queues[s].put(data) #收到的数据先放到queue里,一会返回给客户端
                if s not  in outputs:
                    outputs.append(s) #为了避免影响处理与其它客户端的链接 , 这里不马上返回数据给客户端


            else:#若是收不到data表明什么呢? 表明客户端断开了呀
                print("客户端断开了",s)

                if s in outputs:
                    outputs.remove(s) #清理已断开的链接

                inputs.remove(s) #清理已断开的链接

                del message_queues[s] ##清理已断开的链接


    for s in writeable:
        try :
            next_msg = message_queues[s].get_nowait()

        except queue.Empty:
            print("client [%s]" %s.getpeername()[0], "queue is empty..")
            outputs.remove(s)

        else:
            print("sending msg to [%s]"%s.getpeername()[0], next_msg)
            s.send(next_msg.upper())


    for s in exeptional:
        print("handling exception for ",s.getpeername())
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()

        del message_queues[s]

select socket client

import socket
import sys

messages = [ b'This is the message. ',
             b'It will be sent ',
             b'in parts.',
             ]
server_address = ('localhost', 10000)

# Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          ]

# Connect the socket to the port where the server is listening
print('connecting to %s port %s' % server_address)
for s in socks:
    s.connect(server_address)

for message in messages:

    # Send messages on both sockets
    for s in socks:
        print('%s: sending "%s"' % (s.getsockname(), message) )
        s.send(message)

    # Read responses on both sockets
    for s in socks:
        data = s.recv(1024)
        print( '%s: received "%s"' % (s.getsockname(), data) )
        if not data:
            print(sys.stderr, 'closing socket', s.getsockname() )

 

selectors模块

这个模块容许高级和高效的I/O多路复用,创建在选择模块原语的基础上。咱们鼓励用户使用这个模块,除非他们但愿对所使用的os级原语进行精确控制。

import selectors
import socket
 
sel = selectors.DefaultSelector()
 
def accept(sock, mask):
    conn, addr = sock.accept()  # Should be ready
    print('accepted', conn, 'from', addr)
    conn.setblocking(False)
    sel.register(conn, selectors.EVENT_READ, read)
 
def read(conn, mask):
    data = conn.recv(1000)  # Should be ready
    if data:
        print('echoing', repr(data), 'to', conn)
        conn.send(data)  # Hope it won't block
    else:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()
 
sock = socket.socket()
sock.bind(('localhost', 10000))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)
 
while True:
    events = sel.select()
    for key, mask in events:
        callback = key.data
        callback(key.fileobj, mask)

6、RabbitMQ队列

 

实现最简单的队列通讯

 

 


send端

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()  #声明一个管道

#声明queue
channel.queue_declare(queue='hello2')

# n RabbitMQ a message can never be sent directly to the queue,it always
channel.basic_publish(exchange='',
                      routing_key='hello2',#queue名字
                      body='Hello World!'
                      )
                      )
print(" [x] Sent 'Hello World!'")
connection.close()

receive端

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

#You may ask why we declare the queue again ‒ we have already declared it in our previous code.
# We could avoid that if we were sure that the queue already exists. For example if send.py program
#was run before. But we're not yet sure which program to run first. In such cases it's a good
# practice to repeat declaring the queue in both programs.
channel.queue_declare(queue='hello2')

def callback(ch, method, properties, body): #回调函数
    print("-->",ch,method,properties)
    print(" [x] Receiverd %r" %body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(#消费信息
                      'hello2',
                      callback,) #若是收到消息,就调用CALLBACK 函数来处理消息


print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

 

远程链接rabbitmq server的话,须要配置权限 噢 

首先在rabbitmq server上建立一个用户

sudo rabbitmqctl  add_user hhh hhh1234

同时还要配置权限,容许从外面访问

sudo rabbitmqctl set_permissions -p / hhh ".*" ".*" ".*"

set_permissions [-p vhost] {user} {conf} {write} {read}

vhost

要授予用户访问权的虚拟主机的名称,默认为/。

user

授予对指定虚拟主机的访问权的用户的名称。

conf

一个正则表达式,它与授予用户配置权限的资源名称相匹配。

write

为用户授予写权限的与资源名称匹配的正则表达式。

 read

匹配资源名称的正则表达式,为用户授予读权限。

 

客户端链接的时候须要配置认证参数

credentials = pika.PlainCredentials('hhh', 'hhh1234')
 
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    '192.168.1.40',5672,'/',credentials))
channel = connection.channel()

 

Work queues

在这种模式下,RabbitMQ会默认把p发的消息依次分发给各个消费者(c),跟负载均衡差很少

消息提供者代码

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()  #声明一个管道

#声明queue
channel.queue_declare(queue='hello2',durable=True)

# n RabbitMQ a message can never be sent directly to the queue,it always
channel.basic_publish(exchange='',
                      routing_key='hello2',#queue名字
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2,  #make message persistent
                      )
                      )
print(" [x] Sent 'Hello World!'")
connection.close()

消费者代码

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import pika, time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

#You may ask why we declare the queue again ‒ we have already declared it in our previous code.
# We could avoid that if we were sure that the queue already exists. For example if send.py program
#was run before. But we're not yet sure which program to run first. In such cases it's a good
# practice to repeat declaring the queue in both programs.
channel.queue_declare(queue='hello2',durable=True)

def callback(ch, method, properties, body): #回调函数
    print("-->",ch,method,properties)
    # time.sleep(30)
    print(" [x] Receiverd %r" %body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(#消费信息
                      'hello2',
                      callback,) #若是收到消息,就调用CALLBACK 函数来处理消息
                      #True  #不确认

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

 

此时,先启动消息生产者,而后再分别启动3个消费者,经过生产者多发送几条消息,你会发现,这几条消息会被依次分配到各个消费者身上


完成一项任务须要几秒钟的时间。您可能想知道,若是其中一个消费者启动了一个很长的任务,但只完成了一部分就挂掉了,会发生什么状况。对于咱们当前的代码,一旦RabbitMQ将消息传递给客户,它就会当即将其从内存中删除。在这种状况下,若是您杀死了一个工做人员,咱们将丢失它正在处理的消息。咱们还将丢失全部发送给这个特定worker但还没有处理的消息。

但咱们不想失去任何任务。若是一个工人死了,咱们但愿把这个任务交给另外一个工人。

为了确保消息永不丢失,RabbitMQ支持消息确认。从使用者发回一个ack(nowledgement),告诉RabbitMQ已经接收并处理了特定的消息,RabbitMQ能够自由地删除它。

若是使用者在不发送ack的状况下死亡(其通道关闭、链接关闭或TCP链接丢失),RabbitMQ将理解消息未被彻底处理,并将从新对其排队。若是同时有其余的消费者在线,它会很快将其从新发送给另外一个消费者。这样你能够确保没有信息丢失,即便工人偶尔死亡。

没有任何消息超时;当使用者死亡时,RabbitMQ将从新传递消息。即便处理一条消息须要很长很长的时间,也不要紧。

消息确认在默认状况下是打开的。在前面的示例中,咱们经过no_ack=True标志显式地关闭了它们。一旦咱们完成了一个任务,就应该移除这个标志并从worker那里发送一个适当的确认。

def callback(ch, method, properties, body): #回调函数
    print("-->",ch,method,properties)
    # time.sleep(30)
    print(" [x] Receiverd %r" %body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(#消费信息
                      'hello2',
                      callback,) #若是收到消息,就调用CALLBACK 函数来处理消息

使用这段代码,咱们能够确保即便您在处理消息时使用CTRL+C杀死了一个工做人员,也不会丢失任何东西。在工做人员死后不久,全部未确认的消息将被从新发送

 

消息持久化

咱们已经学习了如何确保即便使用者死亡,任务也不会丢失(默认状况下,若是想禁用use no_ack=True)。可是,若是RabbitMQ服务器中止,咱们的任务仍然会丢失。

当RabbitMQ退出或崩溃时,它将忘记队列和消息,除非您告诉它不要这样作。确保消息不丢失须要作两件事:咱们须要将队列和消息都标记为持久的。

首先,咱们须要确保RabbitMQ永远不会丢失咱们的队列。为了作到这一点,咱们须要声明它是耐用的:

channel.queue_declare(queue='hello2',durable=True)

虽然这个命令自己是正确的,可是它在咱们的设置中不能工做。那是由于咱们已经定义了一个名为hello的队列,它不是持久的。RabbitMQ不容许您使用不一样的参数从新定义现有队列,并将向任何试图这样作的程序返回一个错误。可是有一个快速的解决方法——让咱们声明一个不一样名称的队列,例如:

channel.queue_declare(queue='task_queue', durable=True)

这个queue_declare更改须要同时应用到生产者代码和消费者代码。

此时,咱们确信即便RabbitMQ从新启动,task_queue队列也不会丢失。如今,咱们须要将消息标记为persistent—经过提供一个值为2的delivery_mode属性。

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))

 

消息公平分发

若是Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,极可能出现,一个机器配置不高的消费者那里堆积了不少消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,能够在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。

channel.basic_qos(prefetch_count=1)

带消息持久化+公平分发的完整代码

生产者端

#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print(" [x] Sent %r" % message)
connection.close()

消费者端

#!/usr/bin/env python
import pika
import time
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')
 
channel.start_consuming()

 

Publish\Subscribe(消息发布\订阅)

以前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被全部的Queue收到,相似广播的效果,这时候就要用到exchange了,

交换是一件很是简单的事情。一边接收来自生产者的消息,另外一边将消息推送到队列。交换器必须确切地知道如何处理接收到的消息。它应该被附加到一个特定的队列吗?它应该被添加到许多队列中吗?或者它应该被丢弃。这些规则由exchange类型定义。

Exchange在定义的时候是有类型的,以决定究竟是哪些Queue符合条件,能够接收消息


fanout: 全部bind到此exchange的queue均可以接收消息
direct: 经过routingKey和exchange决定的那个惟一的queue能够接收消息
topic:全部符合routingKey(此时能够是一个表达式)的routingKey所bind的queue能够接收消息

   表达式符号说明:#表明一个或多个字符,*表明任何字符
      例:#.a会匹配a.a,aa.a,aaa.a等
          *.a会匹配a.a,b.a,c.a等
     注:使用RoutingKey为#,Exchange Type为topic的时候至关于使用fanout 

headers: 经过headers 来决定把消息发给哪些queue

消息publisher

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

# message = ' '.join(sys.argv[1:]) or "info: Hello World!"
message = "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

消息subscriber

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

result = channel.queue_declare('',exclusive=True)  #exclusive排他,惟一的,不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_name = result.method.queue
print("random queuename",queue_name)

channel.queue_bind(exchange='logs',
                   queue=queue_name)

print('[*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(queue_name,
                      callback,
                      True)

channel.start_consuming()

 

有选择的接收消息(exchange type=direct)

RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 断定应该将数据发送至指定队列。

publisher

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

consumer

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

result = channel.queue_declare('',exclusive=True)
queue_name = result.method.queue

serverities = sys.argv[1:]
if not serverities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)
print(serverities)
for serverity in serverities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=serverity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
    print(" [x] %r:%r" %(method.routing_key, body))

channel.basic_consume(queue_name,
                      callback,
                      True)
channel.start_consuming()

 

更细致的消息过滤

虽然使用直接交换改进了咱们的系统,但它仍然有局限性——它不能基于多个标准进行路由。

在咱们的日志系统中,咱们可能不只但愿根据严重性订阅日志,还但愿根据发出日志的源订阅日志。您可能从syslog unix工具中了解了这个概念,该工具根据严重程度(info/warn/crit…)和设施(auth/cron/kern…)来路由日志。

这将给咱们很大的灵活性——咱们可能但愿只监听来自“cron”的关键错误,但也要监听来自“kern”的全部日志。

 

 

 publisher

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

consumer

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(queue_name,
                      callback,
                      True)

channel.start_consuming()

接收全部运行的日志:

python receive_logs_topic.py "#"

接收设施“kern”的日志:

python receive_logs_topic.py "kern.*"

或者你只想接收“critical”的日志:

python receive_logs_topic.py "*.critical"

你能够建立多个绑定:

python receive_logs_topic.py "kern.*" "*.critical"

并发出一个带有路由键“kern”的日志“critical”类型:

python emit_log_topic.py "kern.critical" "A critical kernel error"

 

Remote producer call(RPC)

为了说明如何使用RPC服务,咱们将建立一个简单的客户机类。它将公开一个名为call的方法,发送一个RPC请求并阻塞,直到收到答案:

fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print("fib(4) is %r" % result)

 

 

 RPC server

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)

def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id=\
                                                     props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume('rpc_queue',on_request)

print("[x] Awaiting RPC requests")
channel.start_consuming()

RPC client

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
import uuid
import time

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters
                                                  (host='localhost'))
        self.channel = self.connection.channel()

        result = self.channel.queue_declare('',exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.callback_queue,
                                   self.on_response,    #只要一收到消息就调用on_response
                                   True)
    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,
                                       correlation_id=self.corr_id,
                                   ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()   #非阻塞版的start_consuming()
            print("no msg")
            time.sleep(0.5)
        return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

print("[x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print("[.] Got %r" % response)

 

7、Memcached

http://www.cnblogs.com/wupeiqi/articles/5132791.html 

 

8、redis

介绍

redis是业界主流的key-value nosql 数据库之一。和Memcached相似,它支持存储的value类型相对更多,包括string(字符串)、list(链表)、set(集合)、zset(sorted set --有序集合)和hash(哈希类型)。这些数据类型都支持push/pop、add/remove及取交集并集和差集及更丰富的操做,并且这些操做都是原子性的。在此基础上,redis支持各类不一样方式的排序。与memcached同样,为了保证效率,数据都是缓存在内存中。区别的是redis会周期性的把更新的数据写入磁盘或者把修改操做写入追加的记录文件,而且在此基础上实现了master-slave(主从)同步。

Redis优势

  • 异常快速 : Redis是很是快的,每秒能够执行大约110000设置操做,81000个/每秒的读取操做。

  • 支持丰富的数据类型 : Redis支持最大多数开发人员已经知道如列表,集合,可排序集合,哈希等数据类型。

    这使得在应用中很容易解决的各类问题,由于咱们知道哪些问题处理使用哪一种数据类型更好解决。
  • 操做都是原子的 : 全部 Redis 的操做都是原子,从而确保当两个客户同时访问 Redis 服务器获得的是更新后的值(最新值)。

  • MultiUtility工具:Redis是一个多功能实用工具,能够在不少如:缓存,消息传递队列中使用(Redis原生支持发布/订阅),在应用程序中,如:Web应用程序会话,网站页面点击数等任何短暂的数据;

安装Redis环境

要在 Ubuntu 上安装 Redis,打开终端,而后输入如下命令:
$sudo apt-get update
$sudo apt-get install redis-server
这将在您的计算机上安装Redis

启动 Redis

$redis-server

查看 redis 是否还在运行

$redis-cli
这将打开一个 Redis 提示符,以下图所示:
redis 127.0.0.1:6379>
在上面的提示信息中:127.0.0.1 是本机的IP地址,6379是 Redis 服务器运行的端口。如今输入 PING 命令,以下图所示:
redis 127.0.0.1:6379> ping
PONG
这说明如今你已经成功地在计算机上安装了 Redis。
 
链接方式

一、操做模式

redis-py提供两个类Redis和StrictRedis用于实现Redis的命令,StrictRedis用于实现大部分官方的命令,并使用官方的语法和命令,Redis是StrictRedis的子类,用于向后兼容旧版本的redis-py。

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import redis

r = redis.Redis(host='192.168.1.40',port=6379)
r.set('foo','Bar')

print(r.get('foo'))

二、链接池

redis-py使用connection pool来管理对一个redis server的全部链接,避免每次创建、释放链接的开销。默认,每一个Redis实例都会维护一个本身的链接池。能够直接创建一个链接池,而后做为参数Redis,这样就能够实现多个Redis实例共享一个链接池。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import redis

pool = redis.ConnectionPool(host='192.168.1.40',port=6379)

r = redis.Redis(connection_pool=pool)
r.set('foo','Bar')

print(r.get('foo'))

 

操做

1. String操做

redis中的String在在内存中按照一个name对应一个value来存储。如图:

set(name, value, ex=None, px=None, nx=False, xx=False)

在Redis中设置值,默认,不存在则建立,存在则修改
参数:
      ex,过时时间(秒)
      px,过时时间(毫秒)
      nx,若是设置为True,则只有name不存在时,当前set操做才执行
      xx,若是设置为True,则只有name存在时,岗前set操做才执行

setnx(name, value)

设置值,只有name不存在时,执行设置操做(添加)

setex(name, value, time)

# 设置值
# 参数:
     # time,过时时间(数字秒 或 timedelta对象)

psetex(name, time_ms, value)

# 设置值
# 参数:
     # time_ms,过时时间(数字毫秒 或 timedelta对象)

mset(*args, **kwargs)

批量设置值
如:
     mset(k1= 'v1' , k2= 'v2' )
    
     mget({ 'k1' 'v1' 'k2' 'v2' })

get(name)

获取值

mget(keys, *args)

批量获取
如:
     mget( 'ylr' 'wupeiqi' )
    
     r.mget([ 'ylr' 'wupeiqi' ])

getset(name, value)

设置新值并获取原来的值

getrange(key, start, end)

# 获取子序列(根据字节获取,非字符)
# 参数:
     # name,Redis 的 name
     # start,起始位置(字节)
     # end,结束位置(字节)
# 如: "武沛齐" ,0-3表示 "武"

setrange(name, offset, value)

# 修改字符串内容,从指定字符串索引开始向后替换(新值太长时,则向后添加)
# 参数:
     # offset,字符串的索引,字节(一个汉字三个字节)
     # value,要设置的值

setbit(name, offset, value)

# 对name对应值的二进制表示的位进行操做
 
# 参数:
     # name,redis的name
     # offset,位的索引(将值变换成二进制后再进行索引)
     # value,值只能是 1 或 0
 
# 注:若是在Redis中有一个对应: n1 = "foo",
         那么字符串foo的二进制表示为: 01100110  01101111  01101111
     因此,若是执行 setbit( 'n1' 7 1 ),则就会将第 7 位设置为 1
         那么最终二进制则变成  01100111  01101111  01101111 ,即: "goo"
 
# 扩展,转换二进制表示:
 
     # source = "武沛齐"
     source  =  "foo"
 
     for  in  source:
         num  =  ord (i)
         print  bin (num).replace( 'b' ,'')
 
     特别的,若是source是汉字  "武沛齐" 怎么办?
     答:对于utf - 8 ,每个汉字占  3  个字节,那么  "武沛齐"  则有  9 个字节
        对于汉字, for 循环时候会按照 字节 迭代,那么在迭代时,将每个字节转换 十进制数,而后再将十进制数转换成二进制
         11100110  10101101  10100110  11100110  10110010  10011011  11101001  10111101  10010000
         - - - - - - - - - - - - - - - - - - - - - - - - - -  - - - - - - - - - - - - - - - - - - - - - - - - - - - - -  - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                     武                         沛                           齐

*用途举例,用最省空间的方式,存储在线用户数及分别是哪些用户在线

getbit(name, offset)

# 获取name对应的值的二进制表示中的某位的值 (0或1)

bitcount(key, start=None, end=None)

# 获取name对应的值的二进制表示中 1 的个数
# 参数:
     # key,Redis的name
     # start,位起始位置
     # end,位结束位置

strlen(name)

# 返回name对应值的字节长度(一个汉字3个字节)

incr(self, name, amount=1)

# 自增 name对应的值,当name不存在时,则建立name=amount,不然,则自增。
 
# 参数:
     # name,Redis的name
     # amount,自增数(必须是整数)
 
# 注:同incrby

incrbyfloat(self, name, amount=1.0)

# 自增 name对应的值,当name不存在时,则建立name=amount,不然,则自增。
 
# 参数:
     # name,Redis的name
     # amount,自增数(浮点型)

decr(self, name, amount=1)

# 自减 name对应的值,当name不存在时,则建立name=amount,不然,则自减。
 
# 参数:
     # name,Redis的name
     # amount,自减数(整数)

append(key, value)

# 在redis name对应的值后面追加内容
 
# 参数:
     key, redis的name
     value, 要追加的字符串

  

2. Hash操做

hash表现形式上有些像pyhton中的dict,能够存储一组关联性较强的数据 , redis中Hash在内存中的存储格式以下图:  

 

hset(name, key, value)

# name对应的hash中设置一个键值对(不存在,则建立;不然,修改)
 
# 参数:
     # name,redis的name
     # key,name对应的hash中的key
     # value,name对应的hash中的value
 
# 注:
     # hsetnx(name, key, value),当name对应的hash中不存在当前key时则建立(至关于添加)

hmset(name, mapping)

# 在name对应的hash中批量设置键值对
 
# 参数:
     # name,redis的name
     # mapping,字典,如:{'k1':'v1', 'k2': 'v2'}
 
# 如:
     # r.hmset('xx', {'k1':'v1', 'k2': 'v2'})

hget(name,key)

# 在name对应的hash中获取根据key获取value

hmget(name, keys, *args)

# 在name对应的hash中获取多个key的值
 
# 参数:
     # name,reids对应的name
     # keys,要获取key集合,如:['k1', 'k2', 'k3']
     # *args,要获取的key,如:k1,k2,k3
 
# 如:
     # r.mget('xx', ['k1', 'k2'])
     # 或
     # print r.hmget('xx', 'k1', 'k2')

hgetall(name)

获取name对应 hash 的全部键值

hlen(name)

# 获取name对应的hash中键值对的个数

hkeys(name)

# 获取name对应的hash中全部的key的值

hvals(name)

# 获取name对应的hash中全部的value的值

hexists(name, key)

# 检查name对应的hash是否存在当前传入的key

hdel(name,*keys)

# 将name对应的hash中指定key的键值对删除

hincrby(name, key, amount=1)

# 自增name对应的hash中的指定key的值,不存在则建立key=amount
# 参数:
     # name,redis中的name
     # key, hash对应的key
     # amount,自增数(整数)

hincrbyfloat(name, key, amount=1.0)

# 自增name对应的hash中的指定key的值,不存在则建立key=amount
 
# 参数:
     # name,redis中的name
     # key, hash对应的key
     # amount,自增数(浮点数)
 
# 自增name对应的hash中的指定key的值,不存在则建立key=amount

hscan(name, cursor=0, match=None, count=None)

Start a full hash scan with:

HSCAN myhash 0

Start a hash scan with fields matching a pattern with:

HSCAN myhash 0 MATCH order_*

Start a hash scan with fields matching a pattern and forcing the scan command to do more scanning with:

HSCAN myhash 0 MATCH order_* COUNT 1000

 

# 增量式迭代获取,对于数据大的数据很是有用,hscan能够实现分片的获取数据,并不是一次性将数据所有获取完,从而放置内存被撑爆
 
# 参数:
     # name,redis的name
     # cursor,游标(基于游标分批取获取数据)
     # match,匹配指定key,默认None 表示全部的key
     # count,每次分片最少获取个数,默认None表示采用Redis的默认分片个数
 
# 如:
     # 第一次:cursor1, data1 = r.hscan('xx', cursor=0, match=None, count=None)
     # 第二次:cursor2, data1 = r.hscan('xx', cursor=cursor1, match=None, count=None)
     # ...
     # 直到返回值cursor的值为0时,表示数据已经经过分片获取完毕

 

hscan_iter(name, match=None, count=None)

# 利用yield封装hscan建立生成器,实现分批去redis中获取数据
  
# 参数:
     # match,匹配指定key,默认None 表示全部的key
     # count,每次分片最少获取个数,默认None表示采用Redis的默认分片个数
  
# 如:
     # for item in r.hscan_iter('xx'):
     #     print item

 

3. list

List操做,redis中的List在在内存中按照一个name对应一个List来存储。如图:  

lpush(name,values)

# 在name对应的list中添加元素,每一个新的元素都添加到列表的最左边
 
# 如:
     # r.lpush('oo', 11,22,33)
     # 保存顺序为: 33,22,11
 
# 扩展:
     # rpush(name, values) 表示从右向左操做

lpushx(name,value)

# 在name对应的list中添加元素,只有name已经存在时,值添加到列表的最左边
 
# 更多:
     # rpushx(name, value) 表示从右向左操做

llen(name)

# name对应的list元素的个数

linsert(name, where, refvalue, value))

# 在name对应的列表的某一个值前或后插入一个新值
 
# 参数:
     # name,redis的name
     # where,BEFORE或AFTER
     # refvalue,标杆值,即:在它先后插入数据
     # value,要插入的数据

r.lset(name, index, value)

# 对name对应的list中的某一个索引位置从新赋值
 
# 参数:
     # name,redis的name
     # index,list的索引位置
     # value,要设置的值

r.lrem(name, value, num)

# 在name对应的list中删除指定的值
 
# 参数:
     # name,redis的name
     # value,要删除的值
     # num,  num=0,删除列表中全部的指定值;
            # num=2,从前到后,删除2个;
            # num=-2,从后向前,删除2个

lpop(name)

# 在name对应的列表的左侧获取第一个元素并在列表中移除,返回值则是第一个元素
 
# 更多:
     # rpop(name) 表示从右向左操做

lindex(name, index)

在name对应的列表中根据索引获取列表元素

lrange(name, start, end)

# 在name对应的列表分片获取数据
# 参数:
     # name,redis的name
     # start,索引的起始位置
     # end,索引结束位置

ltrim(name, start, end)

# 在name对应的列表中移除没有在start-end索引之间的值
# 参数:
     # name,redis的name
     # start,索引的起始位置
     # end,索引结束位置

rpoplpush(src, dst)

# 从一个列表取出最右边的元素,同时将其添加至另外一个列表的最左边
# 参数:
     # src,要取数据的列表的name
     # dst,要添加数据的列表的name

blpop(keys, timeout)

# 将多个列表排列,按照从左到右去pop对应列表的元素
 
# 参数:
     # keys,redis的name的集合
     # timeout,超时时间,当元素全部列表的元素获取完以后,阻塞等待列表内有数据的时间(秒), 0 表示永远阻塞
 
# 更多:
     # r.brpop(keys, timeout),从右向左获取数据

brpoplpush(src, dst, timeout=0)

# 从一个列表的右侧移除一个元素并将其添加到另外一个列表的左侧
 
# 参数:
     # src,取出并要移除元素的列表对应的name
     # dst,要插入元素的列表对应的name
     # timeout,当src对应的列表中没有数据时,阻塞等待其有数据的超时时间(秒),0 表示永远阻塞

 

4.set集合操做

Set操做,Set集合就是不容许重复的列表

sadd(name,values)

# name对应的集合中添加元素

scard(name)

获取name对应的集合中元素个数

sdiff(keys, *args)

在第一个name对应的集合中且不在其余name对应的集合的元素集合

sdiffstore(dest, keys, *args)

# 获取第一个name对应的集合中且不在其余name对应的集合,再将其新加入到dest对应的集合中

sinter(keys, *args)

# 获取多一个name对应集合的并集

sinterstore(dest, keys, *args)

# 获取多一个name对应集合的并集,再讲其加入到dest对应的集合中

sismember(name, value)

# 检查value是不是name对应的集合的成员

smembers(name)

# 获取name对应的集合的全部成员

smove(src, dst, value)

# 将某个成员从一个集合中移动到另一个集合

spop(name)

# 从集合的右侧(尾部)移除一个成员,并将其返回

srandmember(name, numbers)

# 从name对应的集合中随机获取 numbers 个元素

srem(name, values)

# 在name对应的集合中删除某些值

sunion(keys, *args)

# 获取多一个name对应的集合的并集

sunionstore(dest,keys, *args)

# 获取多一个name对应的集合的并集,并将结果保存到dest对应的集合中

sscan(name, cursor=0, match=None, count=None)
sscan_iter(name, match=None, count=None)

# 同字符串的操做,用于增量迭代分批获取元素,避免内存消耗太大

 

有序集合,在集合的基础上,为每元素排序;元素的排序须要根据另一个值来进行比较,因此,对于有序集合,每个元素有两个值,即:值和分数,分数专门用来作排序。

zadd(name, *args, **kwargs)

# 在name对应的有序集合中添加元素
# 如:
      # zadd('zz', 'n1', 1, 'n2', 2)
      # 或
      # zadd('zz', n1=11, n2=22)

zcard(name)

# 获取name对应的有序集合元素的数量

zcount(name, min, max)

# 获取name对应的有序集合中分数 在 [min,max] 之间的个数

zincrby(name, value, amount)

# 自增name对应的有序集合的 name 对应的分数

r.zrange( name, start, end, desc=False, withscores=False, score_cast_func=float)

# 按照索引范围获取name对应的有序集合的元素
 
# 参数:
     # name,redis的name
     # start,有序集合索引发始位置(非分数)
     # end,有序集合索引结束位置(非分数)
     # desc,排序规则,默认按照分数从小到大排序
     # withscores,是否获取元素的分数,默认只获取元素的值
     # score_cast_func,对分数进行数据转换的函数
 
# 更多:
     # 从大到小排序
     # zrevrange(name, start, end, withscores=False, score_cast_func=float)
 
     # 按照分数范围获取name对应的有序集合的元素
     # zrangebyscore(name, min, max, start=None, num=None, withscores=False, score_cast_func=float)
     # 从大到小排序
     # zrevrangebyscore(name, max, min, start=None, num=None, withscores=False, score_cast_func=float)

zrank(name, value)

# 获取某个值在 name对应的有序集合中的排行(从 0 开始)
 
# 更多:
     # zrevrank(name, value),从大到小排序

 

zrem(name, values)

# 删除name对应的有序集合中值是values的成员
 
# 如:zrem('zz', ['s1', 's2'])

zremrangebyrank(name, min, max)

# 根据排行范围删除

zremrangebyscore(name, min, max)

# 根据分数范围删除

zscore(name, value)

# 获取name对应有序集合中 value 对应的分数

zinterstore(dest, keys, aggregate=None)

# 获取两个有序集合的交集,若是遇到相同值不一样分数,则按照aggregate进行操做
# aggregate的值为:  SUM  MIN  MAX

zunionstore(dest, keys, aggregate=None)

# 获取两个有序集合的并集,若是遇到相同值不一样分数,则按照aggregate进行操做
# aggregate的值为:  SUM  MIN  MAX

zscan(name, cursor=0, match=None, count=None, score_cast_func=float)
zscan_iter(name, match=None, count=None,score_cast_func=float)

# 同字符串类似,相较于字符串新增score_cast_func,用来对分数进行操做

 

其余经常使用操做

delete(*names)

# 根据删除redis中的任意数据类型

exists(name)

# 检测redis的name是否存在

keys(pattern='*')

# 根据模型获取redis的name
 
# 更多:
     # KEYS * 匹配数据库中全部 key 。
     # KEYS h?llo 匹配 hello , hallo 和 hxllo 等。
     # KEYS h*llo 匹配 hllo 和 heeeeello 等。
     # KEYS h[ae]llo 匹配 hello 和 hallo ,但不匹配 hillo

expire(name ,time)

# 为某个redis的某个name设置超时时间

rename(src, dst)

# 对redis的name重命名为

move(name, db))

# 将redis的某个值移动到指定的db下

randomkey()

# 随机获取一个redis的name(不删除)

type(name)

# 获取name对应值的类型

scan(cursor=0, match=None, count=None)
scan_iter(match=None, count=None)

# 同字符串操做,用于增量迭代获取key

 

管道

redis-py默认在执行每次请求都会建立(链接池申请链接)和断开(归还链接池)一次链接操做,若是想要在一次请求中指定多个命令,则可使用pipline实现一次请求指定多个命令,而且默认状况下一次pipline 是原子性操做。

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import redis
import time

pool = redis.ConnectionPool(host='192.168.1.40',port=6379)

r = redis.Redis(connection_pool=pool)

# pipe = r.pipeline(transaction=False)
pipe = r.pipeline(transaction=True)

pipe.set('name', 'alex')
# time.sleep(50)
pipe.set('role', 'sb')

pipe.execute()

 

发布\订阅

 

 

 

发布者:服务器

订阅者:Dashboad和数据处理

Demo以下:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import redis


class RedisHelper:

    def __init__(self):
        self.__conn = redis.Redis(host='192.168.1.40')
        self.chan_sub = 'fm104.5'
        self.chan_pub = 'fm104.5'

    def public(self, msg):
        self.__conn.publish(self.chan_pub, msg)
        return True

    def subscribe(self):
        pub = self.__conn.pubsub()  #打开收音机
        pub.subscribe(self.chan_sub)    #调频道
        pub.parse_response()    #准备接收
        return pub

订阅者

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from redis_helper import RedisHelper

obj = RedisHelper() redis_sub = obj.subscribe() while True: msg = redis_sub.parse_response() print(msg)

发布者

#!/usr/bin/env python
# -*- coding:utf-8 -*-



from redis_helper import RedisHelper

obj = RedisHelper()
obj.public('hello')

 

 

做业一:

题目:IO多路复用版FTP

需求:

  1. 实现文件上传及下载功能
  2. 支持多链接并发传文件
  3. 使用select or selectors

 

做业二:

题目:rpc命令端

需求:

  1. 能够异步的执行多个命令
  2. 对多台机器

>>:run "df -h" --hosts 192.168.3.55 10.4.3.4task id: 45334>>: check_task 45334>>:

相关文章
相关标签/搜索