今天在写zabbix storm job监控脚本的时候用到了python的redis模块,以前也有用过,可是没有过多的了解,今天看了下相关的api和源码,看到有ConnectionPool的实现,这里简单说下。
在ConnectionPool以前,若是须要链接redis,我都是用StrictRedis这个类,在源码中能够看到这个类的具体解释:
python
redis.StrictRedis Implementation of the Redis protocol.This abstract class provides a Python interface to all Redis commands and an implementation of the Redis protocol.Connection and Pipeline derive from this, implementing how the commands are sent and received to the Redis server
使用的方法:
redis
r=redis.StrictRedis(host=xxxx, port=xxxx, db=xxxx) r.xxxx()
有了ConnectionPool这个类以后,可使用以下方法
api
pool = redis.ConnectionPool(host=xxx, port=xxx, db=xxxx) r = redis.Redis(connection_pool=pool)
这里Redis是StrictRedis的子类
简单分析以下:
在StrictRedis类的__init__方法中,能够初始化connection_pool这个参数,其对应的是一个ConnectionPool的对象:数组
class StrictRedis(object): ........ def __init__(self, host='localhost', port=6379, db=0, password=None, socket_timeout=None, socket_connect_timeout=None, socket_keepalive=None, socket_keepalive_options=None, connection_pool=None, unix_socket_path=None, encoding='utf-8', encoding_errors='strict', charset=None, errors=None, decode_responses=False, retry_on_timeout=False, ssl=False, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs=None, ssl_ca_certs=None): if not connection_pool: .......... connection_pool = ConnectionPool(**kwargs) self.connection_pool = connection_pool
在StrictRedis的实例执行具体的命令时会调用execute_command方法,这里能够看到具体实现是从链接池中获取一个具体的链接,而后执行命令,完成后释放链接:
bash
# COMMAND EXECUTION AND PROTOCOL PARSING def execute_command(self, *args, **options): "Execute a command and return a parsed response" pool = self.connection_pool command_name = args[0] connection = pool.get_connection(command_name, **options) #调用ConnectionPool.get_connection方法获取一个链接 try: connection.send_command(*args) #命令执行,这里为Connection.send_command return self.parse_response(connection, command_name, **options) except (ConnectionError, TimeoutError) as e: connection.disconnect() if not connection.retry_on_timeout and isinstance(e, TimeoutError): raise connection.send_command(*args) return self.parse_response(connection, command_name, **options) finally: pool.release(connection) #调用ConnectionPool.release释放链接
在来看看ConnectionPool类:
app
class ConnectionPool(object): ........... def __init__(self, connection_class=Connection, max_connections=None, **connection_kwargs): #类初始化时调用构造函数 max_connections = max_connections or 2 ** 31 if not isinstance(max_connections, (int, long)) or max_connections < 0: #判断输入的max_connections是否合法 raise ValueError('"max_connections" must be a positive integer') self.connection_class = connection_class #设置对应的参数 self.connection_kwargs = connection_kwargs self.max_connections = max_connections self.reset() #初始化ConnectionPool 时的reset操做 def reset(self): self.pid = os.getpid() self._created_connections = 0 #已经建立的链接的计数器 self._available_connections = [] #声明一个空的数组,用来存放可用的链接 self._in_use_connections = set() #声明一个空的集合,用来存放已经在用的链接 self._check_lock = threading.Lock() ....... def get_connection(self, command_name, *keys, **options): #在链接池中获取链接的方法 "Get a connection from the pool" self._checkpid() try: connection = self._available_connections.pop() #获取并删除表明链接的元素,在第一次获取connectiong时,由于_available_connections是一个空的数组, 会直接调用make_connection方法 except IndexError: connection = self.make_connection() self._in_use_connections.add(connection) #向表明正在使用的链接的集合中添加元素 return connection def make_connection(self): #在_available_connections数组为空时获取链接调用的方法 "Create a new connection" if self._created_connections >= self.max_connections: #判断建立的链接是否已经达到最大限制,max_connections能够经过参数初始化 raise ConnectionError("Too many connections") self._created_connections += 1 #把表明已经建立的链接的数值+1 return self.connection_class(**self.connection_kwargs) #返回有效的链接,默认为Connection(**self.connection_kwargs) def release(self, connection): #释放链接,连接并无断开,只是存在连接池中 "Releases the connection back to the pool" self._checkpid() if connection.pid != self.pid: return self._in_use_connections.remove(connection) #从集合中删除元素 self._available_connections.append(connection) #并添加到_available_connections 的数组中 def disconnect(self): #断开全部链接池中的连接 "Disconnects all connections in the pool" all_conns = chain(self._available_connections, self._in_use_connections) for connection in all_conns: connection.disconnect()
execute_command最终调用的是Connection.send_command方法,关闭连接为 Connection.disconnect方法,而Connection类的实现:
socket
class Connection(object): "Manages TCP communication to and from a Redis server" def __del__(self): #对象删除时的操做,调用disconnect释放链接 try: self.disconnect() except Exception: pass
核心的连接创建方法是经过socket模块实现:
tcp
def _connect(self): err = None for res in socket.getaddrinfo(self.host, self.port, 0, socket.SOCK_STREAM): family, socktype, proto, canonname, socket_address = res sock = None try: sock = socket.socket(family, socktype, proto) # TCP_NODELAY sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # TCP_KEEPALIVE if self.socket_keepalive: #构造函数中默认 socket_keepalive=False,所以这里默认为短链接 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) for k, v in iteritems(self.socket_keepalive_options): sock.setsockopt(socket.SOL_TCP, k, v) # set the socket_connect_timeout before we connect sock.settimeout(self.socket_connect_timeout) #构造函数中默认socket_connect_timeout=None,即链接为blocking的模式 # connect sock.connect(socket_address) # set the socket_timeout now that we're connected sock.settimeout(self.socket_timeout) #构造函数中默认socket_timeout=None return sock except socket.error as _: err = _ if sock is not None: sock.close() .....
关闭连接的方法:
ide
def disconnect(self): "Disconnects from the Redis server" self._parser.on_disconnect() if self._sock is None: return try: self._sock.shutdown(socket.SHUT_RDWR) #先shutdown再close self._sock.close() except socket.error: pass self._sock = None
能够小结以下
1)默认状况下每建立一个Redis实例都会构造出一个ConnectionPool实例,每一次访问redis都会从这个链接池获得一个链接,操做完成后会把该链接放回链接池(链接并无释放),能够构造一个统一的ConnectionPool,在建立Redis实例时,能够将该ConnectionPool传入,那么后续的操做会从给定的ConnectionPool得到链接,不会再重复建立ConnectionPool。
2)默认状况下没有设置keepalive和timeout,创建的链接是blocking模式的短链接。
3)不考虑底层tcp的状况下,链接池中的链接会在ConnectionPool.disconnect中统一销毁。
函数