python 使用sqlalchemy进行数据库操做

sqlalchemy是python下一个著名的数据库orm库,能够方便地进行数据表建立、数据增删改查等操做html

最详细的教程,见官方:https://docs.sqlalchemy.orgpython

 

这里列举一些经常使用操做:mysql

1、建立数据表

代码以及相关的注释:git

import datetime import sqlalchemy as db from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base # 为了使用MySQL支持的类型,如unsigned bigint
import sqlalchemy.dialects.mysql as mysql # 建立对象的基类:
Base = declarative_base() # 定义User对象, 尽可能展现一些建立选项:
class User(Base): # 表的名字:
    __tablename__ = 't_user'
    # 建立表的扩展设置,这里设置表的字符集为utf8
    __table_args__ = ( # 联合惟一索引
        db.UniqueConstraint('id', 'name', name='uix_id_name'), # 联合索引
        db.Index('sex_createtime', 'sex', 'createtime'), # 设置引擎和字符集,注意:这个map只能放到元组的最后,不然会报错
 { 'mysql_engine': 'InnoDB', 'mysql_charset': 'utf8', } ) # 表的结构字段定义
    # id字段,类型为unsigned bigint,定义为主键,自增,db.BigInteger不支持unsigned,因此使用mysql特别支持的类型
    # id = db.Column(db.BigInteger(), primary_key=True, autoincrement=True)
    id = db.Column(mysql.BIGINT(unsigned=True), primary_key=True, autoincrement=True) # name字段,定义为varchar(20),默认容许空,带注释
    name = db.Column(db.String(20), comment='姓名') # phone字段,定义为varchar(11),不容许空,有惟一限制,带注释
    phone = db.Column(db.String(11), nullable=False, unique=True, comment='电话') # score, float类型,带索引, 容许空
    score = db.Column(db.Float, index=True, comment='成绩') # sex,性别, int类型, 默认值为1
    sex = db.Column(db.Integer, default=1, comment='性别,1-男;2-女') # createtime, datetime类型, 不容许空
    createtime = db.Column(db.DateTime, nullable=False, comment='建立时间') # modifytime, datetime类型, 带索引, 默认使用datetime.now()生成当前时间,注意,不带()
    modifytime = db.Column(db.DateTime, default=datetime.datetime.now, comment='修改时间') # 只是用于打印对象
    def __str__(self): return "(" + ', '.join(['%s:%s' % item for item in self.__dict__.items()]) + ")"


# 初始化数据库链接: # 链接字符串模式:数据库类型+链接库+用户名+密码+主机,字符编码,是否打印建表细节 # 其中,链接库是当前用于操做数据库的库,对于python2.7,通常使用MysqlDb,对于Python3,通常使用pymysql # 链接的例子如:create_engine("mysql+pymysql://cai:123@localhost/test?charset=utf8", echo=True)
engine = db.create_engine('mysql+pymysql://user:pass@localhost:3306/test') # 删除现有的表,谨慎决定是否须要这样操做
Base.metadata.drop_all(engine) # 建立表
Base.metadata.create_all(engine)

 

在mysql中生成的表结构以下:github

