Essential_SQLAlchemy2th学习笔记之Core模块

SQL Expression Language对原生SQL语言进行了简单的封装
两大模块SQLAlchemy Core and ORM:html

  • Core:提供执行SQL Expression Language的接口python

  • ORMmysql

安装:SQLAlchemy及相关数据库驱动
pip install sqlalchemy pymysql sql

链接到数据库

数据库链接字符串格式:请参考这里数据库

mysql://username:password@hostname/database
postgresql://username:password@hostname/database
sqlite:////absolute/path/to/database
oracle://scott:tiger@127.0.0.1:1521/orcl

好比SQLite以下:json

from sqlalchemy import create_engine
engine = create_engine('sqlite:///cookies.db')
engine2 = create_engine('sqlite:///:memory:')
engine3 = create_engine('sqlite:////home/cookiemonster/cookies.db')
engine4 = create_engine('sqlite:///c:\\Users\\cookiemonster\\cookies.db')

注意:create_engine函数返回以一个engine实例,可是不会当即获取数据库链接,直到在engine上进行操做如查询时才会去获取connection安全

关于MySQL空闲链接8小时自动关闭的解决方案:传入 pool_recycle=3600参数cookie

from sqlalchemy import create_engine
engine = create_engine('mysql+pymysql://cookiemonster:chocolatechip@mysql01.monster.internal/cookies', pool_recycle=3600)

create_engine其他的一些参数:oracle

  • echo:是否log打印执行的sql语句及其参数。默认为False函数

  • encoding:默认utf-8

  • isolation_level:隔离级别

  • pool_recycle:指定链接回收间隔,这对于MySQL链接的8小时机制特别重要。默认-1

获取链接

from sqlalchemy import create_engine
engine = create_engine('mysql+pymysql://cookiemonster:chocolatechip' \
'@mysql01.monster.internal/cookies', pool_recycle=3600)
connection = engine.connect()

Schema and Types

四种类型集合:
• Generic
• SQL standard
• Vendor specific
• User defined

SQLAlchemy定义了不少generic types以兼容不一样数据库。这些类型都定义在sqlalchemy.types模块中,为了方便也能够从sqlalchemy直接导入这些类型。
类型对应表以下:

SQLAlchemy Python SQL
BigInteger int BIGINT
Boolean bool BOOLEAN or SMALLINT
Date datetime.date DATE (SQLite: STRING)
DateTime datetime.datetime DATETIME (SQLite: STRING)
Enum str ENUM or VARCHAR
Float float or Decimal FLOAT or REAL
Integer int INTEGER
Interval datetime.timedelta INTERVAL or DATE from epoch
LargeBinary byte BLOB or BYTEA
Numeric decimal.Decimal NUMERIC or DECIMAL
Unicode unicode UNICODE or VARCHAR
Text str CLOB or TEXT
Time datetime.time DATETIME

若是这些类型不能知足你,好比有些数据库支持json类型,那么你须要用到sqlalchemy.dialects模块中对应数据库的类型。好比from sqlalchemy.dialects.postgresql import JSON

Metadata & Table & Column

Metadata为了快速访问数据库。能够看做是不少Table对象的集合,还有一些关于engin,connection的信息。能够经过MetaData.tables访问这些表对象字典
定义表对象以前须要先实例化Metadata:

from sqlalchemy import MetaData
metadata = MetaData()

Table对象构建以下:第一个参数为名称,第二个参数为Metadata对象,后续参数为Column对象. Column对象参数为,名称,类型,及其他等

