Object-Relational Mapping,把关系数据库的表结构映射到对象上。使用面向对象的方式来操做数据库。python
下面是一个关系模型与Python对象之间的映射关系:mysql
SQLAlchemy是一个ORM框架。内部是使用了链接池
来管理数据库链接。其自己只是作了关系映射,不能链接数据库,也不能执行sql语句,它在底层须要使用pymysql等模块来链接并执行sql语句,要使用sqlalchemy,那么须要先进行安装:nginx
pip3 install sqlalchemy
查看版本sql
In [1]: import sqlalchemy In [2]: print(sqlalchemy.__version__) 1.3.1
先来总结一下使用sqlalchemy框架操做数据库的通常流程:数据库
建立引擎
(不一样类型数据库使用不一样的链接方式)建立基类
(类对象要继承,由于基类会利用元编程为咱们的子类绑定关于表的其余属性信息)建立实体类
(用来对应数据库中的表)编写实体类属性
(用来对应表中的字段/属性)建立表
(若是表不存在,则须要执行语句在数据库中建立出对应的表)实例化
(具体的一条record记录)建立会话session
(用于执行sql语句的链接)使用会话执行SQL语句
关闭会话
sqlalchemy 使用引擎管理数据库链接(DATABASE URLS),链接的通常格式为:django
dialect+driver://username:password@host:port/database
dialect
:表示什么数据库(好比,mysql,sqlite,oracle等)driver
:用于链接数据库的模块(好比pymysql,mysqldb等)username
:链接数据库的用户名password
:链接数据库的密码host
: 数据库的主机地址port
: 数据库的端口database
: 要链接的数据库名称pymysql模块是较长用于链接mysql的模块,使用pymysql的链接的语句为:编程
mysql+pymysql://dahl:123456@10.0.0.13:3306/test
建立引擎用于进行数据库的链接:create_engine(urls)
安全
import sqlalchemy db_url = 'mysql+pymysql://dahl:123456@10.0.0.13:3306/test' engine = sqlalchemy.create_engine(db_url,echo=True)
特别注意
:建立引擎并不会立刻链接数据库,直到让数据库执行任务是才链接。session
sqlalchemy内部是原生支持链接池的,咱们能够仅仅利用它的链接池功能。经过engie的raw_connection
方法就能够获取到一个链接,而后就能够执行sql语句了(基本上就是Pymysql+DBUtils的实现)
import threading import time import sqlalchemy import pymysql engine = sqlalchemy.create_engine( 'mysql+pymysql://dahl:123456@10.0.0.13:3306/test', max_overflow=5, pool_size=1, pool_timeout=30, pool_recycle=-1 ) def func(): conn = engine.raw_connection() cursor = conn.cursor(pymysql.cursors.DictCursor) time.sleep(2) cursor.execute('select * from employees;') res = cursor.fetchall() print(res) cursor.close() conn.close() if __name__ == '__main__': for i in range(10): threading.Thread(target=func).start()
上面是经过连接池来执行sql的,其实也能够经过session来执行。
import threading from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker engine = sqlalchemy.create_engine( 'mysql+pymysql://dahl:123456@10.0.0.13:3306/test', max_overflow=5, pool_size=1, pool_timeout=30, pool_recycle=-1 ) DBsession = sessionmaker(bind=engine) def func(): session = DBsession() cursor = session.execute('select * from employees;') res = cursor.fetchall() print(res) cursor.close() session.close() if __name__ == '__main__': for i in range(10): threading.Thread(target=func).start()
使用: sqlalchemy.ext.declarative.declarative_base
来构造声明性类定义的基类。由于sqlalchemy内部大量使用了元编程,为实例化的子类注入映射所需的属性,因此咱们定义的映射要继承自它(必须继承)
通常只须要一个这样的基类
Base = sqlalchemy.ext.declarative.declarative_base() # 或者 from sqlalchemy.ext import declarative Base = declarative.declarative_base()
现数据库存在以下表
CREATE TABLE `student` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(30) DEFAULT NULL, `age` int(11) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4;
建立对应的实体类:
import sqlalchemy from sqlalchemy.ext import declarative from sqlalchemy import Column, String, Integer db_url = 'mysql+pymysql://dahl:123456@10.0.0.13:3306/test' engine = sqlalchemy.create_engine(db_url, echo=True) Base = declarative.declarative_base() class Student(Base): __tablename__ = 'student' id = Column(Integer, primary_key=True, nullable=False, autoincrement=True) name = Column(String, default='Null') age = Column(Integer, default='Null') print(Student.__dict__) # Table('student', MetaData(bind=None), 这里没有绑定engine,因此是None # Column('id', Integer(), table=<student>, primary_key=True, nullable=False), # Column('name', String(), table=<student>, default=ColumnDefault('Null')), # Column('age', Integer(), table=<student>, default=ColumnDefault('Null')), schema=None), # Column('data',DateTime,default=datetime.datetime.now) # 这里不能加括号
type
:字段类型(好比String、Integer,这里是sqlalchemy包装的类型,对应的是数据库的varchar、int等),来自TypeEngine的子类autoincrement
:是否自增nullable
: 是否能够为空primary_key
: 主键注意:Column和String、Integer等都来自于sqlalchemy下的方法,要么直接导入,要么就使用sqlalchemy.String来引用。
类型名 | python中类型 | 说明 |
---|---|---|
Integer | int | 普通整数,通常是32位 |
SmallInteger | int | 取值范围小的整数,通常是16位 |
BigInteger | int或long | 不限制精度的整数 |
Float | float | 浮点数 |
Numeric | decimal.Decimal | 普通整数,通常是32位 |
String | str | 变长字符串 |
Text | str | 变长字符串,对较长或不限长度的字符串作了优化 |
Unicode | unicode | 变长Unicode字符串 |
UnicodeText | unicode | 变长Unicode字符串,对较长或不限长度的字符串作了优化 |
Boolean | bool | 布尔值 |
Date | datetime.date | 时间 |
Time | datetime.datetime | 日期和时间 |
LargeBinary | str | 二进制文件 |
经过咱们构建的类,来实例化的对象,在未来就是数据库中的一条条记录。
student = Student() student.name = 'daxin' student.id = 1 student.age = 20 print(student) # <1 daxin 20>
咱们本身写的类都是继承自Base,每继承一次Base类,在Base类的metadata属性中就会记录当前子类,metadata提供了方法用于删除/建立表。若是数据库中已经存在对应的表,那么将不会继续建立
Base = declarative.declarative_base() Base.metadata.create_all(bind=engine) # 须要经过引擎去执行 # 下面是engine的echo为true时的输出信息 # 2019-03-16 16:53:45,922 INFO sqlalchemy.engine.base.Engine # CREATE TABLE hello ( # id INTEGER NOT NULL AUTO_INCREMENT, # name VARCHAR(24), # age INTEGER, # PRIMARY KEY (id) # ) # # # 2019-03-16 16:53:45,922 INFO sqlalchemy.engine.base.Engine {} # 2019-03-16 16:53:45,926 INFO sqlalchemy.engine.base.Engine COMMIT # 2019-03-16 16:53:45,927 INFO sqlalchemy.engine.base.Engine # CREATE TABLE world ( # id INTEGER NOT NULL AUTO_INCREMENT, # name VARCHAR(24), # age INTEGER, # PRIMARY KEY (id) # ) # # # 2019-03-16 16:53:45,927 INFO sqlalchemy.engine.base.Engine {} # 2019-03-16 16:53:45,928 INFO sqlalchemy.engine.base.Engine COMMIT
注意:sqlalchemy 只能建立和删除表,不能修改表结构。只能手动的在数据库中修改而后在代码中添加便可。
在一个会话中操做数据库,绘画创建在链接上,链接被引擎管理,当第一次使用数据库时,从引擎维护的链接池中取出一个链接使用。
from sqlalchemy.orm import sessionmaker Session = sessionmaker(bind=engine) session = Session() # 实例化一个session对象
scoped_session
是sqlalchemy提供的线程安全的session,利用的是ThreadLocal实现的。
方法 | 含义 |
---|---|
add() | 增长一个对象 |
add_all() | 增长多个对象,类型为可迭代 |
import sqlalchemy from sqlalchemy.ext import declarative from sqlalchemy import Column, String, Integer from sqlalchemy.orm import sessionmaker db_url = 'mysql+pymysql://dahl:123456@10.0.0.13:3306/test' engine = sqlalchemy.create_engine(db_url, echo=True) Base = declarative.declarative_base() DBSession = sessionmaker(bind=engine) session = DBSession() class Student(Base): __tablename__ = 'student' id = Column(Integer, primary_key=True, nullable=False, autoincrement=True) name = Column(String(24), default='Null') age = Column(Integer, default='Null') def __repr__(self): return '<{} {} {}>'.format( self.id, self.name, self.age ) daxin = Student(id=12, name='daxin', age=20) session.add(daxin) dachenzi = Student(id=13,name='dachenzi', age=21) xiaobai = Student(id=14,name='xiaobai', age=22) session.add_all((dachenzi,xiaobai)) # 新增三条数据 session.commit()
须要注意的是下面的状况:
daxin = Student(name='daxin') daxin.age = 40 session.add(daxin) # 1 session.commit() daxin.age = 20 session.add(daxin) # 2 session.commit() daxin.age = 10 session.add_all([daxin, daxin, daxin, daxin]) # 3 session.commit() # <30 daxin 10>
结果生成个1条数据,<30 daxin 10>,为何呢?因为id属于自增列,咱们在执行#1时,id是没有固定下来的。
执行时,因为都是daxin的,id,name,age都没有改变,因此只会执行1条语句
当engine的echo等于true时,看到具体的sql语句,一切就很明白了。
使用session的query方法进行简单查询,格式为:
session.query(student)
: 等同于select * from student;session.query(student).get(2)
: 等同于select * from student where id = 2,这里的get方法只能主键查询std_list = session.query(Student) print(std_list) # SELECT student.id AS student_id, student.name AS student_name, student.age AS student_age FROM student print(type(std_list)) # <class 'sqlalchemy.orm.query.Query'>
这里直接打印并不会结果,由于它太懒了,你不迭代它,它就不会真的去数据库查询。
std_list = session.query(Student) for std in std_list: print(std) # 经过get来过滤主键,是能够直接执行返回结果的。 std_list = session.query(Student).get(30) print(std_list)
修改的数据的流程分为两步:
std = session.query(Student).get(30) print(std) # <30 daxin 200> std.age = 1000 session.add(std) session.commit() std = session.query(Student).get(30) print(std) # <30 daxin 1000> # 或者 std = session.query(Student).filter(Student.id > 10).update({'name':'daxin'}) std.commit()
大部分ORM是都是这样,必须先查才能改。
在原有数据的基础上批量修改,好比在全部名称后面添加特定的后缀
session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False) # 必须为synchronize_session=False session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate") # 必须synchronize_session="evaluate",表示要进行计算
使用session.delete来删除数据
std = session.query(Student).get(30) session.delete(std) session.commit() # 提交删除操做 std = session.query(Student).get(30) print(std) # None
当咱们建立一条数据,或是从数据库中获取一条数据时,数据自己是存在一个状态属性的,用来标识当前数据是否持久化,或者其余状态,在sqlalchemy中,inspect(obj)
能够用来窥探数据(obj)的状态
daxin = Student(name='daxin') daxin.age = 10000 state = sqlalchemy.inspect(daxin) print(state) # <sqlalchemy.orm.state.InstanceState object at 0x00000186EEC38EB8> std = session.query(Student).get(28) state = sqlalchemy.inspect(std) print(state) # <sqlalchemy.orm.state.InstanceState object at 0x00000186EFDCDA20>
咱们看到,inspect返回的是一个InstanceState对象。这个对象有如下几个属性:
key:key是多少
InstanceState对象,能够经过sqlalchemy.orm.state导入
具体的状态信息与含义以下:
状态 | 说明 |
---|---|
transient |
实体类还没有加入到session中,同时并无保存到数据库中 |
pending |
transient的实体被加入到session中,状态切换到pending,但它尚未被flsh到数据库中 |
persistent |
session中的实体对象对应着数据库中的真实记录。pending状态在提交成功后能够变成persistent状态,或者查询成功返回的实体也是persistent状态 |
deleted |
实体被删除且已经flush但未commit完成。事物提交成功了,实体变成detached,事物失败返回persistent状态 |
detached |
删除成功的实体进入这个状态 |
因此数据的状态变化以下:
commit()之后,变为detached,提交失败,回退到persistent状态
flush()方法,主动把改变应用到数据库中去
删除、修改操做,须要对应一个真实存在的数据,也就是说数据的状态是persistent才行。当使用add语句新增信息时,若是这个对象已经添加过数据库了,那么它的状态会变为persistent,若是对persistent的数据进行修改继续提交的话,那么使用的将会是update语句而非insert。这也是前面为啥屡次对一个数据进行add,提交了屡次只会插入1次的缘由。
import sqlalchemy from sqlalchemy.ext import declarative from sqlalchemy import Column, String, Integer from sqlalchemy.orm import sessionmaker db_url = 'mysql+pymysql://dahl:123456@10.0.0.13:3306/test' engine = sqlalchemy.create_engine(db_url, echo=True) Base = declarative.declarative_base() DBSession = sessionmaker(bind=engine) session = DBSession() class Student(Base): __tablename__ = 'student' id = Column(Integer, primary_key=True, nullable=False, autoincrement=True) name = Column(String(24), default='Null') age = Column(Integer, default='Null') def __repr__(self): return '<{} {} {}>'.format( self.id, self.name, self.age ) def getstate(state: InstanceState): print(""" session_id={} transient={} _attached={} pending={} persistent={} deleted={} detached={} """.format(state.session_id, state.transient, state._attached, state.pending, state.persistent, state.deleted, state.detached)) daxin = Student(name='daxin') daxin.age = 10000 state = sqlalchemy.inspect(daxin) getstate(state) # session_id=None transient=True _attached=False pending=False persistent=False deleted=False detached=False session.add(daxin) getstate(state) # session_id=1 transient=False _attached=True pending=True persistent=False deleted=False detached=False session.commit() getstate(state) # session_id=1 transient=False _attached=True pending=False persistent=True deleted=False detached=False std = session.query(Student).get(28) state = sqlalchemy.inspect(std) getstate(state) # session_id=1 transient=False _attached=True pending=False persistent=True deleted=False detached=False std = session.query(Student).get(31) session.delete(std) state = sqlalchemy.inspect(std) getstate(state) # session_id=1 transient=False _attached=True pending=False persistent=True deleted=False detached=False session.flush() getstate(state) # session_id=1 transient=False _attached=True pending=False persistent=False deleted=True detached=False session.commit() getstate(state) # session_id=None transient=False _attached=False pending=False persistent=False deleted=False detached=True
在数据库的字段类型中,好比性别字段,咱们可能会限制数据来源为M(male),F(Female),这个时候字段类型能够是枚举的,可是在sqlalchemy中,原生的字段类型没有枚举类型,那么就须要借助enum类了。
import sqlalchemy from sqlalchemy.ext import declarative from sqlalchemy import Column, String, Integer from sqlalchemy.orm import sessionmaker import enum db_url = 'mysql+pymysql://dahl:123456@10.0.0.13:3306/test' engine = sqlalchemy.create_engine(db_url, echo=True) Base = declarative.declarative_base() DBSession = sessionmaker(bind=engine) session = DBSession() class GenderEnum(enum.Enum): M = 'M' F = 'F' class Student(Base): __tablename__ = 'student' id = Column(Integer, primary_key=True, nullable=False, autoincrement=True) name = Column(String(24), default='Null') age = Column(Integer, default='Null') gender = Column(sqlalchemy.Enum(GenderEnum),nullable=False) def __repr__(self): return '<{} {} {}>'.format( self.id, self.name, self.age )
用起来很麻烦,因此建议性别使用1或者0来存储,显示的时候作对于转换便可
使用filter方法进行条件过滤查询:
session.query(student).filter(student.id > 10)
:至关于select * from student where student.id > 10
同时还存在一个filter_by,它的不一样之处在于括号中的不是表达式,而是参数。好比:filter(user.id == 10) -> filter_by(user.id = 10)
where条件中的关系:
AND
(与) 对应 and_
OR
(或) 对应 or_
not
(非) 对应 not_
in
对应字段的 in_
not in
对应字段的 notin_
like
对应 字段的like方法not like
对应 字段的notlike方法想要使用与或非,须要先行导入
import sqlalchemy from sqlalchemy.ext import declarative from sqlalchemy import Column, String, Integer from sqlalchemy.orm import sessionmaker from sqlalchemy import and_, or_, not_ db_url = 'mysql+pymysql://dahl:123456@10.0.0.13:3306/test' engine = sqlalchemy.create_engine(db_url, echo=True) Base = declarative.declarative_base() DBSession = sessionmaker(bind=engine) session = DBSession() class Student(Base): __tablename__ = 'student' id = Column(Integer, primary_key=True, nullable=False, autoincrement=True) name = Column(String(24), default='Null') age = Column(Integer, default='Null') def __repr__(self): return '<{} {} {}>'.format( self.id, self.name, self.age ) def getresulte(stds): for std in stds: print(std)
条件判断之AND
(&,and_)
# and std_list = session.query(Student).filter(and_(Student.id < 30, Student.id > 27)) getresulte(std_list) std_list = session.query(Student).filter(Student.id < 30).filter(Student.id > 27) # filter返回的仍是一个结果集,因此还能够继续使用filter进行过滤 getresulte(std_list) std_list = session.query(Student).filter(Student.id < 30, Student.age > 100) # 多个条件一块儿写,也是and的关系 getresulte(std_list) std_list = session.query(Student).filter((Student.name == 'daxin') & (Student.age > 28)) getresulte(std_list)
条件判断之OR
(or_,|)
# or std_list = session.query(Student).filter(or_(Student.id > 27, Student.age < 50)) getresulte(std_list) std_list = session.query(Student).filter((Student.id > 27) | (Student.age < 50 )) getresulte(std_list)
条件判断之NOT
(not_,~)
# not std_list = session.query(Student).filter(not_(Student.id == 32)) getresulte(std_list) std_list = session.query(Student).filter(~(Student.id == 32)) getresulte(std_list)
like
和in
及not in
# like std_list = session.query(Student).filter(Student.name.like('da%')) getresulte(std_list) # not like std_list = session.query(Student).filter(Student.name.notlike('da%')) getresulte(std_list) # in std_list = session.query(Student).filter(Student.age.in_([10,30,50])) getresulte(std_list) # not in std_list = session.query(Student).filter(Student.age.notin_([10,30,50])) getresulte(std_list)
补充:
r6 = session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(Users.id).all() # 传参的方式查询 r7 = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='ed').all() # 用的很少
# 升序 std_list = session.query(Student).filter(Student.name.like('da%')).order_by(Student.age) getresulte(std_list) # 降序 std_list = session.query(Student).filter(Student.name.like('da%')).order_by(Student.age.desc()) getresulte(std_list)
# limit std_list = session.query(Student).filter(Student.name.like('da%')).order_by(Student.age.desc()).limit(3).offset(2) getresulte(std_list) # select * from Student where name like 'da%' order_by age desc limit 3 offset 2
# count std_list = session.query(Student) print(std_list.count()) # first std_list = session.query(Student).first() print(std_list)
first本质上就是limit。
sqlalchemy一样提供了聚合方法,使用sqlalchemy.func来调用
func提供的方法有
# max std_list = session.query(Student.name,sqlalchemy.func.max(Student.age)) # select name,max(age) from Student; getresulte(std_list) # count std_list = session.query(Student.name,sqlalchemy.func.count(Student.id)).group_by(Student.name) # SELECT name, count(id) FROM student GROUP BY name getresulte(std_list)
sqlalchemy提供ForeignKey用来进行外键关联,它的格式为:
sqlalchemy.ForeignKey(表名.字段名,ondelete='更新规则') # 若是填写映射后的class,那么能够直接写:类.字段 # 若是填写数据库中的表,那么须要使用引号:'数据库表名.字段名'
更新规则和删除规则,可选项以下:
CASCADE
:级联删除,删除被关联数据时,从表关联的数据所有删除。SET NULL
:从父表删除或更新行,会设置子表中的外键列为NULL,但必须保证子表没有指定 NOT NULL,也就是说子表的字段能够为NULL才行。RESTRICT
:若是从父表删除主键,若是子表引用了,则拒绝对父表的删除或更新操做。(保护数据)NO ACTION
:表中SQL的关键字,在MySQL中与RESTRICT相同。拒绝对父表的删除或更新操做。现有以下关系表
CREATE TABLE `departments` ( `dept_no` char(4) NOT NULL, `dept_name` varchar(40) NOT NULL, PRIMARY KEY (`dept_no`), UNIQUE KEY `dept_name` (`dept_name`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; CREATE TABLE `employees` ( `emp_no` int(11) NOT NULL, `birth_date` date NOT NULL, `first_name` varchar(14) NOT NULL, `last_name` varchar(16) NOT NULL, `gender` enum('M','F') NOT NULL, `hire_date` date NOT NULL, PRIMARY KEY (`emp_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; CREATE TABLE `dept_emp` ( `emp_no` int(11) NOT NULL, `dept_no` char(4) NOT NULL, `from_date` date NOT NULL, `to_date` date NOT NULL, PRIMARY KEY (`emp_no`,`dept_no`), KEY `dept_no` (`dept_no`), CONSTRAINT `dept_emp_ibfk_1` FOREIGN KEY (`emp_no`) REFERENCES `employees` (`emp_no`) ON DELETE CASCADE, CONSTRAINT `dept_emp_ibfk_2` FOREIGN KEY (`dept_no`) REFERENCES `departments` (`dept_no`) ON DELETE CASCADE ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
建立对应的映射实体类
import sqlalchemy from sqlalchemy.ext import declarative from sqlalchemy import Column, String, Integer, Date from sqlalchemy.orm import sessionmaker import enum db_url = 'mysql+pymysql://dahl:123456@10.0.0.13:3306/test' engine = sqlalchemy.create_engine(db_url, echo=True) Base = declarative.declarative_base() DBSession = sessionmaker(bind=engine) session = DBSession() class Departments(Base): __tablename__ = 'departments' dept_no = Column(String(4), nullable=False, primary_key=True) dept_name = Column(String(40), nullable=False, unique=True) def __repr__(self): return '<{} {} {}>'.format(self.__class__.__name__, self.dept_no, self.dept_name) class GenderEnum(enum.Enum): M = 'M' F = 'F' class Employees(Base): __tablename__ = 'employees' emp_no = Column(Integer, nullable=False, primary_key=True) birth_date = Column(Date, nullable=False) first_name = Column(String(14), nullable=False) last_name = Column(String(16), nullable=False) gender = Column(sqlalchemy.Enum(GenderEnum), nullable=False) hire_date = Column(Date, nullable=False) def __repr__(self): return '<{} {} {} {} {} {}>'.format( self.__class__.__name__, self.emp_no, self.birth_date, self.first_name, self.last_name, self.gender, self.hire_date ) class Dept_emp(Base): __tablename__ = 'dept_emp' emp_no = Column(Integer, sqlalchemy.ForeignKey(Employees.emp_no, ondelete='CASCADE'), primary_key=True, ) dept_no = Column(String(4), sqlalchemy.ForeignKey(Departments.dept_no, ondelete='CASCADE'), nullable=False) from_date = Column(Date, nullable=False) to_date = Column(Date, nullable=False) def __repr__(self): return '<{} emp_no={} dept_no={}>'.format( self.__class__.__name__, self.emp_no, self.dept_no ) def getres(emps): for emp in emps: print(emp)
查询10010员工所在的部门编号及员工信息
emps = session.query(Employees, Dept_emp).filter(and_(Employees.emp_no == Dept_emp.emp_no, Employees.emp_no == '10010')).all() getres(emps) emps = session.query(Employees, Dept_emp).filter(Employees.emp_no == Dept_emp.emp_no).filter(Employees.emp_no == '10010').all() getres(emps) # 至关于:select * from employees,dept_emp where employees.emp_no = dept_emp.emp_no and employees.emp_no = '10010';
使用join()关键字来进行连表查询,其isouter参数用于指定join的类型,默认状况下使用的是inner join,当isouter=True时,就表示是left join(right join没有实现,须要本身交换前面表的顺序)
# join链接 std_list = session.query(Employees).join(Dept_emp,Employees.emp_no == Dept_emp.emp_no,isouter=True).filter(Employees.emp_no == '10010') getres(std_list) std_list = session.query(Employees).join(Dept_emp).filter((Employees.emp_no == Dept_emp.emp_no) & (Employees.emp_no == '10010')) getres(std_list) # 若是query中,仅列出一个代表,至关于 # select employees.* from employees inner join dept_emp on employees.emp_no = dept_emp.emp_no where employees.emp_no = '10010';
一对可能是一种表与表的关系,在orm中建立和使用方式有一写特色,这里单独描述
经过ForeignKey来建立一对多关系,须要注意的是它内部须要填写对应的真实的表名和字段(非映射的类对象)
class Deptment(base): __tablename__ = 'deptment' id = Column(Integer, autoincrement=True, primary_key=True) dep_name = Column(String(32), nullable=False) class User(base): __tablename__ = 'user' id = Column(Integer, autoincrement=True, primary_key=True) name = Column(String(32), nullable=False) dept_id = Column(Integer, ForeignKey('deptment.id')) # CREATE TABLE `user` ( # `id` int(11) NOT NULL AUTO_INCREMENT, # `name` varchar(32) NOT NULL, # `dept_id` int(11) DEFAULT NULL, # PRIMARY KEY (`id`), # KEY `dept_id` (`dept_id`), # CONSTRAINT `user_ibfk_1` FOREIGN KEY (`dept_id`) REFERENCES `deptment` (`id`) # ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; # CREATE TABLE `deptment` ( # `id` int(11) NOT NULL AUTO_INCREMENT, # `dep_name` varchar(32) NOT NULL, # PRIMARY KEY (`id`) # ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
添加部门数据时,只能使用id来插入,当插入的dept_id在deptment表中不存在时,会直接爆出异常
# 批量添加部门数据 session.add_all([ Deptment(dep_name='运维部'), Deptment(dep_name='开发部'), Deptment(dep_name='产品部'), Deptment(dep_name='测试部')] ) # 添加用户数据 session.add_all([ User(name='daxin', dept_id=1), User(name='dachenzi', dept_id=2), User(name='dahl', dept_id=3)] ) session.commit()
在表中使用relationship来建立一个关联的对象,便于查询(和django的一对多隐含的对象相同,但在sqlalchemy中必须经过relationship才能够生成),并不会生成新的字段,仅仅是产生一个关联关系。
注意relationship对象不能做为条件直接进行表查询:session.query(User.name,User.deptment.dept_name) 这样是不行的。
relationship(obj,backref='')
下面是一个例子:
class Deptment(base): __tablename__ = 'deptment' id = Column(Integer, autoincrement=True, primary_key=True) dep_name = Column(String(32), nullable=False) class User(base): __tablename__ = 'user' id = Column(Integer, autoincrement=True, primary_key=True) name = Column(String(32), nullable=False) dept_id = Column(Integer, ForeignKey('deptment.id')) deptment = relationship(Deptment, backref='user') # 建立relationship关系 # 正向查(经过User查deptment) std = session.query(User).filter(User.id == 1).first() print(std.name) print(std.deptment.dep_name) # 直接经过deptment对象,读取Deptment表中对于的信息 # 反向查(经过depement查User) dep = session.query(Deptment).filter(Deptment.id == 1).first() print(dep.user[0].name) # 反向查,经过user获取到的数据是一个列表
relationship,就是经过在一个映射类中增长一个属性,该属性用于表示链接关系,能够在结果中,访问该属性来访问关联的表信息
存在relationship的映射关系时,咱们添加数据时,就能够经过relationship使用对象来添加关联数据了
dep = session.query(Deptment).filter(Deptment.id == 3).first() session.add_all([ User(name='hello', deptment=dep), User(name='world', deptment=dep)] ) session.commit()
直接建立新的部门对象也是能够的
user = User(name='dahlhin',deptment=Deptment(dep_name='管理员')) session.add(user) session.commit()
和django不一样sqlalchemy的第三张表须要手动建立。
模拟主机与业务线的归属问题。
这是一个典型的多对多关系,须要额外一张关系表来记录。
class Service2host(base): __tablename__ = 'service_to_host' id = Column(Integer, primary_key=True, nullable=False, unique=True, autoincrement=True) s_id = Column(Integer, ForeignKey('service.id')) h_id = Column(Integer, ForeignKey('host.id')) __table_args__ = ( # 联合惟一索引 UniqueConstraint('s_id', 'h_id', name='serivce_to_host'), ) # 业务id和主机id不能重复,这里建立惟一索引来约束 class Service(base): __tablename__ = 'service' id = Column(Integer, primary_key=True, nullable=False, unique=True, autoincrement=True) ser_name = Column(String(16), nullable=False) class Host(base): __tablename__ = 'host' id = Column(Integer, primary_key=True, nullable=False, unique=True, autoincrement=True) hostname = Column(String(16), nullable=False) datetime = Column(DateTime, default=datetime.datetime.now, nullable=False) # CREATE TABLE `host` ( # `id` int(11) NOT NULL AUTO_INCREMENT, # `hostname` varchar(16) NOT NULL, # `datetime` date NOT NULL, # PRIMARY KEY (`id`), # UNIQUE KEY `id` (`id`) # ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; # CREATE TABLE `service` ( # `id` int(11) NOT NULL AUTO_INCREMENT, # `ser_name` varchar(16) NOT NULL, # PRIMARY KEY (`id`), # UNIQUE KEY `id` (`id`) # ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; # CREATE TABLE `service_to_host` ( # `id` int(11) NOT NULL AUTO_INCREMENT, # `s_id` int(11) DEFAULT NULL, # `h_id` int(11) DEFAULT NULL, # PRIMARY KEY (`id`), # UNIQUE KEY `id` (`id`), # UNIQUE KEY `serivce_to_host` (`s_id`,`h_id`), # KEY `h_id` (`h_id`), # CONSTRAINT `service_to_host_ibfk_1` FOREIGN KEY (`s_id`) REFERENCES `service` (`id`), # CONSTRAINT `service_to_host_ibfk_2` FOREIGN KEY (`h_id`) REFERENCES `host` (`id`) # ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
若是没有relationship关联对象,那么咱们将须要分别操做三张表,使用relationship将会大大简化这个过程,那么在Service中建立关系:
hosts = relationship('host', secondary='service_to_host', backref='services')
建立后:
# 添加一个业务,并建立几台主机 session.add( Service(ser_name='运营', hosts=[ Host(hostname='openstack'), Host(hostname='nginx') ]) ) # 添加一个主机,并关联几个业务线 service = session.query(Service).filter(or_(Service.ser_name == '运营', Service.ser_name == '运维')).all() session.add( Host(hostname='Tomcat', services=service) ) # 查询一个业务线下都有哪些主机 service = session.query(Service).filter(Service.ser_name == '运营').first() for host in service.hosts: print(host.id, host.hostname) # 查询一台主机都归属哪些业务线 host = session.query(Host).filter(Host.hostname == 'openstack').first() for servcie in host.services: print(service.id, service.ser_name)
当咱们启动多线程来执行数据库操做时,每一个线程都会用到session来执行sql语句,通常状况下,咱们会在线程内不建立新的session来执行sql语句,下面是一个利用session完成原生sql的执行。
import threading from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker engine = create_engine( 'mysql+pymysql://dahl:123456@10.0.0.10:3306/test', max_overflow=5, pool_size=1, pool_timeout=30, pool_recycle=-1 ) DBsession = sessionmaker(bind=engine) def func(): session = DBsession() cursor = session.execute('show tables') res = cursor.fetchall() print(res) cursor.close() session.close() if __name__ == '__main__': for i in range(10): threading.Thread(target=func).start()
这里会产生一个问题,每一个线程启动时都会建立一个session,用完又会关闭,这样太麻烦了,这里可使用scoped_session对象来完成,它相似与threading.Local的实现方式。做用是,当线程调用scoped_session对象的功能时,好比各类sql查询,在其内部会为每一个线程建立一个新的session对象,让它来使用。
# 1 引入scoped_session from sqlalchemy.orm import scoped_session # 2 全局建立session对象 session = scoped_session(DBsession) # 3 多进程内直接使用 def func(): cursor = session.execute('show tables') res = cursor.fetchall() print(res) cursor.close() session.remove() # 使用完毕须要remove
不须要关闭,直接使用scoped_session对象的remove方法便可。
实现过程:
源码以下:
# scoping def instrument(name): def do(self, *args, **kwargs): return getattr(self.registry(), name)(*args, **kwargs) return do for meth in Session.public_methods: setattr(scoped_session, meth, instrument(meth)) # registry是ThreadLocalRegistry对象 self.registry = ThreadLocalRegistry(session_factory) # ThreadLocalRegistry对象内部经过threading.local实现的 def __init__(self, createfunc): self.createfunc = createfunc # Session对象 self.registry = threading.local() # registry()其实调用的就是__call__方法 def __call__(self): try: return self.registry.value except AttributeError: val = self.registry.value = self.createfunc() # 第一次执行,就会实例化一个Session对象 return val
当使用join语句连表查询时,不免会碰到两个表重名的字段,这里就可使用label
来对字段进行别名显示。
stds = session.query(User.name.label('username'), User.id, Deptment.dep_name).join(Deptment).all() for std in stds: print(std.username, std.id, std.dep_name)