42 - 数据库-orm-SQLAlchemy

1 ORM

Object-Relational Mapping,把关系数据库的表结构映射到对象上。使用面向对象的方式来操做数据库。python

orm

下面是一个关系模型与Python对象之间的映射关系:mysql

  • table --> class : 表映射为类
  • row --> object : 行映射为实例
  • column --> property : 字段映射为属性

2 sqlalchemy

SQLAlchemy是一个ORM框架。内部是使用了链接池来管理数据库链接。其自己只是作了关系映射,不能链接数据库,也不能执行sql语句,它在底层须要使用pymysql等模块来链接并执行sql语句,要使用sqlalchemy,那么须要先进行安装:nginx

pip3 install sqlalchemy

查看版本sql

In [1]: import sqlalchemy
In [2]: print(sqlalchemy.__version__)
1.3.1

3 基本使用

先来总结一下使用sqlalchemy框架操做数据库的通常流程:数据库

  1. 建立引擎(不一样类型数据库使用不一样的链接方式)
  2. 建立基类(类对象要继承,由于基类会利用元编程为咱们的子类绑定关于表的其余属性信息)
  3. 建立实体类(用来对应数据库中的表)
  4. 编写实体类属性(用来对应表中的字段/属性)
  5. 建立表(若是表不存在,则须要执行语句在数据库中建立出对应的表)
  6. 实例化(具体的一条record记录)
  7. 建立会话session(用于执行sql语句的链接)
  8. 使用会话执行SQL语句
  9. 关闭会话

3.1 建立链接

sqlalchemy 使用引擎管理数据库链接(DATABASE URLS),链接的通常格式为:django

dialect+driver://username:password@host:port/database
  • dialect:表示什么数据库(好比,mysql,sqlite,oracle等)
  • driver:用于链接数据库的模块(好比pymysql,mysqldb等)
  • username:链接数据库的用户名
  • password:链接数据库的密码
  • host: 数据库的主机地址
  • port: 数据库的端口
  • database: 要链接的数据库名称

pymysql模块是较长用于链接mysql的模块,使用pymysql的链接的语句为:编程

  • mysql+pymysql://dahl:123456@10.0.0.13:3306/test

建立引擎用于进行数据库的链接:create_engine(urls)安全

import sqlalchemy

db_url = 'mysql+pymysql://dahl:123456@10.0.0.13:3306/test'
engine = sqlalchemy.create_engine(db_url,echo=True)
  • echo:引擎是否打印执行的sql语句,等于True时,表示打印,便于调试
  • max_overflow=5: 超过链接池大小外最多建立的链接
  • pool_size=1: 链接池大小
  • pool_timeout=30: 池中没有线程最多等待的时间(不然会报错)
  • pool_recycle=-1: 多久以后对线程池中的线程进行一次链接的回收(重置)

特别注意:建立引擎并不会立刻链接数据库,直到让数据库执行任务是才链接。session

3.1.1 利用链接池执行sql

sqlalchemy内部是原生支持链接池的,咱们能够仅仅利用它的链接池功能。经过engie的raw_connection方法就能够获取到一个链接,而后就能够执行sql语句了(基本上就是Pymysql+DBUtils的实现)

import threading
import time
import sqlalchemy
import pymysql

engine = sqlalchemy.create_engine(
    'mysql+pymysql://dahl:123456@10.0.0.13:3306/test',
    max_overflow=5,
    pool_size=1,
    pool_timeout=30,
    pool_recycle=-1
)

def func():
    conn = engine.raw_connection()
    cursor = conn.cursor(pymysql.cursors.DictCursor)
    time.sleep(2)
    cursor.execute('select * from employees;')
    res = cursor.fetchall()
    print(res)
    cursor.close()
    conn.close()

if __name__ == '__main__':
    for i in range(10):
        threading.Thread(target=func).start()

3.1.2 利用session来执行sql

上面是经过连接池来执行sql的,其实也能够经过session来执行。

import threading
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

engine = sqlalchemy.create_engine(
    'mysql+pymysql://dahl:123456@10.0.0.13:3306/test',
    max_overflow=5,
    pool_size=1,
    pool_timeout=30,
    pool_recycle=-1
)

DBsession = sessionmaker(bind=engine)

def func():
    session = DBsession()
    cursor = session.execute('select * from employees;')
    res = cursor.fetchall()
    print(res)
    cursor.close()
    session.close()

if __name__ == '__main__':
    for i in range(10):
        threading.Thread(target=func).start()

3.2 建立基类

        使用: sqlalchemy.ext.declarative.declarative_base 来构造声明性类定义的基类。由于sqlalchemy内部大量使用了元编程,为实例化的子类注入映射所需的属性,因此咱们定义的映射要继承自它(必须继承)

通常只须要一个这样的基类

Base = sqlalchemy.ext.declarative.declarative_base()
# 或者
from sqlalchemy.ext import declarative
Base = declarative.declarative_base()

3.3 建立实体类

现数据库存在以下表

CREATE TABLE `student` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(30) DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4;

建立对应的实体类:

import sqlalchemy
from sqlalchemy.ext import declarative
from sqlalchemy import Column, String, Integer

