带你进入数据库链接池

[原文连接](https://mp.weixin.qq.com/s/7wT_mw4uC0GuhhsJJIV0Pg)复制代码

概述

链接池的做用就是为了提升性能,将已经建立好的链接保存在池中,当有请求来时,直接使用已经建立好的链接对Server端进行访问。这样省略了建立链接和销毁链接的过程(TCP链接创建时的三次握手和销毁时的四次握手),从而在性能上获得了提升。
链接池设计的基本原理是这样的:
(1)创建链接池对象(服务启动)。
(2)按照事先指定的参数建立初始数量的链接(即:空闲链接数)。
(3)对于一个访问请求,直接从链接池中获得一个链接。若是链接池对象中没有空闲的链接,且链接数没有达到最大(即:最大活跃链接数),建立一个新的链接;若是达到最大,则设定必定的超时时间,来获取链接。
(4)运用链接访问服务。
(5)访问服务完成,释放链接(此时的释放链接,并不是真正关闭,而是将其放入空闲队列中。如实际空闲链接数大于初始空闲链接数则释放链接)。
(6)释放链接池对象(服务中止、维护期间,释放链接池对象,并释放全部链接)。python

说的通俗点,能够把链接池理解为一个一个的管道,在管道空闲时,即可以取出使用;同时,也能够铺设新的管道(固然不能超过最大链接数的限制)。使用完以后,管道就变为空闲了。数据库

一般比较经常使用的链接池是数据库链接池,HTTP Client链接池,我也本身编写过链接池,如Thrift链接池及插入Rabbitmq队列的链接池。数组

下面分析三个典型的链接池的设计。缓存

数据库链接池

首先剖析一下数据库链接池的设计与实现的原理。DBUtils 属于数据库链接池实现模块,用于链接DB-API 2模块,对数据库链接线程化,使能够安全和高效的访问数据库的模块。本文主要分析一下PooledDB的流程。安全

DBUtils.PooledDB使用DB-API 2模块实现了一个强硬的、线程安全的、有缓存的、可复用的数据库链接。bash

以下图展现了使用PooledDB时的工做流程:服务器

本文主要考虑dedicated connections,即专用数据库链接,在初始化时链接池时,就须要指定mincached、maxcached以及maxconnections等参数,分别表示链接池的最小链接数、链接池的最大链接数以及系统可用的最大链接数,同时,blocking参数表征了当获取不到链接的时候是阻塞等待获取链接仍是返回异常:架构

if not blocking:
    def wait():
        raise TooManyConnections
    self._condition.wait = wait复制代码

在链接池初始化时,就会创建mincached个链接,代码以下:并发

# Establish an initial number of idle database connections:
idle = [self.dedicated_connection() for i in range(mincached)]
while idle:
    idle.pop().close()复制代码

里面有close方法,看一下链接close方法的实现:app

def close(self):
    """Close the pooled dedicated connection."""
    # Instead of actually closing the connection,
    # return it to the pool for future reuse.
    if self._con:
        self._pool.cache(self._con)
        self._con = None复制代码

主要是实现了cache方法,看一下具体代码:

def cache(self, con):
    """Put a dedicated connection back into the idle cache."""
    self._condition.acquire()
    try:
        if not self._maxcached or len(self._idle_cache) < self._maxcached:
            con._reset(force=self._reset) # rollback possible transaction
            # the idle cache is not full, so put it there
            self._idle_cache.append(con) # append it to the idle cache
        else: # if the idle cache is already full,
            con.close() # then close the connection
        self._connections -= 1
        self._condition.notify()
    finally:
        self._condition.release()复制代码

由上述代码可见,close并非把链接关闭,而是在链接池的数目小于maxcached的时候,将链接放回链接池,而大于此值时,关闭该链接。同时能够注意到,在放回链接池以前,须要将事务进行回滚,避免在使用链接池的时候有存活的事务没有提交。这能够保证进入链接池的链接都是可用的。

而获取链接的过程正如以前讨论的,先从链接池中获取链接,若是获取链接失败,则新创建链接:

# try to get a dedicated connection
    self._condition.acquire()
    try:
        while (self._maxconnections
                and self._connections >= self._maxconnections):
            self._condition.wait()
        # connection limit not reached, get a dedicated connection
        try: # first try to get it from the idle cache
            con = self._idle_cache.pop(0)
        except IndexError: # else get a fresh connection
            con = self.steady_connection()
        else:
            con._ping_check() # check connection
        con = PooledDedicatedDBConnection(self, con)
        self._connections += 1
    finally:
        self._condition.release()复制代码

关闭链接正如刚刚建立mincached个链接后关闭链接的流程,在链接池的数目小于maxcached的时候,将链接放回链接池,而大于此值时,关闭该链接。

RabbitMQ队列插入消息链接池

异步消息传递是高并发系统经常使用的一种技术手段。而这其中就少不了消息队列。频繁的向消息队列里面插入消息,创建链接释放链接会是比较大的开销。因此,可使用链接池来提升系统性能。

链接池的设计实现以下:

在获取链接的时候,先从队列里面获取链接,若是获取不到,则新创建一个链接,若是不能新创建链接,则根据超时时间,阻塞等待从队列里面获取连接。若是没成功,则作最后的尝试,从新创建链接。代码实现以下:

def get_connection_pipe(self):
        """ 获取链接 :return: """
        try:
            connection_pipe = self._queue.get(False)
        except Queue.Empty:
            try:
                connection_pipe = self.get_new_connection_pipe()
            except GetConnectionException:
                timeout = self.timeout
                try:
                    connection_pipe = self._queue.get(timeout=timeout)
                except Queue.Empty:
                    try:
                        connection_pipe = self.get_new_connection_pipe()
                    except GetConnectionException:
                        logging.error("Too much connections, Get Connection Timeout!")
        if (time.time() - connection_pipe.use_time) > self.disable_time:
            self.close(connection_pipe)
            return self.get_connection_pipe()
        return connection_pipe复制代码

一个RabbitMQ插入消息队列的完整链接池设计以下:

# coding:utf-8
import logging
import threading
import Queue
from kombu import Connection
import time

class InsertQueue():
    def __init__(self, host=None, port=None, virtual_host=None, heartbeat_interval=3, name=None, password=None, logger=None, maxIdle=10, maxActive=50, timeout=30, disable_time=20):
        """ :param str host: Hostname or IP Address to connect to :param int port: TCP port to connect to :param str virtual_host: RabbitMQ virtual host to use :param int heartbeat_interval: How often to send heartbeats :param str name: auth credentials name :param str password: auth credentials password """
        self.logger = logging if logger is None else logger
        self.host = host
        self.port = port
        self.virtual_host = virtual_host
        self.heartbeat_interval = heartbeat_interval
        self.name = name
        self.password = password
        self.mutex = threading.RLock()
        self.maxIdle = maxIdle
        self.maxActive = maxActive
        self.available = self.maxActive
        self.timeout = timeout
        self._queue = Queue.Queue(maxsize=self.maxIdle)
        self.disable_time = disable_time

    def get_new_connection_pipe(self):
        """ 产生新的队列链接 :return: """

        with self.mutex:
            if self.available <= 0:
                raise GetConnectionException
            self.available -= 1
        try:

            conn = Connection(hostname=self.host,
                              port=self.port,
                              virtual_host=self.virtual_host,
                              heartbeat=self.heartbeat_interval,
                              userid=self.name,
                              password=self.password)
            producer = conn.Producer()

            return ConnectionPipe(conn, producer)
        except:
            with self.mutex:
                self.available += 1
            raise GetConnectionException

    def get_connection_pipe(self):
        """ 获取链接 :return: """
        try:
            connection_pipe = self._queue.get(False)
        except Queue.Empty:
            try:
                connection_pipe = self.get_new_connection_pipe()
            except GetConnectionException:
                timeout = self.timeout
                try:
                    connection_pipe = self._queue.get(timeout=timeout)
                except Queue.Empty:
                    try:
                        connection_pipe = self.get_new_connection_pipe()
                    except GetConnectionException:
                        logging.error("Too much connections, Get Connection Timeout!")
        if (time.time() - connection_pipe.use_time) > self.disable_time:
            self.close(connection_pipe)
            return self.get_connection_pipe()
        return connection_pipe

    def close(self, connection_pipe):
        """ close the connection and the correlative channel :param connection_pipe: :return: """
        with self.mutex:
            self.available += 1
            connection_pipe.close()
        return

    def insert_message(self, exchange=None, body=None, routing_key='', mandatory=True):
        """ insert message to queue :param str exchange: exchange name :param str body: message :param str routing_key: routing key :param bool mandatory: is confirm: True means confirm, False means not confirm :return: """

        put_into_queue_flag = True
        insert_result = False
        connection_pipe = None
        try:

            connection_pipe = self.get_connection_pipe()
            producer = connection_pipe.channel
            use_time = time.time()
            producer.publish(exchange=exchange,
                                             body=body,
                                             delivery_mode=2,
                                             routing_key=routing_key,
                                             mandatory=mandatory
                                             )
            insert_result = True

        except Exception:
            insert_result = False
            put_into_queue_flag = False
        finally:

            if put_into_queue_flag is True:
                try:
                    connection_pipe.use_time = use_time
                    self._queue.put_nowait(connection_pipe)
                except Queue.Full:
                    self.close(connection_pipe)
            else:
                if connection_pipe is not None:
                    self.close(connection_pipe)

        return insert_result

class ConnectionPipe(object):
    """ connection和channel对象的封装 """

    def __init__(self, connection, channel):
        self.connection = connection
        self.channel = channel
        self.use_time = time.time()

    def close(self):
        try:
            self.connection.close()
        except Exception as ex:
            pass

class GetConnectionException():
    """ 获取链接异常 """
    pass复制代码

Thrift链接池

Thrift是什么呢?简而言之,Thrift定义一个简单的文件,包含数据类型和服务接口,以做为输入文件,编译器生成代码用来方便地生成RPC客户端和服务器通讯的方式。实际上就是一种远程调用的方式,由于协议栈为TCP层,因此相对于HTTP层效率会更高。

Thrift链接池的设计同数据库链接池相似,流程图以下:

思路依旧是,在获取链接时,先从链接池中获取链接,若池中无链接,则判断是否能够新建链接,若不能新建链接,则阻塞等待链接。

在从池中获取不到队列的时候的处理方式,本设计处理方式为:当获取不到链接时,将这部分请求放入一个等待队列,等待获取链接;而当关闭链接放回链接池时,优先判断这个队列是否有等待获取链接的请求,如有,则优先分配给这些请求。

获取不到链接时处理代码以下,将请求放入一个队列进行阻塞等待获取链接:

async_result = AsyncResult()
self.no_client_queue.appendleft(async_result)
client = async_result.get()  # blocking复制代码

而当有链接释放须要放回链接池时,须要优先考虑这部分请求,代码以下:

def put_back_connections(self, client):
    """ 线程安全 将链接放回链接池,逻辑以下: 一、若是有请求还没有获取到链接,请求优先 二、若是链接池中的链接的数目小于maxIdle,则将该链接放回链接池 三、关闭链接 :param client: :return: """
    with self.lock:
        if self.no_client_queue.__len__() > 0:
            task = self.no_client_queue.pop()
            task.set(client)
        elif self.connections.__len__() < self.maxIdle:
            self.connections.add(client)
        else:
            client.close()
            self.pool_size -= 1复制代码

最后,基于thrift链接池,介绍一个简单的服务化框架的实现。

服务化框架分为两部分:RPC、注册中心。
一、RPC:远程调用,远程调用的传输协议有不少种,能够走http、Webservice、TCP等。Thrift也是世界上主流的RPC框架。其重点在于安全、快速、最好能跨语言。
二、注册中心:用于存放,服务的IP地址和端口信息等。比较好的存放服务信息的方案有:Zookeeper、Redis等。其重点在于避免单点问题,而且好维护。

一般的架构图为:

经过Thrift链接池做为客户端,而Zookeeper做为注册中心,设计服务框架。具体就是服务端在启动服务的时候到Zookeeper进行注册,而客户端在启动的时候经过Zookeeper发现服务端的IP和端口,经过Thrift链接池轮询创建链接访问服务端的服务。

具体设计的代码以下,代码有点长,细细研读必定有所收获的:

# coding: utf-8

import threading
from collections import deque
import logging
import socket
import time
from kazoo.client import KazooClient
from thriftpy.protocol import TBinaryProtocolFactory
from thriftpy.transport import (
    TBufferedTransportFactory,
    TSocket,
)
from gevent.event import AsyncResult
from gevent import Timeout

from error import CTECThriftClientError
from thriftpy.thrift import TClient
from thriftpy.transport import TTransportException

class ClientPool:
    def __init__(self, service, server_hosts=None, zk_path=None, zk_hosts=None, logger=None, max_renew_times=3, maxActive=20, maxIdle=10, get_connection_timeout=30, socket_timeout=30, disable_time=3):
        """ :param service: Thrift的Service名称 :param server_hosts: 服务提供者地址,数组类型,['ip:port','ip:port'] :param zk_path: 服务提供者在zookeeper中的路径 :param zk_hosts: zookeeper的host地址,多个请用逗号隔开 :param max_renew_times: 最大重连次数 :param maxActive: 最大链接数 :param maxIdle: 最大空闲链接数 :param get_connection_timeout:获取链接的超时时间 :param socket_timeout: 读取数据的超时时间 :param disable_time: 链接失效时间 """
        # 负载均衡队列
        self.load_balance_queue = deque()
        self.service = service
        self.lock = threading.RLock()
        self.max_renew_times = max_renew_times
        self.maxActive = maxActive
        self.maxIdle = maxIdle
        self.connections = set()
        self.pool_size = 0
        self.get_connection_timeout = get_connection_timeout
        self.no_client_queue = deque()
        self.socket_timeout = socket_timeout
        self.disable_time = disable_time
        self.logger = logging if logger is None else logger

        if zk_hosts:
            self.kazoo_client = KazooClient(hosts=zk_hosts)
            self.kazoo_client.start()
            self.zk_path = zk_path
            self.zk_hosts = zk_hosts
            # 定义Watcher
            self.kazoo_client.ChildrenWatch(path=self.zk_path,
                                            func=self.watcher)
            # 刷新链接池中的链接对象
            self.__refresh_thrift_connections(self.kazoo_client.get_children(self.zk_path))
        elif server_hosts:
            self.server_hosts = server_hosts
            # 复制新的IP地址到负载均衡队列中
            self.load_balance_queue.extendleft(self.server_hosts)
        else:
            raise CTECThriftClientError('没有指定服务器获取方式!')

    def get_new_client(self):
        """ 轮询在每一个ip:port的链接池中获取链接(线程安全) 从当前队列右侧取出ip:port信息,获取client 将链接池对象放回到当前队列的左侧 请求或链接超时时间,默认30秒 :return: """
        with self.lock:
            if self.pool_size < self.maxActive:
                try:
                    ip = self.load_balance_queue.pop()
                except IndexError:
                    raise CTECThriftClientError('没有可用的服务提供者列表!')
                if ip:
                    self.load_balance_queue.appendleft(ip)
                    # 建立新的thrift client
                    t_socket = TSocket(ip.split(':')[0], int(ip.split(':')[1]),
                                       socket_timeout=1000 * self.socket_timeout)
                    proto_factory = TBinaryProtocolFactory()
                    trans_factory = TBufferedTransportFactory()
                    transport = trans_factory.get_transport(t_socket)
                    protocol = proto_factory.get_protocol(transport)
                    transport.open()
                    client = TClient(self.service, protocol)
                    self.pool_size += 1
                return client
            else:
                return None

    def close(self):
        """ 关闭全部链接池和zk客户端 :return: """
        if getattr(self, 'kazoo_client', None):
            self.kazoo_client.stop()

    def watcher(self, children):
        """ zk的watcher方法,负责检测zk的变化,刷新当前双端队列中的链接池 :param children: 子节点,即服务提供方的列表 :return: """
        self.__refresh_thrift_connections(children)

    def __refresh_thrift_connections(self, children):
        """ 刷新服务提供者在当前队列中的链接池信息(线程安全),主要用于zk刷新 :param children: :return: """
        with self.lock:
            # 清空负载均衡队列
            self.load_balance_queue.clear()
            # 清空链接池
            self.connections.clear()
            # 复制新的IP地址到负载均衡队列中
            self.load_balance_queue.extendleft(children)

    def __getattr__(self, name):
        """ 函数调用,最大重试次数为max_renew_times :param name: :return: """

        def method(*args, **kwds):

            # 从链接池获取链接
            client = self.get_client_from_pool()

            # 链接池中无链接
            if client is None:
                # 设置获取链接的超时时间
                time_out = Timeout(self.get_connection_timeout)
                time_out.start()
                try:
                    async_result = AsyncResult()
                    self.no_client_queue.appendleft(async_result)
                    client = async_result.get()  # blocking
                except:
                    with self.lock:
                        if client is None:
                            self.no_client_queue.remove(async_result)
                            self.logger.error("Get Connection Timeout!")
                finally:
                    time_out.cancel()

            if client is not None:

                for i in xrange(self.max_renew_times):

                    try:
                        put_back_flag = True
                        client.last_use_time = time.time()
                        fun = getattr(client, name, None)
                        return fun(*args, **kwds)
                    except socket.timeout:
                        self.logger.error("Socket Timeout!")
                        # 关闭链接,不关闭会致使乱序
                        put_back_flag = False
                        self.close_one_client(client)
                        break

                    except TTransportException, e:
                        put_back_flag = False

                        if e.type == TTransportException.END_OF_FILE:
                            self.logger.warning("Socket Connection Reset Error,%s", e)
                            with self.lock:
                                client.close()
                                self.pool_size -= 1
                                client = self.get_new_client()
                        else:
                            self.logger.error("Socket Error,%s", e)
                            self.close_one_client(client)
                            break

                    except socket.error, e:
                        put_back_flag = False
                        if e.errno == socket.errno.ECONNABORTED:
                            self.logger.warning("Socket Connection aborted Error,%s", e)
                            with self.lock:
                                client.close()
                                self.pool_size -= 1
                                client = self.get_new_client()
                        else:
                            self.logger.error("Socket Error, %s", e)
                            self.close_one_client(client)
                            break

                    except Exception as e:
                        put_back_flag = False

                        self.logger.error("Thrift Error, %s", e)
                        self.close_one_client(client)
                        break

                    finally:
                        # 将链接放回链接池
                        if put_back_flag is True:
                            self.put_back_connections(client)
            return None

        return method

    def close_one_client(self, client):
        """ 线程安全 关闭链接 :param client: :return: """
        with self.lock:
            client.close()
            self.pool_size -= 1

    def put_back_connections(self, client):
        """ 线程安全 将链接放回链接池,逻辑以下: 一、若是有请求还没有获取到链接,请求优先 二、若是链接池中的链接的数目小于maxIdle,则将该链接放回链接池 三、关闭链接 :param client: :return: """
        with self.lock:
            if self.no_client_queue.__len__() > 0:
                task = self.no_client_queue.pop()
                task.set(client)
            elif self.connections.__len__() < self.maxIdle:
                self.connections.add(client)
            else:
                client.close()
                self.pool_size -= 1

    def get_client_from_pool(self):
        """ 线程安全 从链接池中获取链接,若链接池中有链接,直接取出,不然, 新建一个链接,若一直没法获取链接,则返回None :return: """
        client = self.get_one_client_from_pool()

        if client is not None and (time.time() - client.last_use_time) < self.disable_time:
            return client
        else:
            if client is not None:
                self.close_one_client(client)

        client = self.get_new_client()
        if client is not None:
            return client

        return None

    def get_one_client_from_pool(self):
        """ 线程安全 从链接池中获取一个链接,若取不到链接,则返回None :return: """
        with self.lock:
            if self.connections:
                try:
                    return self.connections.pop()
                except KeyError:
                    return None
            return None复制代码
相关文章
相关标签/搜索