from sqlalchemy import Table, Column, Integer, Numeric, String, ForeignKey
cookies = Table('cookies', metadata,
Column('cookie_id', Integer(), primary_key=True),
Column('cookie_name', String(50), index=True),
Column('cookie_recipe_url', String(255)),
Column('cookie_sku', String(55)),
Column('quantity', Integer()),
Column('unit_cost', Numeric(12, 2))
)
from datetime import datetime
from sqlalchemy import DateTime
users = Table('users', metadata,
Column('user_id', Integer(), primary_key=True),
Column('username', String(15), nullable=False, unique=True),
Column('email_address', String(255), nullable=False),
Column('phone', String(20), nullable=False),
Column('password', String(25), nullable=False),
Column('created_on', DateTime(), default=datetime.now),
Column('updated_on', DateTime(), default=datetime.now, onupdate=datetime.now)

注意:这里default,onupdate属性是一个callable对象而不是直接值,好比datetime.now(),由于这样的话,就永远是这个值,而不是每一个实例实例化、更新时的时间了。
比较有用的就是onupdate,每次更新时都会调用该方法或函数。

键和约束(Keys and Constraints)
键和约束既能够像上面那样经过kwargs定义在Column中,也能够在以后经过对象添加。相关类定义在基础的 sqlalchemy模块中,好比最经常使用的三个:
from sqlalchemy import PrimaryKeyConstraint, UniqueConstraint, CheckConstraint

PrimaryKeyConstraint('user_id', name='user_pk'),它也支持同时定义多个造成联合主键。
UniqueConstraint('username', name='uix_username')
CheckConstraint('unit_cost >= 0.00', name='unit_cost_positive')

索引(Index)

from sqlalchemy import Index
Index('ix_cookies_cookie_name', 'cookie_name')

这个定义须要放置在Table构造器中。也能够在以后定义,好比
Index('ix_test', mytable.c.cookie_sku, mytable.c.cookie_name))

关联关系和外键约束(Relationships and ForeignKeyConstraints)

from sqlalchemy import ForeignKey
orders = Table('orders', metadata,
Column('order_id', Integer(), primary_key=True),
Column('user_id', ForeignKey('users.user_id')),
Column('shipped', Boolean(), default=False)
)
line_items = Table('line_items', metadata,
Column('line_items_id', Integer(), primary_key=True),
Column('order_id', ForeignKey('orders.order_id')),
Column('cookie_id', ForeignKey('cookies.cookie_id')),
Column('quantity', Integer()),
Column('extended_cost', Numeric(12, 2))
)

注意:这里ForeignKey用的是字符串参数(这些字符串对应的是数据库中的表名.列名),而非引用。这样隔离了模块间相互依赖
咱们也可使用:
ForeignKeyConstraint(['order_id'], ['orders.order_id'])

建立或持久化表模式(Persisting the Tables)
经过示例代码咱们知道全部的Table定义,以及额外的模式定义都会与一个metadata对象关联。咱们能够经过这个metadata对象来建立表:

metadata.create_all(engine)

注意:默认状况下create_all不会从新建立已有表,因此它能够安全地屡次调用,并且也很是友好地与数据库迁移库如Ablembic集成而不须要你进行额外手动编码。

本节代码完整以下:

from datetime import datetime
from sqlalchemy import (MetaData, Table, Column, Integer, Numeric, String,
DateTime, ForeignKey, create_engine)
metadata = MetaData()
cookies = Table('cookies', metadata,
Column('cookie_id', Integer(), primary_key=True),
Column('cookie_name', String(50), index=True),
Column('cookie_recipe_url', String(255)),
Column('cookie_sku', String(55)),
Column('quantity', Integer()),
Column('unit_cost', Numeric(12, 2))
)
users = Table('users', metadata,
Column('user_id', Integer(), primary_key=True),
Column('customer_number', Integer(), autoincrement=True),
Column('username', String(15), nullable=False, unique=True),
Column('email_address', String(255), nullable=False),
Column('phone', String(20), nullable=False),
Column('password', String(25), nullable=False),
Column('created_on', DateTime(), default=datetime.now),
Column('updated_on', DateTime(), default=datetime.now, onupdate=datetime.now)
)
orders = Table('orders', metadata,
Column('order_id', Integer(), primary_key=True),
Column('user_id', ForeignKey('users.user_id'))
)
line_items = Table('line_items', metadata,
Column('line_items_id', Integer(), primary_key=True),
Column('order_id', ForeignKey('orders.order_id')),
Column('cookie_id', ForeignKey('cookies.cookie_id')),
Column('quantity', Integer()),
Column('extended_cost', Numeric(12, 2))
)
engine = create_engine('sqlite:///:memory:')
metadata.create_all(engine)