db_url = 'mysql+pymysql://dahl:123456@10.0.0.13:3306/test'
engine = sqlalchemy.create_engine(db_url, echo=True)

Base = declarative.declarative_base()

class Student(Base):

    __tablename__ = 'student'

    id = Column(Integer, primary_key=True, nullable=False, autoincrement=True)
    name = Column(String, default='Null')
    age = Column(Integer, default='Null')

print(Student.__dict__)

# Table('student', MetaData(bind=None),   这里没有绑定engine,因此是None
# Column('id', Integer(), table=<student>, primary_key=True, nullable=False), 
# Column('name', String(), table=<student>, default=ColumnDefault('Null')), 
# Column('age', Integer(), table=<student>, default=ColumnDefault('Null')), schema=None),
# Column('data',DateTime,default=datetime.datetime.now)  # 这里不能加括号
  • __tablename__: 代表,若是表已存在,则必须指定正确的表名
  • Column: 用于构建一个列对象,它的参数通常都和数据库中的列属性是对应的,主要有:
    • name: 数据库中表示的此列的名称
    • type:字段类型(好比String、Integer,这里是sqlalchemy包装的类型,对应的是数据库的varchar、int等),来自TypeEngine的子类
    • autoincrement:是否自增
    • default:默认值,能够是值,可调用对象或者类,当写入数据该字段没有指定时,调用。当是可调用对象的时候,建议不要加括号
    • doc:字段说明信息
    • key: 一个可选的字符串标识符,用于标识表上的此Column对象。
    • index: 是否启用索引
    • nullable: 是否能够为空
    • onupdate: 若是在更新语句的SET子句中不存在此列,则将在更新时调用该值
    • primary_key: 主键
    • server_default:它的值是 FetchedValue实例、字符串、Unicode 或者 text()实例,用做DDL语句中该列的default值

注意:Column和String、Integer等都来自于sqlalchemy下的方法,要么直接导入,要么就使用sqlalchemy.String来引用。

3.3.1 经常使用字段

类型名 python中类型 说明
Integer int 普通整数,通常是32位
SmallInteger int 取值范围小的整数,通常是16位
BigInteger int或long 不限制精度的整数
Float float 浮点数
Numeric decimal.Decimal 普通整数,通常是32位
String str 变长字符串
Text str 变长字符串,对较长或不限长度的字符串作了优化
Unicode unicode 变长Unicode字符串
UnicodeText unicode 变长Unicode字符串,对较长或不限长度的字符串作了优化
Boolean bool 布尔值
Date datetime.date 时间
Time datetime.datetime 日期和时间
LargeBinary str 二进制文件

3.4 实例化

经过咱们构建的类,来实例化的对象,在未来就是数据库中的一条条记录。

student = Student()
student.name = 'daxin'
student.id = 1
student.age = 20
print(student)   # <1 daxin 20>

3.5 建立表

        咱们本身写的类都是继承自Base,每继承一次Base类,在Base类的metadata属性中就会记录当前子类,metadata提供了方法用于删除/建立表。若是数据库中已经存在对应的表,那么将不会继续建立

  • drop_all(bind=None, tables=None, checkfirst=True):删除metadata中记录的全部表
  • create_all(bind=None, tables=None, checkfirst=True):建立metadata中记录的全部表
Base = declarative.declarative_base()  
Base.metadata.create_all(bind=engine)  # 须要经过引擎去执行

# 下面是engine的echo为true时的输出信息
# 2019-03-16 16:53:45,922 INFO sqlalchemy.engine.base.Engine 
# CREATE TABLE hello (
#   id INTEGER NOT NULL AUTO_INCREMENT, 
#   name VARCHAR(24), 
#   age INTEGER, 
#   PRIMARY KEY (id)
# )
# 
# 
# 2019-03-16 16:53:45,922 INFO sqlalchemy.engine.base.Engine {}
# 2019-03-16 16:53:45,926 INFO sqlalchemy.engine.base.Engine COMMIT
# 2019-03-16 16:53:45,927 INFO sqlalchemy.engine.base.Engine 
# CREATE TABLE world (
#   id INTEGER NOT NULL AUTO_INCREMENT, 
#   name VARCHAR(24), 
#   age INTEGER, 
#   PRIMARY KEY (id)
# )
# 
# 
# 2019-03-16 16:53:45,927 INFO sqlalchemy.engine.base.Engine {}
# 2019-03-16 16:53:45,928 INFO sqlalchemy.engine.base.Engine COMMIT
  • 生产环境不多这样建立表,都是系统上线的时候由脚本生成。
  • 生产环境不多删除表,宁肯废除都不能删除。

注意:sqlalchemy 只能建立和删除表,不能修改表结构。只能手动的在数据库中修改而后在代码中添加便可。

3.6 建立会话Session

        在一个会话中操做数据库,绘画创建在链接上,链接被引擎管理,当第一次使用数据库时,从引擎维护的链接池中取出一个链接使用。

from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=engine) 
session = Session()  # 实例化一个session对象
  • session对象线程不安全。因此不一样线程应该使用不一样的session对象
  • Session类和engine有一个就好了。

