个人第一个python web开发框架(25)——定制ORM(一)

  在开始编写ORM模块以前,咱们须要先对db_helper进行重构,由于ORM最终生成的sql是须要转给db_helper来执行的,因此拥有一个功能完善、健壮的数据库操做类是很是必要的。前端

  这是项目原db_helper.py代码python

#!/usr/bin/env python
# coding=utf-8

import psycopg2
from common import log_helper
from config import const

# 初始化数据库参数
db_name = const.DB_NAME
db_host = const.DB_HOST
db_port = const.DB_PORT
db_user = const.DB_USER
db_pass = const.DB_PASS


def read(sql):
    """
    链接pg数据库并进行数据查询
    若是链接失败,会把错误写入日志中,并返回false,若是sql执行失败,也会把错误写入日志中,并返回false
    若是全部执行正常,则返回查询到的数据,这个数据是通过转换的,转成字典格式,方便模板调用,其中字典的key是数据表里的字段名
    """
    try:
        # 链接数据库
        conn = psycopg2.connect(database=db_name, user=db_user, password=db_pass, host=db_host, port=db_port)
        # 获取游标
        cursor = conn.cursor()
    except Exception as e:
        print(e.args)
        log_helper.error('链接数据库失败:' + str(e.args))
        return False
    try:
        # 执行查询操做
        cursor.execute(sql)
        # 将返回的结果转换成字典格式
        data = [dict((cursor.description[i][0], value) for i, value in enumerate(row)) for row in cursor.fetchall()]
    except Exception as e:
        print(e.args)
        log_helper.error('sql执行失败:' + str(e.args) + ' sql:' + str(sql))
        return False
    finally:
        # 关闭游标和数据库连接
        cursor.close()
        conn.close()
    # 返回结果(字典格式)
    return data


def write(sql, vars):
    """
    链接pg数据库并进行写的操做
    若是链接失败,会把错误写入日志中,并返回false,若是sql执行失败,也会把错误写入日志中,并返回false,若是全部执行正常,则返回true
    """
    try:
        # 链接数据库
        conn = psycopg2.connect(database=db_name, user=db_user, password=db_pass, host=db_host, port=db_port)
        # 获取游标
        cursor = conn.cursor()
    except Exception as e:
        print(e.args)
        log_helper.error('链接数据库失败:' + str(e.args))
        return False
    try:
        # 执行sql语句
        cursor.execute(sql, vars)
        # 提交事务
        conn.commit()
    except Exception as e:
        print(e.args)
        # 若是出错,则事务回滚
        conn.rollback()
        log_helper.error('sql执行失败:' + str(e.args) + ' sql:' + str(sql))
        return False
    else:
        # 获取数据
        try:
            data = [dict((cursor.description[i][0], value) for i, value in enumerate(row))
                         for row in cursor.fetchall()]
        except Exception as e:
            # 没有设置returning或执行修改或删除语句时,记录不存在
            data = None
    finally:
        # 关闭游标和数据库连接
        cursor.close()
        conn.close()

    # 若是写入数据后,将数据库返回的数据返回给调用者
    return data
View Code

  经过对代码的简单分析,能够看到整个模块在初化时,载入数据库连接配置,对数据库的操做也只有简单读与写操做。这样的功能对于通常的数据库增删改查操做已经足够了,但若是业务复杂,有多个库、须要用到事务或者须要访问不一样类型数据库时,它就不够用了。因此首先要作的就是对它进行重构,功能进行完善。sql

  首先咱们须要将配置独立出来,当有须要连接多个数据库时,能够读取不一样的配置文件,让程序更加方便灵活。数据库

  在config目录下建立db_config.py文件(有多个库时,能够配置多个不一样的参数来引用)ide

#!/usr/bin/env python
# coding=utf-8


