(转)SQLAlchemy入门和进阶

URL:https://zhuanlan.zhihu.com/p/27400862html

http://www.javashuo.com/article/p-dcqfqbpa-k.html---SQLAlchemy 教程 —— 基础入门篇python

目录:mysql

  1. SQLAlchemy 简介
  2. 横向对比
  3. 核心概念与入门
    1. 模型定义
    2. 复杂查询
    3. 基础性能
  4. 扩展与进阶
    1. 事件
    2. 反射
    3. Python3.x asyncio扩展
    4. 分片Session
    5. 自定义的列类型
    6. 混合(hybrid)属性
    7. 序列化Query
    8. Baked Query
    9. 多态与关系


(知乎没有自动目录和侧边栏悬浮呢。。惆怅)sql

在新团队里作的技术分享,过了一段时间,整理下来而且有了新的想法。彷佛入门级的教程在知乎更受欢迎?数据库

SQLAlchemy 简介

SQLAlchemy 是一个功能强大的Python ORM 工具包,口碑不错,社区活跃也较为开放
提供 全功能的SQL和ORM操做 本次附赠的文件(这里放不上来,也懒得放gayhub了,总之很简单的,单元测试多一些,一下午搞定):
connect.py :底层的数据库链接
orm.py :模型定义的样例
example_test.py :单元测试,实质上能够对应业务的具体使用
python3_test.py :展现Python3 asyncio下的SQLAlchemyexpress

分别创建python2/3的虚拟环境,而后安装对应的requirements.txt便可后端

不管什么语言,不管什么库,作一个ORM实现,至少应当实现彻底语义化的数据库操做,使得操做数据库表就像在操做对象。
完整的ORM应当能够彻底避免SQL拼接api

为何须要ORM

当时分享完毕以后,也确实不少同事表示仍是喜欢裸SQL,我后来也又在工做中看到了不少遗留代码的问题。我也正好趁浴室迷思 想了一下,为何我须要ORM呢?缓存

第一条来自一个定理:安全

一切由人直接来保证安全性的系统,就必定会出错

拼接SQL、把SQL作成模板、开始使用ORM、封装出DAO层,几乎是每一个项目的共识吧?
过往的项目中,由我第一手写的,都会第一时间加入ORM,毕竟也只是两三个小文件,一百行之内的事情(后续因为封装的增多,可能会到达数百行)

这段时间在写旧系统的小规模重构(定理2:一个好程序猿应当友好地帮前人擦好屁股,而不是靠从新制造一个新屁股实现),拼接字符串并无带来任何优势,反而引入了很是简单的注入漏洞,简单的设想这样一个列表API的场景:

  1. 根据请求参数控制对应的:过滤条件、排序方法、翻页
  2. 根据须要预取关联的表,JOIN并把对一对多的关系化为一个list

第一条,刚一上手,就发现满地的string format,翻页用了:

order_sql = "ORDER BY {} {}".format(order_by,direction)

毫无疑问的order_by=id%3Bselect+1%3B-- 就直接注入了

要解决这些在SQL拼接的问题,除了表单验证,毫无疑问须要作一个SQL字符转义,另外在能用SQL参数的地方,须要用参数(而后也得注意拼接时候参数的个数,是的,这里咱们的接口有另外一个BUG,参数数量没数对)

第二个功能点,想象一下在须要的地方额外加一句LEFT JOIN,而后对结果再作额外的解析

还有一些附属功能:单元测试如何建表?代码里遍地的硬编码表名如何解决?


本身不是不能实现,但本身来实现这些,就走上了发明ORM的老路,用一个成熟的、文档丰富的ORM,岂不美哉?

横向对比

简单的挑了三个

(知乎的表格彷佛智障,不插入表格了)

SQLAlchemy、Peewee、Django ORM

Django ORM一直就不是一个全功能的ORM,会发现你想写的SQL几乎没法经过ORM写出来,固然raw属于tan90,使用裸SQL不在咱们的考虑范围。Django 1.12后提供了一些subquery等各种丰富SQL操做,但这么新,估计还极少项目在这么新的版本