scoped_session是sqlalchemy提供的线程安全的session,利用的是ThreadLocal实现的。

3.7 数据操做

3.7.1 增长数据

方法 含义
add() 增长一个对象
add_all() 增长多个对象,类型为可迭代
import sqlalchemy
from sqlalchemy.ext import declarative
from sqlalchemy import Column, String, Integer
from sqlalchemy.orm import sessionmaker

db_url = 'mysql+pymysql://dahl:123456@10.0.0.13:3306/test'
engine = sqlalchemy.create_engine(db_url, echo=True)

Base = declarative.declarative_base()
DBSession = sessionmaker(bind=engine)
session = DBSession()

class Student(Base):
    __tablename__ = 'student'

    id = Column(Integer, primary_key=True, nullable=False, autoincrement=True)
    name = Column(String(24), default='Null')
    age = Column(Integer, default='Null')

    def __repr__(self):
        return '<{} {} {}>'.format(
            self.id, self.name, self.age
        )

daxin = Student(id=12, name='daxin', age=20)
session.add(daxin)
dachenzi = Student(id=13,name='dachenzi', age=21)
xiaobai = Student(id=14,name='xiaobai', age=22)
session.add_all((dachenzi,xiaobai))   # 新增三条数据
session.commit()

须要注意的是下面的状况:

daxin = Student(name='daxin')
daxin.age = 40
session.add(daxin)  # 1
session.commit()   
daxin.age = 20  
session.add(daxin)  # 2
session.commit()
daxin.age = 10
session.add_all([daxin, daxin, daxin, daxin]) # 3
session.commit()  
# <30 daxin 10>

结果生成个1条数据,<30 daxin 10>,为何呢?因为id属于自增列,咱们在执行#1时,id是没有固定下来的。

  1. 执行后,commit,则daxin的id就固定下来了。
  2. 执行时,因为id没变,因此并不会新增数据,而是使用update语句更新了age字段
  3. 执行时,因为都是daxin的,id,name,age都没有改变,因此只会执行1条语句

    当engine的echo等于true时,看到具体的sql语句,一切就很明白了。

3.7.2 简单查询

使用session的query方法进行简单查询,格式为:

  • session.query(student): 等同于select * from student;
  • session.query(student).get(2): 等同于select * from student where id = 2,这里的get方法只能主键查询
std_list = session.query(Student)
print(std_list)        # SELECT student.id AS student_id, student.name AS student_name, student.age AS student_age FROM student
print(type(std_list))  # <class 'sqlalchemy.orm.query.Query'>

这里直接打印并不会结果,由于它太懒了,你不迭代它,它就不会真的去数据库查询。

std_list = session.query(Student)
for std in std_list:
    print(std)

# 经过get来过滤主键,是能够直接执行返回结果的。
std_list = session.query(Student).get(30)
print(std_list)

3.7.3 修改数据

修改的数据的流程分为两步:

  1. 查找匹配的数据
  2. 修改后,提交
std = session.query(Student).get(30)
print(std)  # <30 daxin 200>
std.age = 1000
session.add(std)
session.commit()
std = session.query(Student).get(30)
print(std)  # <30 daxin 1000>

# 或者
std = session.query(Student).filter(Student.id > 10).update({'name':'daxin'})
std.commit()

大部分ORM是都是这样,必须先查才能改。

在原有数据的基础上批量修改,好比在全部名称后面添加特定的后缀

session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False)  # 必须为synchronize_session=False
session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate")  # 必须synchronize_session="evaluate",表示要进行计算

3.7.4 删除数据(不建议)

使用session.delete来删除数据

std = session.query(Student).get(30)
session.delete(std)
session.commit()   # 提交删除操做
std = session.query(Student).get(30)
print(std)  # None

3.7.5 状态

当咱们建立一条数据,或是从数据库中获取一条数据时,数据自己是存在一个状态属性的,用来标识当前数据是否持久化,或者其余状态,在sqlalchemy中,inspect(obj)能够用来窥探数据(obj)的状态

daxin = Student(name='daxin')
daxin.age = 10000
state = sqlalchemy.inspect(daxin)
print(state)  # <sqlalchemy.orm.state.InstanceState object at 0x00000186EEC38EB8>
std = session.query(Student).get(28)
state = sqlalchemy.inspect(std)
print(state)  # <sqlalchemy.orm.state.InstanceState object at 0x00000186EFDCDA20>

咱们看到,inspect返回的是一个InstanceState对象。这个对象有如下几个属性:

  • session_id:获取数据时的session ID,若是是自建数据未持久化ID为None
  • _attached: 是不是附加的(数据已经加载(已add),就差提交的数据库了)
  • transient:是不是临时数据(临时建立,尚未提交(尚未add))
  • pending: 是不是待定的数据
  • persistent: 是不是持久的
  • deleted: 是否已删除
  • detached:是否被分离
  • key:key是多少

    InstanceState对象,能够经过sqlalchemy.orm.state导入

具体的状态信息与含义以下:

