介绍
SQLAlchemy是一个基于Python实现的ORM框架。该框架创建在 DB API之上,使用关系对象映射进行数据库操做,简言之即是:将类和对象转换成SQL,而后使用数据API执行SQL并获取执行结果,并把获取的结果转为python对象。其中发sql到mysql服务器,从mysql服务器拿结果都是借助其余工具来完成的,例如pymysql.html
- Engine,框架的引擎
- Connection Pooling ,数据库链接池
- Dialect,选择链接数据库的DB API种类
- Schema/Types,架构和类型
- SQL Exprression Language,SQL表达式语言
SQLAlchemy自己没法操做数据库,其必须以来pymsql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不一样调用不一样的数据库API,从而实现对数据库的操做,如:python
shell
MySQL-Python
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
pymysql
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]mysql
MySQL-Connector
mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>sql
cx_Oracle
oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]docker
更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html
单表
单表的建立
import datetime
import time
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy import Column
from sqlalchemy import Integer, String, Date数据库
from sqlalchemy.orm import sessionmakerdjango
Base = declarative_base()flask
engine = create_engine(
"mysql+pymysql://root:123456@127.0.0.1:3306/test?charset=utf8",
encoding='utf8',
max_overflow=0,
pool_size=5,
pool_timeout=20,
pool_recycle=-1
)bash
class User(Base):
# tablename 字段必须有,不然会报错
tablename = 'user'
# 不一样于django model 会自动加主键,sqlalchemy须要手动加主键
id = Column(Integer, primary_key=True)
name = Column(String(32), nullable=False)
# 时间类型的default的默认值须要使用datetime.date.today(), 可是使用flask-sqlalchemy的时候使用datetime.date.today
date = Column(Date, default=datetime.date.today())
def create_table():
# 建立全部的表,表若是存在也不会重复建立,只会建立新的表,并且sqlalchemy默认不支持修改表结构
# 要想和django orm同样能修改表结构并反映到数据库须要借助第三方组件
Base.metadata.create_all(engine)
def drop_table():
# 删除全部的表
Base.metadata.drop_all(engine)
单表的增删改查
# 增长
# user = User(name='jack')
# session.add(user)
# session.commit()
# session.close()
# # 增长多条
# user_list = [User(name='a'), User(name='b'), User(name='c')]
# session.add_all(user_list)
# session.commit()
# 查
# result 是一个列表,里面存放着对象
# result = session.query(User).all()
# for item in result:
# print(item.name)
# 查询最后加all() 获得的是一个存放对象的列表,不加all() 经过print 打印出的是sql语句
# 可是结果还是一个可迭代的对象,只不过对象的__str__ 返回的是sql语句,迭代的时候里面的对象
# 是一个类元组的对象,可使用下标取值,也能够经过对象的`.`方式取值
# result = session.query(User.name, User.date).filter(User.id>3)
# for item in result:
# print(item[0], item.date)
# 条件查询
from sqlalchemy import and_, or_,func
## 逻辑查询
r0 = session.query(User).filter(User.id.in_([1, 2]))
r1 = session.query(User).filter(~User.id.in_([1, 2]))
r2 = session.query(User).filter(User.name.startswith('j'), User.id>2)
r3 = session.query(User).filter(
or_(
User.id>3,
and_(User.name=='jack', User.id<2)
)
)
## 通配符
r4 = session.query(User).filter(User.name.like('%j'))
r5 = session.query(User).filter(~User.name.like('%j'))
## limit 和django orm 同样都是经过索引来限制
r6 = session.query(User)[0:4]
## 排序, 排序通常是倒数第二的位置,倒数第一是limit
r7 = session.query(User).order_by(User.id.desc())
## 分组和聚合
r8 = session.query(func.max(User.id)).group_by(User.name).all()
# 改, 获得的结果是收到影响的记录条数
# r9 = session.query(User).filter(User.id==2).update({'name': User.name + User.name.concat('hh')}, synchronize_session=False)
# 删除
session.query(User).delete()
## 子查询
session.query(User).filter(User.id.in_(session.query(User.id).filter(User.name.startswith('j'))))session.commit()
# 这边的close并非真实的关闭链接,而是完成终止事务和清除工做
session.close()
连表
两张表
建立表
import datetime
import time
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy import Column
from sqlalchemy import Integer, String, Date
from sqlalchemy import ForeignKey
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
engine = create_engine(
"mysql+pymysql://root:123456@127.0.0.1:3306/test?charset=utf8",
encoding='utf8',
max_overflow=0,
pool_size=5,
pool_timeout=20,
pool_recycle=-1
)
class User(Base):
# tablename 字段必须有,不然会报错
tablename = 'user'
# 不一样于django model 会自动加主键,sqlalchemy须要手动加主键
id = Column(Integer, primary_key=True)
name = Column(String(32), nullable=False)
# 时间类型的default的默认值须要使用datetime.date.today(), 可是使用flask-sqlalchemy的时候使用datetime.date.today
date = Column(Date, default=datetime.date.today())
# 由于外键的sh设置更偏向于数据库底层,因此这里使用了表名,而不是类名
depart_id = <span class="hljs-type">Column</span>(<span class="hljs-type">Integer</span>, <span class="hljs-type">ForeignKey</span>('<span class="hljs-title">department</span>.<span class="hljs-title">id'</span>))
# 由于外键的sh设置更偏向于数据库底层,因此这里使用了表名,而不是类名
depart_id = <span class="hljs-type">Column</span>(<span class="hljs-type">Integer</span>, <span class="hljs-type">ForeignKey</span>('<span class="hljs-title">department</span>.<span class="hljs-title">id'</span>))class Department(Base):
tablename = 'department'
id = Column(Integer, primary_key=True)
# 默认的nullable 是True
title = Column(String(32), nullable=False)
查询
# 默认根据在类里面定义的外键进行on, 此时获得的结果是[(userobj, departmnetobj),()] 这种形式,默认是inner join
r1 = session.query(User, Department).join(Department).all()
r2 = session.query(User.name, Department.title).join(Department, Department.id==User.depart_id).all()
有了 isouter 参数,inner join 就变成 left join
r3 = session.query(User.name, Department.title).join(Department, Department.id==User.depart_id, isouter=True).all()
relationship
如今问题来了,想要查name是jack所属的部门名,两种方式
- 分两次sql查询
user = session.query(User).filter(User.name == 'jack').first()
title = session.query(Department.title).filter(Department.id == user.depart_id).first().title
- 一次连表查询
r1 = session.query(Department.title).join(User).filter(User.name == 'jack').first().title
print(r1)
这样的方式在python代码的级别貌似没有django的方便,django 的 orm 拿到一个对象obj, obj.deaprtment.title 就能拿到结果。sqlalchemy也有相似功能,经过relationship来实现。
# 注意,导入的是relationship,而不是relationships
from sqlalchemy.orm import relationship
class Department(Base):
__tablename__ = 'department'
id = Column(Integer, primary_key=True)
# 默认的nullable 是True
title = Column(String(32), nullable=False)
<span class="hljs-meta"># 若是backref 的那张表和这张表是一对一关系,加上一个uselist=False参数就行</span>
user = relationship(<span class="hljs-string">"User"</span>, backref=<span class="hljs-comment">'department')</span>
class User(Base):
# tablename 字段必须有,不然会报错
tablename = 'user'
# 不一样于django model 会自动加主键,sqlalchemy须要手动加主键
id = Column(Integer, primary_key=True)
name = Column(String(32), nullable=False)
# 时间类型的default的默认值须要使用datetime.date.today(), 可是使用flask-sqlalchemy的时候使用datetime.date.today
date = Column(Date, default=datetime.date.today())
<span class="hljs-meta"># 由于外键的sh设置更偏向于数据库底层,因此这里使用了表名,而不是类名</span>
depart_id = Column(<span class="hljs-built_in">Integer</span>, ForeignKey(<span class="hljs-comment">'department.id'))</span>
<span class="hljs-meta"># 神奇的一点是,SQLAlchemy会根据关系的对应状况自动给关系相关属性的类型</span>
<span class="hljs-meta"># 好比这里的Department下面的user自动是一个list类型,而User因为设定了外键的缘故</span>
<span class="hljs-meta"># 一个user最多只能应对一个用户,因此自动识别成一个非列表类型</span>
<span class="hljs-meta"># 这样写两个relationship比较麻烦,在设置了外键的一边使用relationship,而且加上backref参数</span>
<span class="hljs-meta"># department = relationship("Department")</span>
session_factory = sessionmaker(engine)
session = session_factory()
user = session.query(User).first()
print(user.department)
<span class="hljs-meta"># 若是backref 的那张表和这张表是一对一关系,加上一个uselist=False参数就行</span>
user = relationship(<span class="hljs-string">"User"</span>, backref=<span class="hljs-comment">'department')</span><span class="hljs-meta"># 由于外键的sh设置更偏向于数据库底层,因此这里使用了表名,而不是类名</span>
depart_id = Column(<span class="hljs-built_in">Integer</span>, ForeignKey(<span class="hljs-comment">'department.id'))</span>
<span class="hljs-meta"># 神奇的一点是,SQLAlchemy会根据关系的对应状况自动给关系相关属性的类型</span>
<span class="hljs-meta"># 好比这里的Department下面的user自动是一个list类型,而User因为设定了外键的缘故</span>
<span class="hljs-meta"># 一个user最多只能应对一个用户,因此自动识别成一个非列表类型</span>
<span class="hljs-meta"># 这样写两个relationship比较麻烦,在设置了外键的一边使用relationship,而且加上backref参数</span>
<span class="hljs-meta"># department = relationship("Department")</span>department = session.query(Department).first()
print(department.user)
有了relationship,不只查询方便,增长数据也更方便。
# 增长一个用户ppp,并新建这个用户的部门叫IT
## 方式一
# d = Department(title='IT')
# session.add(d)
# session.commit() # 只有commit以后才能取d的id
#
# session.add(User(name='ppp', depart_id=d.id))
# session.commit()
## 方式二
# session.add(User(name='ppp', department=Department(title='IT')))
# session.commit()
# 增长一个部门xx,并在部门里添加员工:aa/bb/cc
# session.add(Department(title='xx', users=[User(name='aa'), User(name='bb'),User(name='cc')]))
# session.commit()
三张表
建立表
结果是每5个一块儿打印
在全局建立一个特殊的session,各个线程去使用这个特殊的session
scoped_session 这个类还真是神奇,名字居然还不是大写,并且原先的session有的,这个类实例化的对象也会有。咱们第一反应是继承,其实它也不是继承。它的实现原理是这样的
执行导入语句的的时候,点进去看源码发现执行了一个scoping.py的文件。