Peewee若是有兴趣能够后续继续使用来感觉一下,Peewee也是一个功能全面的ORM,star不少但开发没有SQLAlchemy活跃

 

核心概念与入门

官方文档
我老是在想为何团队里不少人会以为SQLAlchemy入门门槛高,我曾经也被困扰过,但回头一看会发现的概念实质比较简单。
官方文档的脉络不太清晰,要扫过一遍而且学以至用才能感觉获得。example很友好的!


回过头来看它的从教程到API的文档,会发现它的文档很是详细,学会它,除了学会了Python操做SQL的一个库,一样也能够学到从代码组织、各种Pythonic技巧到思想的不少东西


总的感觉是:上手还算容易,精通要花不少功夫,但确实还挺有趣的

先放一个表,待会咱们会继续讲

(再次损失一个表)

 

概念不多,而且很清晰,理解这些概念以后的后续使用时,基本能够感觉到:你能直觉想到的操做,还确实都有(好比subquery、复杂查询的构造)

 

模型定义

咱们来看看他如何完成模型定义:

# coding=utf-8 from __future__ import unicode_literals, absolute_import from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, DateTime ModelBase = declarative_base() #<-元类 class User(ModelBase): __tablename__ = "auth_user" id = Column(Integer, primary_key=True) date_joined = Column(DateTime) username = Column(String(length=30)) password = Column(String(length=128)) 

从这里能够看到,模型定义甚至与数据库是无关的,因此容许不一样的数据库后端,不一样类型拥有不一样的表现形式和建表语句

这里咱们能够看到它实现了 ORM与数据库链接的解耦,一些数据库后端不支持的数据类型,例如Numeric类型,在sqlite中不支持,不过SQLAlchemy也能作一些兼容使用普通浮点

Model 等同于数据库的一张表
Column 显然就是这张表的一列

PS: SQLAlchemy 1.2以后才支持comment注释,以在ddl产生建表SQL时写上comment属性,1.2还在beta版里,因此还不能用。。。我倒很好奇为毛这个feature这么不重要

with get_session() as session:
    session.add(User(username="asd", password="asd"))
    session.add(User(username="qwe", password="qwe"))
    session.commit()

session(会话)的概念,能够当作一个管理数据库持久链接的对象,在此下面是彻底透明的链接池和事务等东西

get_session底下configure能够控制auto_commit参数,= False时写操做默认都不放在事务里,SQLAlchemy默认为True

session.add函数将会把Model加入当前的持久空间(能够从session.dirty看到),直到commit时更新

with get_session() as session:
    # <class 'sqlalchemy.orm.query.Query'>
    session.query(User)

最简单的这个查询返回了一个Query对象
须要注意的是,这里只构造Query,事实上并无发送至数据库进行查询,只会在Query.get()、Query.all()、Query.one()以及Query.__iter__等具备“执行”语义的函数,才会真的去获取

Query :本质上是数据表的若干行

  1. 在查询状况的下,等同于SQL 中的 SELECT Syntax
  2. 在update函数的操做时,能够根据参数选择等同于直接UPDATE users SET xxx WHERE name=xxx或者先用SELECT 选出ID,再循环用UPDATE xxx WHERE id=xxx
  3. delete同上

以SQLAlchemy为表明的ORM基本都支持链式操做。
形如:

with get_session() as session:
    # <class 'sqlalchemy.orm.query.Query'>
    query = (session
             .query(User)
             .filter(User.username == "asd")
             .filter_by(username="asd")
             #上面两个都是添加where
             .join(Addreess)#使用ForeignKey
             .join(Addreess,Addreess.user_id==User.id)#使用显式声明
             .limit(10)
             .offset(0)
             )

全部Query支持的详情见Query API文档

上面也涉及到一个特别有意思的filter函数:User.username == "asd" ,其实是SQLAlchemy重载了Column上的各类运算符 __eq__、__ge__,返回了一个BinaryExpression对象,看起来就更加符合直觉上的语义

复杂查询

