前言
- 基于原生模块:pymysql
- 推荐教程
- 小工具结构
- DatabaseUtil([databseName])
- getConnection()
- query(sql)
- save(sql)
- delete(sql)
- closeResources(cursor,connection)
源码
import pymysql # 导入mysql模块
## 数据库操纵工具
class DatabaseUtil:
"""
database util / 数据库操纵工具
------------------------------
[特别注意]
+ pymysql.connect(...)函数
+ unix_socket
+ 本字段是使用 socket 链接才用到的,而咱们使用的是IP链接
+ 故:unix_socket='/tmp/mysql.sock' 改为 port=3306
"""
className = "DatabaseUtil";
def __init__(self,database='default_db_name'):
self.host='xxxx.com.cn';
self.port = 3306;
# unix_socket= "/tmp/mysql.sock",//module 'socket' has no attribute 'AF_UNIX'
self.user='root';
self.password='root';
self.database=database;
self.charset='utf8mb4';
self.cursorclass=pymysql.cursors.DictCursor;
pass;
def getConnection(self):
connection = None;
connection = pymysql.connect(host=self.host,
port=self.port,
# unix_socket= "/tmp/mysql.sock",//module 'socket' has no attribute 'AF_UNIX'
user=self.user,
password=self.password,
database=self.database,
charset=self.charset,
cursorclass=self.cursorclass);
# 切换数据库子模式
# cursor.execute("USE "+str(self.database)); # 形如: USE db_demoDatabase
# 使用cursor()方法建立一个游标对象
# cursor = connection.cursor();
if connection == None:
print("[",self.className,".getConnection] Fail to load and get database Connection!");
return connection;
pass;
def query(self,sql):## 查 select
# sql = "select * from tb_url limit 0,1000"; # just for test
print("[",self.className,".query] SQL:",sql);
connection = self.getConnection();
cursor = connection.cursor();
try:
cursor.execute(sql) # 执行SQL语句
# sql = "select * from userinfo where username=%(u)s and password=%(p)s"
# cursor.execute(sql,user,pwd) #直接传值
# cursor.execute(sql,[user,pwd]) #列表形式
# cursor.execute(sql,{'u':user,'p':pwd}) #字典格式
# print("row number:",(cursor.rownumber))
# print("rowcount:",cursor.rowcount); # 只读属性,并返回执行execute()方法后影响的行数
# 获取全部记录列表
#results = print("row [1]: ", cursor.fetchone()); # 获取下一个查询结果集。结果集是一个对象
if cursor.rowcount < 1:
print("[",self.className,".query] Not found any data from database!")
return;
results = None; # 初始化
# + cursor.fetchone() 获取下一行数据,第一次为首行
# + cursor.fetchall() 获取全部行数据源
# + cursor.fetchmany(n) 获取下N行数据
results = cursor.fetchall(); # 接收所有的返回结果行.
# print("results:",results);
# print("results[0]:",results[0]);
i = 0; #记录计数
for row in results: # row :字典类型
object = [];#存储各字段信息
for key in row:
object.append(row[key]);
# print("[", i , "] " , key , " : " , row[key]);
pass;
i+=1;
# 打印结果
# print("Url {\n\tpk_url_id:%s,\n\tstate:%s,\n\turl:%s,\n\tresolve_type:%s,\n\tresolver_class_bean:%s\n}" % (object[0], object[1], object[2], object[3], object[4]))
except e:
print("[",self.className,".query] ",e.message);
print("[",self.className,".query] Error: unable to fetch data!")
pass;
self.closeResources(cursor,connection);
return results; # 以结果行为字典类型的数组 [{ key1:value1, key2:value2 }]
pass;
def save(self,sql): ## 插 insert / 更新 update
# sql = "UPDATE EMPLOYEE SET AGE = AGE + 1 WHERE SEX = '%c'" % ('M')
# sql = "insert into tb_url values('2','WAITING_RESOLVE','http://test.for.python.mysql.cn','article/author','resolver_class_bean:Unknown')";
print("[",self.className,".save] SQL:",sql);
connection = self.getConnection();
cursor = connection.cursor();
result = -100; # 初始化一个异常值
try:
# 执行sql语句
result = cursor.execute(sql);
# print("[",className,".insert] the last rowId is ",cursor.lastrowid) # 当表中有自增的主键的时候,能够使用lastrowid来获取最后一次自增的ID
# 提交到数据库执行
connection.commit();
except:
# 若是发生错误则回滚
connection.rollback();
pass;
self.closeResources(cursor,connection);
if result == 1:
print("[",self.className,".save] Done!");
else:
print("[",self.className,".save] Failed!")
pass;
# print("[",self.className,".insert] result:",result)
return result;
pass;
def delete(self,sql): # 删除 delete
# sql = "DELETE FROM EMPLOYEE WHERE AGE > %s" % (20)
print("[",self.className,".delete] SQL:",sql);
connection = self.getConnection();
cursor = connection.cursor();
result = -100; # 初始化一个异常值
try:
# 执行sql语句
result = cursor.execute(sql);
# print("[",className,".insert] the last rowId is ",cursor.lastrowid) # 当表中有自增的主键的时候,能够使用lastrowid来获取最后一次自增的ID
# 提交到数据库执行
connection.commit();
except:
# 若是发生错误则回滚
connection.rollback();
pass;
self.closeResources(cursor,connection);
if result == 1:
print("[",self.className,".delete] Done!");
else:
print("[",self.className,".delete] Failed!")
pass;
# print("[",self.className,".insert] result:",result)
return result;
pass;
def closeResources(self,cursor,connection):
try:
cursor.close() # 关闭数据库游标对象
connection.close() # 关闭数据库链接
pass;
except e:
print (e.message);
pass;
pass;
pass; # class end
Demo
# 插 / 更新
# sql = "insert into tb_url values('5','WAITING_RESOLVE','http://test.for.python.mysql.cn','article/author','resolver_class_bean:Unknown')";
# print(dbUtil.save(sql));
# 删
# for i in range(2,5):[2,5)
# sql = "delete from tb_url where pk_url_id='%d'" % (i);
# dbUtil.delete(sql);
# pass;
# 查
sql = "select * from tb_url where pk_url_id='%s'" % ('5');
print(dbUtil.query(sql));
//output
[ DatabaseUtil .query] SQL: select * from tb_url where pk_url_id='5'
[{'pk_url_id': 5, 'state': 'WAITING_RESOLVE', 'url': 'http://test.for.python.mysql.cn', 'resolve_type': 'article/author', 'resolver_class_bean': 'resolver_class_bean:Unknown'}]