最终self.registry()就是session_factory() 对象,并且是线程隔离的,每一个线程有本身的会话对象
from sqlalchemy.orm import relationship
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy import Column
from sqlalchemy import Integer, String, Date
from sqlalchemy import ForeignKey, UniqueConstraint, Index
class Student(Base): __tablename__ = 'student' id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) course_list = relationship('<span class="hljs-type">Course</span>', <span class="hljs-title">secondary</span>='<span class="hljs-title">student2course'</span>, <span class="hljs-title">backref</span>='<span class="hljs-title">student_list'</span>)
class Course(Base):
tablename = 'course'
id = Column(Integer, primary_key=True)
title = Column(String(32), index=True, nullable=False)
class Student2Course(Base):
tablename = 'student2course'
id = Column(Integer, primary_key=True, autoincrement=True)
student_id = Column(Integer, ForeignKey('student.id'))
course_id = Column(Integer, ForeignKey('course.id'))
__table_args__ = (
<span class="hljs-type">UniqueConstraint</span>('<span class="hljs-title">student_id'</span>, '<span class="hljs-title">course_id'</span>, <span class="hljs-title">name</span>='<span class="hljs-title">uix_stu_cou'</span>), # 联合惟一索引
# <span class="hljs-type">Index</span>('<span class="hljs-title">student_id'</span>, '<span class="hljs-title">course_id'</span>, <span class="hljs-title">name</span>='<span class="hljs-title">stu_cou'</span>), # 联合索引
)</span></code></pre>
查询
查询方式和只有两张表的状况相似,例如查询jack选择的全部课
# obj = session.query(Student).filter(Student.name=='jack').first()
# for item in obj.course_list:
# print(item.title)
建立一个课程,建立2学生,两个学生选新建立的课程
# obj = Course(title='英语')
# obj.student_list = [Student(name='haha'),Student(name='hehe')]
#
# session.add(obj)
# session.commit()
执行原生sql
方式一
# 查询
# cursor = session.execute('select * from users')
# 拿到的结果是一个ResultProxy对象,ResultProxy对象里套着类元组的对象,这些对象能够经过下标取值,也能够经过对象.属性的方式取值
# result = cursor.fetchall()
# 添加
cursor = session.execute('INSERT( INTO users(name) VALUES(:value)', params={"value": 'wupeiqi'})
session.commit()
print(cursor.lastrowid)
方式二
import pymysql
conn = engine.raw_connection()
cursor = conn.cursor(pymysql.cursors.DictCursor)
cursor.execute(
"select * from user"
)
result = cursor.fetchall()
# 结果是一个列表,列表里面套着的对象就是原生的字典对象
print(result)
cursor.close()
conn.close()
多线程状况下的sqlalchemy
在每一个线程内部建立session并关闭session
course_list = relationship('<span class="hljs-type">Course</span>', <span class="hljs-title">secondary</span>='<span class="hljs-title">student2course'</span>, <span class="hljs-title">backref</span>='<span class="hljs-title">student_list'</span>)__table_args__ = (
<span class="hljs-type">UniqueConstraint</span>('<span class="hljs-title">student_id'</span>, '<span class="hljs-title">course_id'</span>, <span class="hljs-title">name</span>='<span class="hljs-title">uix_stu_cou'</span>), # 联合惟一索引
# <span class="hljs-type">Index</span>('<span class="hljs-title">student_id'</span>, '<span class="hljs-title">course_id'</span>, <span class="hljs-title">name</span>='<span class="hljs-title">stu_cou'</span>), # 联合索引
)</span></code></pre># obj = session.query(Student).filter(Student.name=='jack').first()
# for item in obj.course_list:
# print(item.title)# obj = Course(title='英语')
# obj.student_list = [Student(name='haha'),Student(name='hehe')]
#
# session.add(obj)
# session.commit()# 查询
# cursor = session.execute('select * from users')
# 拿到的结果是一个ResultProxy对象,ResultProxy对象里套着类元组的对象,这些对象能够经过下标取值,也能够经过对象.属性的方式取值
# result = cursor.fetchall()
# 添加
cursor = session.execute('INSERT( INTO users(name) VALUES(:value)', params={"value": 'wupeiqi'})
session.commit()
print(cursor.lastrowid)import pymysql
conn = engine.raw_connection()
cursor = conn.cursor(pymysql.cursors.DictCursor)
cursor.execute(
"select * from user"
)
result = cursor.fetchall()
# 结果是一个列表,列表里面套着的对象就是原生的字典对象
print(result)
cursor.close()
conn.close()session_factory = sessionmaker(engine) def task(i):
# 建立一个会话对象,没错仅仅是建立一个对象这么简单
session = session_factory()
# 执行query语句的时候才会真真去拿链接去执行sql语句,若是没有close那么没有空闲链接就会等待
result = session.execute('select * from user where id=14')
for i in result:
print(i.name)
time.sleep(1)
# 必需要close,这里的close能够理解为关闭会话,把连接放回链接池
# 若是注释掉这一句代码,程序会报错QueuePool limit of size 5 overflow 0 reached, connection timed out, timeout 20
session.close()
if name == 'main':
for i in range(10):
t = Thread(target=task, args=(i,))
t.start()
session_factory = sessionmaker(engine) def task(i):
# 建立一个会话对象,没错仅仅是建立一个对象这么简单
session = session_factory()
# 执行query语句的时候才会真真去拿链接去执行sql语句,若是没有close那么没有空闲链接就会等待
result = session.execute('select * from user where id=14')
for i in result:
print(i.name)
time.sleep(1)
# 必需要close,这里的close能够理解为关闭会话,把连接放回链接池
# 若是注释掉这一句代码,程序会报错QueuePool limit of size 5 overflow 0 reached, connection timed out, timeout 20
session.close()
if name == 'main':
for i in range(10):
t = Thread(target=task, args=(i,))
t.start()from sqlalchemy.orm import scoped_session
session_factory = sessionmaker(engine)
session = scoped_session(session_factory)
def task(i):
result = session.execute('select * from user where id=14')
for i in result:
print(i.name)
time.sleep(1)
session.remove()
session.remove()if name == 'main':
for i in range(10):
t = Thread(target=task, args=(i,))
t.start()from sqlalchemy.orm import scoped_session