一 概述
一般python 链接redis数据库用的redis-py 这个三方库,最近研究了一下它的源码python
它链接服务器的函数是这个 _connect 函数redis
def _connect(self****): """建立一个tcp socket 链接""" # we want to mimic wha****t socket.create_connection does to support # ipv4/ipv6, but we want to set options prior to calling # socket.connect() err = None for res in socket.getaddrinfo(self.host, self.port, 0, socket.SOCK_STREAM): family, socktype, proto, canonname, socket_address = res sock = None try: sock = socket.socket(family, socktype, proto) # TCP_NODELAY sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # TCP 长链接 if self.socket_keepalive: sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) for k, v in iteritems(self.socket_keepalive_options): sock.setsockopt(socket.SOL_TCP, k, v) # 咱们链接以前设置socket_connect_timeout超时时间 sock.settimeout(self.socket_connect_timeout) # 链接 sock.connect(socket_address) # 链接成功后设置 socket_timeout 时间 sock.settimeout(self.socket_timeout) return sock except socket.error as _: err = _ if sock is not None: sock.close() if err is not None: raise err raise socket.error("socket.getaddrinfo returned an empty list")
on_connect 函数,实列化链接,有密码则发送密码,选择数据表数据库
def on_connect(self): "Initialize the connection, authenticate and select a database" self._parser.on_connect(self) # if a password is specified, authenticate if self.password: # 若是redis服务作了进一步的安全加固,也是在这里进行鉴权 self.send_command('AUTH', self.password) if nativestr(self.read_response()) != 'OK': raise AuthenticationError('Invalid Password') # if a database is specified, switch to it if self.db: self.send_command('SELECT', self.db) if nativestr(self.read_response()) != 'OK': raise ConnectionError('Invalid Database')
执行命令函数 execute_command,从链接池获取链接并发送命令安全
def execute_command(self, *args, **options): "Execute a command and return a parsed response" pool = self.connection_pool command_name = args[0] connection = pool.get_connection(command_name, **options) try: connection.send_command(*args) return self.parse_response(connection, command_name, **options) except (ConnectionError, TimeoutError) as e: connection.disconnect() if not connection.retry_on_timeout and isinstance(e, TimeoutError): raise connection.send_command(*args) return self.parse_response(connection, command_name, **options) finally: pool.release(connection)