在开始编写ORM模块以前,咱们须要先对db_helper进行重构,由于ORM最终生成的sql是须要转给db_helper来执行的,因此拥有一个功能完善、健壮的数据库操做类是很是必要的。前端
这是项目原db_helper.py代码python
#!/usr/bin/env python # coding=utf-8 import psycopg2 from common import log_helper from config import const # 初始化数据库参数 db_name = const.DB_NAME db_host = const.DB_HOST db_port = const.DB_PORT db_user = const.DB_USER db_pass = const.DB_PASS def read(sql): """ 链接pg数据库并进行数据查询 若是链接失败,会把错误写入日志中,并返回false,若是sql执行失败,也会把错误写入日志中,并返回false 若是全部执行正常,则返回查询到的数据,这个数据是通过转换的,转成字典格式,方便模板调用,其中字典的key是数据表里的字段名 """ try: # 链接数据库 conn = psycopg2.connect(database=db_name, user=db_user, password=db_pass, host=db_host, port=db_port) # 获取游标 cursor = conn.cursor() except Exception as e: print(e.args) log_helper.error('链接数据库失败:' + str(e.args)) return False try: # 执行查询操做 cursor.execute(sql) # 将返回的结果转换成字典格式 data = [dict((cursor.description[i][0], value) for i, value in enumerate(row)) for row in cursor.fetchall()] except Exception as e: print(e.args) log_helper.error('sql执行失败:' + str(e.args) + ' sql:' + str(sql)) return False finally: # 关闭游标和数据库连接 cursor.close() conn.close() # 返回结果(字典格式) return data def write(sql, vars): """ 链接pg数据库并进行写的操做 若是链接失败,会把错误写入日志中,并返回false,若是sql执行失败,也会把错误写入日志中,并返回false,若是全部执行正常,则返回true """ try: # 链接数据库 conn = psycopg2.connect(database=db_name, user=db_user, password=db_pass, host=db_host, port=db_port) # 获取游标 cursor = conn.cursor() except Exception as e: print(e.args) log_helper.error('链接数据库失败:' + str(e.args)) return False try: # 执行sql语句 cursor.execute(sql, vars) # 提交事务 conn.commit() except Exception as e: print(e.args) # 若是出错,则事务回滚 conn.rollback() log_helper.error('sql执行失败:' + str(e.args) + ' sql:' + str(sql)) return False else: # 获取数据 try: data = [dict((cursor.description[i][0], value) for i, value in enumerate(row)) for row in cursor.fetchall()] except Exception as e: # 没有设置returning或执行修改或删除语句时,记录不存在 data = None finally: # 关闭游标和数据库连接 cursor.close() conn.close() # 若是写入数据后,将数据库返回的数据返回给调用者 return data
经过对代码的简单分析,能够看到整个模块在初化时,载入数据库连接配置,对数据库的操做也只有简单读与写操做。这样的功能对于通常的数据库增删改查操做已经足够了,但若是业务复杂,有多个库、须要用到事务或者须要访问不一样类型数据库时,它就不够用了。因此首先要作的就是对它进行重构,功能进行完善。sql
首先咱们须要将配置独立出来,当有须要连接多个数据库时,能够读取不一样的配置文件,让程序更加方便灵活。数据库
在config目录下建立db_config.py文件(有多个库时,能够配置多个不一样的参数来引用)ide
#!/usr/bin/env python # coding=utf-8 ### 数据库连接参数 ### DB = { 'db_host': '127.0.0.1', 'db_port': 5432, 'db_name': 'simple_db', 'db_user': 'postgres', 'db_pass': '123456' } # 是否将全部要执行的Sql语句输出到日志里 IS_OUTPUT_SQL = False
在配置中,咱们一样定义了数据库链接地址、端口、数据库名称、用户名与密码。函数
另外,为了方便咱们进行排错,检查sql的生成状况,添加了IS_OUTPUT_SQL是否输出执行的sql语句到日志中这一个开关项,设置为True时,全部被执行的sql语句都会被写到日志中,方便下载日志下来进行分析。post
对于数据库操做模块,咱们须要封装成一个类,在有须要调用时,就能够经过with语句进行初始化操做,设置对应的数据库连接配置,灵活的链接不一样的数据库。测试
在设计操做类时,咱们须要思考几个问题:fetch
1.它能够支持多数据库操做,即读取不一样的配置能链接操做不一样的数据库(能够经过类初始化时进行注入配置信息)优化
2.它须要支持with语句,当咱们忘记关闭数据库游标和链接时,自动帮咱们关闭(须要实现__enter__()与__exit__()方法)
3.它须要支持数据库事务,当执行失败时,能够回滚数据,当全部sql执行都成功时,能够统一提交事务(须要建立rollback()与commit()方法)
4.它须要支持查询、添加、修改、删除等操做,方便咱们操做关系型数据库记录(须要建立sql执行方法)
5.它须要支持sql执行优化,将超出指定执行时间的sql语句记录到日志中,方便开发人员进行分析(须要记录sql执行起始时间与结束时间,并进行计算,当这个时间大于指定值时执行日志写入程序)
根据这些要求,咱们初步设计出数据库操做类的基本模型
class PgHelper(object): """postgresql数据库操做类""" def __init__(self, db, is_output_sql): """初始化数据库操做类配置信息""" def open_conn(self): """链接数据库,并创建游标""" def close_conn(self): """关闭postgresql数据库连接""" def __enter__(self): """初始化数据库连接""" def __exit__(self, type, value, trace): """关闭postgresql数据库连接""" def rollback(self): """回滚操做""" def commit(self): """提交事务""" def execute(self, query, vars=None): """执行sql语句查询,返回结果集或影响行数""" def write_log(self, start_time, end_time, sql): """记录Sql执行超时日志"""
接下来,咱们来一一实现上面的各个方法
首先是完成初始化方法,咱们能够经过注入的方法,将db_config配置信息里的参数注入进来初始化。链接不一样的数据库时,能够注入不一样的配置信息。
class PgHelper(object): """postgresql数据库操做类""" def __init__(self, db, is_output_sql): self.connect = None self.cursor = None # 初始化数据库参数 self.db_name = db.get('db_name') self.db_user = db.get('db_user') self.db_pass = db.get('db_pass') self.db_host = db.get('db_host') self.db_port = db.get('db_port') # 是否将全部要执行的Sql语句输出到日志里 self.is_output_sql = is_output_sql
而后咱们来建立数据库打开链接方法与关闭链接的方法,当数据库链接失败时会抛出异常,程序会自动调用log_helper.error()方法,将异常写入日志当中,并第一时间发送邮件通知开发人员,方便开发人员即时排错。
def open_conn(self): """链接数据库,并创建游标""" try: if not self.connect: self.connect = psycopg2.connect(database=self.db_name, user=self.db_user, password=self.db_pass, host=self.db_host, port=self.db_port) return self.connect except Exception as e: log_helper.error('链接数据库失败:' + str(e.args)) return False def close_conn(self): """关闭postgresql数据库连接""" # 关闭游标 try: if self.cursor: self.cursor.close() except Exception: pass # 关闭数据库连接 try: if self.connect: self.connect.close() except Exception: pass
经过重写内置__enter__()与__exit__()方法,来实现with语句调用本类时,会自动对类进行初始化操做,自动建立数据库链接。当代码执行完毕后(程序退出with语句时),程序会自动调用对应的方法,将游标与数据库链接的关闭,避免手动操做时,忘记关闭链接出现异常。
def __enter__(self): """初始化数据库连接""" self.open_conn() return self def __exit__(self, type, value, trace): """关闭postgresql数据库连接""" self.close_conn()
为了方便事务处理,增长回滚方法。用于事务中执行操做失败时,调用回滚方法执行回滚操做。
def rollback(self): """回滚操做""" try: # 异常时,进行回滚操做 if self.connect: self.connect.rollback() self.close_conn() except Exception as e: log_helper.error('回滚操做失败:' + str(e.args))
还须要增长事务提交方法,方便sql执行增删改为功之后,提交事务更新数据。在开发中不少朋友常常会忘记执行提交事务操做,一直觉得代码有问题没有执行成功。
def commit(self): """提交事务""" try: if self.connect: self.connect.commit() self.close_conn() except Exception as e: log_helper.error('提交事务失败:' + str(e.args))
为了方便查看sql语句转换效果,咱们还能够增长获取sql语句生成方法,固然这个方法并无太大的用途。
def get_sql(self, query, vars=None): """获取编译后的sql语句""" # 记录程序执行开始时间 start_time = time.clock() try: # 判断是否记录sql执行语句 if self.is_output_sql: log_helper.info('sql:' + str(query)) # 创建游标 self.cursor = self.connect.cursor() # 执行SQL self.data = self.cursor.mogrify(query, vars) except Exception as e: # 将异常写入到日志中 log_helper.error('sql生成失败:' + str(e.args) + ' query:' + str(query)) self.data = '获取编译sql失败' finally: # 关闭游标 self.cursor.close() # 记录程序执行结束时间 end_time = time.clock() # 写入日志 self.write_log(start_time, end_time, query) return self.data
由于,当你直接使用完整的sql语句执行时,并不须要这个方法。可是,你使用的是下面方式,执行后就会生成组合好的sql语句,帮助咱们分析sql语句生成状况
# 使用with方法,初始化数据库链接 with db_helper.PgHelper(db_config.DB, db_config.IS_OUTPUT_SQL) as db: # 设置sql执行语句 sql = """insert into product (name, code) values (%s, %s) returning id""" # 设置提交参数 vars = ('zhangsan', '201807251234568') # 生成sql语句,并打印到控制台 print(db.get_sql(sql, vars))
输出结果:
b"insert into product (name, code) values ('zhangsan', '201807251234568') returning id"
数据库最多见的操做就是增删改查操做,因为postgresql有个很是好用的特殊参数:returning,它能够在sql执行增改删结束后,返回咱们想要的字段值,方便咱们进行相应的判断与操做,因此增改删操做咱们不须要将它与查询操做分离成两个方法,统一使用这个方法来获取数据库中返回的记录值。
在实现这个方法以前,咱们设计时要思考这几个问题:
1.须要记录程序执行的起始与结束时间,计算sql语句执行时长,用来判断是否记录到日志中,方便开发人员进行分析优化sql语句
2.须要根据参数判断,是否须要将全部执行的sql语句记录到日志中,方便开发人员有须要时,查看执行了哪些sql语句,进行数据与功能分析
3.因为类在加载时就已经自动链接数据库了,因此在方法中不须要进行打开数据库连接操做
5.在执行sql语句时,须要建立游标,而后执行sql语句
6.为了让用户更好的体验,减小异常的直接抛出,须要进行异常捕捉,并将捕捉到的异常进行处理,记录到日志中方便开发人员分析错误,同时同步发送推送给相关人员,即时提醒错误
7.sql执行成功之后,须要对返回的数据进行处理,组合成字典类型,方便前端使用
8.完成数据处理后,须要及时关闭游标
9.对返回的数据须要进行处理后,返回给上一级程序
1 def execute(self, query, vars=None): 2 """执行sql语句查询,返回结果集或影响行数""" 3 if not query: 4 return None 5 # 记录程序执行开始时间 6 start_time = time.clock() 7 try: 8 # 判断是否记录sql执行语句 9 if self.is_output_sql: 10 log_helper.info('sql:' + str(query)) 11 # 创建游标 12 self.cursor = self.connect.cursor() 13 # 执行SQL 14 result = self.cursor.execute(query, vars) 15 print(str(result)) 16 except Exception as e: 17 # 将异常写入到日志中 18 log_helper.error('sql执行失败:' + str(e.args) + ' query:' + str(query)) 19 self.data = None 20 else: 21 # 获取数据 22 try: 23 if self.cursor.description: 24 # 在执行insert/update/delete等更新操做时,若是添加了returning,则读取返回数据组合成字典返回 25 self.data = [dict((self.cursor.description[i][0], value) for i, value in enumerate(row)) for row in self.cursor.fetchall()] 26 else: 27 # 若是执行insert/update/delete等更新操做时没有添加returning,则返回影响行数,值为0时表时没有数据被更新 28 self.data = self.cursor.rowcount 29 except Exception as e: 30 # 将异常写入到日志中 31 log_helper.error('数据获取失败:' + str(e.args) + ' query:' + str(query)) 32 self.data = None 33 finally: 34 # 关闭游标 35 self.cursor.close() 36 # 记录程序执行结束时间 37 end_time = time.clock() 38 # 写入日志 39 self.write_log(start_time, end_time, query) 40 41 # 若是有返回数据,则把该数据返回给调用者 42 return self.data
最后一个是记录超时sql语句到日志方法,这里我将大于0.1秒的sql语句都记录下来
def write_log(self, start_time, end_time, sql): """记录Sql执行超时日志""" t = end_time - start_time if (t) > 0.1: content = ' '.join(('run time:', str(t), 's sql:', sql)) log_helper.info(content)
完成的db_helper.py代码
1 #!/usr/bin/env python 2 # coding=utf-8 3 4 import psycopg2 5 import time 6 from io import StringIO 7 from common import log_helper, file_helper 8 9 10 class PgHelper(object): 11 """postgresql数据库操做类""" 12 13 def __init__(self, db, is_output_sql): 14 self.connect = None 15 self.cursor = None 16 # 初始化数据库参数 17 self.db_name = db.get('db_name', '') 18 self.db_user = db.get('db_user', '') 19 self.db_pass = db.get('db_pass', '') 20 self.db_host = db.get('db_host', '') 21 self.db_port = db.get('db_port', '') 22 # 是否将全部要执行的Sql语句输出到日志里 23 self.is_output_sql = is_output_sql 24 25 def open_conn(self): 26 """链接数据库,并创建游标""" 27 try: 28 if not self.connect: 29 self.connect = psycopg2.connect(database=self.db_name, user=self.db_user, password=self.db_pass, 30 host=self.db_host, port=self.db_port) 31 return self.connect 32 except Exception as e: 33 log_helper.error('链接数据库失败:' + str(e.args)) 34 return False 35 36 def close_conn(self): 37 """关闭postgresql数据库连接""" 38 # 关闭游标 39 try: 40 if self.cursor: 41 self.cursor.close() 42 except Exception: 43 pass 44 # 关闭数据库连接 45 try: 46 if self.connect: 47 self.connect.close() 48 except Exception: 49 pass 50 51 def __enter__(self): 52 """初始化数据库连接""" 53 self.open_conn() 54 return self 55 56 def __exit__(self, type, value, trace): 57 """关闭postgresql数据库连接""" 58 self.close_conn() 59 60 def rollback(self): 61 """回滚操做""" 62 try: 63 # 异常时,进行回滚操做 64 if self.connect: 65 self.connect.rollback() 66 except Exception as e: 67 log_helper.error('回滚操做失败:' + str(e.args)) 68 69 def commit(self): 70 """提交事务""" 71 try: 72 if self.connect: 73 self.connect.commit() 74 self.close_conn() 75 except Exception as e: 76 log_helper.error('提交事务失败:' + str(e.args)) 77 78 def get_sql(self, query, vars=None): 79 """获取编译后的sql语句""" 80 # 记录程序执行开始时间 81 start_time = time.clock() 82 try: 83 # 判断是否记录sql执行语句 84 if self.is_output_sql: 85 log_helper.info('sql:' + str(query)) 86 # 创建游标 87 self.cursor = self.connect.cursor() 88 # 执行SQL 89 self.data = self.cursor.mogrify(query, vars) 90 except Exception as e: 91 # 将异常写入到日志中 92 log_helper.error('sql生成失败:' + str(e.args) + ' query:' + str(query)) 93 self.data = '获取编译sql失败' 94 finally: 95 # 关闭游标 96 self.cursor.close() 97 # 记录程序执行结束时间 98 end_time = time.clock() 99 # 写入日志 100 self.write_log(start_time, end_time, query) 101 102 return self.data 103 104 def copy(self, values, table_name, columns): 105 """ 106 百万级数据更新函数 107 :param values: 更新内容,字段之间用\t分隔,记录之间用\n分隔 "1\taaa\tabc\n2\bbb\abc\n" 108 :param table_name: 要更新的表名称 109 :param columns: 须要更新的字段名称:例:('id','userame','passwd') 110 :return: 111 """ 112 try: 113 # 创建游标 114 self.cursor = self.connect.cursor() 115 self.cursor.copy_from(StringIO(values), table_name, columns=columns) 116 self.connect.commit() 117 return True 118 except Exception as e: 119 # 将异常写入到日志中 120 log_helper.error('批量更新失败:' + str(e.args) + ' table:' + table_name) 121 finally: 122 # 关闭游标 123 self.cursor.close() 124 125 def execute(self, query, vars=None): 126 """执行sql语句查询,返回结果集或影响行数""" 127 if not query: 128 return None 129 # 记录程序执行开始时间 130 start_time = time.clock() 131 try: 132 # 判断是否记录sql执行语句 133 if self.is_output_sql: 134 log_helper.info('sql:' + str(query)) 135 # 创建游标 136 self.cursor = self.connect.cursor() 137 # 执行SQL 138 result = self.cursor.execute(query, vars) 139 print(str(result)) 140 except Exception as e: 141 # 将异常写入到日志中 142 log_helper.error('sql执行失败:' + str(e.args) + ' query:' + str(query)) 143 self.data = None 144 else: 145 # 获取数据 146 try: 147 if self.cursor.description: 148 # 在执行insert/update/delete等更新操做时,若是添加了returning,则读取返回数据组合成字典返回 149 self.data = [dict((self.cursor.description[i][0], value) for i, value in enumerate(row)) for row in self.cursor.fetchall()] 150 else: 151 # 若是执行insert/update/delete等更新操做时没有添加returning,则返回影响行数,值为0时表时没有数据被更新 152 self.data = self.cursor.rowcount 153 except Exception as e: 154 # 将异常写入到日志中 155 log_helper.error('数据获取失败:' + str(e.args) + ' query:' + str(query)) 156 self.data = None 157 finally: 158 # 关闭游标 159 self.cursor.close() 160 # 记录程序执行结束时间 161 end_time = time.clock() 162 # 写入日志 163 self.write_log(start_time, end_time, query) 164 165 # 若是有返回数据,则把该数据返回给调用者 166 return self.data 167 168 169 def write_log(self, start_time, end_time, sql): 170 """记录Sql执行超时日志""" 171 t = end_time - start_time 172 if (t) > 0.1: 173 content = ' '.join(('run time:', str(t), 's sql:', sql)) 174 log_helper.info(content)
测试代码
1 #!/usr/bin/evn python 2 # coding=utf-8 3 4 import unittest 5 from common import db_helper 6 from config import db_config 7 8 class DbHelperTest(unittest.TestCase): 9 """数据库操做包测试类""" 10 11 def setUp(self): 12 """初始化测试环境""" 13 print('------ini------') 14 15 def tearDown(self): 16 """清理测试环境""" 17 print('------clear------') 18 19 def test(self): 20 # 使用with方法,初始化数据库链接 21 with db_helper.PgHelper(db_config.DB, db_config.IS_OUTPUT_SQL) as db: 22 # 设置sql执行语句 23 sql = """insert into product (name, code) values (%s, %s) returning id""" 24 # 设置提交参数 25 vars = ('张三', '201807251234568') 26 # 生成sql语句,并打印到控制台 27 print(db.get_sql(sql, vars)) 28 29 db.execute('select * from product where id=1000') 30 db.execute('insert into product (name, code) values (%s, %s) returning id', ('张三', '201807251234568')) 31 db.commit() 32 33 if __name__ == '__main__': 34 unittest.main()
版权声明:本文原创发表于 博客园,做者为 AllEmpty 本文欢迎转载,但未经做者赞成必须保留此段声明,且在文章页面明显位置给出原文链接,不然视为侵权。
python开发QQ群:669058475(本群已满)、733466321(能够加2群) 做者博客:http://www.cnblogs.com/EmptyFS/