状态 说明
transient 实体类还没有加入到session中,同时并无保存到数据库中
pending transient的实体被加入到session中,状态切换到pending,但它尚未被flsh到数据库中
persistent session中的实体对象对应着数据库中的真实记录。pending状态在提交成功后能够变成persistent状态,或者查询成功返回的实体也是persistent状态
deleted 实体被删除且已经flush但未commit完成。事物提交成功了,实体变成detached,事物失败返回persistent状态
detached 删除成功的实体进入这个状态

因此数据的状态变化以下:

  1. 新建一个实体,状态是transient临时的
  2. add()之后,状态变为pending
  3. commit()之后,状态变为persistent
  4. 查询成功返回的实体状态也是persistent状态
  5. delete()并flush()之后,状态变为deleted
  6. commit()之后,变为detached,提交失败,回退到persistent状态

    flush()方法,主动把改变应用到数据库中去

        删除、修改操做,须要对应一个真实存在的数据,也就是说数据的状态是persistent才行。当使用add语句新增信息时,若是这个对象已经添加过数据库了,那么它的状态会变为persistent,若是对persistent的数据进行修改继续提交的话,那么使用的将会是update语句而非insert。这也是前面为啥屡次对一个数据进行add,提交了屡次只会插入1次的缘由。

import sqlalchemy
from sqlalchemy.ext import declarative
from sqlalchemy import Column, String, Integer
from sqlalchemy.orm import sessionmaker

db_url = 'mysql+pymysql://dahl:123456@10.0.0.13:3306/test'
engine = sqlalchemy.create_engine(db_url, echo=True)

Base = declarative.declarative_base()
DBSession = sessionmaker(bind=engine)
session = DBSession()

class Student(Base):
    __tablename__ = 'student'

    id = Column(Integer, primary_key=True, nullable=False, autoincrement=True)
    name = Column(String(24), default='Null')
    age = Column(Integer, default='Null')

    def __repr__(self):
        return '<{} {} {}>'.format(
            self.id, self.name, self.age
        )

def getstate(state: InstanceState):
    print("""
    session_id={} transient={} _attached={} pending={} persistent={} deleted={} detached={}
    """.format(state.session_id, state.transient, state._attached, state.pending, state.persistent, state.deleted,
               state.detached))

daxin = Student(name='daxin')
daxin.age = 10000
state = sqlalchemy.inspect(daxin)
getstate(state)  # session_id=None transient=True _attached=False pending=False persistent=False deleted=False detached=False

session.add(daxin)
getstate(state)  # session_id=1 transient=False _attached=True pending=True persistent=False deleted=False detached=False

session.commit()
getstate(state)  # session_id=1 transient=False _attached=True pending=False persistent=True deleted=False detached=False

std = session.query(Student).get(28)
state = sqlalchemy.inspect(std)
getstate(state)  # session_id=1 transient=False _attached=True pending=False persistent=True deleted=False detached=False

std = session.query(Student).get(31)
session.delete(std)
state = sqlalchemy.inspect(std)
getstate(state)  # session_id=1 transient=False _attached=True pending=False persistent=True deleted=False detached=False

session.flush()
getstate(state)  # session_id=1 transient=False _attached=True pending=False persistent=False deleted=True detached=False

session.commit()
getstate(state)  # session_id=None transient=False _attached=False pending=False persistent=False deleted=False detached=True

3.7.6 枚举字段

        在数据库的字段类型中,好比性别字段,咱们可能会限制数据来源为M(male),F(Female),这个时候字段类型能够是枚举的,可是在sqlalchemy中,原生的字段类型没有枚举类型,那么就须要借助enum类了。

import sqlalchemy
from sqlalchemy.ext import declarative
from sqlalchemy import Column, String, Integer
from sqlalchemy.orm import sessionmaker
import enum

db_url = 'mysql+pymysql://dahl:123456@10.0.0.13:3306/test'
engine = sqlalchemy.create_engine(db_url, echo=True)

Base = declarative.declarative_base()
DBSession = sessionmaker(bind=engine)
session = DBSession()

class GenderEnum(enum.Enum):
    M = 'M'
    F = 'F'

class Student(Base):
    __tablename__ = 'student'

    id = Column(Integer, primary_key=True, nullable=False, autoincrement=True)
    name = Column(String(24), default='Null')
    age = Column(Integer, default='Null')
    gender = Column(sqlalchemy.Enum(GenderEnum),nullable=False)

    def __repr__(self):
        return '<{} {} {}>'.format(
            self.id, self.name, self.age
        )

用起来很麻烦,因此建议性别使用1或者0来存储,显示的时候作对于转换便可

3.7.7 复杂查询

3.7.7.1 where条件查询

使用filter方法进行条件过滤查询:

  • session.query(student).filter(student.id > 10):至关于select * from student where student.id > 10

    同时还存在一个filter_by,它的不一样之处在于括号中的不是表达式,而是参数。好比:filter(user.id == 10) -> filter_by(user.id = 10)

where条件中的关系:

  • AND(与) 对应 and_
  • OR(或) 对应 or_
  • not(非) 对应 not_
  • in 对应字段的 in_
  • not in 对应字段的 notin_
  • like 对应 字段的like方法
  • not like 对应 字段的notlike方法