基于Query的subquery

with get_session() as session:
    # <class 'sqlalchemy.orm.query.Query'>
    query = (session
            .query(User.id)
            .filter(User.username == "asd")
            .filter_by(username="asd")
            .limit(10)
            )
    subquery = query.subquery()
    query2 = session.query(User).filter(
        User.id.in_(subquery)
    )
    print query2#<-打印展开成的SQL,此处没有SQL查询

理解了Query、Column的概念,也很容易自行构造出这样的SQL

全部在Column级别上的使用 详见Column API文档

上面咱们提到了直接对Query进行的删除:

with get_session() as session:
    query = (session
             .query(User)
             .filter(User.username == "asd")
             .filter_by(username="asd")
             .join(Addreess)
             .join(Addreess,Addreess.user_id=User.id)
             .limit(10)
             .delete()#<-这里
             )

另外,由于Model也能够被放进session里,而后删除的,和插入是个反向操做:

with get_session() as session:
    instance = session.query(User).get(1)
    session.delete(instance)
    #下一句执行:DELETE FROM auth_user WHERE auth_user.id = ?
    session.commit()

改首先是上述Query中所说的update方法:

with get_session() as session:
            # get by id
    query = (session
             .query(User)
             .filter_by(id=1)
             .update({"username": 
                     User.username + "a"},
                     synchronize_session=False)
                     )

而后是在Model级别的方法:

with get_session() as session:
            # get by id
    user = (session
            .query(User)
            .get(1)
            )
    user.password = "zxcv"
    # UPDATE auth_user SET password=?
    # WHERE auth_user.id = ?
    session.commit()

在对Model的属性进行修改的时候,session会获得修改对应的内容,下次commit即会提交SQL
这里留个思考题:若是对一、同一对象的同一属性进行修改,二、同一对象的不一样属性进行修改 ,最终会有几个SQL被发出? 若是你来实现这样的功能,你会从哪里下手?

基础性能

SQLAlchemy性能

比较了十万条记录插入的性能

 

另外不要以为比sqlite 裸SQL慢三倍很慢,注意这个量级,实际项目中会发现慢查询、不规范操做(例如for循环里放查询)的危害比引入ORM的这点开销打多了

 

总结

到这再贴上面那个概念表,应该就能比较好的理解了

在用裸SQL能够解决的场景下,上述的SQLAlchemy入门部分就足以掌控场景,完成全部的增删查改API需求(甚至自动生成代码的需求),自动生成真是偷懒无止境。。不过发明新的DSL嘛,能不作就不作。。

 

扩展与进阶

从过往的经验来看,SQLAlchemy以优雅的直觉实现了诸多接口,并保留了良好的可扩展性,这里抛砖引玉一些有趣的特性

事件

应用层的触发器(trigger),支持:

  1. ConnectionEvents 包括Connection和Engine(链接后进行一些自检操做)
  2. DDLEvents 模型增删查改事件
  3. DialectEvents 不一样种类的数据库的事件
  4. PoolEvents 链接池事件,链接的检出和回收等

上面的性能测试里就使用了两种事件

from sqlalchemy import event
from sqlalchemy.engine import Engine
import time
import logging

logging.basicConfig()
logger = logging.getLogger("myapp.sqltime")
logger.setLevel(logging.DEBUG)

@event.listens_for(Engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement,
                        parameters, context, executemany):
    conn.info.setdefault('query_start_time', []).append(time.time())
    logger.debug("Start Query: %s", statement)

@event.listens_for(Engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement,
                        parameters, context, executemany):
    total = time.time() - conn.info['query_start_time'].pop(-1)
    logger.debug("Query Complete!")
    logger.debug("Total Time: %f", total)

反射

现有项目或者别人的代码里若是已经用其余的方式写好了表定义,不想再定义Model了,想用SQLAlchemy直接使用对应的数据库表
查文档关键字:Automap

from sqlalchemy.ext.automap import automap_base
from sqlalchemy.orm import Session
from sqlalchemy import create_engine

Base = automap_base()

