SQLALchemy
是Python
中的一款优秀的ORM
框架,它能够做用于任何第三方Web
框架,如flask
,tornado
等框架。html
SQLALchemy
相较于DjangoORM
来讲更加的贴近原生SQL
语句,所以学习难度较低。python
组成部分 | 描述 |
---|---|
Engine | 框架引擎 |
Connection Pooling | 数据库连接池 |
Dialect | 数据库DB API种类 |
Schema/Types | 架构&类型 |
SQL Exprression Language | SQL表达式语言 |
下载SQLALchemy
模块:mysql
pip install sqlalchemy
值得注意的是SQLALchemy
必须依赖其余操纵数据的模块,Dialect
用于和数据API
进行交流,根据配置文件的不一样调用不一样的数据库API
,从而实现对数据库的操做,如:sql
MySQL-Python mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname> pymysql mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] MySQL-Connector mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname> cx_Oracle oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...] 更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html
SQLALchemy
中不容许修改表结构,若是修改表结构则须要删除旧表,再建立新表:数据库
#!/usr/bin/env python # -*- coding:utf-8 -*- import datetime from sqlalchemy import Column, Integer, String, DateTime, UniqueConstraint, Index from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base # 基础类 Base = declarative_base() # 建立引擎 engine = create_engine( "mysql+pymysql://root@127.0.0.1:3306/db1?charset=utf8", # "mysql+pymysql://root:123@127.0.0.1:3306/db1?charset=utf8", # 有密码时 max_overflow=0, # 超过链接池大小外最多建立的链接 pool_size=5, # 链接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,不然报错 pool_recycle=-1 # 多久以后对线程池中的线程进行一次链接的回收(重置) ) class Users(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) age = Column(Integer,nullable=False) phone = Column(String(11)) addr = Column(String(64), nullable=True) create_time = Column(DateTime, default=datetime.datetime.now) # 必定不要加括号 __table_args__ = ( UniqueConstraint("id", "name"), # 建立联合惟一 可指定name给个别名 Index("phone", "addr", unique=True), # 建立联合惟一索引 可指定name给个别名 ) def __str__(self): return "object:<id:%s name:%s>" % (self.id, self.name) def create_tb(): """ 建立表 :return: """ Base.metadata.create_all(engine) def drop_tb(): """ 删除表 :return: """ Base.metadata.drop_all(engine) if __name__ == '__main__': drop_tb() create_tb()
表建立好以后,开始连接库。flask
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import scoped_session # 导入引擎,模型表等 from models import * # 经过Session绑定引擎和数据库创建关系 Session = sessionmaker(bind=engine) # 建立连接池,使用session便可为当前线程拿出一个连接对象。内部采用threading.local进行隔离 session = scoped_session(Session)
新增单条记录:session
from sqlalchemy.orm import scoped_session from sqlalchemy.orm import sessionmaker # 导入引擎,模型表等 from models import * # 经过Session绑定引擎和数据库创建关系 Session = sessionmaker(bind=engine) # 建立连接池,使用session便可为当前线程拿出一个连接对象。内部采用threading.local进行隔离 session = scoped_session(Session) user_obj = Users(name="user001", phone="15125352333",age=23, addr="China") session.add(user_obj) # 提交 session.commit() # 关闭连接(可以使用session.remove()) session.close()
修改记录:架构
from sqlalchemy.orm import scoped_session from sqlalchemy.orm import sessionmaker # 导入引擎,模型表等 from models import * # 经过Session绑定引擎和数据库创建关系 Session = sessionmaker(bind=engine) # 建立连接池,使用session便可为当前线程拿出一个连接对象。内部采用threading.local进行隔离 session = scoped_session(Session) # 修更名字 session.query(Users).filter_by(id=1).update({"name": "USER001"}) # 修改年龄,使用+号,默认为"fetch",表明只容许int类型使用+号 session.query(Users).filter_by(id=1).update({"age": Users.age + 1},synchronize_session="fetch") # 修改地址,使用+号,因为是字符类型,因此要修改synchronize_session=False session.query(Users).filter_by(id=1).update({"addr":Users.addr + "BeiJing"},synchronize_session=False) # 提交 session.commit() # 关闭连接 session.close()
删除案例:oracle
from sqlalchemy.orm import scoped_session from sqlalchemy.orm import sessionmaker # 导入引擎,模型表等 from models import * # 经过Session绑定引擎和数据库创建关系 Session = sessionmaker(bind=engine) # 建立连接池,使用session便可为当前线程拿出一个连接对象。内部采用threading.local进行隔离 session = scoped_session(Session) session.query(Users).filter_by(id=2).delete() # 提交 session.commit() # 关闭连接 session.close()
批量增长:框架
from sqlalchemy.orm import scoped_session from sqlalchemy.orm import sessionmaker # 导入引擎,模型表等 from models import * # 经过Session绑定引擎和数据库创建关系 Session = sessionmaker(bind=engine) # 建立连接池,使用session便可为当前线程拿出一个连接对象。内部采用threading.local进行隔离 session = scoped_session(Session) # 批量增长 session.add_all([ Users(name="user002",age=21,phone="13269867233",addr="ShangHai"), Users(name="user003",age=18,phone="13269867234",addr="GuangZhou"), Users(name="user003",age=24,phone="13269867235",addr="ChongQing"), ]) # 提交 session.commit() # 关闭连接 session.close()
基本查询:
from sqlalchemy.orm import scoped_session from sqlalchemy.orm import sessionmaker # 导入引擎,模型表等 from models import * # 经过Session绑定引擎和数据库创建关系 Session = sessionmaker(bind=engine) # 建立连接池,使用session便可为当前线程拿出一个连接对象。内部采用threading.local进行隔离 session = scoped_session(Session) # 查询 # -- 查全部 -- result_01 = session.query(Users).all() # -- 过滤 -- result_02 = session.query(Users).filter(Users.name == "USER001").all() # Python表达式的形式过滤 result_03 = session.query(Users).filter_by(name="user002").all() # ORM形式过滤 result_04 = session.query(Users).filter_by(name="user003").first() # ORM形式过滤 取第一个 print(result_01) # [<models.Users>,<models.Users>,<models.Users>] print(result_02) print(result_03) print(result_04) # object:<id:3 name:user003> 经过__str__拿到结果 # 提交 session.commit() # 关闭连接 session.close()
条件查询:
from sqlalchemy.orm import scoped_session from sqlalchemy.orm import sessionmaker # 导入引擎,模型表等 from models import * # 经过Session绑定引擎和数据库创建关系 Session = sessionmaker(bind=engine) # 建立连接池,使用session便可为当前线程拿出一个连接对象。内部采用threading.local进行隔离 session = scoped_session(Session) # 只拿某字段 result_00 = session.query(Users.name,Users.age).first() print(result_00) # and(用逗号或者用and_) result_01 = session.query(Users).filter( Users.id > 1,Users.age < 23).all() print(result_01) from sqlalchemy import and_ result_02 = session.query(Users).filter(and_( Users.id > 1,Users.age < 23)).all() print(result_02) # or from sqlalchemy import or_ result_03 = session.query(Users).filter(or_(Users.id > 3,Users.age < 23)).all() print(result_03) # and与or的组合使用 result_04 = session.query(Users).filter(or_( Users.id > 1, and_(Users.id > 2, Users.age < 24) )).all() print(result_04) # 范围 result_05 = session.query(Users).filter(Users.age.between(18,24)).all() print(result_05) # 包含 result_06 = session.query(Users).filter(Users.age.in_([18,21,24])).all() print(result_06) # 取反 ~ result_07 = session.query(Users).filter(~Users.age.in_([18,21,24])).all() print(result_07) # 通配符 result_08 = session.query(Users).filter(Users.name.like("us%")).all() print(result_08) # 分页 result_09 = session.query(Users).all()[0:1] print(result_09) # 排序 result_10 = session.query(Users).order_by(Users.id.desc()).all() # 倒序 print(result_10) result_11 = session.query(Users).order_by(Users.id.asc()).all() # 正序 print(result_11) # 分组 result_12 = session.query(Users).group_by(Users.id).all() print(result_12) # 聚合函数 from sqlalchemy.sql import func result_13 = session.query( func.max(Users.age), func.sum(Users.age), func.min(Users.age), ).group_by(Users.name).having(func.max(Users.age > 12)).all() print(result_13) # 提交 session.commit() # 关闭连接 session.close()
首先是创建一对多的关系,使用relationship
作逻辑一对多,不会在物理表中建立关系,可是能够经过该字段进行增删改查:
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy import Column, Integer, String, ForeignKey from sqlalchemy.orm import relationship from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base # 基础类 Base = declarative_base() # 建立引擎 engine = create_engine( "mysql+pymysql://root@127.0.0.1:3306/db1?charset=utf8", # "mysql+pymysql://root:123@127.0.0.1:3306/db1?charset=utf8", # 有密码时 max_overflow=0, # 超过链接池大小外最多建立的链接 pool_size=5, # 链接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,不然报错 pool_recycle=-1 # 多久以后对线程池中的线程进行一次链接的回收(重置) ) class Classes(Base): __tablename__ = "classes" id = Column(Integer, primary_key=True) name = Column(String(32), nullable=False) class Students(Base): __tablename__ = "students" id = Column(Integer, primary_key=True) name = Column(String(32), nullable=False) # 真实约束字段:避免脏数据写入,在物理表中会建立真实字段关系 # 可选级联操做:CASCADE,DELETE、RESTRICT fk_class = Column(Integer, ForeignKey("classes.id",ondelete="CASCADE",onupdate="CASCADE")) # 逻辑关系字段:不会在真实物理表中建立字段,可是能够经过该逻辑字段进行增删改查 # backref:反向查询的名字 re_class = relationship("Classes",backref="students") def create_tb(): """ 建立表 :return: """ Base.metadata.create_all(engine) def drop_tb(): """ 删除表 :return: """ Base.metadata.drop_all(engine) if __name__ == '__main__': drop_tb() create_tb()
经过逻辑字段进行增长:
from sqlalchemy.orm import scoped_session from sqlalchemy.orm import sessionmaker # 导入引擎,模型表等 from models import * # 经过Session绑定引擎和数据库创建关系 Session = sessionmaker(bind=engine) # 建立连接池,使用session便可为当前线程拿出一个连接对象。内部采用threading.local进行隔离 session = scoped_session(Session) session.add_all( [ Students(name="学生01", re_class=Classes(name="一年级一班")), # 自动填入fk_class Students(name="学生02", re_class=Classes(name="一年级二班")), ] ) # 提交 session.commit() # 关闭连接 session.close()
多对多也使用relationship
作逻辑多对多,不会在物理表中建立关系,可是能够经过该字段进行增删改查。
使用relationship
时,传入指定手动生成的第三张表,表明这是多对多关系:
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import relationship # 基础类 Base = declarative_base() # 建立引擎 engine = create_engine( "mysql+pymysql://root@127.0.0.1:3306/db1?charset=utf8", # "mysql+pymysql://root:123@127.0.0.1:3306/db1?charset=utf8", # 有密码时 max_overflow=0, # 超过链接池大小外最多建立的链接 pool_size=5, # 链接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,不然报错 pool_recycle=-1 # 多久以后对线程池中的线程进行一次链接的回收(重置) ) class Classes(Base): __tablename__ = "classes" id = Column(Integer, primary_key=True) name = Column(String(32), nullable=False) class Students(Base): __tablename__ = "students" id = Column(Integer, primary_key=True) name = Column(String(32), nullable=False) # 可选级联操做:CASCADE,DELETE、RESTRICT fk_class = Column(Integer, ForeignKey("classes.id", ondelete="CASCADE", onupdate="CASCADE")) # 逻辑关系字段:不会在真实物理表中建立字段,可是能够经过该逻辑字段进行增删改查 # backref:反向查询的名字 re_class = relationship("Classes", backref="students") class Teachers(Base): __tablename__ = "teachers" id = Column(Integer, primary_key=True) name = Column(String(32), nullable=False) # 逻辑字段M2M:指定第三张表,secondary参数为__tablename__,反向查询为teachers re_class = relationship("Classes", secondary="teachersm2mclasses", backref="teachers") class TeachersM2mClasses(Base): __tablename__ = "teachersm2mclasses" id = Column(Integer, primary_key=True) teacher_id = Column(Integer, ForeignKey("teachers.id")) class_id = Column(Integer, ForeignKey("classes.id")) __table_args__ = ( UniqueConstraint("teacher_id", "class_id"), # 建立联合惟一 可指定name给个别名 ) def create_tb(): """ 建立表 :return: """ Base.metadata.create_all(engine) def drop_tb(): """ 删除表 :return: """ Base.metadata.drop_all(engine) if __name__ == '__main__': drop_tb() create_tb()
用一个列表,将班级的记录对象放进去,你能够用多种增长方式,使用逻辑字段添加或本身操纵第三张表:
from sqlalchemy.orm import scoped_session from sqlalchemy.orm import sessionmaker # 导入引擎,模型表等 from models import * # 经过Session绑定引擎和数据库创建关系 Session = sessionmaker(bind=engine) # 建立连接池,使用session便可为当前线程拿出一个连接对象。内部采用threading.local进行隔离 session = scoped_session(Session) session.add_all( [ Teachers(name="老师01",re_class=[ session.query(Classes).filter_by(id=1).first() ]), Teachers(name="老师02",re_class=[ session.query(Classes).filter_by(id=1).first() ]), Teachers(name="老师03",re_class=[ session.query(Classes).filter_by(id=2).first() ]), ] ) # 提交 session.commit() # 关闭连接 session.close()
组合查询将两张表用笛卡尔积的效果显现出来:
from sqlalchemy.orm import scoped_session from sqlalchemy.orm import sessionmaker # 导入引擎,模型表等 from models import * # 经过Session绑定引擎和数据库创建关系 Session = sessionmaker(bind=engine) # 建立连接池,使用session便可为当前线程拿出一个连接对象。内部采用threading.local进行隔离 session = scoped_session(Session) # 必须用filter,获取所有也是,不可使用all由于他会返回一个list,list不具有union_all # 使用filter返回的对象是:<class 'sqlalchemy.orm.query.Query'> # 而且query中必须单拿某一个字段,若是不指定字段就直接返回对象 s = session.query(Students.name).filter() t = session.query(Teachers.name).filter() c = session.query(Classes.name).filter() ret = s.union_all(t).union_all(c).all() # 用列表显示 print(ret) # [('学生01',), ('学生02',), ('老师01',), ('老师02',), ('老师03',), ('一年级一班',), ('一年级二班',)] # 提交 session.commit() # 关闭连接 session.close()
使用join
进行连表查询:
from sqlalchemy.orm import scoped_session from sqlalchemy.orm import sessionmaker # 导入引擎,模型表等 from models import * # 经过Session绑定引擎和数据库创建关系 Session = sessionmaker(bind=engine) # 建立连接池,使用session便可为当前线程拿出一个连接对象。内部采用threading.local进行隔离 session = scoped_session(Session) # 手动指定条件查询 result = session.query(Students.name, Classes.name).filter(Students.id == Classes.id).all() for i in result: print(i) # 链接查询,同上,内部自动指定 Students.fk_class == Classes.id 的条件 result = session.query(Students.name, Classes.name).join(Classes).all() # 至关于:result = session.query(Students.name,Classes.name).join(Classes, Students.fk_class == Classes.id).all() for i in result: print(i) # 左连接查询,即便有同窗没有班级也拿出来 result = session.query(Students.name, Classes.name).join(Classes, isouter=True).all() for i in result: print(i) # 若是想查看有哪些班级没有同窗,就换一个位置 result = session.query(Students.name, Classes.name).join(Students, isouter=True).all() for i in result: print(i) # 三表查询,须要本身指定条件 result = session.query(Teachers.name, Classes.name, TeachersM2mClasses.id) \ .join(Teachers, TeachersM2mClasses.teacher_id == Teachers.id) \ .join(Classes, TeachersM2mClasses.class_id == Classes.id) \ .filter() # 查看原生语句 print(result) for i in result: print(i) # 提交 session.commit() # 关闭连接 session.close()
上面是使用join
进行的连表查询,其实也可使用逻辑字段relationship
查询。
from sqlalchemy.orm import scoped_session from sqlalchemy.orm import sessionmaker # 导入引擎,模型表等 from models import * # 经过Session绑定引擎和数据库创建关系 Session = sessionmaker(bind=engine) # 建立连接池,使用session便可为当前线程拿出一个连接对象。内部采用threading.local进行隔离 session = scoped_session(Session) # 正向:查看第一个老师都在哪些班级(经过逻辑字段的名字) result = session.query(Teachers).first() # result.re_class是一个列表,存了有关该老师所在的班级 <class 'sqlalchemy.orm.collections.InstrumentedList'> for class_obj in result.re_class: # 查看其全部的班级 print(class_obj.name) # 反向:查看第一个班级下都有哪些老师,都有哪些学生(经过逻辑字段中的backref参数进行反向查询) result = session.query(Classes).first() # 看老师 for teacher_obj in result.teachers: print(teacher_obj.name) # 看学生 for student_obj in result.students: print(student_obj.name) # 提交 session.commit() # 关闭连接 session.close()
使用逻辑字段relationship
可拥有一些方法执行增删改。
因为逻辑字段是一个相似列表的存在,因此列表的方法都能用。
print(type(老师对象.班级列表)) # <class 'sqlalchemy.orm.collections.InstrumentedList'>
好比使用extend
方法增长老师的班级:
# 给老师增长班级 result = session.query(Teachers).first() # extend方法: result.re_class.extend([ Classes(name="三年级一班",), Classes(name="三年级二班",), ])
使用remove
方法删除老师的班级:
# 减小老师所在的班级 result = session.query(Teachers).first() # 待删除的班级对象,集合查找比较快 delete_class_set = { session.query(Classes).filter_by(id=7).first(), session.query(Classes).filter_by(id=8).first(), } # 循换老师所在的班级 # remove方法: for class_obj in result.re_class: if class_obj in delete_class_set: result.re_class.remove(class_obj)
使用clear
清空老师所对应的班级:
# 拿出一个老师 result = session.query(Teachers).first() result.re_class.clear()
若是一条查询语句是以filter
结尾,则返回结果对象的__str__
方法中都是SQL
语句:
result = session.query(Teachers).filter() print(result) # SELECT teachers.id AS teachers_id, teachers.name AS teachers_name # FROM teachers
若是是all
结尾,返回的就是一个列表,first
结尾也是一个列表:
result = session.query(Teachers).all() print(result) # [<models.Teachers object at 0x00000178EB0B5550>, <models.Teachers object at 0x00000178EB0B5518>, <models.Teachers object at 0x00000178EB0B5048>]
执行原生SQL
:
from sqlalchemy.orm import scoped_session from sqlalchemy.orm import sessionmaker # 导入引擎,模型表等 from models import * # 经过Session绑定引擎和数据库创建关系 Session = sessionmaker(bind=engine) # 建立连接池,使用session便可为当前线程拿出一个连接对象。内部采用threading.local进行隔离 session = scoped_session(Session) cursor = session.execute(r"select * from students where id <= (:num)",params={"num":2}) print(cursor.fetchall()) # 提交 session.commit() # 关闭连接 session.close()