SQLAlchemy-Core模块

插入数据:

ins = cookies.insert().values(
cookie_name="chocolate chip",
cookie_recipe_url="http://some.aweso.me/cookie/recipe.html",
cookie_sku="CC01",
quantity="12",
unit_cost="0.50"
)
print(str(ins))

固然你也能够这么作:

from sqlalchemy import insert
ins = insert(cookies).values(
cookie_name="chocolate chip",
cookie_recipe_url="http://some.aweso.me/cookie/recipe.html",
cookie_sku="CC01",
quantity="12",
unit_cost="0.50"
)

上述编译成预编译语句以下:

INSERT INTO cookies
(cookie_name, cookie_recipe_url, cookie_sku, quantity, unit_cost)
VALUES
(:cookie_name, :cookie_recipe_url, :cookie_sku, :quantity, :unit_cost)

实际过程会是以下ins对象内部会调用compile()方法编译成上述语句,而后将参数存储到ins.compile().params字典中。
接下来咱们经过前面获取的connection对象执行statement:

result = connection.execute(ins)

固然你也能够这么查询:

ins = cookies.insert()
result = connection.execute(
ins,
cookie_name='dark chocolate chip',
cookie_recipe_url='http://some.aweso.me/cookie/recipe_dark.html',
cookie_sku='CC02',
quantity='1',
unit_cost='0.75'
)
result.inserted_primary_key

批量插入:

inventory_list = [
{
'cookie_name': 'peanut butter',
'cookie_recipe_url': 'http://some.aweso.me/cookie/peanut.html',
'cookie_sku': 'PB01',
'quantity': '24',
'unit_cost': '0.25'
},
{
'cookie_name': 'oatmeal raisin',
'cookie_recipe_url': 'http://some.okay.me/cookie/raisin.html',
'cookie_sku': 'EWW01',
'quantity': '100',
'unit_cost': '1.00'
}
]
result = connection.execute(ins, inventory_list)

注意:必定要确保全部字典参数拥有相同的keys

查询

from sqlalchemy.sql import select
s = select([cookies])
rp = connection.execute(s)
results = rp.fetchall()

固然咱们也可使用字符串来代替:

s = select("""SELECT cookies.cookie_id, cookies.cookie_name,
cookies.cookie_recipe_url, cookies.cookie_sku, cookies.quantity,
cookies.unit_cost FROM cookies""")

connection.execute返回的rp变量是一个ResultProxy对象(它是DBAPI中cursor对象的封装)。

咱们也能够这样写:

from sqlalchemy.sql import select
s = cookies.select()
rp = connection.execute(s)
results = rp.fetchall()

ResultProxy使得查询结果能够经过index,name,or Column object访问列数据。例如:

first_row = results[0]
first_row[1] #游标列索引从1开始,by index
first_row.cookie_name # by name
first_row[cookies.c.cookie_name] #by Column object.

你也能够迭代ResultProxy,以下:

rp = connection.execute(s)
for record in rp:
print(record.cookie_name)

ResultProxy其他可用来获取结果集的方法

  • first()

  • fetchone()

  • fetchall()

  • scalar():Returns a single value if a query results in a single record with one column.

  • keys() 获取列名

关于选择ResultProxy上述的方法的建议:
一、使用first()而不是fetchone()来获取单条记录,由于fetchone()调用以后仍然保留着打开的connections共后续使用,若是不当心的话很容易引发问题。
二、使用迭代方式获取全部结果,而不是fetchall(),更加省内存。
三、使用scalar()获取单行单列结果时须要注意,若是返回多于一行,它会抛出异常。

