深刻研究sqlalchemy链接池

简介:

相对于最新的MySQL5.6,MariaDB在性能、功能、管理、NoSQL扩展方面包含了更丰富的特性。好比微秒的支持、线程池、子查询优化、组提交、进度报告等。html

本文就主要探索MariaDB当中链接池的一些特性,配置。来配合咱们的sqlalchemy。python

一:原由

原本是不会写这个东西的,可是,写好了python--flask程序,使用sqlalchemy+mariadb,部署之后老是出问题,500错误之类的。mysql

使用默认链接参数linux

engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan',)sql

错误提示是:docker

sqlalchemy.exc.OperationalError: (mysql.connector.errors.OperationalError) MySQL Connection not available. [SQL: 'SELECT public.id AS public_id, public.public_name AS public_public_name, public.public_email AS public_public_email \nFROM public \nWHERE public.public_name = %(public_name_1)s \n LIMIT %(param_1)s'] [parameters: [{}]] (Background on this error at: http://sqlalche.me/e/e3q8)数据库

http://sqlalche.me/e/e3q8:flask

OperationalError:

Exception raised for errors that are related to the database’s operation andnot necessarily under the control of the programmer, e.g. an unexpecteddisconnect occurs, the data source name is not found, a transaction could notbe processed, a memory allocation error occurred during processing, etc.windows

This error is aDBAPI Errorand originates fromthe database driver (DBAPI), not SQLAlchemy itself.api

TheOperationalErroris the most common (but not the only) error class usedby drivers in the context of the database connection being dropped, or notbeing able to connect to the database. For tips on how to deal with this, seethe sectionDealing with Disconnects.

意思是没有正确断开和数据库的链接。

二:处理断开

http://docs.sqlalchemy.org/en/latest/core/pooling.html#pool-disconnects

官方给了三种方案来解决这个问题:

1.悲观处理

engine = create_engine("mysql+pymysql://user:pw@host/db", pool_pre_ping=True)

pool_pre_ping=True

表示每次链接从池中检查,若是有错误,监测为断开的状态,链接将被当即回收。

2.自定义悲观的ping

from sqlalchemy import exc
from sqlalchemy import event
from sqlalchemy import select

some_engine = create_engine(...)

@event.listens_for(some_engine, "engine_connect")
def ping_connection(connection, branch):
    if branch:
        # "branch" refers to a sub-connection of a connection,
        # we don't want to bother pinging on these.
        return

    # turn off "close with result".  This flag is only used with
    # "connectionless" execution, otherwise will be False in any case
    save_should_close_with_result = connection.should_close_with_result
    connection.should_close_with_result = False

    try:
        # run a SELECT 1.   use a core select() so that
        # the SELECT of a scalar value without a table is
        # appropriately formatted for the backend
        connection.scalar(select([1]))
    except exc.DBAPIError as err:
        # catch SQLAlchemy's DBAPIError, which is a wrapper
        # for the DBAPI's exception.  It includes a .connection_invalidated
        # attribute which specifies if this connection is a "disconnect"
        # condition, which is based on inspection of the original exception
        # by the dialect in use.
        if err.connection_invalidated:
            # run the same SELECT again - the connection will re-validate
            # itself and establish a new connection.  The disconnect detection
            # here also causes the whole connection pool to be invalidated
            # so that all stale connections are discarded.
            connection.scalar(select([1]))
        else:
            raise
    finally:
        # restore "close with result"
        connection.should_close_with_result = save_should_close_with_result

说实话,没怎么看明白。

像是try一个select 语句,若是没问题就关闭。

 

3.乐观处理

from sqlalchemy import create_engine, exc
e = create_engine(...)
c = e.connect()

try:
    # suppose the database has been restarted.
    c.execute("SELECT * FROM table")
    c.close()
except exc.DBAPIError, e:
    # an exception is raised, Connection is invalidated.
    if e.connection_invalidated:
        print("Connection was invalidated!")

# after the invalidate event, a new connection
# starts with a new Pool
c = e.connect()
c.execute("SELECT * FROM table")

这个看懂了,try一个select语句,若是无效,就返回Connection was invalidated!,而后开一个新的链接,再去执行select。这个应该写个装饰器,放在每一个查询前面。

4.使用链接池回收

from sqlalchemy import create_engine
e = create_engine("mysql://scott:tiger@localhost/test", pool_recycle=3600)

这种方式就比较简单了,在链接参数中写上链接超时时间便可。

5.这是本身看文档找到的方法

from sqlalchemy.pool import QueuePool,NullPool,AssertionPool,StaticPool,SingletonThreadPool,Pool

在sqlalchemy.pool下有已经配置好的链接池,直接使用这些链接池也应该能够。

三:测试

docker run  --restart=always --privileged --name My_mariadb_01 -p 3301:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_02 -p 3302:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_03 -p 3303:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_04 -p 3304:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_05 -p 3305:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13

为避免因数据库交叉链接,首先开启5个MARIADB

Flask_Plan_01   8801       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan',)
Flask_Plan_02   8802       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', pool_pre_ping=True)
Flask_Plan_03   8803       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', poolclass=QueuePool)
Flask_Plan_04   8804       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', poolclass=NullPool)
Flask_Plan_05   8805       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', pool_recycle=3600)

用这5种链接参数进行链接测试。

若是你愿意,也能够继续开,QueuePool,NullPool,AssertionPool,StaticPool,SingletonThreadPool,Pool,把这几种都测试一下。

 

8801 8805 均会不一样程度的出现500错误,8801频率还高点。

sqlalchemy.exc.OperationalError: (mysql.connector.errors.OperationalError) MySQL Connection not available. [SQL: 'SELECT public.id AS public_id, public.public_name AS public_public_name, public.public_email AS public_public_email \nFROM public \nWHERE public.public_name = %(public_name_1)s \n LIMIT %(param_1)s'] [parameters: [{}]] (Background on this error at: http://sqlalche.me/e/e3q8)
sqlalchemy.exc.OperationalError: (mysql.connector.errors.OperationalError) MySQL Connection not available. [SQL: 'SELECT public.id AS public_id, public.public_name AS public_public_name, public.public_email AS public_public_email \nFROM public \nWHERE public.public_name = %(public_name_1)s \n LIMIT %(param_1)s'] [parameters: [{}]] (Background on this error at: http://sqlalche.me/e/e3q8)


 

Internal Server Error

The server encountered an internal error and was unable to complete your request. Either the server is overloaded or there is an error in the application.

等会儿看看8802  8803 8804如何。

四:深刻研究sqlalchemy源码

VENV\Flask_Base\Lib\site-packages\sqlalchemy\engine\__init__.py

看起来,没有默认值。因此engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan',)报错频率比较高。

五:研究pool源码

VENV\Flask_Base\Lib\site-packages\sqlalchemy\pool.py

看来poolclass的类型都定义在这里了。

1.SingletonThreadPool

A Pool that maintains one connection per thread

每一个线程维护一个链接的池。

2.QueuePool

A :class:`.Pool` that imposes a limit on the number of open connections.

这种方式限制了链接数量,QueuePool是默认的链接池方式,除非使用了方言,也就是第三方连接库。

难怪我使用MySQL-connector-python时老出错呢,没打开链接池啊。

3.NullPool

A Pool which does not pool connections...

不使用链接池

4.StaticPool

A Pool of exactly one connection, used for all requests.

一个完整的链接池,用于全部的链接。

5.AssertionPool

A :class:`.Pool` that allows at most one checked out connection at any given time.

任什么时候间只给一个签出链接?为了debug模式?不懂了。

看的官方说明也没这么详细。

这么看来,若是我使用默认连接库,能够不加参数试试。

mysql-python是sqlalchemy默认的mysql连接库,我在windows下装不上。放弃测试默认连接库,手动指定链接池为QueuePool。

或者指定链接池类型为:QueuePool   StaticPool   SingletonThreadPool(多线程的时候)

六:链接池类型测试

修改测试docker

docker run  --restart=always --privileged --name My_mariadb_01 -p 3301:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_02 -p 3302:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_03 -p 3303:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_04 -p 3304:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_05 -p 3305:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_06 -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13

Flask_Plan_01   8801       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', pool_pre_ping=True))
Flask_Plan_02   8802       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', poolclass=SingletonThreadPool)
Flask_Plan_03   8803       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', poolclass=QueuePool)
Flask_Plan_04   8804       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', poolclass=NullPool)
Flask_Plan_05   8805       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', poolclass=StaticPool)
Flask_Plan_06   8806       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', poolclass=AssertionPool)

七:编写测试脚本

 

import requests
import time
i = 1
while True:
    try:
        r=requests.get('http://192.168.0.104:8801',timeout=5)
        if  r.status_code==200:
            print(time.strftime('%Y-%m-%d %H:%M:%S')+'---'+str(i)+'---'+str(r.status_code)+'---ok')
        else:
            print(time.strftime('%Y-%m-%d %H:%M:%S') + '---' + str(i) + '---' + str(r.status_code) + '-----------badr')
            break
        time.sleep(1)
        i+=1
    except:
        print('except')
        print(time.strftime('%Y-%m-%d %H:%M:%S') +'---'+str(i)+'-----------bad')
        break

修改地址,把几个测试服务都开始跑。

出错就会停了。

代码很烂,凑活测试而已。

从晚上22:30睡觉到早上6:10起床,pool_pre_ping=True,SingletonThreadPool,QueuePool,NullPool,StaticPool,AssertionPool,都很稳定,访问代码都是200

八:继续研究相关代码

http://docs.sqlalchemy.org/en/latest/core/pooling.html?highlight=use_threadlocal#using-connection-pools-with-multiprocessing

使用链接池进行多重处理

http://docs.sqlalchemy.org/en/latest/core/pooling.html?highlight=use_threadlocal#api-documentation-available-pool-implementations

api文档--链接池的实现

classsqlalchemy.pool.Pool(creator,recycle=-1,echo=None,use_threadlocal=False,logging_name=None,reset_on_return=True,listeners=None,events=None,dialect=None,pre_ping=False,_dispatch=None)

 

Parameters:    
creator–可调用的函数返回对象。
recycle– 超时回收时间。若是链接超过这个时间,链接就被关闭,换一个新的链接
logging_name - 日志标识名称
echo– 是否打印sql语句
use_threadlocal–是否使用线程,在同一应用程序的线程使用相同的链接对象
reset_on_return–在返回前的操做
    rollback,大概是自动回滚
    True 同为回滚
    commit 大概是自动提交的意思
    None 无操做
    none 无操做
    False 无操做
events– 列表元组,每一个表单会传递给listen………………没搞懂
listeners - 弃用,被listen取代
dialect–连接库,使用create_engine时不使用,由引擎建立时处理
pre_ping–是否测试链接

基本上这些参数都在engine-creation-api中

http://docs.sqlalchemy.org/en/rel_1_0/core/engines.html#engine-creation-api

Pool                  (creator,recycle=-1,echo=None,use_threadlocal=False,logging_name=None,reset_on_return=True,listeners=None,events=None,dialect=None,pre_ping=False,_dispatch=None)
StaticPool         (creator,recycle=-1,echo=None,use_threadlocal=False,logging_name=None,reset_on_return=True,listeners=None,events=None,dialect=None,pre_ping=False,_dispatch=None)
NullPool            (creator,recycle=-1,echo=None,use_threadlocal=False,logging_name=None,reset_on_return=True,listeners=None,events=None,dialect=None,pre_ping=False,_dispatch=None)
QueuePool          (creator,pool_size=5,max_overflow=10,timeout=30,**kw)
SingletonThreadPool(creator,pool_size=5,**kw)
AssertionPool      (*args,**kw)

这下清楚了,Pool,StaicPool,NullPool,都同样,直接回收,效率必定低了。

咱们就指定默认的QueuePool好了。之后观察着服务器的负载,负载大了之后,调整就行了。

自定义方法以下:

engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan',
                       pool_size=5,
                       max_overflow=10,
                       pool_timeout=30,
                       pool_pre_ping=True)

九:总结

曲折的道路,终于找到了解决方案。

sqlalchemy的教程当中,不多有讲如何部署的。不少又是linux开发。可能在linux下很容易装默认连接库,部署的时候就自动使用了QueuePool链接池。因此这种问题不多出现。

我在windows下开发,部署在linux,开发和部署都使用了非默认连接库,致使没有使用默认链接池。

那么随着深刻研究,找到了链接池的配置,并掌握这一知识,为之后的开发部署工做,扫除了障碍。

虽然源码里面还有不少看不懂,可是读书百遍其义自见,仍是要多读(我是懒蛋,遇到问题,再去解决,下一个问题是什么呢?)。

相关文章
相关标签/搜索