### 数据库连接参数 ###
DB = {
    'db_host': '127.0.0.1',
    'db_port': 5432,
    'db_name': 'simple_db',
    'db_user': 'postgres',
    'db_pass': '123456'
}
# 是否将全部要执行的Sql语句输出到日志里
IS_OUTPUT_SQL = False

  在配置中,咱们一样定义了数据库链接地址、端口、数据库名称、用户名与密码。函数

  另外,为了方便咱们进行排错,检查sql的生成状况,添加了IS_OUTPUT_SQL是否输出执行的sql语句到日志中这一个开关项,设置为True时,全部被执行的sql语句都会被写到日志中,方便下载日志下来进行分析。post

 

  对于数据库操做模块,咱们须要封装成一个类,在有须要调用时,就能够经过with语句进行初始化操做,设置对应的数据库连接配置,灵活的链接不一样的数据库。测试

  在设计操做类时,咱们须要思考几个问题:fetch

  1.它能够支持多数据库操做,即读取不一样的配置能链接操做不一样的数据库(能够经过类初始化时进行注入配置信息)优化

  2.它须要支持with语句,当咱们忘记关闭数据库游标和链接时,自动帮咱们关闭(须要实现__enter__()与__exit__()方法)

  3.它须要支持数据库事务,当执行失败时,能够回滚数据,当全部sql执行都成功时,能够统一提交事务(须要建立rollback()与commit()方法)

  4.它须要支持查询、添加、修改、删除等操做,方便咱们操做关系型数据库记录(须要建立sql执行方法)

  5.它须要支持sql执行优化,将超出指定执行时间的sql语句记录到日志中,方便开发人员进行分析(须要记录sql执行起始时间与结束时间,并进行计算,当这个时间大于指定值时执行日志写入程序)

  根据这些要求,咱们初步设计出数据库操做类的基本模型

class PgHelper(object):
    """postgresql数据库操做类"""

    def __init__(self, db, is_output_sql):
        """初始化数据库操做类配置信息"""

    def open_conn(self):
        """链接数据库,并创建游标"""

    def close_conn(self):
        """关闭postgresql数据库连接"""

    def __enter__(self):
        """初始化数据库连接"""

    def __exit__(self, type, value, trace):
        """关闭postgresql数据库连接"""

    def rollback(self):
        """回滚操做"""

    def commit(self):
        """提交事务"""

    def execute(self, query, vars=None):
        """执行sql语句查询,返回结果集或影响行数"""

    def write_log(self, start_time, end_time, sql):
        """记录Sql执行超时日志"""

 

 

  接下来,咱们来一一实现上面的各个方法

  首先是完成初始化方法,咱们能够经过注入的方法,将db_config配置信息里的参数注入进来初始化。链接不一样的数据库时,能够注入不一样的配置信息。

class PgHelper(object):
    """postgresql数据库操做类"""

    def __init__(self, db, is_output_sql):
        self.connect = None
        self.cursor = None
        # 初始化数据库参数
        self.db_name = db.get('db_name')
        self.db_user = db.get('db_user')
        self.db_pass = db.get('db_pass')
        self.db_host = db.get('db_host')
        self.db_port = db.get('db_port')
        # 是否将全部要执行的Sql语句输出到日志里
        self.is_output_sql = is_output_sql

 

  而后咱们来建立数据库打开链接方法与关闭链接的方法,当数据库链接失败时会抛出异常,程序会自动调用log_helper.error()方法,将异常写入日志当中,并第一时间发送邮件通知开发人员,方便开发人员即时排错。

    def open_conn(self):
        """链接数据库,并创建游标"""
        try:
            if not self.connect:
                self.connect = psycopg2.connect(database=self.db_name, user=self.db_user, password=self.db_pass, host=self.db_host, port=self.db_port)
            return self.connect
        except Exception as e:
            log_helper.error('链接数据库失败:' + str(e.args))
            return False

    def close_conn(self):
        """关闭postgresql数据库连接"""
        # 关闭游标
        try:
            if self.cursor:
                self.cursor.close()
        except Exception:
            pass
        # 关闭数据库连接
        try:
            if self.connect:
                self.connect.close()
        except Exception:
            pass

 

  经过重写内置__enter__()与__exit__()方法,来实现with语句调用本类时,会自动对类进行初始化操做,自动建立数据库链接。当代码执行完毕后(程序退出with语句时),程序会自动调用对应的方法,将游标与数据库链接的关闭,避免手动操做时,忘记关闭链接出现异常。

    def __enter__(self):
        """初始化数据库连接"""
        self.open_conn()
        return self

    def __exit__(self, type, value, trace):
        """关闭postgresql数据库连接"""
        self.close_conn()

 

  为了方便事务处理,增长回滚方法。用于事务中执行操做失败时,调用回滚方法执行回滚操做。

    def rollback(self):
        """回滚操做"""
        try:
            # 异常时,进行回滚操做
            if self.connect:
                self.connect.rollback()
                self.close_conn()
        except Exception as e:
            log_helper.error('回滚操做失败:' + str(e.args))

 

  还须要增长事务提交方法,方便sql执行增删改为功之后,提交事务更新数据。在开发中不少朋友常常会忘记执行提交事务操做,一直觉得代码有问题没有执行成功。

    def commit(self):
        """提交事务"""
        try:
            if self.connect:
                self.connect.commit()
                self.close_conn()
        except Exception as e:
            log_helper.error('提交事务失败:' + str(e.args))

 

  为了方便查看sql语句转换效果,咱们还能够增长获取sql语句生成方法,固然这个方法并无太大的用途。

    def get_sql(self, query, vars=None):
        """获取编译后的sql语句"""
        # 记录程序执行开始时间
        start_time = time.clock()
        try:
            # 判断是否记录sql执行语句
            if self.is_output_sql:
                log_helper.info('sql:' + str(query))
            # 创建游标
            self.cursor = self.connect.cursor()
            # 执行SQL
            self.data = self.cursor.mogrify(query, vars)
        except Exception as e:
            # 将异常写入到日志中
            log_helper.error('sql生成失败:' + str(e.args) + ' query:' + str(query))
            self.data = '获取编译sql失败'
        finally:
            # 关闭游标
            self.cursor.close()
        # 记录程序执行结束时间
        end_time = time.clock()
        # 写入日志
        self.write_log(start_time, end_time, query)

        return self.data

  由于,当你直接使用完整的sql语句执行时,并不须要这个方法。可是,你使用的是下面方式,执行后就会生成组合好的sql语句,帮助咱们分析sql语句生成状况