控制返回列的数目

s = select([cookies.c.cookie_name, cookies.c.quantity])
rp = connection.execute(s)
print(rp.keys())
result = rp.first()

排序

s = select([cookies.c.cookie_name, cookies.c.quantity])
s = s.order_by(cookies.c.quantity)
rp = connection.execute(s)
for cookie in rp:
print('{} - {}'.format(cookie.quantity, cookie.cookie_name))

#倒序desc
from sqlalchemy import desc
s = select([cookies.c.cookie_name, cookies.c.quantity])
s = s.order_by(desc(cookies.c.quantity))

限制返回结果集的条数

s = select([cookies.c.cookie_name, cookies.c.quantity])
s = s.order_by(cookies.c.quantity)
s = s.limit(2)
rp = connection.execute(s)
print([result.cookie_name for result in rp])

内置SQL函数

在sqlalchemy.sql.func模块中

#sum
from sqlalchemy.sql import func
s = select([func.sum(cookies.c.quantity)])
rp = connection.execute(s)
print(rp.scalar())

#count
s = select([func.count(cookies.c.cookie_name)])
rp = connection.execute(s)
record = rp.first()
print(record.keys())
print(record.count_1) #字段名是自动生成的,<func_name>_<position>,能够设置别名的,看下面

#设置别名
s = select([func.count(cookies.c.cookie_name).label('inventory_count')])
rp = connection.execute(s)
record = rp.first()
print(record.keys())
print(record.inventory_count)

过滤

#where
s = select([cookies]).where(cookies.c.cookie_name == 'chocolate chip')
rp = connection.execute(s)
record = rp.first()
print(record.items()) #调用row对象的items()方法。

#like
s = select([cookies]).where(cookies.c.cookie_name.like('%chocolate%'))
rp = connection.execute(s)
for record in rp.fetchall():
    print(record.cookie_name)

能够在where中使用的子句元素

  • between(cleft, cright)

  • concat(column_two) Concatenate column with column_two

  • distinct()

  • in_([list])

  • is_(None) Find where the column is None (commonly used for Null checks with None)

  • contains(string) Find where the column has string in it (case-sensitive)

  • endswith(string) Find where the column ends with string (case-sensitive)

  • like(string) Find where the column is like string (case-sensitive)

  • startswith(string) Find where the column begins with string (case-sensitive)

  • ilike(string) Find where the column is like string (this is not case-sensitive)

固然还包括一系列的notxxx方法,好比notin_(),惟一的例外是isnot()

操做符

  • +,-,*,/,%

  • ==,!=,<,>,<=,>=

  • AND,OR,NOT,因为python关键字的缘由,使用and_(),or_(),not_()来代替

+号还能够用于字符串拼接:

s = select([cookies.c.cookie_name, 'SKU-' + cookies.c.cookie_sku])
for row in connection.execute(s):
print(row)
from sqlalchemy import cast
s = select([cookies.c.cookie_name,
    cast((cookies.c.quantity * cookies.c.unit_cost),
        Numeric(12,2)).label('inv_cost')])
for row in connection.execute(s):
    print('{} - {}'.format(row.cookie_name, row.inv_cost))

注意:cast是另一个函数,容许咱们进行类型转换,上述转换是将数字转换为货币形式,和
print('{} - {:.2f}'.format(row.cookie_name, row.inv_cost)).这个行为一致。

from sqlalchemy import and_, or_, not_
s = select([cookies]).where(
    and_(
        cookies.c.quantity > 23,
        cookies.c.unit_cost < 0.40
    )
)
for row in connection.execute(s):
    print(row.cookie_name)