# engine, suppose it has two tables 'user' and 'address' set up
engine = create_engine("sqlite:///mydatabase.db")

# reflect the tables
Base.prepare(engine, reflect=True)
tables = Base.classes#<-load tables

User = Base.classes.user
Address = Base.classes.address
# rudimentary relationships are produced
session.add(Address(email_address="foo@bar.com", user=User(name="foo")))
session.commit()

# collection-based relationships are by default named
# "<classname>_collection"
print (u1.address_collection)

扩展阅读:DeferredReflection

我以前在一些OLAP应用 用来作数据分析时用到过。。

Python3.x asyncio扩展

16年12月 Python3.6进入稳按期,同时也标志着Python3.4和3.5中的asyncio模块进入稳按期

SQLAlchemy对asyncio的支持在于,它实质上能够在engine层进行扩展,同时扩展Engine、Connection、Transaction、Context 代码量约400行

Strategies for creating new instances of Engine types. These are semi-private implementation classes which provide the underlying behavior for the "strategy" keyword argument available on :func:~sqlalchemy.engine.create_engine. Current available options are plain, threadlocal, and mock. New strategies can be added via new EngineStrategy classes. """

形如:

from sqlalchemy.engine.strategies import DefaultEngineStrategy
from .engine import AsyncioEngine
ASYNCIO_STRATEGY = '_asyncio'

class AsyncioEngineStrategy(DefaultEngineStrategy):
    name = ASYNCIO_STRATEGY
    engine_cls = AsyncioEngine
AsyncioEngineStrategy()  


async def main():
    engine = create_engine(
        # In-memory sqlite database cannot be accessed from different
        # threads, use file.
        'sqlite:///test.db', strategy=ASYNCIO_STRATEGY
    )

    metadata = MetaData()
    users = Table(
        'users', metadata,
        Column('id', Integer, primary_key=True),
        Column('name', Text),
    )

    # Create the table
    await engine.execute(CreateTable(users))

    conn = await engine.connect()

另外提一嘴的是:asyncio不是银弹,会致使应用层压力直接传给DB,会掩盖应用的SQL写的烂的问题

分片Session

读写分离是当数据库压力到达必定阶段时,由应用层进行的拆分数据库压力的措施
实现一种主从分离的Session:

  1. 最简单的方案是直接扩展Session类get_bind方法

get_bind(mapper=None, clause=None)
Return a “bind” to which this Session is bound. Note that the “mapper” argument is usually present when Session.get_bind() is called via an ORM operation such as a Session.query(), each individual INSERT/UPDATE/DELETE operation within a Session.flush(), call, etc.

  1. 也可使用sqlalchemy.ext.horizontal_shard模块中已经实现好的ShardedSession

Parameters:

  • shard_chooser – A callable which, passed a Mapper, a mapped instance, and possibly a SQL clause, returns a shard ID. This id may be based off of the attributes present within the object, or on some round-robin scheme.
  • id_chooser – A callable, passed a query and a tuple of identity values, which should return a list of shard ids where the ID might reside.
  • query_chooser – For a given Query, returns the list of shard_ids where the query should be issued.
  • shards – A dictionary of string shard names to Engine objects.

容许根据model或者SQL条件、ID选择具体的数据库链接。一个未经验证的脑洞:由于shards是Engine的dict,那么是否容许在异构数据库之间使用Shard?这样会带来什么样的优缺点?

自定义的列类型

好久好久之前作的功能了,想象一个这样的场景:

  • Postgresql支持IP/CIDR的存储,本质上是使用4*8bit=32bit的int存储
  • Mysql此时并无这样简单的IP存储 如何对其进行扩展?

自定义实现的列类型实质上须要:

  1. 指定在某种数据库方言下的存储类型,例如Mysql下使用int
  2. 实现两个方法:从数据库中取出来一个python对象和把Python对象放入数据库
  3. 按需须要实现:支持一些操做符(例如==,in_)
from sqlalchemy import types
class MyIPType(types.TypeDecorator):
    impl = types.Integer


    def process_bind_param(self, value, dialect):
        #from python to database
        if dialect=="mysql":
            pass
        return #....

    def process_result_value(self, value, dialect):
        #from database to python object
        return #...

咱们也能够在awesome-sqlalchemy中找到一些有趣的类型扩展

混合(hybrid)属性

咱们常见使用Python的property修饰器来构造一个复杂属性,SQLAlchemy中,这个混合属性的做用也相似,不只能够用于得到对应的值,也能够用于Query时的链式操做

定义一个Model后,能够在各种增删查改中用到这个混合属性。混合属性 混合在:既是一个Python属性,也是一个能够放入数据库查询的属性

class Interval(Base): __tablename__ = 'interval' id = Column(Integer, primary_key=True) start = Column(Integer, nullable=False) end = Column(Integer, nullable=False) def __init__(self, start, end): self.start = start self.end = end @hybrid_property def length(self): return self.end - self.start #下面这个写着玩的。。 @length.setter def length(self, value): self._value = value >>> i1 = Interval(5, 10) >>> i1.length 5 >>> print Session().query(Interval).filter_by(length=5) SELECT interval.id AS interval_id, interval.start AS interval_start, interval."end" AS interval_end FROM interval WHERE interval."end" - interval.start = :param_1 

上述还有一个写着玩儿的setter,hybrid_property支持:

  1. comparator 扩展Interval.length在各类比较符(><=)的行为
  2. deleter/setter 顾名思义
  3. expression 能够扩展最后展开的SQL表达式,例如展开成SUM(xxx):
from sqlalchemy.orm import func
    #下面这个写着玩的。。
    @length.expression
    def length(self, expr):
        return func.sum(self.end, expr)

序列化Query

提供一个接口,以序列化和反序列化Query,用于跨系统、微服务的场景

from sqlalchemy.ext.serializer import loads, dumps
metadata = MetaData(bind=some_engine)
Session = scoped_session(sessionmaker())

# ... define mappers

query = Session.query(User).
    filter(User.somedata=='foo').order_by(User.sortkey)

# pickle the query
serialized = dumps(query)

# unpickle.  Pass in metadata + scoped_session 
# 上面提到过的 query和Session其实是密不可分的
query2 = loads(serialized, metadata, Session)

print query2.all()

这个作起来其实就很是带感了,微服务之间的必要条件就是各类dump,结合一下celery,实现一个去中心的HTTP服务也是不在话下

Baked Query

缓存从Query生成的SQL,以减小生成时间,其实是个应用层面的存储过程、View

from sqlalchemy.ext import baked bakery = baked.bakery()#<-建立了一个LRU from sqlalchemy import bindparam def search_for_user(session, username, email=None): baked_query = bakery(lambda session: session.query(User)) baked_query += lambda q: q.filter(User.name == bindparam('username')) baked_query += lambda q: q.order_by(User.id) if email: baked_query += lambda q: q.filter(User.email == bindparam('email')) result = baked_query(session).params(username=username, email=email).all() return result

上面说到了SQLAlchemy展开成SQL的性能问题,真的特别担心的话,再来一个缓存绑定参数如何?

多态和关系

使用多个模型,但实际上只是操做一张数据库表 此处基本略,以前写过一篇文章了:这儿

class Employee(Base):  
    __tablename__ = 'employee'
    id = Column(Integer, primary_key=True)
    name = Column(String(50))
    type = Column(String(50))

    __mapper_args__ = {
        'polymorphic_identity':'employee',
        'polymorphic_on':type
    }

这里定义了雇员Employee 模型,指定type字段为多态所在字段,而且对于这个模型,当type字段为'employee'时,即为一个雇员

一对1、一对多、多对多的关系和自动收集成collection,这里不会细说,relationship函数的各类参数留待你们游玩。

关系间的收集有多种lazy方式,能够选择在父类读取时直接JOIN或者Subquery,也能够在须要的时候使用Query.option设置。提及来的篇幅会更长,我投个懒,你们去读文档吧~hfgl

相关文章
相关标签/搜索