# 使用with方法,初始化数据库链接
with db_helper.PgHelper(db_config.DB, db_config.IS_OUTPUT_SQL) as db:
    # 设置sql执行语句
    sql = """insert into product (name, code) values (%s, %s) returning id"""
    # 设置提交参数
    vars = ('zhangsan', '201807251234568')
    # 生成sql语句,并打印到控制台
    print(db.get_sql(sql, vars))

  输出结果:

b"insert into product (name, code) values ('zhangsan', '201807251234568') returning id"

 

  数据库最多见的操做就是增删改查操做,因为postgresql有个很是好用的特殊参数:returning,它能够在sql执行增改删结束后,返回咱们想要的字段值,方便咱们进行相应的判断与操做,因此增改删操做咱们不须要将它与查询操做分离成两个方法,统一使用这个方法来获取数据库中返回的记录值。

  在实现这个方法以前,咱们设计时要思考这几个问题:

  1.须要记录程序执行的起始与结束时间,计算sql语句执行时长,用来判断是否记录到日志中,方便开发人员进行分析优化sql语句

  2.须要根据参数判断,是否须要将全部执行的sql语句记录到日志中,方便开发人员有须要时,查看执行了哪些sql语句,进行数据与功能分析

  3.因为类在加载时就已经自动链接数据库了,因此在方法中不须要进行打开数据库连接操做

  5.在执行sql语句时,须要建立游标,而后执行sql语句

  6.为了让用户更好的体验,减小异常的直接抛出,须要进行异常捕捉,并将捕捉到的异常进行处理,记录到日志中方便开发人员分析错误,同时同步发送推送给相关人员,即时提醒错误

  7.sql执行成功之后,须要对返回的数据进行处理,组合成字典类型,方便前端使用

  8.完成数据处理后,须要及时关闭游标

  9.对返回的数据须要进行处理后,返回给上一级程序

 1     def execute(self, query, vars=None):
 2         """执行sql语句查询,返回结果集或影响行数"""
 3         if not query:
 4             return None
 5         # 记录程序执行开始时间
 6         start_time = time.clock()
 7         try:
 8             # 判断是否记录sql执行语句
 9             if self.is_output_sql:
10                 log_helper.info('sql:' + str(query))
11             # 创建游标
12             self.cursor = self.connect.cursor()
13             # 执行SQL
14             result = self.cursor.execute(query, vars)
15             print(str(result))
16         except Exception as e:
17             # 将异常写入到日志中
18             log_helper.error('sql执行失败:' + str(e.args) + ' query:' + str(query))
19             self.data = None
20         else:
21             # 获取数据
22             try:
23                 if self.cursor.description:
24                     # 在执行insert/update/delete等更新操做时,若是添加了returning,则读取返回数据组合成字典返回
25                     self.data = [dict((self.cursor.description[i][0], value) for i, value in enumerate(row)) for row in self.cursor.fetchall()]
26                 else:
27                     # 若是执行insert/update/delete等更新操做时没有添加returning,则返回影响行数,值为0时表时没有数据被更新
28                     self.data = self.cursor.rowcount
29             except Exception as e:
30                 # 将异常写入到日志中
31                 log_helper.error('数据获取失败:' + str(e.args) + ' query:' + str(query))
32                 self.data = None
33         finally:
34             # 关闭游标
35             self.cursor.close()
36         # 记录程序执行结束时间
37         end_time = time.clock()
38         # 写入日志
39         self.write_log(start_time, end_time, query)
40 
41         # 若是有返回数据,则把该数据返回给调用者
42         return self.data

 

   最后一个是记录超时sql语句到日志方法,这里我将大于0.1秒的sql语句都记录下来

    def write_log(self, start_time, end_time, sql):
        """记录Sql执行超时日志"""
        t = end_time - start_time
        if (t) > 0.1:
            content = ' '.join(('run time:', str(t), 's sql:', sql))
            log_helper.info(content)

 

 

  完成的db_helper.py代码

  1 #!/usr/bin/env python
  2 # coding=utf-8
  3 
  4 import psycopg2
  5 import time
  6 from io import StringIO
  7 from common import log_helper, file_helper
  8 
  9 
 10 class PgHelper(object):
 11     """postgresql数据库操做类"""
 12 
 13     def __init__(self, db, is_output_sql):
 14         self.connect = None
 15         self.cursor = None
 16         # 初始化数据库参数
 17         self.db_name = db.get('db_name', '')
 18         self.db_user = db.get('db_user', '')
 19         self.db_pass = db.get('db_pass', '')
 20         self.db_host = db.get('db_host', '')
 21         self.db_port = db.get('db_port', '')
 22         # 是否将全部要执行的Sql语句输出到日志里
 23         self.is_output_sql = is_output_sql
 24 
 25     def open_conn(self):
 26         """链接数据库,并创建游标"""
 27         try:
 28             if not self.connect:
 29                 self.connect = psycopg2.connect(database=self.db_name, user=self.db_user, password=self.db_pass,
 30                                                 host=self.db_host, port=self.db_port)
 31             return self.connect
 32         except Exception as e:
 33             log_helper.error('链接数据库失败:' + str(e.args))
 34             return False
 35 
 36     def close_conn(self):
 37         """关闭postgresql数据库连接"""
 38         # 关闭游标
 39         try:
 40             if self.cursor:
 41                 self.cursor.close()
 42         except Exception:
 43             pass
 44         # 关闭数据库连接
 45         try:
 46             if self.connect:
 47                 self.connect.close()
 48         except Exception:
 49             pass
 50 
 51     def __enter__(self):
 52         """初始化数据库连接"""
 53         self.open_conn()
 54         return self
 55 
 56     def __exit__(self, type, value, trace):
 57         """关闭postgresql数据库连接"""
 58         self.close_conn()
 59 
 60     def rollback(self):
 61         """回滚操做"""
 62         try:
 63             # 异常时,进行回滚操做
 64             if self.connect:
 65                 self.connect.rollback()
 66         except Exception as e:
 67             log_helper.error('回滚操做失败:' + str(e.args))
 68 
 69     def commit(self):
 70         """提交事务"""
 71         try:
 72             if self.connect:
 73                 self.connect.commit()
 74                 self.close_conn()
 75         except Exception as e:
 76             log_helper.error('提交事务失败:' + str(e.args))
 77 
 78     def get_sql(self, query, vars=None):
 79         """获取编译后的sql语句"""
 80         # 记录程序执行开始时间
 81         start_time = time.clock()
 82         try:
 83             # 判断是否记录sql执行语句
 84             if self.is_output_sql:
 85                 log_helper.info('sql:' + str(query))
 86             # 创建游标
 87             self.cursor = self.connect.cursor()
 88             # 执行SQL
 89             self.data = self.cursor.mogrify(query, vars)
 90         except Exception as e:
 91             # 将异常写入到日志中
 92             log_helper.error('sql生成失败:' + str(e.args) + ' query:' + str(query))
 93             self.data = '获取编译sql失败'
 94         finally:
 95             # 关闭游标
 96             self.cursor.close()
 97         # 记录程序执行结束时间
 98         end_time = time.clock()
 99         # 写入日志