from sqlalchemy import and_, or_, not_
s = select([cookies]).where(
    or_(
        cookies.c.quantity.between(10, 50),
        cookies.c.cookie_name.contains('chip')
    )
)
for row in connection.execute(s):
    print(row.cookie_name)

update

from sqlalchemy import update
u = update(cookies).where(cookies.c.cookie_name == "chocolate chip")
u = u.values(quantity=(cookies.c.quantity + 120))
result = connection.execute(u)
print(result.rowcount)
s = select([cookies]).where(cookies.c.cookie_name == "chocolate chip")
result = connection.execute(s).first()
for key in result.keys():
    print('{:>20}: {}'.format(key, result[key]))

delete

from sqlalchemy import delete
u = delete(cookies).where(cookies.c.cookie_name == "dark chocolate chip")
result = connection.execute(u)
print(result.rowcount)

s = select([cookies]).where(cookies.c.cookie_name == "dark chocolate chip")
result = connection.execute(s).fetchall()
print(len(result))

joins

join(),outerjoin()函数,select_from()函数

columns = [orders.c.order_id, users.c.username, users.c.phone,
           cookies.c.cookie_name, line_items.c.quantity,
           line_items.c.extended_cost]
cookiemon_orders = select(columns)
cookiemon_orders = cookiemon_orders.select_from(orders.join(users).join(
    line_items).join(cookies)).where(users.c.username ==
                                     'cookiemon')
result = connection.execute(cookiemon_orders).fetchall()
for row in result:
    print(row)

最终产生的SQL语句以下:

SELECT orders.order_id, users.username, users.phone, cookies.cookie_name,
line_items.quantity, line_items.extended_cost FROM users JOIN orders ON
users.user_id = orders.user_id JOIN line_items ON orders.order_id =
line_items.order_id JOIN cookies ON cookies.cookie_id = line_items.cookie_id
WHERE users.username = :username_1

outerjoin

columns = [users.c.username, func.count(orders.c.order_id)]
all_orders = select(columns)
all_orders = all_orders.select_from(users.outerjoin(orders))
all_orders = all_orders.group_by(users.c.username)
result = connection.execute(all_orders).fetchall()
for row in result:
    print(row)

表别名函数alias()

>>> manager = employee_table.alias('mgr')
>>> stmt = select([employee_table.c.name],
            ... and_(employee_table.c.manager_id==manager.c.id,
            ... manager.c.name=='Fred'))
>>> print(stmt)
SELECT employee.name
FROM employee, employee AS mgr
WHERE employee.manager_id = mgr.id AND mgr.name = ?

分组

columns = [users.c.username, func.count(orders.c.order_id)]
all_orders = select(columns)
all_orders = all_orders.select_from(users.outerjoin(orders))
all_orders = all_orders.group_by(users.c.username)
result = connection.execute(all_orders).fetchall()
for row in result:
    print(row)

chaining

def get_orders_by_customer(cust_name, shipped=None, details=False):
    columns = [orders.c.order_id, users.c.username, users.c.phone]
    joins = users.join(orders)
    if details:
        columns.extend([cookies.c.cookie_name, line_items.c.quantity,
            line_items.c.extended_cost])
        joins = joins.join(line_items).join(cookies)
    cust_orders = select(columns)
    cust_orders = cust_orders.select_from(joins)

    cust_orders = cust_orders.where(users.c.username == cust_name)
    if shipped is not None:
        cust_orders = cust_orders.where(orders.c.shipped == shipped)
    result = connection.execute(cust_orders).fetchall()
    return result

执行原生SQL

返回的仍是ResultProxy对象
一、彻底采用原始SQL

result = connection.execute("select * from orders").fetchall()
print(result)

二、部分采用原始SQL,text()函数

from sqlalchemy import text
stmt = select([users]).where(text("username='cookiemon'"))
print(connection.execute(stmt).fetchall())

异常

SQLALchemy定义了不少异常。咱们经过关心:AttributeErrors,IntegrityErrors.等
为了进行相关试验与说明,请先执行下面这些语句

