pip install --upgrade redis
/usr/lib/python2.7/site-packages/redis
客户端向服务端发送请求,服务端收到请求,双方创建链接,客户端给服务端发送消息,服务端回应客户端,而后一次读写就完成了,这时双方任何一个均可以发送关闭请求,不过通常是客户端,短连接管理起来比较方便,存在的链接都是有用的,不须要额外的控制手段python
客户端向服务端发送请求,服务器接收客户端请求,双方创建链接,客户端和服务端之间完成一次读写以后,它们之间的的链接不会主动关闭,后续的操做会继续使用这个链接,可是一旦开启此链接将不会主动断开,除非客户端挂掉/主动断开,可是服务端依然保持半开式链接等待客户端数据,长用TCP保活功能keepalive和超时时间timeout来关闭这些半开式链接redis
一个数据库链接对象均为一个物理数据库链接,每次操做都打开一个物理链接,使用完后都关闭链接,这样会形成系统的性能低下,数据库链接池的解决方案是在应用程序启动时创建足够多的数据库链接,并将这些链接放在一个链接池中,由应用程序动态对池中的链接进行申请,使用和释放,对于多余链接池中链接数的并发请求,会放在请求中排列等待,而且应用程序能够根据链接池中的链接使用率,动态的增长或是减小池中的链接数,链接池技术尽量多地重用了消耗内存的资源(避免了重复性的链接关闭的消耗),大大节省了内存,提升了服务效率,支持更多的客户端,同时能够经过其自身的管理机制来监视数据库的链接数和使用状况数据库
1.最小链接数是链接池一直保持的数据库链接,因此全部应用程序对数据库链接的使用量不大,将会形成大量的数据库链接资源被浪费服务器
2.最大链接数是链接池能申请的最大链接数,若是数据库链接请求超过此数,后面的请求将被加入到等待队列,这会影响以后的数据库操做并发
/usr/lib/python2.7/site-packages/redis/client.pyapp
# Redis是StrictRedis的子类 class Redis(StrictRedis): """ Provides backwards compatibility with older versions of redis-py that changed arguments to some commands to be more Pythonic, sane, or by accident. """
说明:从如上代码能够看出Redis是StrictRedis的子类,两个类下的方法相似,后者用于实现大部分官方的命令,而前者为了向后兼容旧版模块儿,因此官方推荐使用StrictRedis,Redis子类下的LREM/ZADD/SETEX绝对是个坑~要么本身尝试,要么乖乖用StrictRedis~python2.7
# StrictRedis基类 class StrictRedis(object): """ Implementation of the Redis protocol. This abstract class provides a Python interface to all Redis commands and an implementation of the Redis protocol. Connection and Pipeline derive from this, implementing how the commands are sent and received to the Redis server """ def __init__(self, host='localhost', port=6379, db=0, password=None, socket_timeout=None, socket_connect_timeout=None, socket_keepalive=None, socket_keepalive_options=None, connection_pool=None, unix_socket_path=None, encoding='utf-8', encoding_errors='strict', charset=None, errors=None, decode_responses=False, retry_on_timeout=False, ssl=False, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs=None, ssl_ca_certs=None): # 若是connection_pool没有定义的话就尝试获取其它设置后并从新建立链接池 if not connection_pool: if charset is not None: warnings.warn(DeprecationWarning( '"charset" is deprecated. Use "encoding" instead')) encoding = charset if errors is not None: warnings.warn(DeprecationWarning( '"errors" is deprecated. Use "encoding_errors" instead')) encoding_errors = errors kwargs = { 'db': db, 'password': password, 'socket_timeout': socket_timeout, 'encoding': encoding, 'encoding_errors': encoding_errors, 'decode_responses': decode_responses, 'retry_on_timeout': retry_on_timeout } # based on input, setup appropriate connection args if unix_socket_path is not None: kwargs.update({ 'path': unix_socket_path, 'connection_class': UnixDomainSocketConnection }) else: # TCP specific options kwargs.update({ 'host': host, 'port': port, 'socket_connect_timeout': socket_connect_timeout, 'socket_keepalive': socket_keepalive, 'socket_keepalive_options': socket_keepalive_options, }) if ssl: kwargs.update({ 'connection_class': SSLConnection, 'ssl_keyfile': ssl_keyfile, 'ssl_certfile': ssl_certfile, 'ssl_cert_reqs': ssl_cert_reqs, 'ssl_ca_certs': ssl_ca_certs, }) connection_pool = ConnectionPool(**kwargs) # 若是没有已经建立链接池则使用已经建立的链接池若是没有链接池则默认也会建立链接池 self.connection_pool = connection_pool self._use_lua_lock = None self.response_callbacks = self.__class__.RESPONSE_CALLBACKS.copy() # COMMAND EXECUTION AND PROTOCOL PARSING def execute_command(self, *args, **options): "Execute a command and return a parsed response" pool = self.connection_pool command_name = args[0] # 从链接池中获取链接执行command_name connection = pool.get_connection(command_name, **options) try: connection.send_command(*args) return self.parse_response(connection, command_name, **options) except (ConnectionError, TimeoutError) as e: connection.disconnect() if not connection.retry_on_timeout and isinstance(e, TimeoutError): raise connection.send_command(*args) return self.parse_response(connection, command_name, **options) finally: pool.release(connection)
/usr/lib/python2.7/site-packages/redis/connection.py:socket
# 建立链接池类 class ConnectionPool(object): "Generic connection pool" @classmethod def from_url(cls, url, db=None, **kwargs): """ Return a connection pool configured from the given URL. For example:: redis://[:password]@localhost:6379/0 rediss://[:password]@localhost:6379/0 unix://[:password]@/path/to/socket.sock?db=0 Three URL schemes are supported: redis:// creates a normal TCP socket connection rediss:// creates a SSL wrapped TCP socket connection unix:// creates a Unix Domain Socket connection There are several ways to specify a database number. The parse function will return the first specified option: 1. A ``db`` querystring option, e.g. redis://localhost?db=0 2. If using the redis:// scheme, the path argument of the url, e.g. redis://localhost/0 3. The ``db`` argument to this function. If none of these options are specified, db=0 is used. Any additional querystring arguments and keyword arguments will be passed along to the ConnectionPool class's initializer. In the case of conflicting arguments, querystring arguments always win. """ def __init__(self, connection_class=Connection, max_connections=None, **connection_kwargs): """ Create a connection pool. If max_connections is set, then this object raises redis.ConnectionError when the pool's limit is reached. By default, TCP connections are created connection_class is specified. Use redis.UnixDomainSocketConnection for unix sockets. Any additional keyword arguments are passed to the constructor of connection_class. """ # 最大链接数默认为62 max_connections = max_connections or 2 ** 31 if not isinstance(max_connections, (int, long)) or max_connections < 0: raise ValueError('"max_connections" must be a positive integer') self.connection_class = connection_class self.connection_kwargs = connection_kwargs self.max_connections = max_connections # 初始化线程池 self.reset() def __repr__(self): return "%s<%s>" % ( type(self).__name__, self.connection_class.description_format % self.connection_kwargs, ) # 初始化线程池 def reset(self): # 获取当前的进程号,后面判断进程是否挂掉 self.pid = os.getpid() # 存放已经建立的链接(计数) self._created_connections = 0 # 存放可用的链接对象(列表) self._available_connections = [] # 存放正在使用的链接对象(集合) self._in_use_connections = set() # 建立线程锁 self._check_lock = threading.Lock() def _checkpid(self): if self.pid != os.getpid(): with self._check_lock: if self.pid == os.getpid(): # another thread already did the work while we waited # on the lock. return self.disconnect() self.reset() # 从链接池中获取链接 def get_connection(self, command_name, *keys, **options): "Get a connection from the pool" self._checkpid() try: # 从链接池中pop出一个链接对象 connection = self._available_connections.pop() except IndexError: # 若是链接池中已经没有链接的话从新建立链接 connection = self.make_connection() # 当前正在使用的链接集合中添加此链接 self._in_use_connections.add(connection) # 并返回此链接对象 return connection # 建立链接 def make_connection(self): "Create a new connection" # 若是大于最大链接数则抛出异常 if self._created_connections >= self.max_connections: raise ConnectionError("Too many connections") # 不然建立的链接数++ self._created_connections += 1 # 利用建立链接类实例化一个链接 return self.connection_class(**self.connection_kwargs) # 释放链接 def release(self, connection): "Releases the connection back to the pool" self._checkpid() if connection.pid != self.pid: return # 并无关闭链接而是从在使用的链接列表中删除此链接,回收链接对象 self._in_use_connections.remove(connection) # 可用链接就回收了一个链接对象 self._available_connections.append(connection) # 关闭链接 def disconnect(self): "Disconnects all connections in the pool" # 关联获取全部可迭代对象获取全部链接对象 all_conns = chain(self._available_connections, self._in_use_connections) # 便历关闭全部链接 for connection in all_conns: connection.disconnect()
# 建立链接类 class Connection(object): "Manages TCP communication to and from a Redis server" description_format = "Connection<host=%(host)s,port=%(port)s,db=%(db)s>" def __init__(self, host='localhost', port=6379, db=0, password=None, socket_timeout=None, socket_connect_timeout=None, socket_keepalive=False, socket_keepalive_options=None, retry_on_timeout=False, encoding='utf-8', encoding_errors='strict', decode_responses=False, parser_class=DefaultParser, socket_read_size=65536): self.pid = os.getpid() self.host = host self.port = int(port) self.db = db self.password = password self.socket_timeout = socket_timeout self.socket_connect_timeout = socket_connect_timeout or socket_timeout self.socket_keepalive = socket_keepalive self.socket_keepalive_options = socket_keepalive_options or {} self.retry_on_timeout = retry_on_timeout self.encoding = encoding self.encoding_errors = encoding_errors self.decode_responses = decode_responses self._sock = None self._parser = parser_class(socket_read_size=socket_read_size) self._description_args = { 'host': self.host, 'port': self.port, 'db': self.db, } self._connect_callbacks = [] def __repr__(self): return self.description_format % self._description_args # 对象删除时调用disconnect方法关闭对象 def __del__(self): try: self.disconnect() except Exception: pass def register_connect_callback(self, callback): self._connect_callbacks.append(callback) def clear_connect_callbacks(self): self._connect_callbacks = [] # 核心的链接方法仍是经过socket模块儿实现 def connect(self): "Connects to the Redis server if not already connected" if self._sock: return try: # 私有方法建立链接 sock = self._connect() except socket.error: e = sys.exc_info()[1] raise ConnectionError(self._error_message(e)) self._sock = sock try: self.on_connect() except RedisError: # clean up after any error in on_connect self.disconnect() raise # run any user callbacks. right now the only internal callback # is for pubsub channel/pattern resubscription for callback in self._connect_callbacks: callback(self) # 建立tcp socket def _connect(self): "Create a TCP socket connection" # we want to mimic what socket.create_connection does to support # ipv4/ipv6, but we want to set options prior to calling # socket.connect() err = None for res in socket.getaddrinfo(self.host, self.port, 0, socket.SOCK_STREAM): family, socktype, proto, canonname, socket_address = res sock = None try: sock = socket.socket(family, socktype, proto) # TCP_NODELAY sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # TCP_KEEPALIVE # 默认使用的是短链接,设置socket_keepalive=True保持长链接 if self.socket_keepalive: sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) for k, v in iteritems(self.socket_keepalive_options): sock.setsockopt(socket.SOL_TCP, k, v) # 设置超市时间,默认不设置为阻塞模式 # set the socket_connect_timeout before we connect sock.settimeout(self.socket_connect_timeout) # connect sock.connect(socket_address) # set the socket_timeout now that we're connected sock.settimeout(self.socket_timeout) return sock except socket.error as _: err = _ if sock is not None: sock.close() if err is not None: raise err raise socket.error("socket.getaddrinfo returned an empty list") def _error_message(self, exception): # args for socket.error can either be (errno, "message") # or just "message" if len(exception.args) == 1: return "Error connecting to %s:%s. %s." % \ (self.host, self.port, exception.args[0]) else: return "Error %s connecting to %s:%s. %s." % \ (exception.args[0], self.host, self.port, exception.args[1]) def on_connect(self): "Initialize the connection, authenticate and select a database" self._parser.on_connect(self) # if a password is specified, authenticate if self.password: self.send_command('AUTH', self.password) if nativestr(self.read_response()) != 'OK': raise AuthenticationError('Invalid Password') # if a database is specified, switch to it if self.db: self.send_command('SELECT', self.db) if nativestr(self.read_response()) != 'OK': raise ConnectionError('Invalid Database') # 关闭链接 def disconnect(self): "Disconnects from the Redis server" self._parser.on_disconnect() if self._sock is None: return try: # 先shutdown而后再close self._sock.shutdown(socket.SHUT_RDWR) self._sock.close() except socket.error: pass self._sock = None def send_packed_command(self, command): "Send an already packed command to the Redis server" if not self._sock: self.connect() try: if isinstance(command, str): command = [command] for item in command: self._sock.sendall(item) except socket.timeout: self.disconnect() raise TimeoutError("Timeout writing to socket") except socket.error: e = sys.exc_info()[1] self.disconnect() if len(e.args) == 1: _errno, errmsg = 'UNKNOWN', e.args[0] else: _errno, errmsg = e.args raise ConnectionError("Error %s while writing to socket. %s." % (_errno, errmsg)) except: self.disconnect() raise def send_command(self, *args): "Pack and send a command to the Redis server" self.send_packed_command(self.pack_command(*args)) def can_read(self): "Poll the socket to see if there's data that can be read." sock = self._sock if not sock: self.connect() sock = self._sock return bool(select([sock], [], [], 0)[0]) or self._parser.can_read() def read_response(self): "Read the response from a previously sent command" try: response = self._parser.read_response() except: self.disconnect() raise if isinstance(response, ResponseError): raise response return response def encode(self, value): "Return a bytestring representation of the value" if isinstance(value, Token): return b(value.value) elif isinstance(value, bytes): return value elif isinstance(value, (int, long)): value = b(str(value)) elif isinstance(value, float): value = b(repr(value)) elif not isinstance(value, basestring): value = str(value) if isinstance(value, unicode): value = value.encode(self.encoding, self.encoding_errors) return value def pack_command(self, *args): "Pack a series of arguments into the Redis protocol" output = [] # the client might have included 1 or more literal arguments in # the command name, e.g., 'CONFIG GET'. The Redis server expects these # arguments to be sent separately, so split the first argument # manually. All of these arguements get wrapped in the Token class # to prevent them from being encoded. command = args[0] if ' ' in command: args = tuple([Token(s) for s in command.split(' ')]) + args[1:] else: args = (Token(command),) + args[1:] buff = SYM_EMPTY.join( (SYM_STAR, b(str(len(args))), SYM_CRLF)) for arg in imap(self.encode, args): # to avoid large string mallocs, chunk the command into the # output list if we're sending large values if len(buff) > 6000 or len(arg) > 6000: buff = SYM_EMPTY.join( (buff, SYM_DOLLAR, b(str(len(arg))), SYM_CRLF)) output.append(buff) output.append(arg) buff = SYM_CRLF else: buff = SYM_EMPTY.join((buff, SYM_DOLLAR, b(str(len(arg))), SYM_CRLF, arg, SYM_CRLF)) output.append(buff) return output def pack_commands(self, commands): "Pack multiple commands into the Redis protocol" output = [] pieces = [] buffer_length = 0 for cmd in commands: for chunk in self.pack_command(*cmd): pieces.append(chunk) buffer_length += len(chunk) if buffer_length > 6000: output.append(SYM_EMPTY.join(pieces)) buffer_length = 0 pieces = [] if pieces: output.append(SYM_EMPTY.join(pieces)) return output
1.默认状况下每建立一个Redis实例都会构造出一个ConnectionPool实例,每一次redis都会从这个链接池中拿到一个链接,操做完成后把该链接放回链接池(链接并无释放)tcp
2.能够构造一个ConnectionPool,在建立实例时,能够将ConnectionPool传入,后续的操做将会从给定的ConnectionPool得到链接,不会再重复建立ConnectionPoolide
3.若是不考虑底层TCP的状况,链接池中的链接会在ConnectionPool.disconnect中统一销毁