想要使用与或非,须要先行导入

import sqlalchemy
from sqlalchemy.ext import declarative
from sqlalchemy import Column, String, Integer
from sqlalchemy.orm import sessionmaker
from sqlalchemy import and_, or_, not_

db_url = 'mysql+pymysql://dahl:123456@10.0.0.13:3306/test'
engine = sqlalchemy.create_engine(db_url, echo=True)

Base = declarative.declarative_base()
DBSession = sessionmaker(bind=engine)
session = DBSession()

class Student(Base):
    __tablename__ = 'student'

    id = Column(Integer, primary_key=True, nullable=False, autoincrement=True)
    name = Column(String(24), default='Null')
    age = Column(Integer, default='Null')

    def __repr__(self):
        return '<{} {} {}>'.format(
            self.id, self.name, self.age
        )

def getresulte(stds):
    for std in stds:
        print(std)

条件判断之AND(&,and_)

# and
std_list = session.query(Student).filter(and_(Student.id < 30, Student.id > 27))
getresulte(std_list)

std_list = session.query(Student).filter(Student.id < 30).filter(Student.id > 27)   # filter返回的仍是一个结果集,因此还能够继续使用filter进行过滤
getresulte(std_list)

std_list = session.query(Student).filter(Student.id < 30, Student.age > 100)  # 多个条件一块儿写,也是and的关系
getresulte(std_list)

std_list = session.query(Student).filter((Student.name == 'daxin') & (Student.age > 28))
getresulte(std_list)

条件判断之OR(or_,|)

# or
std_list = session.query(Student).filter(or_(Student.id > 27, Student.age < 50))
getresulte(std_list)

std_list = session.query(Student).filter((Student.id > 27) | (Student.age < 50 ))
getresulte(std_list)

条件判断之NOT(not_,~)

# not
std_list = session.query(Student).filter(not_(Student.id == 32))
getresulte(std_list)

std_list = session.query(Student).filter(~(Student.id == 32))
getresulte(std_list)

likeinnot in

# like
std_list = session.query(Student).filter(Student.name.like('da%'))
getresulte(std_list)

# not like
std_list = session.query(Student).filter(Student.name.notlike('da%'))
getresulte(std_list)

# in
std_list = session.query(Student).filter(Student.age.in_([10,30,50]))
getresulte(std_list)

# not in
std_list = session.query(Student).filter(Student.age.notin_([10,30,50]))
getresulte(std_list)

补充:

r6 = session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(Users.id).all()  # 传参的方式查询
r7 = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='ed').all()  # 用的很少

更多

3.7.7.2 排序

  • order_by:排序
  • asc: 升序(默认)
  • desc: 降序
# 升序
std_list = session.query(Student).filter(Student.name.like('da%')).order_by(Student.age)
getresulte(std_list)

# 降序
std_list = session.query(Student).filter(Student.name.like('da%')).order_by(Student.age.desc())
getresulte(std_list)

3.7.7.3 分页(偏移量)

  • limit: 显示结果的条目数
  • offset:偏移量
# limit
std_list = session.query(Student).filter(Student.name.like('da%')).order_by(Student.age.desc()).limit(3).offset(2)
getresulte(std_list)
# select * from Student where name like 'da%' order_by age desc limit 3 offset 2

3.7.7.4 消费方法

  • count(): 获取总条数(仅仅针对结果集,不能all之后再count)
  • all(): 取全部行(默认)(列表)
  • first():取首行
  • one():有且只有一行,多行则抛出异常
  • delete():对查询出来的数据直接进行删除
  • scalar(): 第一条记录的第一个元素
# count
std_list = session.query(Student)
print(std_list.count())

# first
std_list = session.query(Student).first()
print(std_list)

first本质上就是limit。

3.7.7.5 分组及聚合方法

sqlalchemy一样提供了聚合方法,使用sqlalchemy.func来调用

  • group_by:分组显示

func提供的方法有

  • max:求最大值
  • min:求最小值
  • avg:求平均值
  • count:聚合(通常和分组连用)
# max
std_list = session.query(Student.name,sqlalchemy.func.max(Student.age))  # select name,max(age) from Student;
getresulte(std_list)

# count
std_list = session.query(Student.name,sqlalchemy.func.count(Student.id)).group_by(Student.name)  # SELECT name, count(id)  FROM student GROUP BY name 
getresulte(std_list)

3.7.7.6 关联查询

sqlalchemy提供ForeignKey用来进行外键关联,它的格式为:

sqlalchemy.ForeignKey(表名.字段名,ondelete='更新规则')

# 若是填写映射后的class,那么能够直接写:类.字段
# 若是填写数据库中的表,那么须要使用引号:'数据库表名.字段名'

更新规则和删除规则,可选项以下:

  • CASCADE:级联删除,删除被关联数据时,从表关联的数据所有删除。
  • SET NULL:从父表删除或更新行,会设置子表中的外键列为NULL,但必须保证子表没有指定 NOT NULL,也就是说子表的字段能够为NULL才行。
  • RESTRICT:若是从父表删除主键,若是子表引用了,则拒绝对父表的删除或更新操做。(保护数据)
  • NO ACTION:表中SQL的关键字,在MySQL中与RESTRICT相同。拒绝对父表的删除或更新操做。