from datetime import datetime
from sqlalchemy import (MetaData, Table, Column, Integer, Numeric, String,
                        DateTime, ForeignKey, Boolean, create_engine,
                        CheckConstraint)
metadata = MetaData()
cookies = Table('cookies', metadata,
                Column('cookie_id', Integer(), primary_key=True),
                37
                Column('cookie_name', String(50), index=True),
                Column('cookie_recipe_url', String(255)),
                Column('cookie_sku', String(55)),
                Column('quantity', Integer()),
                Column('unit_cost', Numeric(12, 2)),
                CheckConstraint('quantity > 0', name='quantity_positive')
                )
users = Table('users', metadata,
              Column('user_id', Integer(), primary_key=True),
              Column('username', String(15), nullable=False, unique=True),
              Column('email_address', String(255), nullable=False),
              Column('phone', String(20), nullable=False),
              Column('password', String(25), nullable=False),
              Column('created_on', DateTime(), default=datetime.now),
              Column('updated_on', DateTime(),
                     default=datetime.now, onupdate=datetime.now)
              )
orders = Table('orders', metadata,
               Column('order_id', Integer()),
               Column('user_id', ForeignKey('users.user_id')),
               Column('shipped', Boolean(), default=False)
               )
line_items = Table('line_items', metadata,
                   Column('line_items_id', Integer(), primary_key=True),
                   Column('order_id', ForeignKey('orders.order_id')),
                   Column('cookie_id', ForeignKey('cookies.cookie_id')),
                   Column('quantity', Integer()),
                   Column('extended_cost', Numeric(12, 2))
                   )
engine = create_engine('sqlite:///:memory:')
metadata.create_all(engine)
connection = engine.connect()
from sqlalchemy import select, insert
ins = insert(users).values(
username="cookiemon",
email_address="mon@cookie.com",
phone="111-111-1111",
password="password"
)
result = connection.execute(ins)
s = select([users.c.username])
results = connection.execute(s)
for result in results:
print(result.username)
print(result.password) #此处包AttributeError异常

在违反约束的状况下会出现IntegrityError异常。好比违反惟一性约束等。

s = select([users.c.username])
connection.execute(s).fetchall()
[(u'cookiemon',)]
ins = insert(users).values(
    username="cookiemon",
    email_address="damon@cookie.com",
    phone="111-111-1111",
    password="password"
)
result = connection.execute(ins) #此处报IntegrityError, UNIQUE constraint failed: users.username
#异常处理
try:
    result = connection.execute(ins)
except IntegrityError as error:
    print(error.orig.message, error.params)

全部的SQLAlchemy异常处理方式都是上面那种思路,经过[SQLAlchemyError](http://docs.sqlal
chemy.org/en/latest/core/exceptions.html)能够获取到的信息由以下:

  • orig :The DBAPI exception object.

  • params:The parameter list being used when this exception occurred.

  • statement :The string SQL statement being invoked when this exception occurred.

事务Transactions

from sqlalchemy.exc import IntegrityError


def ship_it(order_id):
    s = select([line_items.c.cookie_id, line_items.c.quantity])
    s = s.where(line_items.c.order_id == order_id)
    transaction = connection.begin() #开启事务
    cookies_to_ship = connection.execute(s).fetchall()
    try:
        for cookie in cookies_to_ship:
            u = update(cookies).where(cookies.c.cookie_id == cookie.cookie_id)
            u = u.values(quantity=cookies.c.quantity - cookie.quantity)
            result = connection.execute(u)
        u = update(orders).where(orders.c.order_id == order_id)
        u = u.values(shipped=True)
        result = connection.execute(u)
        print("Shipped order ID: {}".format(order_id))
        transaction.commit() #提交事务
    except IntegrityError as error:
        transaction.rollback()   #事务回滚
        print(error)
相关文章
相关标签/搜索