sqlalchemy 是一款Python语言写的ORM框架, 该框架创建在数据库API基础之上。python
sqlalchemy 自己没法操做数据库,必须已第三方插件为基础,Dialect用于和数据API进行交流,根据不通的的配置调用不通的数据库API,从而实现对数据库的操做。mysql
MySQL-Python mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname> pymysql mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] MySQL-Connector mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname> cx_Oracle oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/mybase", max_overflow=5) BaseModel = declarative_base() class Blog(BaseModel): __tablename__ = 'blog' id = Column(CHAR(32), primary_key=True) title = Column(String(64), server_default='', nullable=False) text = Column(String, server_default='', nullable=False) user = Column(CHAR(32), index=True, server_default='', nullable=False) create = Column(BIGINT, index=True, server_default='0', nullable=False) class User(BaseModel): __tablename__ = 'user' id = Column(CHAR(32), primary_key=True) name = Column(String(32), server_default='', nullable=False) username = Column(String(32), index=True, server_default='', nullable=False) password = Column(String(64), server_default='', nullable=False) def init_db(): BaseModel.metadata.create_all(Engine) # 建立数据库及表 def drop_db(): BaseModel.metadata.drop_all(Engine) # 删除数据库及表 if __name__ == '__main__': #init_db() drop_db() #BaseModel.metadata.tables['user'].create(Engine, checkfirst=True) #BaseModel.metadata.tables['user'].drop(Engine, checkfirst=False) pass
session = Session() session.add(User(id=uuid.uuid4().hex)) session.add(Blog(id=uuid.uuid4().hex)) session.add_all([ User(id=uuid.uuid4().hex), Blog(id=uuid.uuid4().hex) ]) session.commit()
查询的结果, 有几种不一样的类型, 这个须要注意, 像是:sql
session.query(User).filter_by(username='abc').all() session.query(User).filter(User.username=='abc').all() session.query(Blog).filter(Blog.create >= 0).all() session.query(Blog).filter(Blog.create >= 0).first() session.query(Blog).filter(Blog.create >= 0 | Blog.title == 'A').first() session.query(Blog).filter(Blog.create >= 0 & Blog.title == 'A').first() session.query(Blog).filter(Blog.create >= 0).offset(1).limit(1).scalar() session.query(User).filter(User.username == 'abc').scalar() session.query(User.id).filter(User.username == 'abc').scalar() session.query(Blog.id).filter(Blog.create >= 0).all() session.query(Blog.id).filter(Blog.create >= 0).all()[0].id dict(session.query(Blog.id, Blog.title).filter(Blog.create >= 0).all()) session.query(Blog.id, Blog.title).filter(Blog.create >= 0).first().title session.query(User.id).order_by('id desc').all() session.query(User.id).order_by('id').first() session.query(User.id).order_by(User.id).first() session.query(User.id).order_by(-User.id).first() session.query('id', 'username').select_from(User).all() session.query(User).get('16e19a64d5874c308421e1a835b01c69')
session.query(Blog, User).filter(Blog.user == User.id).first().User.username session.query(Blog, User.id, User.username).filter(Blog.user == User.id).first().id session.query(Blog.id, User.id, User.username).filter(Blog.user == User.id).first().keys()
from sqlalchemy import or_, not_ session.query(User).filter(or_(User.id == '', User.id == '16e19a64d5874c308421e1a835b01c69')).all() session.query(User).filter(not_(User.id == '16e19a64d5874c308421e1a835b01c69')).all() session.query(User).filter(User.id.in_(['16e19a64d5874c308421e1a835b01c69'])).all() session.query(User).filter(User.id.like('16e19a%')).all() session.query(User).filter(User.id.startswith('16e19a')).all() dir(User.id)
from sqlalchemy import func session.query(func.count('1')).select_from(User).scalar() session.query(func.count('1'), func.max(User.username)).select_from(User).first() session.query(func.count('1')).select_from(User).scalar() session.query(func.md5(User.username)).select_from(User).all() session.query(func.current_timestamp()).scalar() session.query(User).count()
两种修改方式数据库
session.query(User).filter(User.username == 'abc').update({'name': '123'}) session.commit() user = session.query(User).filter_by(username='abc').scalar() user.name = '111' session.commit()
若是涉及对属性原值的引用, 则要考虑 synchronize_session
这个参数.session
'evaluate'
默认值, 会同时修改当前 session 中的对象属性.'fetch'
修改前, 会先经过 select
查询条目的值.‘False’
不修改当前 session 中的对象属性.在默认状况下, 由于会有修改当前会话中的对象属性, 因此若是语句中有 SQL 函数, 或者"原值引用", 那是没法完成的操做, 天然也会报错, 好比:oracle
from sqlalchemy import func session.query(User).update({User.name: func.trim('123 ')}) # 使用了函数 session.query(User).update({User.name: User.name + 'x'}) #使用了原值引用
# 以上两种状况都会报错
这种状况下, 就不能要求 SQLAlchemy 修改当前 session 的对象属性了, 而是直接进行数据库的交互, 无论当前会话值(将synchronize_session值设置为False):app
session.query(User).update({User.name: User.name + 'x'}, synchronize_session=False)
是否修改当前会话的对象属性, 涉及到当前会话的状态. 若是当前会话过时, 那么在获取相关对象的属性值时, SQLAlchemy 会自动做一次数据库查询, 以便获取正确的值:框架
user = session.query(User).filter_by(username='abc').scalar() print user.name session.query(User).update({User.name: 'new'}, synchronize_session=fetch) print user.name session.commit() print user.name
执行了 update
以后, 虽然相关对象的实际的属性值已变动, 可是当前会话中的对象属性值并无改变. 直到 session.commit()
以后, 当前会话变成"过时"状态, 再次获取 user.name
时, SQLAlchemy 经过 user
的 id
属性, 从新去数据库查询了新值. 函数
synchronize_session
设置成 'fetch'
不会有这样的问题, 由于在作 update
时已经修改了当前会话中的对象了.fetch
无论 synchronize_session
的行为如何, commit
以后 session
都会过时, 再次获取相关对象值时, 都会从新做一次查询.
好好体会上边的话。
删除一样有像修改同样的 synchronize_session
参数的问题, 影响当前会话的状态.
session.query(User).filter_by(username='abc').delete() user = session.query(User).filter_by(username='abc').first() session.delete(user)
sqlalchemy 默认状况下的join 是内链接
r = session.query(Blog, User).join(User, Blog.user == User.id).all() for blog, user in r: print blog.id, blog.user, user.id
from sqlalchemy import Column, ForeignKey
from sqlalchemy.types import String, Integer, CHAR, BIGINT
class Blog(BaseModel):
__tablename__ = 'blog'
id = Column(BIGINT, primary_key=True, autoincrement=True)
title = Column(String(64), server_default='', nullable=False)
text = Column(String, server_default='', nullable=False)
user = Column(BIGINT, ForeignKey('user.id'), index=True, nullable=False)
create = Column(BIGINT, index=True, server_default='0', nullable=False)
user_obj = relationship('User')
class User(BaseModel):
__tablename__ = 'user'
id = Column(BIGINT, primary_key=True, autoincrement=True)
name = Column(String(32), server_default='', nullable=False)
username = Column(String(32), index=True, server_default='', nullable=True)
password = Column(String(64), server_default='', nullable=False)
blog_list = relationship('Blog', order_by='Blog.create')
添加数据
session = Session() user = User(name='first', username=u'新的') session.add(user) session.flush() blog = Blog(title=u'第一个', user=user.id) session.add(blog) session.commit()
session.flush()
是进行数据库交互, 可是事务并无提交. 进行数据库交互以后, user.id
才有值.
定义了外键, 对查询来讲, 并无影响. 外键只是单纯的一条约束而已. 固然, 能够在外键上定义一些关联的事件操做, 好比当外键条目被删除时, 字段置成 null
, 或者关联条目也被删除等
获取数据
session = Session() print session.query(Blog).get(1).user_obj print session.query(User).get(1).blog_list
对于 一对多 的关系, 使用 any()
函数查询:
user = session.query(User).filter(User.blogs.any(Blog.title == u'A')).first()
反之, 若是是 多对一 的关系, 则使用 has()
函数查询:
blog = session.query(Blog).filter(Blog.user_obj.has(User.name == u'XX')).first()
上面的关系定义, 对应的属性是实际查询出的实例列表, 当条目数多的时候, 这样可能会有问题. 好比用户名下有成千上万的文章, 一次全取出就太暴力了. 关系对应的属性能够定义成一个 Query
class User(BaseModel): __tablename__ = 'user' id = Column(BIGINT, primary_key=True, autoincrement=True) name = Column(String(32), server_default='', nullable=False) blog_list = relationship('Blog', order_by='Blog.create', lazy="dynamic")
这样就能自由控制了
session.query(User).get(1).blog_list.all() session.query(User).get(1).blog_list.filter(Blog.title == 'abc').first()
二、关系的表现形式
关系在对象属性中的表现, 默认是列表, 可是, 这不是惟一的形式. 根据须要, 能够做成 dictionary , set 或者其它你须要的对象.
class Blog(BaseModel): __tablename__ = 'blog' id = Column(Integer, autoincrement=True, primary_key=True) title = Column(Unicode(32), server_default='') user = Column(Integer, ForeignKey('user.id'), index=True) user_obj = relationship('User') class User(BaseModel): __tablename__ = 'user' id = Column(Integer, autoincrement=True, primary_key=True) name = Column(Unicode(32), server_default='') blogs = relationship('Blog')
对于上面的两个模型:
user = session.query(User).first() print user.blogs
user = User(name=u'XX') session.add_all([Blog(title=u'A', user_obj=user), Blog(title=u'B', user_obj=user)]) session.commit() user = session.query(User).first() print user.blogs
如今 user.blogs
是一个列表. 咱们能够在 relationship()
调用时经过 collection_class
参数指定一个类, 来从新定义关系的表现形式:
set, 集合
blogs = relationship('Blog', collection_class=set) #InstrumentedSet([<__main__.Blog object at 0x1a58710>, <__main__.Blog object at 0x1a587d0>])
attribute_mapped_collection , 字典, 键值从属性取:
from sqlalchemy.orm.collections import attribute_mapped_collection blogs = relationship('Blog', collection_class=attribute_mapped_collection('title')) #{u'A': <__main__.Blog object at 0x20ed810>, u'B': <__main__.Blog object at 0x20ed8d0>}
mapped_collection , 字典, 键值自定义:
from sqlalchemy.orm.collections import mapped_collection blogs = relationship('Blog', collection_class=mapped_collection(lambda blog: blog.title.lower())) #{u'a': <__main__.Blog object at 0x1de4890>, u'b': <__main__.Blog object at 0x1de4950>}