现有以下关系表

CREATE TABLE `departments` (
  `dept_no` char(4) NOT NULL,
  `dept_name` varchar(40) NOT NULL,
  PRIMARY KEY (`dept_no`),
  UNIQUE KEY `dept_name` (`dept_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE `employees` (
  `emp_no` int(11) NOT NULL,
  `birth_date` date NOT NULL,
  `first_name` varchar(14) NOT NULL,
  `last_name` varchar(16) NOT NULL,
  `gender` enum('M','F') NOT NULL,
  `hire_date` date NOT NULL,
  PRIMARY KEY (`emp_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE `dept_emp` (
  `emp_no` int(11) NOT NULL,
  `dept_no` char(4) NOT NULL,
  `from_date` date NOT NULL,
  `to_date` date NOT NULL,
  PRIMARY KEY (`emp_no`,`dept_no`),
  KEY `dept_no` (`dept_no`),
  CONSTRAINT `dept_emp_ibfk_1` FOREIGN KEY (`emp_no`) REFERENCES `employees` (`emp_no`) ON DELETE CASCADE,
  CONSTRAINT `dept_emp_ibfk_2` FOREIGN KEY (`dept_no`) REFERENCES `departments` (`dept_no`) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

建立对应的映射实体类

import sqlalchemy
from sqlalchemy.ext import declarative
from sqlalchemy import Column, String, Integer, Date
from sqlalchemy.orm import sessionmaker
import enum

db_url = 'mysql+pymysql://dahl:123456@10.0.0.13:3306/test'
engine = sqlalchemy.create_engine(db_url, echo=True)

Base = declarative.declarative_base()
DBSession = sessionmaker(bind=engine)
session = DBSession()

class Departments(Base):
    __tablename__ = 'departments'
    dept_no = Column(String(4), nullable=False, primary_key=True)
    dept_name = Column(String(40), nullable=False, unique=True)

    def __repr__(self):
        return '<{} {} {}>'.format(self.__class__.__name__, self.dept_no, self.dept_name)

class GenderEnum(enum.Enum):
    M = 'M'
    F = 'F'

class Employees(Base):
    __tablename__ = 'employees'
    emp_no = Column(Integer, nullable=False, primary_key=True)
    birth_date = Column(Date, nullable=False)
    first_name = Column(String(14), nullable=False)
    last_name = Column(String(16), nullable=False)
    gender = Column(sqlalchemy.Enum(GenderEnum), nullable=False)
    hire_date = Column(Date, nullable=False)

    def __repr__(self):
        return '<{} {} {} {} {} {}>'.format(
            self.__class__.__name__, self.emp_no, self.birth_date, self.first_name, self.last_name, self.gender,
            self.hire_date
        )

class Dept_emp(Base):
    __tablename__ = 'dept_emp'
    emp_no = Column(Integer, sqlalchemy.ForeignKey(Employees.emp_no, ondelete='CASCADE'), primary_key=True, )
    dept_no = Column(String(4), sqlalchemy.ForeignKey(Departments.dept_no, ondelete='CASCADE'), nullable=False)
    from_date = Column(Date, nullable=False)
    to_date = Column(Date, nullable=False)

    def __repr__(self):
        return '<{} emp_no={} dept_no={}>'.format(
            self.__class__.__name__, self.emp_no, self.dept_no
        )

def getres(emps):
    for emp in emps:
        print(emp)

查询10010员工所在的部门编号及员工信息

隐式链接
emps = session.query(Employees, Dept_emp).filter(and_(Employees.emp_no == Dept_emp.emp_no, Employees.emp_no == '10010')).all()
getres(emps)

emps = session.query(Employees, Dept_emp).filter(Employees.emp_no == Dept_emp.emp_no).filter(Employees.emp_no == '10010').all()
getres(emps)


# 至关于:select * from employees,dept_emp where employees.emp_no = dept_emp.emp_no and employees.emp_no = '10010';
join链接

使用join()关键字来进行连表查询,其isouter参数用于指定join的类型,默认状况下使用的是inner join,当isouter=True时,就表示是left join(right join没有实现,须要本身交换前面表的顺序)

# join链接
std_list = session.query(Employees).join(Dept_emp,Employees.emp_no == Dept_emp.emp_no,isouter=True).filter(Employees.emp_no == '10010')
getres(std_list)

std_list = session.query(Employees).join(Dept_emp).filter((Employees.emp_no == Dept_emp.emp_no) & (Employees.emp_no == '10010'))
getres(std_list)

# 若是query中,仅列出一个代表,至关于
# select employees.* from employees inner join dept_emp on employees.emp_no = dept_emp.emp_no where employees.emp_no = '10010';

4 一对多关系

一对可能是一种表与表的关系,在orm中建立和使用方式有一写特色,这里单独描述

4.1 建立关系表

经过ForeignKey来建立一对多关系,须要注意的是它内部须要填写对应的真实的表名和字段(非映射的类对象)

class Deptment(base):

    __tablename__ = 'deptment'

    id = Column(Integer, autoincrement=True, primary_key=True)
    dep_name = Column(String(32), nullable=False)

class User(base):

    __tablename__ = 'user'

    id = Column(Integer, autoincrement=True, primary_key=True)
    name = Column(String(32), nullable=False)
    dept_id = Column(Integer, ForeignKey('deptment.id'))

# CREATE TABLE `user` (
#   `id` int(11) NOT NULL AUTO_INCREMENT,
#   `name` varchar(32) NOT NULL,
#   `dept_id` int(11) DEFAULT NULL,
#   PRIMARY KEY (`id`),
#   KEY `dept_id` (`dept_id`),
#   CONSTRAINT `user_ibfk_1` FOREIGN KEY (`dept_id`) REFERENCES `deptment` (`id`)
# ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

# CREATE TABLE `deptment` (
#   `id` int(11) NOT NULL AUTO_INCREMENT,
#   `dep_name` varchar(32) NOT NULL,
#   PRIMARY KEY (`id`)
# ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

4.2 添加数据

添加部门数据时,只能使用id来插入,当插入的dept_id在deptment表中不存在时,会直接爆出异常

# 批量添加部门数据
session.add_all([
    Deptment(dep_name='运维部'),
    Deptment(dep_name='开发部'),
    Deptment(dep_name='产品部'),
    Deptment(dep_name='测试部')]
)

# 添加用户数据
session.add_all([
    User(name='daxin', dept_id=1),
    User(name='dachenzi', dept_id=2),
    User(name='dahl', dept_id=3)]
)
session.commit()

4.3 relationship

        在表中使用relationship来建立一个关联的对象,便于查询(和django的一对多隐含的对象相同,但在sqlalchemy中必须经过relationship才能够生成),并不会生成新的字段,仅仅是产生一个关联关系。

注意relationship对象不能做为条件直接进行表查询:session.query(User.name,User.deptment.dept_name) 这样是不行的。

relationship(obj,backref='')
  • obj:表示要关联的ORM对象
  • backref:表示在对方插入一个关键字,用于反向关联relationship所在的表对象的自己。

下面是一个例子:

class Deptment(base):
    __tablename__ = 'deptment'

    id = Column(Integer, autoincrement=True, primary_key=True)
    dep_name = Column(String(32), nullable=False)

class User(base):
    __tablename__ = 'user'

    id = Column(Integer, autoincrement=True, primary_key=True)
    name = Column(String(32), nullable=False)
    dept_id = Column(Integer, ForeignKey('deptment.id'))
 
    deptment = relationship(Deptment, backref='user')   # 建立relationship关系

# 正向查(经过User查deptment)
std = session.query(User).filter(User.id == 1).first()
print(std.name)
print(std.deptment.dep_name)  # 直接经过deptment对象,读取Deptment表中对于的信息

# 反向查(经过depement查User)
dep = session.query(Deptment).filter(Deptment.id == 1).first()
print(dep.user[0].name)   # 反向查,经过user获取到的数据是一个列表

relationship,就是经过在一个映射类中增长一个属性,该属性用于表示链接关系,能够在结果中,访问该属性来访问关联的表信息

4.4 经过relationship添加数据

存在relationship的映射关系时,咱们添加数据时,就能够经过relationship使用对象来添加关联数据了

dep = session.query(Deptment).filter(Deptment.id == 3).first()
session.add_all([
    User(name='hello', deptment=dep),
    User(name='world', deptment=dep)]
)
session.commit()

直接建立新的部门对象也是能够的

user = User(name='dahlhin',deptment=Deptment(dep_name='管理员'))
session.add(user)
session.commit()

5 多对多关系

和django不一样sqlalchemy的第三张表须要手动建立。

5.1 建立多对多关系

模拟主机与业务线的归属问题。

  • 主机能够属于多个业务线
  • 一个业务线能够包含多个主机

这是一个典型的多对多关系,须要额外一张关系表来记录。

class Service2host(base):
    __tablename__ = 'service_to_host'
    id = Column(Integer, primary_key=True, nullable=False, unique=True, autoincrement=True)
    s_id = Column(Integer, ForeignKey('service.id'))
    h_id = Column(Integer, ForeignKey('host.id'))

    __table_args__ = (
        # 联合惟一索引
        UniqueConstraint('s_id', 'h_id', name='serivce_to_host'),
    )  # 业务id和主机id不能重复,这里建立惟一索引来约束


class Service(base):
    __tablename__ = 'service'
    id = Column(Integer, primary_key=True, nullable=False, unique=True, autoincrement=True)
    ser_name = Column(String(16), nullable=False)


class Host(base):
    __tablename__ = 'host'

    id = Column(Integer, primary_key=True, nullable=False, unique=True, autoincrement=True)
    hostname = Column(String(16), nullable=False)
    datetime = Column(DateTime, default=datetime.datetime.now, nullable=False)

# CREATE TABLE `host` (
#   `id` int(11) NOT NULL AUTO_INCREMENT,
#   `hostname` varchar(16) NOT NULL,
#   `datetime` date NOT NULL,
#   PRIMARY KEY (`id`),
#   UNIQUE KEY `id` (`id`)
# ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;


# CREATE TABLE `service` (
#   `id` int(11) NOT NULL AUTO_INCREMENT,
#   `ser_name` varchar(16) NOT NULL,
#   PRIMARY KEY (`id`),
#   UNIQUE KEY `id` (`id`)
# ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;


# CREATE TABLE `service_to_host` (
#   `id` int(11) NOT NULL AUTO_INCREMENT,
#   `s_id` int(11) DEFAULT NULL,
#   `h_id` int(11) DEFAULT NULL,
#   PRIMARY KEY (`id`),
#   UNIQUE KEY `id` (`id`),
#   UNIQUE KEY `serivce_to_host` (`s_id`,`h_id`),
#   KEY `h_id` (`h_id`),
#   CONSTRAINT `service_to_host_ibfk_1` FOREIGN KEY (`s_id`) REFERENCES `service` (`id`),
#   CONSTRAINT `service_to_host_ibfk_2` FOREIGN KEY (`h_id`) REFERENCES `host` (`id`)
# ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

5.2 经过relationship操做数据

若是没有relationship关联对象,那么咱们将须要分别操做三张表,使用relationship将会大大简化这个过程,那么在Service中建立关系:

hosts = relationship('host', secondary='service_to_host', backref='services')

建立后:

  • 在Service中存在一个hosts属性,对应host表
  • 在Host中被动注入一个services属性,对应service表
  • secondary='service_to_host' 表示经过第三张表来关联关系
# 添加一个业务,并建立几台主机
session.add(
    Service(ser_name='运营', hosts=[
        Host(hostname='openstack'),
        Host(hostname='nginx')
    ])
)

# 添加一个主机,并关联几个业务线
service = session.query(Service).filter(or_(Service.ser_name == '运营', Service.ser_name == '运维')).all()
session.add(
    Host(hostname='Tomcat', services=service)
)

# 查询一个业务线下都有哪些主机
service = session.query(Service).filter(Service.ser_name == '运营').first()
for host in service.hosts:
    print(host.id, host.hostname)

# 查询一台主机都归属哪些业务线
host = session.query(Host).filter(Host.hostname == 'openstack').first()
for servcie in host.services:
    print(service.id, service.ser_name)

6 scoped_session(推荐)

当咱们启动多线程来执行数据库操做时,每一个线程都会用到session来执行sql语句,通常状况下,咱们会在线程内不建立新的session来执行sql语句,下面是一个利用session完成原生sql的执行。

import threading
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

engine = create_engine(
    'mysql+pymysql://dahl:123456@10.0.0.10:3306/test',
    max_overflow=5,
    pool_size=1,
    pool_timeout=30,
    pool_recycle=-1
)

DBsession = sessionmaker(bind=engine)

def func():
    session = DBsession()
    cursor = session.execute('show tables')
    res = cursor.fetchall()
    print(res)
    cursor.close()
    session.close()

if __name__ == '__main__':
    for i in range(10):
        threading.Thread(target=func).start()

这里会产生一个问题,每一个线程启动时都会建立一个session,用完又会关闭,这样太麻烦了,这里可使用scoped_session对象来完成,它相似与threading.Local的实现方式。做用是,当线程调用scoped_session对象的功能时,好比各类sql查询,在其内部会为每一个线程建立一个新的session对象,让它来使用。

# 1 引入scoped_session
from sqlalchemy.orm import scoped_session

# 2 全局建立session对象
session = scoped_session(DBsession)

# 3 多进程内直接使用
def func():
    cursor = session.execute('show tables')
    res = cursor.fetchall()
    print(res)
    cursor.close()
    session.remove()   # 使用完毕须要remove

不须要关闭,直接使用scoped_session对象的remove方法便可。

实现过程:

  1. scoped_session是一个类
  2. 它内部并无实现全部的Session类的方法
  3. 它只是在内部,把传入的session对象的属性,进行反射获取,并绑定在本身身上。
  4. self.registry() 就是为每一个线程建立的Session对象,其内部使用的就是threading.local实现的。

源码以下:

# scoping
def instrument(name):
    def do(self, *args, **kwargs):
        return getattr(self.registry(), name)(*args, **kwargs)
    return do

for meth in Session.public_methods:
    setattr(scoped_session, meth, instrument(meth))

# registry是ThreadLocalRegistry对象
self.registry = ThreadLocalRegistry(session_factory)


# ThreadLocalRegistry对象内部经过threading.local实现的
    def __init__(self, createfunc):
        self.createfunc = createfunc  # Session对象
        self.registry = threading.local()

    
    # registry()其实调用的就是__call__方法
    def __call__(self):
        try:
            return self.registry.value
        except AttributeError:
            val = self.registry.value = self.createfunc()   # 第一次执行,就会实例化一个Session对象
            return val

7 别名

当使用join语句连表查询时,不免会碰到两个表重名的字段,这里就可使用label来对字段进行别名显示。

stds = session.query(User.name.label('username'), User.id, Deptment.dep_name).join(Deptment).all()
for std in stds:
    print(std.username, std.id, std.dep_name)
相关文章
相关标签/搜索