CREATE TABLE `t_user` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `name` varchar(20) DEFAULT NULL COMMENT '姓名', `phone` varchar(11) NOT NULL COMMENT '电话', `score` float DEFAULT NULL COMMENT '成绩', `sex` int(11) DEFAULT NULL COMMENT '性别,1-男;2-女', `createtime` datetime NOT NULL COMMENT '建立时间', `modifytime` datetime DEFAULT NULL COMMENT '修改时间', PRIMARY KEY (`id`), UNIQUE KEY `phone` (`phone`), UNIQUE KEY `uix_id_name` (`id`,`name`), KEY `sex_createtime` (`sex`,`createtime`), KEY `ix_t_user_score` (`score`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

 

最经常使用的SQLAlchemy Column类型sql

类型名 Python类型 说 明
Integer int 普通整数,通常是 32 位
SmallInteger int 取值范围小的整数,通常是 16 位
BigInteger int 或 long 不限制精度的整数
Float float 浮点数
Numeric decimal.Decimal 定点数
String str 变长字符串
Text str 变长字符串,对较长或不限长度的字符串作了优化
Unicode unicode 变长 Unicode 字符串
UnicodeText unicode 变长 Unicode 字符串,对较长或不限长度的字符串作了优化
Boolean bool 布尔值
Date datetime.date 日期
Time datetime.time 时间
DateTime datetime.datetime 日期和时间
Interval datetime.timedelta 时间间隔
Enum str 一组字符串
PickleType 任何 Python 对象 自动使用 Pickle 序列化
LargeBinary str 二进制文件

最常使用的SQLAlchemy列选项数据库

选项名 说 明
primary_key 若是设为 True ,这列就是表的主键
unique 若是设为 True ,这列不容许出现重复的值
index 若是设为 True ,为这列建立索引,提高查询效率
nullable 若是设为 True ,这列容许使用空值;若是设为 False ,这列不容许使用空值
default 为这列定义默认值

 

mysql的特别定义列类型选项express

BIGINT, BINARY, BIT, BLOB, BOOLEAN, CHAR, DATE, DATETIME, DECIMAL, DECIMAL, DOUBLE, ENUM, FLOAT, INTEGER, LONGBLOB, LONGTEXT, MEDIUMBLOB, MEDIUMINT, MEDIUMTEXT, NCHAR, NUMERIC, NVARCHAR, REAL, SET, SMALLINT, TEXT, TIME, TIMESTAMP, TINYBLOB, TINYINT, TINYTEXT, VARBINARY, VARCHAR, YEAR


2、数据的增删改查

对数据操做以前,须要定义一个会话类型,并在操做数据时,生成一个会话实例进行操做session

# 定义一个会话类型,用来进行数据操做(engine是前面步骤生成的引擎对象)
DBSession = sessionmaker(bind=engine) #实例一个会话
session = DBSession() # 数据的操做,如session.add(..) # 这一句不能少,不然不会提交
session.commit() session.close()

 

2.1 新增数据

# 插入一条数据
user1 = User(id=1,name="小明", phone="13111223344", score=98.2, sex=1, createtime=datetime.datetime.now()) session.add(user1) # 插入多条数据
users = [ User(id=2,name="小芳", phone="13111223345", score=83.1, sex=2, createtime=datetime.datetime.now()), User(id=3,name="小李", phone="13111223346", score=100, sex=2, createtime=datetime.datetime.now()), User(id=4,name="小牛", phone="13111223347", score=62.5, sex=1, createtime=datetime.datetime.now()), ] session.add_all(users) # 非orm方式,特色是快,可定制,如支持ON DUPLICATE KEY UPDATE
session.execute(User.__table__.insert(), [ {"id":5,"name":"小五", "phone":"13111223348", "score":72.5, "sex":1, "createtime":datetime.datetime.now()}, ] )

 

如何实现ON DUPLICATE KEY UPDATE?app

sqlalchemy不支持ON DUPLICATE KEY UPDATE, 能够本身实现一个。

基本的实现方式:

from sqlalchemy.ext.compiler import compiles from sqlalchemy.sql.expression import Insert @compiles(Insert) def append_string(insert, compiler, **kw): s = compiler.visit_insert(insert, **kw) if 'append_string' in insert.kwargs: return s + " " + insert.kwargs['append_string'] return s my_connection.execute(my_table.insert(append_string = 'ON DUPLICATE KEY UPDATE foo=foo'), my_values)

在实际使用中,这种使用方式显得比较粗糙,通常来讲,能够枚举对象的字段,并对每一个字段设置xxx=VALUES(xxx)进行更新。

 

2.2 查询、更新和删除

三者的操做直接合并来讲了

query = session.query(User) print query # 显示SQL 语句
print query.statement # 同上
for user in query: # 遍历时查询
    print user.name print query.all() # 返回的是一个相似列表的对象
print query.first().name # 记录不存在时,first() 会返回 None # print query.one().name # 不存在,或有多行记录时会抛出异常
print query.filter(User.id == 2).first().name print query.get(2).name # 以主键获取,等效于上句
print query.filter('id = 2').first().name # 支持字符串
 query2 = session.query(User.name) print query2.all() # 每行是个元组
print query2.limit(1).all() # 最多返回 1 条记录
print query2.offset(1).all() # 从第 2 条记录开始返回
print query2.order_by(User.name).all() print query2.order_by('name').all() print query2.order_by(User.name.desc()).all() print query2.order_by('name desc').all() print session.query(User.id).order_by(User.name.desc(), User.id).all() print query2.filter(User.id == 1).scalar() # 若是有记录,返回第一条记录的第一个元素
print session.query('id').select_from(User).filter('id = 1').scalar() print query2.filter(User.id > 1, User.name != 'a').scalar() # and
query3 = query2.filter(User.id > 1) # 屡次拼接的 filter 也是 and
query3 = query3.filter(User.name != 'a') print query3.scalar() print query2.filter(or_(User.id == 1, User.id == 2)).all() # or
print query2.filter(User.id.in_((1, 2))).all() # in
 query4 = session.query(User.id) print query4.filter(User.name == None).scalar() print query4.filter('name is null').scalar() print query4.filter(not_(User.name == None)).all() # not
print query4.filter(User.name != None).all() print query4.count() print session.query(func.count('*')).select_from(User).scalar() print session.query(func.count('1')).select_from(User).scalar() print session.query(func.count(User.id)).scalar() print session.query(func.count('*')).filter(User.id > 0).scalar() # filter() 中包含 User,所以不须要指定表
print session.query(func.count('*')).filter(User.name == 'a').limit(1).scalar() == 1 # 能够用 limit() 限制 count() 的返回数
print session.query(func.sum(User.id)).scalar() print session.query(func.now()).scalar() # func 后能够跟任意函数名,只要该数据库支持
print session.query(func.current_timestamp()).scalar() print session.query(func.md5(User.name)).filter(User.id == 1).scalar() query.filter(User.id == 1).update({User.name: 'c'}) user = query.get(1) print user.name user.name = 'd' session.flush() # 写数据库,但并不提交
print query.get(1).name session.delete(user) session.flush() print query.get(1) session.rollback() print query.get(1).name query.filter(User.id == 1).delete() session.commit() print query.get(1)

 

3、一些进阶操做

如何批量插入大批数据?
可使用非 ORM 的方式:

session.execute( User.__table__.insert(), [{'name': `randint(1, 100)`,'age': randint(1, 100)} for i in xrange(10000)] )
session.commit()

上面我批量插入了 10000 条记录,半秒内就执行完了;而 ORM 方式会花掉很长时间。

 

如何让执行的 SQL 语句增长前缀?

使用 query 对象的 prefix_with() 方法:

session.query(User.name).prefix_with('HIGH_PRIORITY').all() session.execute(User.__table__.insert().prefix_with('IGNORE'), {'id': 1, 'name': '1'})

 

如何替换一个已有主键的记录?
使用 session.merge() 方法替代 session.add(),其实就是 SELECT + UPDATE:

user = User(id=1, name='ooxx') session.merge(user) session.commit()

或者使用 MySQL 的 INSERT … ON DUPLICATE KEY UPDATE,须要用到 @compiles 装饰器,有点难懂,本身看吧:《SQLAlchemy ON DUPLICATE KEY UPDATE》 和 sqlalchemy_mysql_ext

 

如何使用无符号整数?
可使用 MySQL 的方言:

如何使用无符号整数? 可使用 MySQL 的方言:

 

模型的属性名须要和表的字段名不同怎么办?
开发时遇到过一个奇怪的需求,有个其余系统的表里包含了一个“from”字段,这在 Python 里是关键字,因而只能这样处理了:

from_ = Column('from', CHAR(10))

 

如何获取字段的长度?
Column 会生成一个很复杂的对象,想获取长度比较麻烦,这里以 User.name 为例:

User.name.property.columns[0].type.length

 

如何指定使用 InnoDB,以及使用 UTF-8 编码?
最简单的方式就是修改数据库的默认配置。若是非要在代码里指定的话,能够这样:

class User(BaseModel): __table_args__ = { 'mysql_engine': 'InnoDB', 'mysql_charset': 'utf8' }

 

MySQL 5.5 开始支持存储 4 字节的 UTF-8 编码的字符了,iOS 里自带的 emoji(如 � 字符)就属于这种。

若是是对表来设置的话,能够把上面代码中的 utf8 改为 utf8mb4,DB_CONNECT_STRING 里的 charset 也这样更改。
若是对库或字段来设置,则仍是本身写 SQL 语句比较方便,具体细节可参考《How to support full Unicode in MySQL databases》
不建议全用 utf8mb4 代替 utf8,由于前者更慢,索引会占用更多空间。

如何设置外键约束?

from random import randint from sqlalchemy import ForeignKey class User(BaseModel): __tablename__ = 'user' id = Column(Integer, primary_key=True) age = Column(Integer) class Friendship(BaseModel): __tablename__ = 'friendship' id = Column(Integer, primary_key=True) user_id1 = Column(Integer, ForeignKey('user.id')) user_id2 = Column(Integer, ForeignKey('user.id')) for i in xrange(100): session.add(User(age=randint(1, 100))) session.flush() # 或 session.commit(),执行完后,user 对象的 id 属性才能够访问(由于 id 是自增的)
 
for i in xrange(100): session.add(Friendship(user_id1=randint(1, 100), user_id2=randint(1, 100))) session.commit() session.query(User).filter(User.age < 50).delete()

执行这段代码时,你应该会遇到一个错误:

sqlalchemy.exc.IntegrityError: (IntegrityError) (1451, 'Cannot delete or update a parent row: a foreign key constraint fails (`ooxx`.`friendship`, CONSTRAINT `friendship_ibfk_1` FOREIGN KEY (`user_id1`) REFERENCES `user` (`id`))') 'DELETE FROM user WHERE user.age < %s' (50,)

缘由是删除 user 表的数据,可能会致使 friendship 的外键不指向一个真实存在的记录。在默认状况下,MySQL 会拒绝这种操做,也就是 RESTRICT。InnoDB 还容许指定 ON DELETE 为 CASCADE 和 SET NULL,前者会删除 friendship 中无效的记录,后者会将这些记录的外键设为 NULL。
除了删除,还有可能更改主键,这也会致使 friendship 的外键失效。因而相应的就有 ON UPDATE 了。其中 CASCADE 变成了更新相应的外键,而不是删除。
而在 SQLAlchemy 中是这样处理的:

class Friendship(BaseModel): __tablename__ = 'friendship' id = Column(Integer, primary_key=True) user_id1 = Column(Integer, ForeignKey('user.id', ondelete='CASCADE', onupdate='CASCADE')) user_id2 = Column(Integer, ForeignKey('user.id', ondelete='CASCADE', onupdate='CASCADE'))

 

如何链接表?

from sqlalchemy import distinct from sqlalchemy.orm import aliased Friend = aliased(User, name='Friend') print session.query(User.id).join(Friendship, User.id == Friendship.user_id1).all() # 全部有朋友的用户
print session.query(distinct(User.id)).join(Friendship, User.id == Friendship.user_id1).all() # 全部有朋友的用户(去掉重复的)
print session.query(User.id).join(Friendship, User.id == Friendship.user_id1).distinct().all() # 同上
print session.query(Friendship.user_id2).join(User, User.id == Friendship.user_id1).order_by(Friendship.user_id2).distinct().all() # 全部被别人当成朋友的用户
print session.query(Friendship.user_id2).select_from(User).join(Friendship, User.id == Friendship.user_id1).order_by(Friendship.user_id2).distinct().all() # 同上,join 的方向相反,但由于不是 STRAIGHT_JOIN,因此 MySQL 能够本身选择顺序
print session.query(User.id, Friendship.user_id2).join(Friendship, User.id == Friendship.user_id1).all() # 用户及其朋友
print session.query(User.id, Friendship.user_id2).join(Friendship, User.id == Friendship.user_id1).filter(User.id < 10).all() # id 小于 10 的用户及其朋友
print session.query(User.id, Friend.id).join(Friendship, User.id == Friendship.user_id1).join(Friend, Friend.id == Friendship.user_id2).all() # 两次 join,因为使用到相同的表,所以须要别名
print session.query(User.id, Friendship.user_id2).outerjoin(Friendship, User.id == Friendship.user_id1).all() # 用户及其朋友(无朋友则为 None,使用左链接)

这里我没提到 relationship,虽然它看上去很方便,但须要学习的内容实在太多,还要考虑不少性能上的问题,因此干脆本身 join 吧。


为何没法删除 in 操做查询出来的记录?

session.query(User).filter(User.id.in_((1, 2, 3))).delete()

抛出这样的异常:

sqlalchemy.exc.InvalidRequestError: Could not evaluate current criteria in Python.  Specify 'fetch' or False for the synchronize_session parameter.

但这样是没问题的:

session.query(User).filter(or_(User.id == 1, User.id == 2, User.id == 3)).delete()

搜了下找到《Sqlalchemy delete subquery》这个问题,提到了 delete 的一个注意点:删除记录时,默认会尝试删除 session 中符合条件的对象,而 in 操做估计还不支持,因而就出错了。解决办法就是删除时不进行同步,而后再让 session 里的全部实体都过时:

session.query(User).filter(User.id.in_((1, 2, 3))).delete(synchronize_session=False) session.commit() # or session.expire_all()

此外,update 操做也有一样的参数,若是后面马上提交了,那么加上 synchronize_session=False 参数会更快。

 

如何对一个字段进行自增操做
最简单的办法就是获取时加上写锁:

user = session.query(User).with_lockmode('update').get(1) user.age += 1 session.commit()

若是不想多一次读的话,这样写也是能够的:

session.query(User).filter(User.id == 1).update({ User.age: User.age + 1 }) session.commit() # 其实字段之间也能够作运算:
session.query(User).filter(User.id == 1).update({ User.age: User.age + User.id })

 

 

参考资料:

https://blog.csdn.net/tastelife/article/details/25218895

https://www.cnblogs.com/Oliver.net/p/7345647.html

https://www.cnblogs.com/Orangeorchard/p/8097547.html

https://www.runoob.com/python3/python3-mysql.html

相关文章
相关标签/搜索