100         self.write_log(start_time, end_time, query)
101 
102         return self.data
103 
104     def copy(self, values, table_name, columns):
105         """
106         百万级数据更新函数
107         :param values: 更新内容,字段之间用\t分隔,记录之间用\n分隔 "1\taaa\tabc\n2\bbb\abc\n"
108         :param table_name: 要更新的表名称
109         :param columns: 须要更新的字段名称:例:('id','userame','passwd')
110         :return:
111         """
112         try:
113             # 创建游标
114             self.cursor = self.connect.cursor()
115             self.cursor.copy_from(StringIO(values), table_name, columns=columns)
116             self.connect.commit()
117             return True
118         except Exception as e:
119             # 将异常写入到日志中
120             log_helper.error('批量更新失败:' + str(e.args) + ' table:' + table_name)
121         finally:
122             # 关闭游标
123             self.cursor.close()
124 
125     def execute(self, query, vars=None):
126         """执行sql语句查询,返回结果集或影响行数"""
127         if not query:
128             return None
129         # 记录程序执行开始时间
130         start_time = time.clock()
131         try:
132             # 判断是否记录sql执行语句
133             if self.is_output_sql:
134                 log_helper.info('sql:' + str(query))
135             # 创建游标
136             self.cursor = self.connect.cursor()
137             # 执行SQL
138             result = self.cursor.execute(query, vars)
139             print(str(result))
140         except Exception as e:
141             # 将异常写入到日志中
142             log_helper.error('sql执行失败:' + str(e.args) + ' query:' + str(query))
143             self.data = None
144         else:
145             # 获取数据
146             try:
147                 if self.cursor.description:
148                     # 在执行insert/update/delete等更新操做时,若是添加了returning,则读取返回数据组合成字典返回
149                     self.data = [dict((self.cursor.description[i][0], value) for i, value in enumerate(row)) for row in self.cursor.fetchall()]
150                 else:
151                     # 若是执行insert/update/delete等更新操做时没有添加returning,则返回影响行数,值为0时表时没有数据被更新
152                     self.data = self.cursor.rowcount
153             except Exception as e:
154                 # 将异常写入到日志中
155                 log_helper.error('数据获取失败:' + str(e.args) + ' query:' + str(query))
156                 self.data = None
157         finally:
158             # 关闭游标
159             self.cursor.close()
160         # 记录程序执行结束时间
161         end_time = time.clock()
162         # 写入日志
163         self.write_log(start_time, end_time, query)
164 
165         # 若是有返回数据,则把该数据返回给调用者
166         return self.data
167 
168 
169     def write_log(self, start_time, end_time, sql):
170         """记录Sql执行超时日志"""
171         t = end_time - start_time
172         if (t) > 0.1:
173             content = ' '.join(('run time:', str(t), 's sql:', sql))
174             log_helper.info(content)
View Code

  测试代码

 1 #!/usr/bin/evn python
 2 # coding=utf-8
 3 
 4 import unittest
 5 from common import db_helper
 6 from config import db_config
 7 
 8 class DbHelperTest(unittest.TestCase):
 9     """数据库操做包测试类"""
10 
11     def setUp(self):
12         """初始化测试环境"""
13         print('------ini------')
14 
15     def tearDown(self):
16         """清理测试环境"""
17         print('------clear------')
18 
19     def test(self):
20         # 使用with方法,初始化数据库链接
21         with db_helper.PgHelper(db_config.DB, db_config.IS_OUTPUT_SQL) as db:
22             # 设置sql执行语句
23             sql = """insert into product (name, code) values (%s, %s) returning id"""
24             # 设置提交参数
25             vars = ('张三', '201807251234568')
26             # 生成sql语句,并打印到控制台
27             print(db.get_sql(sql, vars))
28 
29             db.execute('select * from product where id=1000')
30             db.execute('insert into product (name, code) values (%s, %s) returning id', ('张三', '201807251234568'))
31             db.commit()
32 
33 if __name__ == '__main__':
34     unittest.main()
View Code

 

 

 

版权声明:本文原创发表于 博客园,做者为 AllEmpty 本文欢迎转载,但未经做者赞成必须保留此段声明,且在文章页面明显位置给出原文链接,不然视为侵权。

python开发QQ群:669058475(本群已满)、733466321(能够加2群)    做者博客:http://www.cnblogs.com/EmptyFS/

相关文章
相关标签/搜索